blob: 205cae4119287dc4cb68efc600d2df9efe560c67 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hcatalog.hbase;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.mapreduce.PutSortReducer;
import org.apache.hadoop.hbase.mapreduce.hadoopbackport.TotalOrderPartitioner;
import java.io.IOException;
import java.net.URI;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.hadoop.hbase.mapreduce.hadoopbackport.TotalOrderPartitioner.DEFAULT_PATH;
/**
* MapReduce job which reads a series of Puts stored in a sequence file
* and imports the data into HBase. It needs to create the necessary HBase
* regions using HFileOutputFormat and then notify the correct region servers
* to doBulkLoad(). This will be used After an MR job has written the SequenceFile
* and data needs to be bulk loaded onto HBase.
*/
class ImportSequenceFile {
private final static Logger LOG = LoggerFactory.getLogger(ImportSequenceFile.class);
private final static String NAME = "HCatImportSequenceFile";
private final static String IMPORTER_WORK_DIR = "_IMPORTER_MR_WORK_DIR";
private static class SequenceFileImporter extends Mapper<ImmutableBytesWritable, Put, ImmutableBytesWritable, Put> {
@Override
public void map(ImmutableBytesWritable rowKey, Put value,
Context context)
throws IOException {
try {
context.write(new ImmutableBytesWritable(value.getRow()), value);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private static class ImporterOutputFormat extends HFileOutputFormat {
@Override
public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException {
final OutputCommitter baseOutputCommitter = super.getOutputCommitter(context);
return new OutputCommitter() {
@Override
public void setupJob(JobContext jobContext) throws IOException {
baseOutputCommitter.setupJob(jobContext);
}
@Override
public void setupTask(TaskAttemptContext taskContext) throws IOException {
baseOutputCommitter.setupTask(taskContext);
}
@Override
public boolean needsTaskCommit(TaskAttemptContext taskContext) throws IOException {
return baseOutputCommitter.needsTaskCommit(taskContext);
}
@Override
public void commitTask(TaskAttemptContext taskContext) throws IOException {
baseOutputCommitter.commitTask(taskContext);
}
@Override
public void abortTask(TaskAttemptContext taskContext) throws IOException {
baseOutputCommitter.abortTask(taskContext);
}
@Override
public void abortJob(JobContext jobContext, JobStatus.State state) throws IOException {
try {
baseOutputCommitter.abortJob(jobContext, state);
} finally {
cleanupScratch(jobContext);
}
}
@Override
public void commitJob(JobContext jobContext) throws IOException {
try {
baseOutputCommitter.commitJob(jobContext);
Configuration conf = jobContext.getConfiguration();
try {
//import hfiles
new LoadIncrementalHFiles(conf)
.doBulkLoad(HFileOutputFormat.getOutputPath(jobContext),
new HTable(conf,
conf.get(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY)));
} catch (Exception e) {
throw new IOException("BulkLoad failed.", e);
}
} finally {
cleanupScratch(jobContext);
}
}
@Override
public void cleanupJob(JobContext context) throws IOException {
try {
baseOutputCommitter.cleanupJob(context);
} finally {
cleanupScratch(context);
}
}
private void cleanupScratch(JobContext context) throws IOException {
FileSystem fs = FileSystem.get(context.getConfiguration());
fs.delete(HFileOutputFormat.getOutputPath(context), true);
}
};
}
}
private static Job createSubmittableJob(Configuration conf, String tableName, Path inputDir, Path scratchDir, boolean localMode)
throws IOException {
Job job = new Job(conf, NAME + "_" + tableName);
job.setJarByClass(SequenceFileImporter.class);
FileInputFormat.setInputPaths(job, inputDir);
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setMapperClass(SequenceFileImporter.class);
HTable table = new HTable(conf, tableName);
job.setReducerClass(PutSortReducer.class);
FileOutputFormat.setOutputPath(job, scratchDir);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
HFileOutputFormat.configureIncrementalLoad(job, table);
//override OutputFormatClass with our own so we can include cleanup in the committer
job.setOutputFormatClass(ImporterOutputFormat.class);
//local mode doesn't support symbolic links so we have to manually set the actual path
if (localMode) {
String partitionFile = null;
for (URI uri : DistributedCache.getCacheFiles(job.getConfiguration())) {
if (DEFAULT_PATH.equals(uri.getFragment())) {
partitionFile = uri.toString();
break;
}
}
partitionFile = partitionFile.substring(0, partitionFile.lastIndexOf("#"));
job.getConfiguration().set(TotalOrderPartitioner.PARTITIONER_PATH, partitionFile.toString());
}
return job;
}
/**
* Method to run the Importer MapReduce Job. Normally will be called by another MR job
* during OutputCommitter.commitJob().
* @param parentContext JobContext of the parent job
* @param tableName name of table to bulk load data into
* @param InputDir path of SequenceFile formatted data to read
* @param scratchDir temporary path for the Importer MR job to build the HFiles which will be imported
* @return
*/
static boolean runJob(JobContext parentContext, String tableName, Path InputDir, Path scratchDir) {
Configuration parentConf = parentContext.getConfiguration();
Configuration conf = new Configuration();
for (Map.Entry<String, String> el : parentConf) {
if (el.getKey().startsWith("hbase."))
conf.set(el.getKey(), el.getValue());
if (el.getKey().startsWith("mapred.cache.archives"))
conf.set(el.getKey(), el.getValue());
}
//Inherit jar dependencies added to distributed cache loaded by parent job
conf.set("mapred.job.classpath.archives", parentConf.get("mapred.job.classpath.archives", ""));
conf.set("mapreduce.job.cache.archives.visibilities", parentConf.get("mapreduce.job.cache.archives.visibilities", ""));
//Temporary fix until hbase security is ready
//We need the written HFile to be world readable so
//hbase regionserver user has the privileges to perform a hdfs move
if (parentConf.getBoolean("hadoop.security.authorization", false)) {
FsPermission.setUMask(conf, FsPermission.valueOf("----------"));
}
conf.set(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY, tableName);
conf.setBoolean(JobContext.JOB_CANCEL_DELEGATION_TOKEN, false);
boolean localMode = "local".equals(conf.get("mapred.job.tracker"));
boolean success = false;
try {
FileSystem fs = FileSystem.get(parentConf);
Path workDir = new Path(new Job(parentConf).getWorkingDirectory(), IMPORTER_WORK_DIR);
if (!fs.mkdirs(workDir))
throw new IOException("Importer work directory already exists: " + workDir);
Job job = createSubmittableJob(conf, tableName, InputDir, scratchDir, localMode);
job.setWorkingDirectory(workDir);
job.getCredentials().addAll(parentContext.getCredentials());
success = job.waitForCompletion(true);
fs.delete(workDir, true);
//We only cleanup on success because failure might've been caused by existence of target directory
if (localMode && success) {
new ImporterOutputFormat().getOutputCommitter(org.apache.hadoop.mapred.HCatMapRedUtil.createTaskAttemptContext(conf, new TaskAttemptID())).commitJob(job);
}
} catch (InterruptedException e) {
LOG.error("ImportSequenceFile Failed", e);
} catch (ClassNotFoundException e) {
LOG.error("ImportSequenceFile Failed", e);
} catch (IOException e) {
LOG.error("ImportSequenceFile Failed", e);
}
return success;
}
}