blob: 387518516498da254ed3e22b37027fca0420456b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hcatalog.hbase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hive.cli.CliSessionState;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hcatalog.cli.HCatDriver;
import org.apache.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer;
import org.apache.hcatalog.common.HCatConstants;
import org.apache.hcatalog.common.HCatUtil;
import org.apache.hcatalog.data.DefaultHCatRecord;
import org.apache.hcatalog.data.HCatRecord;
import org.apache.hcatalog.data.schema.HCatSchema;
import org.apache.hcatalog.hbase.HBaseBulkOutputFormat.HBaseBulkOutputCommitter;
import org.apache.hcatalog.hbase.TestHBaseDirectOutputFormat.MapReadAbortedTransaction;
import org.apache.hcatalog.hbase.TestHBaseDirectOutputFormat.MapWriteAbortTransaction;
import org.apache.hcatalog.hbase.snapshot.FamilyRevision;
import org.apache.hcatalog.hbase.snapshot.RevisionManager;
import org.apache.hcatalog.hbase.snapshot.RevisionManagerConfiguration;
import org.apache.hcatalog.hbase.snapshot.TableSnapshot;
import org.apache.hcatalog.hbase.snapshot.Transaction;
import org.apache.hcatalog.mapreduce.HCatInputFormat;
import org.apache.hcatalog.mapreduce.HCatOutputFormat;
import org.apache.hcatalog.mapreduce.InputJobInfo;
import org.apache.hcatalog.mapreduce.OutputJobInfo;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
/**
* Tests components of HBaseHCatStorageHandler using ManyMiniCluster.
* Including ImprtSequenceFile and HBaseBulkOutputFormat
*/
public class TestHBaseBulkOutputFormat extends SkeletonHBaseTest {
private final static Logger LOG = LoggerFactory.getLogger(TestHBaseBulkOutputFormat.class);
private final HiveConf allConf;
private final HCatDriver hcatDriver;
public TestHBaseBulkOutputFormat() {
allConf = getHiveConf();
allConf.set(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK.varname,
HCatSemanticAnalyzer.class.getName());
allConf.set(HiveConf.ConfVars.HADOOPFS.varname, getFileSystem().getUri().toString());
allConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, new Path(getTestDir(), "warehouse").toString());
//Add hbase properties
for (Map.Entry<String, String> el : getHbaseConf())
allConf.set(el.getKey(), el.getValue());
for (Map.Entry<String, String> el : getJobConf())
allConf.set(el.getKey(), el.getValue());
HBaseConfiguration.merge(
allConf,
RevisionManagerConfiguration.create());
SessionState.start(new CliSessionState(allConf));
hcatDriver = new HCatDriver();
}
public static class MapWriteOldMapper implements org.apache.hadoop.mapred.Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
@Override
public void close() throws IOException {
}
@Override
public void configure(JobConf job) {
}
@Override
public void map(LongWritable key, Text value,
OutputCollector<ImmutableBytesWritable, Put> output,
Reporter reporter) throws IOException {
String vals[] = value.toString().split(",");
Put put = new Put(Bytes.toBytes(vals[0]));
for (int i = 1; i < vals.length; i++) {
String pair[] = vals[i].split(":");
put.add(Bytes.toBytes("my_family"),
Bytes.toBytes(pair[0]),
Bytes.toBytes(pair[1]));
}
output.collect(new ImmutableBytesWritable(Bytes.toBytes(vals[0])), put);
}
}
public static class MapWrite extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String vals[] = value.toString().split(",");
Put put = new Put(Bytes.toBytes(vals[0]));
for (int i = 1; i < vals.length; i++) {
String pair[] = vals[i].split(":");
put.add(Bytes.toBytes("my_family"),
Bytes.toBytes(pair[0]),
Bytes.toBytes(pair[1]));
}
context.write(new ImmutableBytesWritable(Bytes.toBytes(vals[0])), put);
}
}
public static class MapHCatWrite extends Mapper<LongWritable, Text, BytesWritable, HCatRecord> {
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
OutputJobInfo jobInfo = (OutputJobInfo) HCatUtil.deserialize(context.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_INFO));
HCatRecord record = new DefaultHCatRecord(3);
HCatSchema schema = jobInfo.getOutputSchema();
String vals[] = value.toString().split(",");
record.setInteger("key", schema, Integer.parseInt(vals[0]));
for (int i = 1; i < vals.length; i++) {
String pair[] = vals[i].split(":");
record.set(pair[0], schema, pair[1]);
}
context.write(null, record);
}
}
@Test
public void hbaseBulkOutputFormatTest() throws IOException, ClassNotFoundException, InterruptedException {
String testName = "hbaseBulkOutputFormatTest";
Path methodTestDir = new Path(getTestDir(), testName);
LOG.info("starting: " + testName);
String tableName = newTableName(testName).toLowerCase();
String familyName = "my_family";
byte[] familyNameBytes = Bytes.toBytes(familyName);
//include hbase config in conf file
Configuration conf = new Configuration(allConf);
//create table
conf.set(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY, tableName);
createTable(tableName, new String[]{familyName});
String data[] = {"1,english:one,spanish:uno",
"2,english:two,spanish:dos",
"3,english:three,spanish:tres"};
// input/output settings
Path inputPath = new Path(methodTestDir, "mr_input");
FSDataOutputStream os = getFileSystem().create(new Path(inputPath, "inputFile.txt"));
for (String line : data)
os.write(Bytes.toBytes(line + "\n"));
os.close();
Path interPath = new Path(methodTestDir, "inter");
//create job
JobConf job = new JobConf(conf);
job.setWorkingDirectory(new Path(methodTestDir, "mr_work"));
job.setJarByClass(this.getClass());
job.setMapperClass(MapWriteOldMapper.class);
job.setInputFormat(org.apache.hadoop.mapred.TextInputFormat.class);
org.apache.hadoop.mapred.TextInputFormat.setInputPaths(job, inputPath);
job.setOutputFormat(HBaseBulkOutputFormat.class);
org.apache.hadoop.mapred.SequenceFileOutputFormat.setOutputPath(job, interPath);
job.setOutputCommitter(HBaseBulkOutputCommitter.class);
//manually create transaction
RevisionManager rm = HBaseRevisionManagerUtil.getOpenedRevisionManager(conf);
try {
OutputJobInfo outputJobInfo = OutputJobInfo.create("default", tableName, null);
Transaction txn = rm.beginWriteTransaction(tableName, Arrays.asList(familyName));
outputJobInfo.getProperties().setProperty(HBaseConstants.PROPERTY_WRITE_TXN_KEY,
HCatUtil.serialize(txn));
job.set(HCatConstants.HCAT_KEY_OUTPUT_INFO,
HCatUtil.serialize(outputJobInfo));
} finally {
rm.close();
}
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(HCatRecord.class);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(HCatRecord.class);
job.setNumReduceTasks(0);
RunningJob runJob = JobClient.runJob(job);
runJob.waitForCompletion();
assertTrue(runJob.isSuccessful());
//verify
HTable table = new HTable(conf, tableName);
Scan scan = new Scan();
scan.addFamily(familyNameBytes);
ResultScanner scanner = table.getScanner(scan);
int index = 0;
for (Result result : scanner) {
String vals[] = data[index].toString().split(",");
for (int i = 1; i < vals.length; i++) {
String pair[] = vals[i].split(":");
assertTrue(result.containsColumn(familyNameBytes, Bytes.toBytes(pair[0])));
assertEquals(pair[1], Bytes.toString(result.getValue(familyNameBytes, Bytes.toBytes(pair[0]))));
}
index++;
}
//test if load count is the same
assertEquals(data.length, index);
//test if scratch directory was erased
assertFalse(FileSystem.get(job).exists(interPath));
}
@Test
public void importSequenceFileTest() throws IOException, ClassNotFoundException, InterruptedException {
String testName = "importSequenceFileTest";
Path methodTestDir = new Path(getTestDir(), testName);
LOG.info("starting: " + testName);
String tableName = newTableName(testName).toLowerCase();
String familyName = "my_family";
byte[] familyNameBytes = Bytes.toBytes(familyName);
//include hbase config in conf file
Configuration conf = new Configuration(allConf);
//create table
createTable(tableName, new String[]{familyName});
String data[] = {"1,english:one,spanish:uno",
"2,english:two,spanish:dos",
"3,english:three,spanish:tres"};
// input/output settings
Path inputPath = new Path(methodTestDir, "mr_input");
getFileSystem().mkdirs(inputPath);
FSDataOutputStream os = getFileSystem().create(new Path(inputPath, "inputFile.txt"));
for (String line : data)
os.write(Bytes.toBytes(line + "\n"));
os.close();
Path interPath = new Path(methodTestDir, "inter");
Path scratchPath = new Path(methodTestDir, "scratch");
//create job
Job job = new Job(conf, testName);
job.setWorkingDirectory(new Path(methodTestDir, "mr_work"));
job.setJarByClass(this.getClass());
job.setMapperClass(MapWrite.class);
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.setInputPaths(job, inputPath);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
SequenceFileOutputFormat.setOutputPath(job, interPath);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Put.class);
job.setNumReduceTasks(0);
assertTrue(job.waitForCompletion(true));
job = new Job(new Configuration(allConf), testName + "_importer");
assertTrue(ImportSequenceFile.runJob(job, tableName, interPath, scratchPath));
//verify
HTable table = new HTable(conf, tableName);
Scan scan = new Scan();
scan.addFamily(familyNameBytes);
ResultScanner scanner = table.getScanner(scan);
int index = 0;
for (Result result : scanner) {
String vals[] = data[index].toString().split(",");
for (int i = 1; i < vals.length; i++) {
String pair[] = vals[i].split(":");
assertTrue(result.containsColumn(familyNameBytes, Bytes.toBytes(pair[0])));
assertEquals(pair[1], Bytes.toString(result.getValue(familyNameBytes, Bytes.toBytes(pair[0]))));
}
index++;
}
//test if load count is the same
assertEquals(data.length, index);
//test if scratch directory was erased
assertFalse(FileSystem.get(job.getConfiguration()).exists(scratchPath));
}
@Test
public void bulkModeHCatOutputFormatTest() throws Exception {
String testName = "bulkModeHCatOutputFormatTest";
Path methodTestDir = new Path(getTestDir(), testName);
LOG.info("starting: " + testName);
String databaseName = testName.toLowerCase();
String dbDir = new Path(methodTestDir, "DB_" + testName).toString();
String tableName = newTableName(testName).toLowerCase();
String familyName = "my_family";
byte[] familyNameBytes = Bytes.toBytes(familyName);
//include hbase config in conf file
Configuration conf = new Configuration(allConf);
conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, HCatUtil.serialize(allConf.getAllProperties()));
String dbquery = "CREATE DATABASE IF NOT EXISTS " + databaseName + " LOCATION '" + dbDir + "'";
String tableQuery = "CREATE TABLE " + databaseName + "." + tableName +
"(key int, english string, spanish string) STORED BY " +
"'org.apache.hcatalog.hbase.HBaseHCatStorageHandler'" +
"TBLPROPERTIES ('" + HBaseConstants.PROPERTY_BULK_OUTPUT_MODE_KEY + "'='true'," +
"'hbase.columns.mapping'=':key," + familyName + ":english," + familyName + ":spanish')";
assertEquals(0, hcatDriver.run(dbquery).getResponseCode());
assertEquals(0, hcatDriver.run(tableQuery).getResponseCode());
String data[] = {"1,english:ONE,spanish:UNO",
"2,english:TWO,spanish:DOS",
"3,english:THREE,spanish:TRES"};
// input/output settings
Path inputPath = new Path(methodTestDir, "mr_input");
getFileSystem().mkdirs(inputPath);
//create multiple files so we can test with multiple mappers
for (int i = 0; i < data.length; i++) {
FSDataOutputStream os = getFileSystem().create(new Path(inputPath, "inputFile" + i + ".txt"));
os.write(Bytes.toBytes(data[i] + "\n"));
os.close();
}
//create job
Job job = new Job(conf, testName);
job.setWorkingDirectory(new Path(methodTestDir, "mr_work"));
job.setJarByClass(this.getClass());
job.setMapperClass(MapHCatWrite.class);
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.setInputPaths(job, inputPath);
job.setOutputFormatClass(HCatOutputFormat.class);
OutputJobInfo outputJobInfo = OutputJobInfo.create(databaseName, tableName, null);
HCatOutputFormat.setOutput(job, outputJobInfo);
job.setMapOutputKeyClass(BytesWritable.class);
job.setMapOutputValueClass(HCatRecord.class);
job.setOutputKeyClass(BytesWritable.class);
job.setOutputValueClass(HCatRecord.class);
job.setNumReduceTasks(0);
assertTrue(job.waitForCompletion(true));
RevisionManager rm = HBaseRevisionManagerUtil.getOpenedRevisionManager(conf);
try {
TableSnapshot snapshot = rm.createSnapshot(databaseName + "." + tableName);
for (String el : snapshot.getColumnFamilies()) {
assertEquals(1, snapshot.getRevision(el));
}
} finally {
rm.close();
}
//verify
HTable table = new HTable(conf, databaseName + "." + tableName);
Scan scan = new Scan();
scan.addFamily(familyNameBytes);
ResultScanner scanner = table.getScanner(scan);
int index = 0;
for (Result result : scanner) {
String vals[] = data[index].toString().split(",");
for (int i = 1; i < vals.length; i++) {
String pair[] = vals[i].split(":");
assertTrue(result.containsColumn(familyNameBytes, Bytes.toBytes(pair[0])));
assertEquals(pair[1], Bytes.toString(result.getValue(familyNameBytes, Bytes.toBytes(pair[0]))));
assertEquals(1l, result.getColumn(familyNameBytes, Bytes.toBytes(pair[0])).get(0).getTimestamp());
}
index++;
}
//test if load count is the same
assertEquals(data.length, index);
}
@Test
public void bulkModeHCatOutputFormatTestWithDefaultDB() throws Exception {
String testName = "bulkModeHCatOutputFormatTestWithDefaultDB";
Path methodTestDir = new Path(getTestDir(), testName);
String databaseName = "default";
String dbDir = new Path(methodTestDir, "DB_" + testName).toString();
String tableName = newTableName(testName).toLowerCase();
String familyName = "my_family";
byte[] familyNameBytes = Bytes.toBytes(familyName);
//include hbase config in conf file
Configuration conf = new Configuration(allConf);
conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, HCatUtil.serialize(allConf.getAllProperties()));
String dbquery = "CREATE DATABASE IF NOT EXISTS " + databaseName + " LOCATION '" + dbDir + "'";
String tableQuery = "CREATE TABLE " + databaseName + "." + tableName +
"(key int, english string, spanish string) STORED BY " +
"'org.apache.hcatalog.hbase.HBaseHCatStorageHandler'" +
"TBLPROPERTIES ('" + HBaseConstants.PROPERTY_BULK_OUTPUT_MODE_KEY + "'='true'," +
"'hbase.columns.mapping'=':key," + familyName + ":english," + familyName + ":spanish')";
assertEquals(0, hcatDriver.run(dbquery).getResponseCode());
assertEquals(0, hcatDriver.run(tableQuery).getResponseCode());
String data[] = {"1,english:ONE,spanish:UNO",
"2,english:TWO,spanish:DOS",
"3,english:THREE,spanish:TRES"};
// input/output settings
Path inputPath = new Path(methodTestDir, "mr_input");
getFileSystem().mkdirs(inputPath);
FSDataOutputStream os = getFileSystem().create(new Path(inputPath, "inputFile.txt"));
for (String line : data)
os.write(Bytes.toBytes(line + "\n"));
os.close();
//create job
Job job = new Job(conf, testName);
job.setWorkingDirectory(new Path(methodTestDir, "mr_work"));
job.setJarByClass(this.getClass());
job.setMapperClass(MapHCatWrite.class);
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.setInputPaths(job, inputPath);
job.setOutputFormatClass(HCatOutputFormat.class);
OutputJobInfo outputJobInfo = OutputJobInfo.create(databaseName, tableName, null);
HCatOutputFormat.setOutput(job, outputJobInfo);
job.setMapOutputKeyClass(BytesWritable.class);
job.setMapOutputValueClass(HCatRecord.class);
job.setOutputKeyClass(BytesWritable.class);
job.setOutputValueClass(HCatRecord.class);
job.setNumReduceTasks(0);
assertTrue(job.waitForCompletion(true));
//verify
HTable table = new HTable(conf, tableName);
Scan scan = new Scan();
scan.addFamily(familyNameBytes);
ResultScanner scanner = table.getScanner(scan);
int index = 0;
for (Result result : scanner) {
String vals[] = data[index].toString().split(",");
for (int i = 1; i < vals.length; i++) {
String pair[] = vals[i].split(":");
assertTrue(result.containsColumn(familyNameBytes, Bytes.toBytes(pair[0])));
assertEquals(pair[1], Bytes.toString(result.getValue(familyNameBytes, Bytes.toBytes(pair[0]))));
}
index++;
}
//test if load count is the same
assertEquals(data.length, index);
}
@Test
public void bulkModeAbortTest() throws Exception {
String testName = "bulkModeAbortTest";
Path methodTestDir = new Path(getTestDir(), testName);
String databaseName = testName.toLowerCase();
String dbDir = new Path(methodTestDir, "DB_" + testName).toString();
String tableName = newTableName(testName).toLowerCase();
String familyName = "my_family";
// include hbase config in conf file
Configuration conf = new Configuration(allConf);
conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, HCatUtil.serialize(allConf.getAllProperties()));
String dbquery = "CREATE DATABASE IF NOT EXISTS " + databaseName + " LOCATION '" + dbDir
+ "'";
String tableQuery = "CREATE TABLE " + databaseName + "." + tableName +
"(key int, english string, spanish string) STORED BY " +
"'org.apache.hcatalog.hbase.HBaseHCatStorageHandler'" +
"TBLPROPERTIES ('" + HBaseConstants.PROPERTY_BULK_OUTPUT_MODE_KEY + "'='true'," +
"'hbase.columns.mapping'=':key," + familyName + ":english," + familyName
+ ":spanish')";
assertEquals(0, hcatDriver.run(dbquery).getResponseCode());
assertEquals(0, hcatDriver.run(tableQuery).getResponseCode());
String data[] = {"1,english:ONE,spanish:UNO",
"2,english:TWO,spanish:DOS",
"3,english:THREE,spanish:TRES"};
Path inputPath = new Path(methodTestDir, "mr_input");
getFileSystem().mkdirs(inputPath);
// create multiple files so we can test with multiple mappers
for (int i = 0; i < data.length; i++) {
FSDataOutputStream os = getFileSystem().create(
new Path(inputPath, "inputFile" + i + ".txt"));
os.write(Bytes.toBytes(data[i] + "\n"));
os.close();
}
Path workingDir = new Path(methodTestDir, "mr_abort");
OutputJobInfo outputJobInfo = OutputJobInfo.create(databaseName,
tableName, null);
Job job = configureJob(testName,
conf, workingDir, MapWriteAbortTransaction.class,
outputJobInfo, inputPath);
assertFalse(job.waitForCompletion(true));
// verify that revision manager has it as aborted transaction
RevisionManager rm = HBaseRevisionManagerUtil.getOpenedRevisionManager(conf);
try {
TableSnapshot snapshot = rm.createSnapshot(databaseName + "." + tableName);
for (String family : snapshot.getColumnFamilies()) {
assertEquals(1, snapshot.getRevision(family));
List<FamilyRevision> abortedWriteTransactions = rm.getAbortedWriteTransactions(
databaseName + "." + tableName, family);
assertEquals(1, abortedWriteTransactions.size());
assertEquals(1, abortedWriteTransactions.get(0).getRevision());
}
} finally {
rm.close();
}
//verify that hbase does not have any of the records.
//Since records are only written during commitJob,
//hbase should not have any records.
HTable table = new HTable(conf, databaseName + "." + tableName);
Scan scan = new Scan();
scan.addFamily(Bytes.toBytes(familyName));
ResultScanner scanner = table.getScanner(scan);
assertFalse(scanner.iterator().hasNext());
// verify that the storage handler input format returns empty results.
Path outputDir = new Path(getTestDir(),
"mapred/testHBaseTableBulkIgnoreAbortedTransactions");
FileSystem fs = getFileSystem();
if (fs.exists(outputDir)) {
fs.delete(outputDir, true);
}
job = new Job(conf, "hbase-bulk-aborted-transaction");
job.setJarByClass(this.getClass());
job.setMapperClass(MapReadAbortedTransaction.class);
job.setInputFormatClass(HCatInputFormat.class);
InputJobInfo inputJobInfo = InputJobInfo.create(databaseName,
tableName, null);
HCatInputFormat.setInput(job, inputJobInfo);
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, outputDir);
job.setMapOutputKeyClass(BytesWritable.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(BytesWritable.class);
job.setOutputValueClass(Text.class);
job.setNumReduceTasks(0);
assertTrue(job.waitForCompletion(true));
}
private Job configureJob(String jobName, Configuration conf,
Path workingDir, Class<? extends Mapper> mapperClass,
OutputJobInfo outputJobInfo, Path inputPath) throws IOException {
Job job = new Job(conf, jobName);
job.setWorkingDirectory(workingDir);
job.setJarByClass(this.getClass());
job.setMapperClass(mapperClass);
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.setInputPaths(job, inputPath);
job.setOutputFormatClass(HCatOutputFormat.class);
HCatOutputFormat.setOutput(job, outputJobInfo);
job.setMapOutputKeyClass(BytesWritable.class);
job.setMapOutputValueClass(HCatRecord.class);
job.setOutputKeyClass(BytesWritable.class);
job.setOutputValueClass(HCatRecord.class);
job.setNumReduceTasks(0);
return job;
}
}