blob: 4faf86dd4e60a7d1a98b5289ab88644eaefe7a11 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import java.io.IOException;
import java.util.List;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.util.AbstractHBaseTool;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
/**
* A test tool that uses multiple threads to read and write multifamily rows into a table, verifying
* that reads never see partially-complete writes
*/
@InterfaceAudience.Private
public class AcidGuaranteesTestTool extends AbstractHBaseTool {
private static final Logger LOG = LoggerFactory.getLogger(AcidGuaranteesTestTool.class);
public static final TableName TABLE_NAME = TableName.valueOf("TestAcidGuarantees");
public static final byte[] FAMILY_A = Bytes.toBytes("A");
public static final byte[] FAMILY_B = Bytes.toBytes("B");
public static final byte[] FAMILY_C = Bytes.toBytes("C");
public static final byte[] QUALIFIER_NAME = Bytes.toBytes("data");
public static final byte[][] FAMILIES = new byte[][] { FAMILY_A, FAMILY_B, FAMILY_C };
public static int NUM_COLS_TO_CHECK = 50;
private ExecutorService sharedPool;
private long millisToRun;
private int numWriters;
private int numGetters;
private int numScanners;
private int numUniqueRows;
private boolean crazyFlush;
private boolean useMob;
private ExecutorService createThreadPool() {
int maxThreads = 256;
int coreThreads = 128;
long keepAliveTime = 60;
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>(
maxThreads * HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS);
ThreadPoolExecutor tpe =
new ThreadPoolExecutor(coreThreads, maxThreads, keepAliveTime, TimeUnit.SECONDS, workQueue,
new ThreadFactoryBuilder().setNameFormat(toString() + "-shared-pool-%d").setDaemon(true)
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
tpe.allowCoreThreadTimeOut(true);
return tpe;
}
@Override
protected void addOptions() {
addOptWithArg("millis", "time limit in milliseconds");
addOptWithArg("numWriters", "number of write threads");
addOptWithArg("numGetters", "number of get threads");
addOptWithArg("numScanners", "number of scan threads");
addOptWithArg("numUniqueRows", "number of unique rows to test");
addOptNoArg("crazyFlush",
"if specified we will flush continuously otherwise will flush every minute");
addOptNoArg("useMob", "if specified we will enable mob on the first column family");
}
@Override
protected void processOptions(CommandLine cmd) {
millisToRun = getOptionAsLong(cmd, "millis", 5000);
numWriters = getOptionAsInt(cmd, "numWriters", 50);
numGetters = getOptionAsInt(cmd, "numGetters", 2);
numScanners = getOptionAsInt(cmd, "numScanners", 2);
numUniqueRows = getOptionAsInt(cmd, "numUniqueRows", 3);
crazyFlush = cmd.hasOption("crazyFlush");
useMob = cmd.hasOption("useMob");
}
@Override
protected int doWork() throws Exception {
sharedPool = createThreadPool();
try (Connection conn = ConnectionFactory.createConnection(getConf())) {
runTestAtomicity(conn.getAdmin());
} finally {
sharedPool.shutdown();
}
return 0;
}
/**
* Thread that does random full-row writes into a table.
*/
public static class AtomicityWriter extends RepeatingTestThread {
Random rand = new Random();
byte data[] = new byte[10];
byte[][] targetRows;
byte[][] targetFamilies;
Connection connection;
Table table;
AtomicLong numWritten = new AtomicLong();
public AtomicityWriter(TestContext ctx, byte[][] targetRows, byte[][] targetFamilies,
ExecutorService pool) throws IOException {
super(ctx);
this.targetRows = targetRows;
this.targetFamilies = targetFamilies;
connection = ConnectionFactory.createConnection(ctx.getConf(), pool);
table = connection.getTable(TABLE_NAME);
}
@Override
public void doAnAction() throws Exception {
// Pick a random row to write into
byte[] targetRow = targetRows[rand.nextInt(targetRows.length)];
Put p = new Put(targetRow);
rand.nextBytes(data);
for (byte[] family : targetFamilies) {
for (int i = 0; i < NUM_COLS_TO_CHECK; i++) {
byte qualifier[] = Bytes.toBytes("col" + i);
p.addColumn(family, qualifier, data);
}
}
table.put(p);
numWritten.getAndIncrement();
}
@Override
public void workDone() throws IOException {
try {
table.close();
} finally {
connection.close();
}
}
}
/**
* Thread that does single-row reads in a table, looking for partially completed rows.
*/
public static class AtomicGetReader extends RepeatingTestThread {
byte[] targetRow;
byte[][] targetFamilies;
Connection connection;
Table table;
int numVerified = 0;
AtomicLong numRead = new AtomicLong();
public AtomicGetReader(TestContext ctx, byte[] targetRow, byte[][] targetFamilies,
ExecutorService pool) throws IOException {
super(ctx);
this.targetRow = targetRow;
this.targetFamilies = targetFamilies;
connection = ConnectionFactory.createConnection(ctx.getConf(), pool);
table = connection.getTable(TABLE_NAME);
}
@Override
public void doAnAction() throws Exception {
Get g = new Get(targetRow);
Result res = table.get(g);
byte[] gotValue = null;
if (res.getRow() == null) {
// Trying to verify but we didn't find the row - the writing
// thread probably just hasn't started writing yet, so we can
// ignore this action
return;
}
for (byte[] family : targetFamilies) {
for (int i = 0; i < NUM_COLS_TO_CHECK; i++) {
byte qualifier[] = Bytes.toBytes("col" + i);
byte thisValue[] = res.getValue(family, qualifier);
if (gotValue != null && !Bytes.equals(gotValue, thisValue)) {
gotFailure(gotValue, res);
}
numVerified++;
gotValue = thisValue;
}
}
numRead.getAndIncrement();
}
@Override
public void workDone() throws IOException {
try {
table.close();
} finally {
connection.close();
}
}
private void gotFailure(byte[] expected, Result res) {
StringBuilder msg = new StringBuilder();
msg.append("Failed after ").append(numVerified).append("!");
msg.append("Expected=").append(Bytes.toStringBinary(expected));
msg.append("Got:\n");
for (Cell kv : res.listCells()) {
msg.append(kv.toString());
msg.append(" val= ");
msg.append(Bytes.toStringBinary(CellUtil.cloneValue(kv)));
msg.append("\n");
}
throw new RuntimeException(msg.toString());
}
}
/**
* Thread that does full scans of the table looking for any partially completed rows.
*/
public static class AtomicScanReader extends RepeatingTestThread {
byte[][] targetFamilies;
Table table;
Connection connection;
AtomicLong numScans = new AtomicLong();
AtomicLong numRowsScanned = new AtomicLong();
public AtomicScanReader(TestContext ctx, byte[][] targetFamilies, ExecutorService pool)
throws IOException {
super(ctx);
this.targetFamilies = targetFamilies;
connection = ConnectionFactory.createConnection(ctx.getConf(), pool);
table = connection.getTable(TABLE_NAME);
}
@Override
public void doAnAction() throws Exception {
Scan s = new Scan();
for (byte[] family : targetFamilies) {
s.addFamily(family);
}
ResultScanner scanner = table.getScanner(s);
for (Result res : scanner) {
byte[] gotValue = null;
for (byte[] family : targetFamilies) {
for (int i = 0; i < NUM_COLS_TO_CHECK; i++) {
byte qualifier[] = Bytes.toBytes("col" + i);
byte thisValue[] = res.getValue(family, qualifier);
if (gotValue != null && !Bytes.equals(gotValue, thisValue)) {
gotFailure(gotValue, res);
}
gotValue = thisValue;
}
}
numRowsScanned.getAndIncrement();
}
numScans.getAndIncrement();
}
@Override
public void workDone() throws IOException {
try {
table.close();
} finally {
connection.close();
}
}
private void gotFailure(byte[] expected, Result res) {
StringBuilder msg = new StringBuilder();
msg.append("Failed after ").append(numRowsScanned).append("!");
msg.append("Expected=").append(Bytes.toStringBinary(expected));
msg.append("Got:\n");
for (Cell kv : res.listCells()) {
msg.append(kv.toString());
msg.append(" val= ");
msg.append(Bytes.toStringBinary(CellUtil.cloneValue(kv)));
msg.append("\n");
}
throw new RuntimeException(msg.toString());
}
}
private void createTableIfMissing(Admin admin, boolean useMob) throws IOException {
if (!admin.tableExists(TABLE_NAME)) {
TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TABLE_NAME);
Stream.of(FAMILIES).map(ColumnFamilyDescriptorBuilder::of)
.forEachOrdered(builder::setColumnFamily);
admin.createTable(builder.build());
}
ColumnFamilyDescriptor cfd = admin.getDescriptor(TABLE_NAME).getColumnFamilies()[0];
if (cfd.isMobEnabled() != useMob) {
admin.modifyColumnFamily(TABLE_NAME, ColumnFamilyDescriptorBuilder.newBuilder(cfd)
.setMobEnabled(useMob).setMobThreshold(4).build());
}
}
private void runTestAtomicity(Admin admin) throws Exception {
createTableIfMissing(admin, useMob);
TestContext ctx = new TestContext(conf);
byte rows[][] = new byte[numUniqueRows][];
for (int i = 0; i < numUniqueRows; i++) {
rows[i] = Bytes.toBytes("test_row_" + i);
}
List<AtomicityWriter> writers = Lists.newArrayList();
for (int i = 0; i < numWriters; i++) {
AtomicityWriter writer = new AtomicityWriter(ctx, rows, FAMILIES, sharedPool);
writers.add(writer);
ctx.addThread(writer);
}
// Add a flusher
ctx.addThread(new RepeatingTestThread(ctx) {
@Override
public void doAnAction() throws Exception {
try {
admin.flush(TABLE_NAME);
} catch (IOException ioe) {
LOG.warn("Ignoring exception while flushing: " + StringUtils.stringifyException(ioe));
}
// Flushing has been a source of ACID violations previously (see HBASE-2856), so ideally,
// we would flush as often as possible. On a running cluster, this isn't practical:
// (1) we will cause a lot of load due to all the flushing and compacting
// (2) we cannot change the flushing/compacting related Configuration options to try to
// alleviate this
// (3) it is an unrealistic workload, since no one would actually flush that often.
// Therefore, let's flush every minute to have more flushes than usual, but not overload
// the running cluster.
if (!crazyFlush) {
Thread.sleep(60000);
}
}
});
List<AtomicGetReader> getters = Lists.newArrayList();
for (int i = 0; i < numGetters; i++) {
AtomicGetReader getter =
new AtomicGetReader(ctx, rows[i % numUniqueRows], FAMILIES, sharedPool);
getters.add(getter);
ctx.addThread(getter);
}
List<AtomicScanReader> scanners = Lists.newArrayList();
for (int i = 0; i < numScanners; i++) {
AtomicScanReader scanner = new AtomicScanReader(ctx, FAMILIES, sharedPool);
scanners.add(scanner);
ctx.addThread(scanner);
}
ctx.startThreads();
ctx.waitFor(millisToRun);
ctx.stop();
LOG.info("Finished test. Writers:");
for (AtomicityWriter writer : writers) {
LOG.info(" wrote " + writer.numWritten.get());
}
LOG.info("Readers:");
for (AtomicGetReader reader : getters) {
LOG.info(" read " + reader.numRead.get());
}
LOG.info("Scanners:");
for (AtomicScanReader scanner : scanners) {
LOG.info(" scanned " + scanner.numScans.get());
LOG.info(" verified " + scanner.numRowsScanned.get() + " rows");
}
}
public static void main(String[] args) {
Configuration c = HBaseConfiguration.create();
int status;
try {
AcidGuaranteesTestTool test = new AcidGuaranteesTestTool();
status = ToolRunner.run(c, test, args);
} catch (Exception e) {
LOG.error("Exiting due to error", e);
status = -1;
}
System.exit(status);
}
}