blob: 9fdedca922e59d4c72cfe56d0b5e8a3a3029b631 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.INCREMENT;
import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.MUTATE_INFO;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.RandomUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
import org.apache.hadoop.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
/** Creates multiple threads that write key/values into the */
public class MultiThreadedUpdater extends MultiThreadedWriterBase {
private static final Logger LOG = LoggerFactory.getLogger(MultiThreadedUpdater.class);
protected Set<HBaseUpdaterThread> updaters = new HashSet<>();
private MultiThreadedWriterBase writer = null;
private boolean isBatchUpdate = false;
private boolean ignoreNonceConflicts = false;
private final double updatePercent;
public MultiThreadedUpdater(LoadTestDataGenerator dataGen, Configuration conf,
TableName tableName, double updatePercent) throws IOException {
super(dataGen, conf, tableName, "U");
this.updatePercent = updatePercent;
}
/** Use batch vs. separate updates for every column in a row */
public void setBatchUpdate(boolean isBatchUpdate) {
this.isBatchUpdate = isBatchUpdate;
}
public void linkToWriter(MultiThreadedWriterBase writer) {
this.writer = writer;
writer.setTrackWroteKeys(true);
}
@Override
public void start(long startKey, long endKey, int numThreads) throws IOException {
super.start(startKey, endKey, numThreads);
if (verbose) {
LOG.debug("Updating keys [" + startKey + ", " + endKey + ")");
}
addUpdaterThreads(numThreads);
startThreads(updaters);
}
protected void addUpdaterThreads(int numThreads) throws IOException {
for (int i = 0; i < numThreads; ++i) {
HBaseUpdaterThread updater = new HBaseUpdaterThread(i);
updaters.add(updater);
}
}
private long getNextKeyToUpdate() {
if (writer == null) {
return nextKeyToWrite.getAndIncrement();
}
synchronized (this) {
if (nextKeyToWrite.get() >= endKey) {
// Finished the whole key range
return endKey;
}
while (nextKeyToWrite.get() > writer.wroteUpToKey()) {
Threads.sleepWithoutInterrupt(100);
}
long k = nextKeyToWrite.getAndIncrement();
if (writer.failedToWriteKey(k)) {
failedKeySet.add(k);
return getNextKeyToUpdate();
}
return k;
}
}
protected class HBaseUpdaterThread extends Thread {
protected final Table table;
public HBaseUpdaterThread(int updaterId) throws IOException {
setName(getClass().getSimpleName() + "_" + updaterId);
table = createTable();
}
protected Table createTable() throws IOException {
return connection.getTable(tableName);
}
@Override
public void run() {
try {
long rowKeyBase;
StringBuilder buf = new StringBuilder();
byte[][] columnFamilies = dataGenerator.getColumnFamilies();
while ((rowKeyBase = getNextKeyToUpdate()) < endKey) {
if (RandomUtils.nextInt(0, 100) < updatePercent) {
byte[] rowKey = dataGenerator.getDeterministicUniqueKey(rowKeyBase);
Increment inc = new Increment(rowKey);
Append app = new Append(rowKey);
numKeys.addAndGet(1);
int columnCount = 0;
for (byte[] cf : columnFamilies) {
long cfHash = Arrays.hashCode(cf);
inc.addColumn(cf, INCREMENT, cfHash);
buf.setLength(0); // Clear the buffer
buf.append("#").append(Bytes.toString(INCREMENT));
buf.append(":").append(MutationType.INCREMENT.getNumber());
app.addColumn(cf, MUTATE_INFO, Bytes.toBytes(buf.toString()));
++columnCount;
if (!isBatchUpdate) {
mutate(table, inc, rowKeyBase);
numCols.addAndGet(1);
inc = new Increment(rowKey);
mutate(table, app, rowKeyBase);
numCols.addAndGet(1);
app = new Append(rowKey);
}
Get get = new Get(rowKey);
get.addFamily(cf);
try {
get = dataGenerator.beforeGet(rowKeyBase, get);
} catch (Exception e) {
// Ideally wont happen
LOG.warn("Failed to modify the get from the load generator = [" + Bytes.toString(get.getRow())
+ "], column family = [" + Bytes.toString(cf) + "]", e);
}
Result result = getRow(get, rowKeyBase, cf);
Map<byte[], byte[]> columnValues =
result != null ? result.getFamilyMap(cf) : null;
if (columnValues == null) {
int specialPermCellInsertionFactor = Integer.parseInt(dataGenerator.getArgs()[2]);
if (((int) rowKeyBase % specialPermCellInsertionFactor == 0)) {
LOG.info("Null result expected for the rowkey " + Bytes.toString(rowKey));
} else {
failedKeySet.add(rowKeyBase);
LOG.error("Failed to update the row with key = [" + Bytes.toString(rowKey)
+ "], since we could not get the original row");
}
}
if(columnValues != null) {
for (byte[] column : columnValues.keySet()) {
if (Bytes.equals(column, INCREMENT) || Bytes.equals(column, MUTATE_INFO)) {
continue;
}
MutationType mt = MutationType
.valueOf(RandomUtils.nextInt(0, MutationType.values().length));
long columnHash = Arrays.hashCode(column);
long hashCode = cfHash + columnHash;
byte[] hashCodeBytes = Bytes.toBytes(hashCode);
byte[] checkedValue = HConstants.EMPTY_BYTE_ARRAY;
if (hashCode % 2 == 0) {
Cell kv = result.getColumnLatestCell(cf, column);
checkedValue = kv != null ? CellUtil.cloneValue(kv) : null;
Preconditions.checkNotNull(checkedValue,
"Column value to be checked should not be null");
}
buf.setLength(0); // Clear the buffer
buf.append("#").append(Bytes.toString(column)).append(":");
++columnCount;
switch (mt) {
case PUT:
Put put = new Put(rowKey);
put.addColumn(cf, column, hashCodeBytes);
mutate(table, put, rowKeyBase, rowKey, cf, column, checkedValue);
buf.append(MutationType.PUT.getNumber());
break;
case DELETE:
Delete delete = new Delete(rowKey);
// Delete all versions since a put
// could be called multiple times if CM is used
delete.addColumns(cf, column);
mutate(table, delete, rowKeyBase, rowKey, cf, column, checkedValue);
buf.append(MutationType.DELETE.getNumber());
break;
default:
buf.append(MutationType.APPEND.getNumber());
app.addColumn(cf, column, hashCodeBytes);
}
app.addColumn(cf, MUTATE_INFO, Bytes.toBytes(buf.toString()));
if (!isBatchUpdate) {
mutate(table, app, rowKeyBase);
numCols.addAndGet(1);
app = new Append(rowKey);
}
}
}
}
if (isBatchUpdate) {
if (verbose) {
LOG.debug("Preparing increment and append for key = ["
+ Bytes.toString(rowKey) + "], " + columnCount + " columns");
}
mutate(table, inc, rowKeyBase);
mutate(table, app, rowKeyBase);
numCols.addAndGet(columnCount);
}
}
if (trackWroteKeys) {
wroteKeys.add(rowKeyBase);
}
}
} finally {
closeHTable();
numThreadsWorking.decrementAndGet();
}
}
protected void closeHTable() {
try {
if (table != null) {
table.close();
}
} catch (IOException e) {
LOG.error("Error closing table", e);
}
}
protected Result getRow(Get get, long rowKeyBase, byte[] cf) {
Result result = null;
try {
result = table.get(get);
} catch (IOException ie) {
LOG.warn(
"Failed to get the row for key = [" + Bytes.toString(get.getRow()) + "], column family = ["
+ Bytes.toString(cf) + "]", ie);
}
return result;
}
public void mutate(Table table, Mutation m, long keyBase) {
mutate(table, m, keyBase, null, null, null, null);
}
public void mutate(Table table, Mutation m,
long keyBase, byte[] row, byte[] cf, byte[] q, byte[] v) {
long start = System.currentTimeMillis();
try {
m = dataGenerator.beforeMutate(keyBase, m);
if (m instanceof Increment) {
table.increment((Increment)m);
} else if (m instanceof Append) {
table.append((Append)m);
} else if (m instanceof Put) {
table.checkAndMutate(row, cf).qualifier(q).ifEquals(v).thenPut((Put)m);
} else if (m instanceof Delete) {
table.checkAndMutate(row, cf).qualifier(q).ifEquals(v).thenDelete((Delete)m);
} else {
throw new IllegalArgumentException(
"unsupported mutation " + m.getClass().getSimpleName());
}
totalOpTimeMs.addAndGet(System.currentTimeMillis() - start);
} catch (IOException e) {
if (ignoreNonceConflicts) {
LOG.info("Detected nonce conflict, ignoring: " + e.getMessage());
totalOpTimeMs.addAndGet(System.currentTimeMillis() - start);
return;
}
failedKeySet.add(keyBase);
String exceptionInfo;
if (e instanceof RetriesExhaustedWithDetailsException) {
RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException) e;
exceptionInfo = aggEx.getExhaustiveDescription();
} else {
exceptionInfo = StringUtils.stringifyException(e);
}
LOG.error("Failed to mutate: " + keyBase + " after " +
(System.currentTimeMillis() - start) +
"ms; region information: " + getRegionDebugInfoSafe(table, m.getRow()) + "; errors: "
+ exceptionInfo);
}
}
}
@Override
public void waitForFinish() {
super.waitForFinish();
System.out.println("Failed to update keys: " + failedKeySet.size());
for (Long key : failedKeySet) {
System.out.println("Failed to update key: " + key);
}
}
public void mutate(Table table, Mutation m, long keyBase) {
mutate(table, m, keyBase, null, null, null, null);
}
public void mutate(Table table, Mutation m,
long keyBase, byte[] row, byte[] cf, byte[] q, byte[] v) {
long start = System.currentTimeMillis();
try {
m = dataGenerator.beforeMutate(keyBase, m);
if (m instanceof Increment) {
table.increment((Increment)m);
} else if (m instanceof Append) {
table.append((Append)m);
} else if (m instanceof Put) {
table.checkAndMutate(row, cf).qualifier(q).ifEquals(v).thenPut((Put)m);
} else if (m instanceof Delete) {
table.checkAndMutate(row, cf).qualifier(q).ifEquals(v).thenDelete((Delete)m);
} else {
throw new IllegalArgumentException(
"unsupported mutation " + m.getClass().getSimpleName());
}
totalOpTimeMs.addAndGet(System.currentTimeMillis() - start);
} catch (IOException e) {
failedKeySet.add(keyBase);
String exceptionInfo;
if (e instanceof RetriesExhaustedWithDetailsException) {
RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException)e;
exceptionInfo = aggEx.getExhaustiveDescription();
} else {
StringWriter stackWriter = new StringWriter();
PrintWriter pw = new PrintWriter(stackWriter);
e.printStackTrace(pw);
pw.flush();
exceptionInfo = StringUtils.stringifyException(e);
}
LOG.error("Failed to mutate: " + keyBase + " after " + (System.currentTimeMillis() - start) +
"ms; region information: " + getRegionDebugInfoSafe(table, m.getRow()) + "; errors: "
+ exceptionInfo);
}
}
public void setIgnoreNonceConflicts(boolean value) {
this.ignoreNonceConflicts = value;
}
}