blob: 657de61ad5cf18a2573965ae108d348fe07c7409 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.bigtop.itest.hbase.smoke;
import java.io.IOException;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.PerformanceEvaluation;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.hbase.mapreduce.NMapInputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class IncrementalPELoad extends Configured implements Tool {
/**
* Simple mapper that makes KeyValue output.
*/
public static class RandomKVGeneratingMapper
extends Mapper<NullWritable, NullWritable, ImmutableBytesWritable, KeyValue> {
private static final int ROWSPERSPLIT = 1024;
private static final byte[][] FAMILIES
= {Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-A")),
Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-B"))};
private int keyLength;
private static final int KEYLEN_DEFAULT = 10;
private static final String KEYLEN_CONF = "randomkv.key.length";
private int valLength;
private static final int VALLEN_DEFAULT = 10;
private static final String VALLEN_CONF = "randomkv.val.length";
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
super.setup(context);
Configuration conf = context.getConfiguration();
keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT);
valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT);
}
protected void map(NullWritable n1, NullWritable n2,
Mapper<NullWritable, NullWritable,
ImmutableBytesWritable, KeyValue>.Context context)
throws java.io.IOException, InterruptedException {
byte keyBytes[] = new byte[keyLength];
byte valBytes[] = new byte[valLength];
int taskId = context.getTaskAttemptID().getTaskID().getId();
assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
Random random = new Random();
for (int i = 0; i < ROWSPERSPLIT; i++) {
random.nextBytes(keyBytes);
// Ensure that unique tasks generate unique keys
keyBytes[keyLength - 1] = (byte) (taskId & 0xFF);
random.nextBytes(valBytes);
ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes);
for (byte[] family : FAMILIES) {
KeyValue kv = new KeyValue(keyBytes, family,
PerformanceEvaluation.QUALIFIER_NAME, valBytes);
context.write(key, kv);
}
}
}
}
public int run(String[] args) throws Exception {
Configuration conf = getConf();
Job job = new Job(conf, "testMRIncrementalLoad");
job.setJarByClass(IncrementalPELoad.class);
job.setInputFormatClass(NMapInputFormat.class);
job.setMapperClass(IncrementalPELoad.RandomKVGeneratingMapper.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(KeyValue.class);
HTable table = new HTable(conf, Bytes.toBytes(args[0]));
HFileOutputFormat.configureIncrementalLoad(job, table);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new IncrementalPELoad(), args);
System.exit(exitCode);
}
}