blob: 84a279d974e598d3d98423619148a2642203ec43 [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapred;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.MutationSerialization;
import org.apache.hadoop.hbase.mapreduce.ResultSerialization;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.security.token.TokenUtil;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputFormat;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
/**
* Utility for {@link TableMap} and {@link TableReduce}
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
@SuppressWarnings({ "rawtypes", "unchecked" })
public class TableMapReduceUtil {
/**
* Use this before submitting a TableMap job. It will
* appropriately set up the JobConf.
*
* @param table The table name to read from.
* @param columns The columns to scan.
* @param mapper The mapper class to use.
* @param outputKeyClass The class of the output key.
* @param outputValueClass The class of the output value.
* @param job The current job configuration to adjust.
*/
public static void initTableMapJob(String table, String columns,
Class<? extends TableMap> mapper,
Class<?> outputKeyClass,
Class<?> outputValueClass, JobConf job) {
initTableMapJob(table, columns, mapper, outputKeyClass, outputValueClass, job,
true, TableInputFormat.class);
}
public static void initTableMapJob(String table, String columns,
Class<? extends TableMap> mapper,
Class<?> outputKeyClass,
Class<?> outputValueClass, JobConf job, boolean addDependencyJars) {
initTableMapJob(table, columns, mapper, outputKeyClass, outputValueClass, job,
addDependencyJars, TableInputFormat.class);
}
/**
* Use this before submitting a TableMap job. It will
* appropriately set up the JobConf.
*
* @param table The table name to read from.
* @param columns The columns to scan.
* @param mapper The mapper class to use.
* @param outputKeyClass The class of the output key.
* @param outputValueClass The class of the output value.
* @param job The current job configuration to adjust.
* @param addDependencyJars upload HBase jars and jars for any of the configured
* job classes via the distributed cache (tmpjars).
*/
public static void initTableMapJob(String table, String columns,
Class<? extends TableMap> mapper,
Class<?> outputKeyClass,
Class<?> outputValueClass, JobConf job, boolean addDependencyJars,
Class<? extends InputFormat> inputFormat) {
job.setInputFormat(inputFormat);
job.setMapOutputValueClass(outputValueClass);
job.setMapOutputKeyClass(outputKeyClass);
job.setMapperClass(mapper);
job.setStrings("io.serializations", job.get("io.serializations"),
MutationSerialization.class.getName(), ResultSerialization.class.getName());
FileInputFormat.addInputPaths(job, table);
job.set(TableInputFormat.COLUMN_LIST, columns);
if (addDependencyJars) {
try {
addDependencyJars(job);
} catch (IOException e) {
e.printStackTrace();
}
}
try {
initCredentials(job);
} catch (IOException ioe) {
// just spit out the stack trace? really?
ioe.printStackTrace();
}
}
/**
* Sets up the job for reading from one or more multiple table snapshots, with one or more scans
* per snapshot.
* It bypasses hbase servers and read directly from snapshot files.
*
* @param snapshotScans map of snapshot name to scans on that snapshot.
* @param mapper The mapper class to use.
* @param outputKeyClass The class of the output key.
* @param outputValueClass The class of the output value.
* @param job The current job to adjust. Make sure the passed job is
* carrying all necessary HBase configuration.
* @param addDependencyJars upload HBase jars and jars for any of the configured
* job classes via the distributed cache (tmpjars).
*/
public static void initMultiTableSnapshotMapperJob(Map<String, Collection<Scan>> snapshotScans,
Class<? extends TableMap> mapper, Class<?> outputKeyClass, Class<?> outputValueClass,
JobConf job, boolean addDependencyJars, Path tmpRestoreDir) throws IOException {
MultiTableSnapshotInputFormat.setInput(job, snapshotScans, tmpRestoreDir);
job.setInputFormat(MultiTableSnapshotInputFormat.class);
if (outputValueClass != null) {
job.setMapOutputValueClass(outputValueClass);
}
if (outputKeyClass != null) {
job.setMapOutputKeyClass(outputKeyClass);
}
job.setMapperClass(mapper);
if (addDependencyJars) {
addDependencyJars(job);
}
org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.resetCacheConfig(job);
}
/**
* Sets up the job for reading from a table snapshot. It bypasses hbase servers
* and read directly from snapshot files.
*
* @param snapshotName The name of the snapshot (of a table) to read from.
* @param columns The columns to scan.
* @param mapper The mapper class to use.
* @param outputKeyClass The class of the output key.
* @param outputValueClass The class of the output value.
* @param job The current job to adjust. Make sure the passed job is
* carrying all necessary HBase configuration.
* @param addDependencyJars upload HBase jars and jars for any of the configured
* job classes via the distributed cache (tmpjars).
* @param tmpRestoreDir a temporary directory to copy the snapshot files into. Current user should
* have write permissions to this directory, and this should not be a subdirectory of rootdir.
* After the job is finished, restore directory can be deleted.
* @throws IOException When setting up the details fails.
* @see TableSnapshotInputFormat
*/
public static void initTableSnapshotMapJob(String snapshotName, String columns,
Class<? extends TableMap> mapper,
Class<?> outputKeyClass,
Class<?> outputValueClass, JobConf job,
boolean addDependencyJars, Path tmpRestoreDir)
throws IOException {
TableSnapshotInputFormat.setInput(job, snapshotName, tmpRestoreDir);
initTableMapJob(snapshotName, columns, mapper, outputKeyClass, outputValueClass, job,
addDependencyJars, TableSnapshotInputFormat.class);
org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.resetCacheConfig(job);
}
/**
* Use this before submitting a TableReduce job. It will
* appropriately set up the JobConf.
*
* @param table The output table.
* @param reducer The reducer class to use.
* @param job The current job configuration to adjust.
* @throws IOException When determining the region count fails.
*/
public static void initTableReduceJob(String table,
Class<? extends TableReduce> reducer, JobConf job)
throws IOException {
initTableReduceJob(table, reducer, job, null);
}
/**
* Use this before submitting a TableReduce job. It will
* appropriately set up the JobConf.
*
* @param table The output table.
* @param reducer The reducer class to use.
* @param job The current job configuration to adjust.
* @param partitioner Partitioner to use. Pass <code>null</code> to use
* default partitioner.
* @throws IOException When determining the region count fails.
*/
public static void initTableReduceJob(String table,
Class<? extends TableReduce> reducer, JobConf job, Class partitioner)
throws IOException {
initTableReduceJob(table, reducer, job, partitioner, true);
}
/**
* Use this before submitting a TableReduce job. It will
* appropriately set up the JobConf.
*
* @param table The output table.
* @param reducer The reducer class to use.
* @param job The current job configuration to adjust.
* @param partitioner Partitioner to use. Pass <code>null</code> to use
* default partitioner.
* @param addDependencyJars upload HBase jars and jars for any of the configured
* job classes via the distributed cache (tmpjars).
* @throws IOException When determining the region count fails.
*/
public static void initTableReduceJob(String table,
Class<? extends TableReduce> reducer, JobConf job, Class partitioner,
boolean addDependencyJars) throws IOException {
job.setOutputFormat(TableOutputFormat.class);
job.setReducerClass(reducer);
job.set(TableOutputFormat.OUTPUT_TABLE, table);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Put.class);
job.setStrings("io.serializations", job.get("io.serializations"),
MutationSerialization.class.getName(), ResultSerialization.class.getName());
if (partitioner == HRegionPartitioner.class) {
job.setPartitionerClass(HRegionPartitioner.class);
int regions =
MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job), TableName.valueOf(table));
if (job.getNumReduceTasks() > regions) {
job.setNumReduceTasks(regions);
}
} else if (partitioner != null) {
job.setPartitionerClass(partitioner);
}
if (addDependencyJars) {
addDependencyJars(job);
}
initCredentials(job);
}
public static void initCredentials(JobConf job) throws IOException {
UserProvider userProvider = UserProvider.instantiate(job);
if (userProvider.isHadoopSecurityEnabled()) {
// propagate delegation related props from launcher job to MR job
if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
job.set("mapreduce.job.credentials.binary", System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
}
}
if (userProvider.isHBaseSecurityEnabled()) {
Connection conn = ConnectionFactory.createConnection(job);
try {
// login the server principal (if using secure Hadoop)
User user = userProvider.getCurrent();
TokenUtil.addTokenForJob(conn, job, user);
} catch (InterruptedException ie) {
ie.printStackTrace();
Thread.currentThread().interrupt();
} finally {
conn.close();
}
}
}
/**
* Ensures that the given number of reduce tasks for the given job
* configuration does not exceed the number of regions for the given table.
*
* @param table The table to get the region count for.
* @param job The current job configuration to adjust.
* @throws IOException When retrieving the table details fails.
*/
// Used by tests.
public static void limitNumReduceTasks(String table, JobConf job)
throws IOException {
int regions =
MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job), TableName.valueOf(table));
if (job.getNumReduceTasks() > regions)
job.setNumReduceTasks(regions);
}
/**
* Ensures that the given number of map tasks for the given job
* configuration does not exceed the number of regions for the given table.
*
* @param table The table to get the region count for.
* @param job The current job configuration to adjust.
* @throws IOException When retrieving the table details fails.
*/
// Used by tests.
public static void limitNumMapTasks(String table, JobConf job)
throws IOException {
int regions =
MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job), TableName.valueOf(table));
if (job.getNumMapTasks() > regions)
job.setNumMapTasks(regions);
}
/**
* Sets the number of reduce tasks for the given job configuration to the
* number of regions the given table has.
*
* @param table The table to get the region count for.
* @param job The current job configuration to adjust.
* @throws IOException When retrieving the table details fails.
*/
public static void setNumReduceTasks(String table, JobConf job)
throws IOException {
job.setNumReduceTasks(MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job),
TableName.valueOf(table)));
}
/**
* Sets the number of map tasks for the given job configuration to the
* number of regions the given table has.
*
* @param table The table to get the region count for.
* @param job The current job configuration to adjust.
* @throws IOException When retrieving the table details fails.
*/
public static void setNumMapTasks(String table, JobConf job)
throws IOException {
job.setNumMapTasks(MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job),
TableName.valueOf(table)));
}
/**
* Sets the number of rows to return and cache with each scanner iteration.
* Higher caching values will enable faster mapreduce jobs at the expense of
* requiring more heap to contain the cached rows.
*
* @param job The current job configuration to adjust.
* @param batchSize The number of rows to return in batch with each scanner
* iteration.
*/
public static void setScannerCaching(JobConf job, int batchSize) {
job.setInt("hbase.client.scanner.caching", batchSize);
}
/**
* @see org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil#addDependencyJars(org.apache.hadoop.mapreduce.Job)
*/
public static void addDependencyJars(JobConf job) throws IOException {
org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addHBaseDependencyJars(job);
org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addDependencyJars(
job,
// when making changes here, consider also mapreduce.TableMapReduceUtil
// pull job classes
job.getMapOutputKeyClass(),
job.getMapOutputValueClass(),
job.getOutputKeyClass(),
job.getOutputValueClass(),
job.getPartitionerClass(),
job.getClass("mapred.input.format.class", TextInputFormat.class, InputFormat.class),
job.getClass("mapred.output.format.class", TextOutputFormat.class, OutputFormat.class),
job.getCombinerClass());
}
}