blob: c64c7431aa9e213e3c7dcfffcee5f7d8fdb4250c [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;
import static org.junit.Assert.assertEquals;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.IntegrationTestBase;
import org.apache.hadoop.hbase.IntegrationTestingUtility;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Consistency;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.testclassification.IntegrationTests;
import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.RegionSplitter;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ToolRunner;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
/**
* Test Bulk Load and MR on a distributed cluster.
* It starts an MR job that creates linked chains
*
* The format of rows is like this:
* Row Key -> Long
*
* L:<< Chain Id >> -> Row Key of the next link in the chain
* S:<< Chain Id >> -> The step in the chain that his link is.
* D:<< Chain Id >> -> Random Data.
*
* All chains start on row 0.
* All rk's are > 0.
*
* After creating the linked lists they are walked over using a TableMapper based Mapreduce Job.
*
* There are a few options exposed:
*
* hbase.IntegrationTestBulkLoad.chainLength
* The number of rows that will be part of each and every chain.
*
* hbase.IntegrationTestBulkLoad.numMaps
* The number of mappers that will be run. Each mapper creates on linked list chain.
*
* hbase.IntegrationTestBulkLoad.numImportRounds
* How many jobs will be run to create linked lists.
*
* hbase.IntegrationTestBulkLoad.tableName
* The name of the table.
*
* hbase.IntegrationTestBulkLoad.replicaCount
* How many region replicas to configure for the table under test.
*/
@Category(IntegrationTests.class)
public class IntegrationTestBulkLoad extends IntegrationTestBase {
private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestBulkLoad.class);
private static final byte[] CHAIN_FAM = Bytes.toBytes("L");
private static final byte[] SORT_FAM = Bytes.toBytes("S");
private static final byte[] DATA_FAM = Bytes.toBytes("D");
private static String CHAIN_LENGTH_KEY = "hbase.IntegrationTestBulkLoad.chainLength";
private static int CHAIN_LENGTH = 500000;
private static String NUM_MAPS_KEY = "hbase.IntegrationTestBulkLoad.numMaps";
private static int NUM_MAPS = 1;
private static String NUM_IMPORT_ROUNDS_KEY = "hbase.IntegrationTestBulkLoad.numImportRounds";
private static int NUM_IMPORT_ROUNDS = 1;
private static String ROUND_NUM_KEY = "hbase.IntegrationTestBulkLoad.roundNum";
private static String TABLE_NAME_KEY = "hbase.IntegrationTestBulkLoad.tableName";
private static String TABLE_NAME = "IntegrationTestBulkLoad";
private static String NUM_REPLICA_COUNT_KEY = "hbase.IntegrationTestBulkLoad.replicaCount";
private static int NUM_REPLICA_COUNT_DEFAULT = 1;
private static final String OPT_LOAD = "load";
private static final String OPT_CHECK = "check";
private boolean load = false;
private boolean check = false;
public static class SlowMeCoproScanOperations implements RegionCoprocessor, RegionObserver {
static final AtomicLong sleepTime = new AtomicLong(2000);
Random r = new Random();
AtomicLong countOfNext = new AtomicLong(0);
AtomicLong countOfOpen = new AtomicLong(0);
public SlowMeCoproScanOperations() {}
@Override
public Optional<RegionObserver> getRegionObserver() {
return Optional.of(this);
}
@Override
public void preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
final Scan scan) throws IOException {
if (countOfOpen.incrementAndGet() == 2) { //slowdown openScanner randomly
slowdownCode(e);
}
}
@Override
public boolean preScannerNext(final ObserverContext<RegionCoprocessorEnvironment> e,
final InternalScanner s, final List<Result> results,
final int limit, final boolean hasMore) throws IOException {
//this will slow down a certain next operation if the conditions are met. The slowness
//will allow the call to go to a replica
countOfNext.incrementAndGet();
if (countOfNext.get() == 0 || countOfNext.get() == 4) {
slowdownCode(e);
}
return true;
}
protected void slowdownCode(final ObserverContext<RegionCoprocessorEnvironment> e) {
if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) {
try {
if (sleepTime.get() > 0) {
LOG.info("Sleeping for " + sleepTime.get() + " ms");
Thread.sleep(sleepTime.get());
}
} catch (InterruptedException e1) {
LOG.error(e1.toString(), e1);
}
}
}
}
/**
* Modify table {@code getTableName()} to carry {@link SlowMeCoproScanOperations}.
*/
private void installSlowingCoproc() throws IOException, InterruptedException {
int replicaCount = conf.getInt(NUM_REPLICA_COUNT_KEY, NUM_REPLICA_COUNT_DEFAULT);
if (replicaCount == NUM_REPLICA_COUNT_DEFAULT) return;
TableName t = getTablename();
Admin admin = util.getAdmin();
TableDescriptor desc = admin.getDescriptor(t);
TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(desc);
builder.setCoprocessor(SlowMeCoproScanOperations.class.getName());
HBaseTestingUtility.modifyTableSync(admin, builder.build());
}
@Test
public void testBulkLoad() throws Exception {
runLoad();
installSlowingCoproc();
runCheckWithRetry();
}
public void runLoad() throws Exception {
setupTable();
int numImportRounds = getConf().getInt(NUM_IMPORT_ROUNDS_KEY, NUM_IMPORT_ROUNDS);
LOG.info("Running load with numIterations:" + numImportRounds);
for (int i = 0; i < numImportRounds; i++) {
runLinkedListMRJob(i);
}
}
private byte[][] getSplits(int numRegions) {
RegionSplitter.UniformSplit split = new RegionSplitter.UniformSplit();
split.setFirstRow(Bytes.toBytes(0L));
split.setLastRow(Bytes.toBytes(Long.MAX_VALUE));
return split.split(numRegions);
}
private void setupTable() throws IOException, InterruptedException {
if (util.getAdmin().tableExists(getTablename())) {
util.deleteTable(getTablename());
}
util.createTable(
getTablename(),
new byte[][]{CHAIN_FAM, SORT_FAM, DATA_FAM},
getSplits(16)
);
int replicaCount = conf.getInt(NUM_REPLICA_COUNT_KEY, NUM_REPLICA_COUNT_DEFAULT);
if (replicaCount == NUM_REPLICA_COUNT_DEFAULT) return;
TableName t = getTablename();
HBaseTestingUtility.setReplicas(util.getAdmin(), t, replicaCount);
}
private void runLinkedListMRJob(int iteration) throws Exception {
String jobName = IntegrationTestBulkLoad.class.getSimpleName() + " - " +
EnvironmentEdgeManager.currentTime();
Configuration conf = new Configuration(util.getConfiguration());
Path p = null;
if (conf.get(ImportTsv.BULK_OUTPUT_CONF_KEY) == null) {
p = util.getDataTestDirOnTestFS(getTablename() + "-" + iteration);
} else {
p = new Path(conf.get(ImportTsv.BULK_OUTPUT_CONF_KEY));
}
conf.setBoolean("mapreduce.map.speculative", false);
conf.setBoolean("mapreduce.reduce.speculative", false);
conf.setInt(ROUND_NUM_KEY, iteration);
Job job = new Job(conf);
job.setJobName(jobName);
// set the input format so that we can create map tasks with no data input.
job.setInputFormatClass(ITBulkLoadInputFormat.class);
// Set the mapper classes.
job.setMapperClass(LinkedListCreationMapper.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(KeyValue.class);
// Use the identity reducer
// So nothing to do here.
// Set this jar.
job.setJarByClass(getClass());
// Set where to place the hfiles.
FileOutputFormat.setOutputPath(job, p);
try (Connection conn = ConnectionFactory.createConnection(conf); Admin admin = conn.getAdmin();
RegionLocator regionLocator = conn.getRegionLocator(getTablename())) {
// Configure the partitioner and other things needed for HFileOutputFormat.
HFileOutputFormat2.configureIncrementalLoad(job, admin.getDescriptor(getTablename()),
regionLocator);
// Run the job making sure it works.
assertEquals(true, job.waitForCompletion(true));
}
// Create a new loader.
BulkLoadHFiles loader = BulkLoadHFiles.create(conf);
// Load the HFiles in.
loader.bulkLoad(getTablename(), p);
// Delete the files.
util.getTestFileSystem().delete(p, true);
}
public static class EmptySplit extends InputSplit implements Writable {
@Override
public void write(DataOutput out) throws IOException { }
@Override
public void readFields(DataInput in) throws IOException { }
@Override
public long getLength() { return 0L; }
@Override
public String[] getLocations() { return new String[0]; }
}
public static class FixedRecordReader<K, V> extends RecordReader<K, V> {
private int index = -1;
private K[] keys;
private V[] values;
public FixedRecordReader(K[] keys, V[] values) {
this.keys = keys;
this.values = values;
}
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException,
InterruptedException { }
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
return ++index < keys.length;
}
@Override
public K getCurrentKey() throws IOException, InterruptedException {
return keys[index];
}
@Override
public V getCurrentValue() throws IOException, InterruptedException {
return values[index];
}
@Override
public float getProgress() throws IOException, InterruptedException {
return (float)index / keys.length;
}
@Override
public void close() throws IOException {
}
}
public static class ITBulkLoadInputFormat extends InputFormat<LongWritable, LongWritable> {
@Override
public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
int numSplits = context.getConfiguration().getInt(NUM_MAPS_KEY, NUM_MAPS);
ArrayList<InputSplit> ret = new ArrayList<>(numSplits);
for (int i = 0; i < numSplits; ++i) {
ret.add(new EmptySplit());
}
return ret;
}
@Override
public RecordReader<LongWritable, LongWritable> createRecordReader(InputSplit split,
TaskAttemptContext context)
throws IOException, InterruptedException {
int taskId = context.getTaskAttemptID().getTaskID().getId();
int numMapTasks = context.getConfiguration().getInt(NUM_MAPS_KEY, NUM_MAPS);
int numIterations = context.getConfiguration().getInt(NUM_IMPORT_ROUNDS_KEY, NUM_IMPORT_ROUNDS);
int iteration = context.getConfiguration().getInt(ROUND_NUM_KEY, 0);
taskId = taskId + iteration * numMapTasks;
numMapTasks = numMapTasks * numIterations;
long chainId = Math.abs(new Random().nextLong());
chainId = chainId - (chainId % numMapTasks) + taskId; // ensure that chainId is unique per task and across iterations
LongWritable[] keys = new LongWritable[] {new LongWritable(chainId)};
return new FixedRecordReader<>(keys, keys);
}
}
/**
* Mapper that creates a linked list of KeyValues.
*
* Each map task generates one linked list.
* All lists start on row key 0L.
* All lists should be CHAIN_LENGTH long.
*/
public static class LinkedListCreationMapper
extends Mapper<LongWritable, LongWritable, ImmutableBytesWritable, KeyValue> {
private Random rand = new Random();
@Override
protected void map(LongWritable key, LongWritable value, Context context)
throws IOException, InterruptedException {
long chainId = value.get();
LOG.info("Starting mapper with chainId:" + chainId);
byte[] chainIdArray = Bytes.toBytes(chainId);
long currentRow = 0;
long chainLength = context.getConfiguration().getLong(CHAIN_LENGTH_KEY, CHAIN_LENGTH);
long nextRow = getNextRow(0, chainLength);
for (long i = 0; i < chainLength; i++) {
byte[] rk = Bytes.toBytes(currentRow);
// Next link in the chain.
KeyValue linkKv = new KeyValue(rk, CHAIN_FAM, chainIdArray, Bytes.toBytes(nextRow));
// What link in the chain this is.
KeyValue sortKv = new KeyValue(rk, SORT_FAM, chainIdArray, Bytes.toBytes(i));
// Added data so that large stores are created.
KeyValue dataKv = new KeyValue(rk, DATA_FAM, chainIdArray,
Bytes.toBytes(RandomStringUtils.randomAlphabetic(50))
);
// Emit the key values.
context.write(new ImmutableBytesWritable(rk), linkKv);
context.write(new ImmutableBytesWritable(rk), sortKv);
context.write(new ImmutableBytesWritable(rk), dataKv);
// Move to the next row.
currentRow = nextRow;
nextRow = getNextRow(i+1, chainLength);
}
}
/** Returns a unique row id within this chain for this index */
private long getNextRow(long index, long chainLength) {
long nextRow = Math.abs(rand.nextLong());
// use significant bits from the random number, but pad with index to ensure it is unique
// this also ensures that we do not reuse row = 0
// row collisions from multiple mappers are fine, since we guarantee unique chainIds
nextRow = nextRow - (nextRow % chainLength) + index;
return nextRow;
}
}
/**
* Writable class used as the key to group links in the linked list.
*
* Used as the key emited from a pass over the table.
*/
public static class LinkKey implements WritableComparable<LinkKey> {
private Long chainId;
public Long getOrder() {
return order;
}
public Long getChainId() {
return chainId;
}
private Long order;
public LinkKey() {}
public LinkKey(long chainId, long order) {
this.chainId = chainId;
this.order = order;
}
@Override
public int compareTo(LinkKey linkKey) {
int res = getChainId().compareTo(linkKey.getChainId());
if (res == 0) {
res = getOrder().compareTo(linkKey.getOrder());
}
return res;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
WritableUtils.writeVLong(dataOutput, chainId);
WritableUtils.writeVLong(dataOutput, order);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
chainId = WritableUtils.readVLong(dataInput);
order = WritableUtils.readVLong(dataInput);
}
}
/**
* Writable used as the value emitted from a pass over the hbase table.
*/
public static class LinkChain implements WritableComparable<LinkChain> {
public Long getNext() {
return next;
}
public Long getRk() {
return rk;
}
public LinkChain() {}
public LinkChain(Long rk, Long next) {
this.rk = rk;
this.next = next;
}
private Long rk;
private Long next;
@Override
public int compareTo(LinkChain linkChain) {
int res = getRk().compareTo(linkChain.getRk());
if (res == 0) {
res = getNext().compareTo(linkChain.getNext());
}
return res;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
WritableUtils.writeVLong(dataOutput, rk);
WritableUtils.writeVLong(dataOutput, next);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
rk = WritableUtils.readVLong(dataInput);
next = WritableUtils.readVLong(dataInput);
}
}
/**
* Class to figure out what partition to send a link in the chain to. This is based upon
* the linkKey's ChainId.
*/
public static class NaturalKeyPartitioner extends Partitioner<LinkKey, LinkChain> {
@Override
public int getPartition(LinkKey linkKey,
LinkChain linkChain,
int numPartitions) {
int hash = linkKey.getChainId().hashCode();
return Math.abs(hash % numPartitions);
}
}
/**
* Comparator used to figure out if a linkKey should be grouped together. This is based upon the
* linkKey's ChainId.
*/
public static class NaturalKeyGroupingComparator extends WritableComparator {
protected NaturalKeyGroupingComparator() {
super(LinkKey.class, true);
}
@Override
public int compare(WritableComparable w1, WritableComparable w2) {
LinkKey k1 = (LinkKey) w1;
LinkKey k2 = (LinkKey) w2;
return k1.getChainId().compareTo(k2.getChainId());
}
}
/**
* Comparator used to order linkKeys so that they are passed to a reducer in order. This is based
* upon linkKey ChainId and Order.
*/
public static class CompositeKeyComparator extends WritableComparator {
protected CompositeKeyComparator() {
super(LinkKey.class, true);
}
@Override
public int compare(WritableComparable w1, WritableComparable w2) {
LinkKey k1 = (LinkKey) w1;
LinkKey k2 = (LinkKey) w2;
return k1.compareTo(k2);
}
}
/**
* Mapper to pass over the table.
*
* For every row there could be multiple chains that landed on this row. So emit a linkKey
* and value for each.
*/
public static class LinkedListCheckingMapper extends TableMapper<LinkKey, LinkChain> {
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context)
throws IOException, InterruptedException {
long longRk = Bytes.toLong(value.getRow());
for (Map.Entry<byte[], byte[]> entry : value.getFamilyMap(CHAIN_FAM).entrySet()) {
long chainId = Bytes.toLong(entry.getKey());
long next = Bytes.toLong(entry.getValue());
Cell c = value.getColumnCells(SORT_FAM, entry.getKey()).get(0);
long order = Bytes.toLong(CellUtil.cloneValue(c));
context.write(new LinkKey(chainId, order), new LinkChain(longRk, next));
}
}
}
/**
* Class that does the actual checking of the links.
*
* All links in the chain should be grouped and sorted when sent to this class. Then the chain
* will be traversed making sure that no link is missing and that the chain is the correct length.
*
* This will throw an exception if anything is not correct. That causes the job to fail if any
* data is corrupt.
*/
public static class LinkedListCheckingReducer
extends Reducer<LinkKey, LinkChain, NullWritable, NullWritable> {
@Override
protected void reduce(LinkKey key, Iterable<LinkChain> values, Context context)
throws java.io.IOException, java.lang.InterruptedException {
long next = -1L;
long prev = -1L;
long count = 0L;
for (LinkChain lc : values) {
if (next == -1) {
if (lc.getRk() != 0L) {
String msg = "Chains should all start at rk 0, but read rk " + lc.getRk()
+ ". Chain:" + key.chainId + ", order:" + key.order;
logError(msg, context);
throw new RuntimeException(msg);
}
next = lc.getNext();
} else {
if (next != lc.getRk()) {
String msg = "Missing a link in the chain. Prev rk " + prev + " was, expecting "
+ next + " but got " + lc.getRk() + ". Chain:" + key.chainId
+ ", order:" + key.order;
logError(msg, context);
throw new RuntimeException(msg);
}
prev = lc.getRk();
next = lc.getNext();
}
count++;
}
int expectedChainLen = context.getConfiguration().getInt(CHAIN_LENGTH_KEY, CHAIN_LENGTH);
if (count != expectedChainLen) {
String msg = "Chain wasn't the correct length. Expected " + expectedChainLen + " got "
+ count + ". Chain:" + key.chainId + ", order:" + key.order;
logError(msg, context);
throw new RuntimeException(msg);
}
}
private static void logError(String msg, Context context) throws IOException {
TableName table = getTableName(context.getConfiguration());
LOG.error("Failure in chain verification: " + msg);
try (Connection connection = ConnectionFactory.createConnection(context.getConfiguration());
Admin admin = connection.getAdmin()) {
LOG.error("cluster metrics:\n" + admin.getClusterMetrics());
LOG.error("table regions:\n"
+ Joiner.on("\n").join(admin.getRegions(table)));
}
}
}
private void runCheckWithRetry() throws IOException, ClassNotFoundException, InterruptedException {
try {
runCheck();
} catch (Throwable t) {
LOG.warn("Received " + StringUtils.stringifyException(t));
LOG.warn("Running the check MR Job again to see whether an ephemeral problem or not");
runCheck();
throw t; // we should still fail the test even if second retry succeeds
}
// everything green
}
/**
* After adding data to the table start a mr job to
* @throws IOException
* @throws ClassNotFoundException
* @throws InterruptedException
*/
private void runCheck() throws IOException, ClassNotFoundException, InterruptedException {
LOG.info("Running check");
Configuration conf = getConf();
String jobName = getTablename() + "_check" + EnvironmentEdgeManager.currentTime();
Path p = util.getDataTestDirOnTestFS(jobName);
Job job = new Job(conf);
job.setJarByClass(getClass());
job.setJobName(jobName);
job.setPartitionerClass(NaturalKeyPartitioner.class);
job.setGroupingComparatorClass(NaturalKeyGroupingComparator.class);
job.setSortComparatorClass(CompositeKeyComparator.class);
Scan scan = new Scan();
scan.addFamily(CHAIN_FAM);
scan.addFamily(SORT_FAM);
scan.setMaxVersions(1);
scan.setCacheBlocks(false);
scan.setBatch(1000);
int replicaCount = conf.getInt(NUM_REPLICA_COUNT_KEY, NUM_REPLICA_COUNT_DEFAULT);
if (replicaCount != NUM_REPLICA_COUNT_DEFAULT) {
scan.setConsistency(Consistency.TIMELINE);
}
TableMapReduceUtil.initTableMapperJob(
getTablename().getName(),
scan,
LinkedListCheckingMapper.class,
LinkKey.class,
LinkChain.class,
job
);
job.setReducerClass(LinkedListCheckingReducer.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(NullWritable.class);
FileOutputFormat.setOutputPath(job, p);
assertEquals(true, job.waitForCompletion(true));
// Delete the files.
util.getTestFileSystem().delete(p, true);
}
@Override
public void setUpCluster() throws Exception {
util = getTestingUtil(getConf());
util.initializeCluster(1);
int replicaCount = getConf().getInt(NUM_REPLICA_COUNT_KEY, NUM_REPLICA_COUNT_DEFAULT);
if (LOG.isDebugEnabled() && replicaCount != NUM_REPLICA_COUNT_DEFAULT) {
LOG.debug("Region Replicas enabled: " + replicaCount);
}
// Scale this up on a real cluster
if (util.isDistributedCluster()) {
util.getConfiguration().setIfUnset(NUM_MAPS_KEY,
Integer.toString(util.getAdmin().getRegionServers().size() * 10));
util.getConfiguration().setIfUnset(NUM_IMPORT_ROUNDS_KEY, "5");
} else {
util.startMiniMapReduceCluster();
}
}
@Override
protected void addOptions() {
super.addOptions();
super.addOptNoArg(OPT_CHECK, "Run check only");
super.addOptNoArg(OPT_LOAD, "Run load only");
}
@Override
protected void processOptions(CommandLine cmd) {
super.processOptions(cmd);
check = cmd.hasOption(OPT_CHECK);
load = cmd.hasOption(OPT_LOAD);
}
@Override
public int runTestFromCommandLine() throws Exception {
if (load) {
runLoad();
} else if (check) {
installSlowingCoproc();
runCheckWithRetry();
} else {
testBulkLoad();
}
return 0;
}
@Override
public TableName getTablename() {
return getTableName(getConf());
}
public static TableName getTableName(Configuration conf) {
return TableName.valueOf(conf.get(TABLE_NAME_KEY, TABLE_NAME));
}
@Override
protected Set<String> getColumnFamilies() {
return Sets.newHashSet(Bytes.toString(CHAIN_FAM) , Bytes.toString(DATA_FAM),
Bytes.toString(SORT_FAM));
}
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
IntegrationTestingUtility.setUseDistributedCluster(conf);
int status = ToolRunner.run(conf, new IntegrationTestBulkLoad(), args);
System.exit(status);
}
}