blob: 0df4927c1bfb7a339d56d9ffbd4196b79f8c724c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.testclassification.IntegrationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.LoadTestTool;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ToolRunner;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import com.google.common.collect.Sets;
/**
* A base class for tests that do something with the cluster while running
* {@link LoadTestTool} to write and verify some data.
*/
@Category(IntegrationTests.class)
public class IntegrationTestIngest extends IntegrationTestBase {
public static final char HIPHEN = '-';
private static final int SERVER_COUNT = 1; // number of slaves for the smallest cluster
protected static final long DEFAULT_RUN_TIME = 20 * 60 * 1000;
protected static final long JUNIT_RUN_TIME = 10 * 60 * 1000;
/** A soft limit on how long we should run */
protected static final String RUN_TIME_KEY = "hbase.%s.runtime";
protected static final String NUM_KEYS_PER_SERVER_KEY = "num_keys_per_server";
protected static final long DEFAULT_NUM_KEYS_PER_SERVER = 2500;
protected static final String NUM_WRITE_THREADS_KEY = "num_write_threads";
protected static final int DEFAULT_NUM_WRITE_THREADS = 20;
protected static final String NUM_READ_THREADS_KEY = "num_read_threads";
protected static final int DEFAULT_NUM_READ_THREADS = 20;
// Log is being used in IntegrationTestIngestWithEncryption, hence it is protected
protected static final Log LOG = LogFactory.getLog(IntegrationTestIngest.class);
protected IntegrationTestingUtility util;
protected HBaseCluster cluster;
protected LoadTestTool loadTool;
protected String[] LOAD_TEST_TOOL_INIT_ARGS = {
LoadTestTool.OPT_COLUMN_FAMILIES,
LoadTestTool.OPT_COMPRESSION,
LoadTestTool.OPT_DATA_BLOCK_ENCODING,
LoadTestTool.OPT_INMEMORY,
LoadTestTool.OPT_ENCRYPTION,
LoadTestTool.OPT_NUM_REGIONS_PER_SERVER,
LoadTestTool.OPT_REGION_REPLICATION,
};
@Override
public void setUpCluster() throws Exception {
util = getTestingUtil(getConf());
LOG.debug("Initializing/checking cluster has " + SERVER_COUNT + " servers");
util.initializeCluster(getMinServerCount());
LOG.debug("Done initializing/checking cluster");
cluster = util.getHBaseClusterInterface();
deleteTableIfNecessary();
loadTool = new LoadTestTool();
loadTool.setConf(util.getConfiguration());
// Initialize load test tool before we start breaking things;
// LoadTestTool init, even when it is a no-op, is very fragile.
initTable();
}
protected int getMinServerCount() {
return SERVER_COUNT;
}
protected void initTable() throws IOException {
int ret = loadTool.run(getArgsForLoadTestToolInitTable());
Assert.assertEquals("Failed to initialize LoadTestTool", 0, ret);
}
@Override
public int runTestFromCommandLine() throws Exception {
internalRunIngestTest(DEFAULT_RUN_TIME);
return 0;
}
@Test
public void testIngest() throws Exception {
runIngestTest(JUNIT_RUN_TIME, 2500, 10, 1024, 10, 20);
}
protected void internalRunIngestTest(long runTime) throws Exception {
String clazz = this.getClass().getSimpleName();
long numKeysPerServer = conf.getLong(String.format("%s.%s", clazz, NUM_KEYS_PER_SERVER_KEY),
DEFAULT_NUM_KEYS_PER_SERVER);
int numWriteThreads = conf.getInt(
String.format("%s.%s", clazz, NUM_WRITE_THREADS_KEY), DEFAULT_NUM_WRITE_THREADS);
int numReadThreads = conf.getInt(
String.format("%s.%s", clazz, NUM_READ_THREADS_KEY), DEFAULT_NUM_READ_THREADS);
runIngestTest(runTime, numKeysPerServer, 10, 1024, numWriteThreads, numReadThreads);
}
@Override
public TableName getTablename() {
String clazz = this.getClass().getSimpleName();
return TableName.valueOf(
conf.get(String.format("%s.%s", clazz, LoadTestTool.OPT_TABLE_NAME), clazz));
}
@Override
protected Set<String> getColumnFamilies() {
Set<String> families = Sets.newHashSet();
String clazz = this.getClass().getSimpleName();
// parse conf for getting the column famly names because LTT is not initialized yet.
String familiesString = getConf().get(
String.format("%s.%s", clazz, LoadTestTool.OPT_COLUMN_FAMILIES));
if (familiesString == null) {
for (byte[] family : LoadTestTool.DEFAULT_COLUMN_FAMILIES) {
families.add(Bytes.toString(family));
}
} else {
for (String family : familiesString.split(",")) {
families.add(family);
}
}
return families;
}
private void deleteTableIfNecessary() throws IOException {
if (util.getHBaseAdmin().tableExists(getTablename())) {
util.deleteTable(getTablename());
}
}
protected void runIngestTest(long defaultRunTime, long keysPerServerPerIter, int colsPerKey,
int recordSize, int writeThreads, int readThreads) throws Exception {
LOG.info("Running ingest");
LOG.info("Cluster size:" + util.getHBaseClusterInterface().getClusterStatus().getServersSize());
long start = System.currentTimeMillis();
String runtimeKey = String.format(RUN_TIME_KEY, this.getClass().getSimpleName());
long runtime = util.getConfiguration().getLong(runtimeKey, defaultRunTime);
long startKey = 0;
long numKeys = getNumKeys(keysPerServerPerIter);
while (System.currentTimeMillis() - start < 0.9 * runtime) {
LOG.info("Intended run time: " + (runtime/60000) + " min, left:" +
((runtime - (System.currentTimeMillis() - start))/60000) + " min");
int ret = -1;
ret = loadTool.run(getArgsForLoadTestTool("-write",
String.format("%d:%d:%d", colsPerKey, recordSize, writeThreads), startKey, numKeys));
if (0 != ret) {
String errorMsg = "Load failed with error code " + ret;
LOG.error(errorMsg);
Assert.fail(errorMsg);
}
ret = loadTool.run(getArgsForLoadTestTool("-update", String.format("60:%d:1", writeThreads),
startKey, numKeys));
if (0 != ret) {
String errorMsg = "Update failed with error code " + ret;
LOG.error(errorMsg);
Assert.fail(errorMsg);
}
ret = loadTool.run(getArgsForLoadTestTool("-read", String.format("100:%d", readThreads)
, startKey, numKeys));
if (0 != ret) {
String errorMsg = "Verification failed with error code " + ret;
LOG.error(errorMsg + " Rerunning verification after 1 minute for debugging");
Threads.sleep(1000 * 60);
ret = loadTool.run(getArgsForLoadTestTool("-read", String.format("100:%d", readThreads)
, startKey, numKeys));
if (0 != ret) {
LOG.error("Rerun of Verification failed with error code " + ret);
}
Assert.fail(errorMsg);
}
startKey += numKeys;
}
}
protected String[] getArgsForLoadTestToolInitTable() {
List<String> args = new ArrayList<String>();
args.add("-tn");
args.add(getTablename().getNameAsString());
// pass all remaining args from conf with keys <test class name>.<load test tool arg>
String clazz = this.getClass().getSimpleName();
for (String arg : LOAD_TEST_TOOL_INIT_ARGS) {
String val = conf.get(String.format("%s.%s", clazz, arg));
if (val != null) {
args.add("-" + arg);
args.add(val);
}
}
args.add("-init_only");
return args.toArray(new String[args.size()]);
}
protected String[] getArgsForLoadTestTool(String mode, String modeSpecificArg, long startKey,
long numKeys) {
List<String> args = new ArrayList<String>();
args.add("-tn");
args.add(getTablename().getNameAsString());
args.add("-families");
args.add(getColumnFamiliesAsString());
args.add(mode);
args.add(modeSpecificArg);
args.add("-start_key");
args.add(String.valueOf(startKey));
args.add("-num_keys");
args.add(String.valueOf(numKeys));
args.add("-skip_init");
return args.toArray(new String[args.size()]);
}
private String getColumnFamiliesAsString() {
return StringUtils.join(",", getColumnFamilies());
}
/** Estimates a data size based on the cluster size */
protected long getNumKeys(long keysPerServer)
throws IOException {
int numRegionServers = cluster.getClusterStatus().getServersSize();
return keysPerServer * numRegionServers;
}
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
IntegrationTestingUtility.setUseDistributedCluster(conf);
int ret = ToolRunner.run(conf, new IntegrationTestIngest(), args);
System.exit(ret);
}
}