blob: d28faca70dc75c561bd54e3c3a954ce1c40d5249 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.mapreduce.ClusterMetrics;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.QueueInfo;
import org.apache.hadoop.mapreduce.TaskCompletionEvent;
import org.apache.hadoop.mapreduce.TaskTrackerInfo;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.Cluster.JobTrackerStatus;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.mapreduce.filecache.TaskDistributedCacheManager;
import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.mapreduce.server.jobtracker.State;
import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.security.token.Token;
/** Implements MapReduce locally, in-process, for debugging. */
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class LocalJobRunner implements ClientProtocol {
public static final Log LOG =
LogFactory.getLog(LocalJobRunner.class);
/** The maximum number of map tasks to run in parallel in LocalJobRunner */
public static final String LOCAL_MAX_MAPS =
"mapreduce.local.map.tasks.maximum";
private FileSystem fs;
private HashMap<JobID, Job> jobs = new HashMap<JobID, Job>();
private JobConf conf;
private AtomicInteger map_tasks = new AtomicInteger(0);
private int reduce_tasks = 0;
final Random rand = new Random();
private JobTrackerInstrumentation myMetrics = null;
private static final String jobDir = "localRunner/";
private static final Counters EMPTY_COUNTERS = new Counters();
public long getProtocolVersion(String protocol, long clientVersion) {
return ClientProtocol.versionID;
}
@Override
public ProtocolSignature getProtocolSignature(String protocol,
long clientVersion, int clientMethodsHash) throws IOException {
return ProtocolSignature.getProtocolSignature(
this, protocol, clientVersion, clientMethodsHash);
}
private class Job extends Thread implements TaskUmbilicalProtocol {
// The job directory on the system: JobClient places job configurations here.
// This is analogous to JobTracker's system directory.
private Path systemJobDir;
private Path systemJobFile;
// The job directory for the task. Analagous to a task's job directory.
private Path localJobDir;
private Path localJobFile;
private JobID id;
private JobConf job;
private int numMapTasks;
private float [] partialMapProgress;
private Counters [] mapCounters;
private Counters reduceCounters;
private JobStatus status;
private List<TaskAttemptID> mapIds = Collections.synchronizedList(
new ArrayList<TaskAttemptID>());
private JobProfile profile;
private FileSystem localFs;
boolean killed = false;
private TrackerDistributedCacheManager trackerDistributerdCacheManager;
private TaskDistributedCacheManager taskDistributedCacheManager;
public long getProtocolVersion(String protocol, long clientVersion) {
return TaskUmbilicalProtocol.versionID;
}
@Override
public ProtocolSignature getProtocolSignature(String protocol,
long clientVersion, int clientMethodsHash) throws IOException {
return ProtocolSignature.getProtocolSignature(
this, protocol, clientVersion, clientMethodsHash);
}
public Job(JobID jobid, String jobSubmitDir) throws IOException {
this.systemJobDir = new Path(jobSubmitDir);
this.systemJobFile = new Path(systemJobDir, "job.xml");
this.id = jobid;
JobConf conf = new JobConf(systemJobFile);
this.localFs = FileSystem.getLocal(conf);
this.localJobDir = localFs.makeQualified(conf.getLocalPath(jobDir));
this.localJobFile = new Path(this.localJobDir, id + ".xml");
// Manage the distributed cache. If there are files to be copied,
// this will trigger localFile to be re-written again.
this.trackerDistributerdCacheManager =
new TrackerDistributedCacheManager(conf, new DefaultTaskController());
this.taskDistributedCacheManager =
trackerDistributerdCacheManager.newTaskDistributedCacheManager(conf);
taskDistributedCacheManager.setup(
new LocalDirAllocator(MRConfig.LOCAL_DIR),
new File(systemJobDir.toString()),
"archive", "archive");
if (DistributedCache.getSymlink(conf)) {
// This is not supported largely because,
// for a Child subprocess, the cwd in LocalJobRunner
// is not a fresh slate, but rather the user's working directory.
// This is further complicated because the logic in
// setupWorkDir only creates symlinks if there's a jarfile
// in the configuration.
LOG.warn("LocalJobRunner does not support " +
"symlinking into current working dir.");
}
// Setup the symlinks for the distributed cache.
TaskRunner.setupWorkDir(conf, new File(localJobDir.toUri()).getAbsoluteFile());
// Write out configuration file. Instead of copying it from
// systemJobFile, we re-write it, since setup(), above, may have
// updated it.
OutputStream out = localFs.create(localJobFile);
try {
conf.writeXml(out);
} finally {
out.close();
}
this.job = new JobConf(localJobFile);
// Job (the current object) is a Thread, so we wrap its class loader.
if (!taskDistributedCacheManager.getClassPaths().isEmpty()) {
setContextClassLoader(taskDistributedCacheManager.makeClassLoader(
getContextClassLoader()));
}
profile = new JobProfile(job.getUser(), id, systemJobFile.toString(),
"http://localhost:8080/", job.getJobName());
status = new JobStatus(id, 0.0f, 0.0f, JobStatus.RUNNING,
profile.getUser(), profile.getJobName(), profile.getJobFile(),
profile.getURL().toString());
jobs.put(id, this);
this.start();
}
JobProfile getProfile() {
return profile;
}
/**
* A Runnable instance that handles a map task to be run by an executor.
*/
protected class MapTaskRunnable implements Runnable {
private final int taskId;
private final TaskSplitMetaInfo info;
private final JobID jobId;
private final JobConf localConf;
// This is a reference to a shared object passed in by the
// external context; this delivers state to the reducers regarding
// where to fetch mapper outputs.
private final Map<TaskAttemptID, MapOutputFile> mapOutputFiles;
public volatile Throwable storedException;
public MapTaskRunnable(TaskSplitMetaInfo info, int taskId, JobID jobId,
Map<TaskAttemptID, MapOutputFile> mapOutputFiles) {
this.info = info;
this.taskId = taskId;
this.mapOutputFiles = mapOutputFiles;
this.jobId = jobId;
this.localConf = new JobConf(job);
}
public void run() {
try {
TaskAttemptID mapId = new TaskAttemptID(new TaskID(
jobId, TaskType.MAP, taskId), 0);
LOG.info("Starting task: " + mapId);
mapIds.add(mapId);
MapTask map = new MapTask(systemJobFile.toString(), mapId, taskId,
info.getSplitIndex(), 1);
map.setUser(UserGroupInformation.getCurrentUser().
getShortUserName());
TaskRunner.setupChildMapredLocalDirs(map, localConf);
MapOutputFile mapOutput = new MapOutputFile();
mapOutput.setConf(localConf);
mapOutputFiles.put(mapId, mapOutput);
map.setJobFile(localJobFile.toString());
localConf.setUser(map.getUser());
map.localizeConfiguration(localConf);
map.setConf(localConf);
try {
map_tasks.getAndIncrement();
myMetrics.launchMap(mapId);
map.run(localConf, Job.this);
myMetrics.completeMap(mapId);
} finally {
map_tasks.getAndDecrement();
}
LOG.info("Finishing task: " + mapId);
} catch (Throwable e) {
this.storedException = e;
}
}
}
/**
* Create Runnables to encapsulate map tasks for use by the executor
* service.
* @param taskInfo Info about the map task splits
* @param jobId the job id
* @param mapOutputFiles a mapping from task attempts to output files
* @return a List of Runnables, one per map task.
*/
protected List<MapTaskRunnable> getMapTaskRunnables(
TaskSplitMetaInfo [] taskInfo, JobID jobId,
Map<TaskAttemptID, MapOutputFile> mapOutputFiles) {
int numTasks = 0;
ArrayList<MapTaskRunnable> list = new ArrayList<MapTaskRunnable>();
for (TaskSplitMetaInfo task : taskInfo) {
list.add(new MapTaskRunnable(task, numTasks++, jobId,
mapOutputFiles));
}
return list;
}
/**
* Initialize the counters that will hold partial-progress from
* the various task attempts.
* @param numMaps the number of map tasks in this job.
*/
private synchronized void initCounters(int numMaps) {
// Initialize state trackers for all map tasks.
this.partialMapProgress = new float[numMaps];
this.mapCounters = new Counters[numMaps];
for (int i = 0; i < numMaps; i++) {
this.mapCounters[i] = EMPTY_COUNTERS;
}
this.reduceCounters = EMPTY_COUNTERS;
}
/**
* Creates the executor service used to run map tasks.
*
* @param numMapTasks the total number of map tasks to be run
* @return an ExecutorService instance that handles map tasks
*/
protected ExecutorService createMapExecutor(int numMapTasks) {
// Determine the size of the thread pool to use
int maxMapThreads = job.getInt(LOCAL_MAX_MAPS, 1);
if (maxMapThreads < 1) {
throw new IllegalArgumentException(
"Configured " + LOCAL_MAX_MAPS + " must be >= 1");
}
this.numMapTasks = numMapTasks;
maxMapThreads = Math.min(maxMapThreads, this.numMapTasks);
maxMapThreads = Math.max(maxMapThreads, 1); // In case of no tasks.
initCounters(this.numMapTasks);
LOG.debug("Starting thread pool executor.");
LOG.debug("Max local threads: " + maxMapThreads);
LOG.debug("Map tasks to process: " + this.numMapTasks);
// Create a new executor service to drain the work queue.
ExecutorService executor = Executors.newFixedThreadPool(maxMapThreads);
return executor;
}
@SuppressWarnings("unchecked")
@Override
public void run() {
JobID jobId = profile.getJobID();
JobContext jContext = new JobContextImpl(job, jobId);
OutputCommitter outputCommitter = job.getOutputCommitter();
try {
TaskSplitMetaInfo[] taskSplitMetaInfos =
SplitMetaInfoReader.readSplitMetaInfo(jobId, localFs, conf, systemJobDir);
int numReduceTasks = job.getNumReduceTasks();
if (numReduceTasks > 1 || numReduceTasks < 0) {
// we only allow 0 or 1 reducer in local mode
numReduceTasks = 1;
job.setNumReduceTasks(1);
}
outputCommitter.setupJob(jContext);
status.setSetupProgress(1.0f);
Map<TaskAttemptID, MapOutputFile> mapOutputFiles =
Collections.synchronizedMap(new HashMap<TaskAttemptID, MapOutputFile>());
List<MapTaskRunnable> taskRunnables = getMapTaskRunnables(taskSplitMetaInfos,
jobId, mapOutputFiles);
ExecutorService mapService = createMapExecutor(taskRunnables.size());
// Start populating the executor with work units.
// They may begin running immediately (in other threads).
for (Runnable r : taskRunnables) {
mapService.submit(r);
}
try {
mapService.shutdown(); // Instructs queue to drain.
// Wait for tasks to finish; do not use a time-based timeout.
// (See http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6179024)
LOG.info("Waiting for map tasks");
mapService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException ie) {
// Cancel all threads.
mapService.shutdownNow();
throw ie;
}
LOG.info("Map task executor complete.");
// After waiting for the map tasks to complete, if any of these
// have thrown an exception, rethrow it now in the main thread context.
for (MapTaskRunnable r : taskRunnables) {
if (r.storedException != null) {
throw new Exception(r.storedException);
}
}
TaskAttemptID reduceId =
new TaskAttemptID(new TaskID(jobId, TaskType.REDUCE, 0), 0);
try {
if (numReduceTasks > 0) {
ReduceTask reduce = new ReduceTask(systemJobFile.toString(),
reduceId, 0, mapIds.size(), 1);
reduce.setUser(UserGroupInformation.getCurrentUser().
getShortUserName());
JobConf localConf = new JobConf(job);
localConf.set("mapreduce.jobtracker.address", "local");
TaskRunner.setupChildMapredLocalDirs(reduce, localConf);
// move map output to reduce input
for (int i = 0; i < mapIds.size(); i++) {
if (!this.isInterrupted()) {
TaskAttemptID mapId = mapIds.get(i);
Path mapOut = mapOutputFiles.get(mapId).getOutputFile();
MapOutputFile localOutputFile = new MapOutputFile();
localOutputFile.setConf(localConf);
Path reduceIn =
localOutputFile.getInputFileForWrite(mapId.getTaskID(),
localFs.getFileStatus(mapOut).getLen());
if (!localFs.mkdirs(reduceIn.getParent())) {
throw new IOException("Mkdirs failed to create "
+ reduceIn.getParent().toString());
}
if (!localFs.rename(mapOut, reduceIn))
throw new IOException("Couldn't rename " + mapOut);
} else {
throw new InterruptedException();
}
}
if (!this.isInterrupted()) {
reduce.setJobFile(localJobFile.toString());
localConf.setUser(reduce.getUser());
reduce.localizeConfiguration(localConf);
reduce.setConf(localConf);
reduce_tasks += 1;
myMetrics.launchReduce(reduce.getTaskID());
reduce.run(localConf, this);
myMetrics.completeReduce(reduce.getTaskID());
reduce_tasks -= 1;
} else {
throw new InterruptedException();
}
}
} finally {
for (MapOutputFile output : mapOutputFiles.values()) {
output.removeAll();
}
}
// delete the temporary directory in output directory
outputCommitter.commitJob(jContext);
status.setCleanupProgress(1.0f);
if (killed) {
this.status.setRunState(JobStatus.KILLED);
} else {
this.status.setRunState(JobStatus.SUCCEEDED);
}
JobEndNotifier.localRunnerNotification(job, status);
} catch (Throwable t) {
try {
outputCommitter.abortJob(jContext,
org.apache.hadoop.mapreduce.JobStatus.State.FAILED);
} catch (IOException ioe) {
LOG.info("Error cleaning up job:" + id);
}
status.setCleanupProgress(1.0f);
if (killed) {
this.status.setRunState(JobStatus.KILLED);
} else {
this.status.setRunState(JobStatus.FAILED);
}
LOG.warn(id, t);
JobEndNotifier.localRunnerNotification(job, status);
} finally {
try {
fs.delete(systemJobFile.getParent(), true); // delete submit dir
localFs.delete(localJobFile, true); // delete local copy
// Cleanup distributed cache
taskDistributedCacheManager.release();
trackerDistributerdCacheManager.purgeCache();
} catch (IOException e) {
LOG.warn("Error cleaning up "+id+": "+e);
}
}
}
// TaskUmbilicalProtocol methods
public JvmTask getTask(JvmContext context) { return null; }
public synchronized boolean statusUpdate(TaskAttemptID taskId,
TaskStatus taskStatus) throws IOException, InterruptedException {
LOG.info(taskStatus.getStateString());
int taskIndex = mapIds.indexOf(taskId);
if (taskIndex >= 0) { // mapping
float numTasks = (float) this.numMapTasks;
partialMapProgress[taskIndex] = taskStatus.getProgress();
mapCounters[taskIndex] = taskStatus.getCounters();
float partialProgress = 0.0f;
for (float f : partialMapProgress) {
partialProgress += f;
}
status.setMapProgress(partialProgress / numTasks);
} else {
reduceCounters = taskStatus.getCounters();
status.setReduceProgress(taskStatus.getProgress());
}
// ignore phase
return true;
}
/** Return the current values of the counters for this job,
* including tasks that are in progress.
*/
public synchronized Counters getCurrentCounters() {
if (null == mapCounters) {
// Counters not yet initialized for job.
return EMPTY_COUNTERS;
}
Counters current = EMPTY_COUNTERS;
for (Counters c : mapCounters) {
current = Counters.sum(current, c);
}
current = Counters.sum(current, reduceCounters);
return current;
}
/**
* Task is reporting that it is in commit_pending
* and it is waiting for the commit Response
*/
public void commitPending(TaskAttemptID taskid,
TaskStatus taskStatus)
throws IOException, InterruptedException {
statusUpdate(taskid, taskStatus);
}
public void reportDiagnosticInfo(TaskAttemptID taskid, String trace) {
// Ignore for now
}
public void reportNextRecordRange(TaskAttemptID taskid,
SortedRanges.Range range) throws IOException {
LOG.info("Task " + taskid + " reportedNextRecordRange " + range);
}
public boolean ping(TaskAttemptID taskid) throws IOException {
return true;
}
public boolean canCommit(TaskAttemptID taskid)
throws IOException {
return true;
}
public void done(TaskAttemptID taskId) throws IOException {
int taskIndex = mapIds.indexOf(taskId);
if (taskIndex >= 0) { // mapping
status.setMapProgress(1.0f);
} else {
status.setReduceProgress(1.0f);
}
}
public synchronized void fsError(TaskAttemptID taskId, String message)
throws IOException {
LOG.fatal("FSError: "+ message + "from task: " + taskId);
}
public void shuffleError(TaskAttemptID taskId, String message) throws IOException {
LOG.fatal("shuffleError: "+ message + "from task: " + taskId);
}
public synchronized void fatalError(TaskAttemptID taskId, String msg)
throws IOException {
LOG.fatal("Fatal: "+ msg + "from task: " + taskId);
}
public MapTaskCompletionEventsUpdate getMapCompletionEvents(JobID jobId,
int fromEventId, int maxLocs, TaskAttemptID id) throws IOException {
return new MapTaskCompletionEventsUpdate(
org.apache.hadoop.mapred.TaskCompletionEvent.EMPTY_ARRAY, false);
}
}
public LocalJobRunner(Configuration conf) throws IOException {
this(new JobConf(conf));
}
@Deprecated
public LocalJobRunner(JobConf conf) throws IOException {
this.fs = FileSystem.getLocal(conf);
this.conf = conf;
myMetrics = new JobTrackerMetricsInst(null, new JobConf(conf));
}
// JobSubmissionProtocol methods
private static int jobid = 0;
public synchronized org.apache.hadoop.mapreduce.JobID getNewJobID() {
return new org.apache.hadoop.mapreduce.JobID("local", ++jobid);
}
public org.apache.hadoop.mapreduce.JobStatus submitJob(
org.apache.hadoop.mapreduce.JobID jobid, String jobSubmitDir,
Credentials credentials) throws IOException {
Job job = new Job(JobID.downgrade(jobid), jobSubmitDir);
job.job.setCredentials(credentials);
return job.status;
}
public void killJob(org.apache.hadoop.mapreduce.JobID id) {
jobs.get(JobID.downgrade(id)).killed = true;
jobs.get(JobID.downgrade(id)).interrupt();
}
public void setJobPriority(org.apache.hadoop.mapreduce.JobID id,
String jp) throws IOException {
throw new UnsupportedOperationException("Changing job priority " +
"in LocalJobRunner is not supported.");
}
/** Throws {@link UnsupportedOperationException} */
public boolean killTask(org.apache.hadoop.mapreduce.TaskAttemptID taskId,
boolean shouldFail) throws IOException {
throw new UnsupportedOperationException("Killing tasks in " +
"LocalJobRunner is not supported");
}
public org.apache.hadoop.mapreduce.TaskReport[] getTaskReports(
org.apache.hadoop.mapreduce.JobID id, TaskType type) {
return new org.apache.hadoop.mapreduce.TaskReport[0];
}
public org.apache.hadoop.mapreduce.JobStatus getJobStatus(
org.apache.hadoop.mapreduce.JobID id) {
Job job = jobs.get(JobID.downgrade(id));
if(job != null)
return job.status;
else
return null;
}
public org.apache.hadoop.mapreduce.Counters getJobCounters(
org.apache.hadoop.mapreduce.JobID id) {
Job job = jobs.get(JobID.downgrade(id));
return new org.apache.hadoop.mapreduce.Counters(job.getCurrentCounters());
}
public String getFilesystemName() throws IOException {
return fs.getUri().toString();
}
public ClusterMetrics getClusterMetrics() {
int numMapTasks = map_tasks.get();
return new ClusterMetrics(numMapTasks, reduce_tasks, numMapTasks,
reduce_tasks, 0, 0, 1, 1, jobs.size(), 1, 0, 0);
}
/**
* @deprecated Use {@link #getJobTrackerStatus()} instead.
*/
@Deprecated
public State getJobTrackerState() throws IOException, InterruptedException {
return State.RUNNING;
}
public JobTrackerStatus getJobTrackerStatus() {
return JobTrackerStatus.RUNNING;
}
public long getTaskTrackerExpiryInterval() throws IOException, InterruptedException {
return 0;
}
/**
* Get all active trackers in cluster.
* @return array of TaskTrackerInfo
*/
public TaskTrackerInfo[] getActiveTrackers()
throws IOException, InterruptedException {
return null;
}
/**
* Get all blacklisted trackers in cluster.
* @return array of TaskTrackerInfo
*/
public TaskTrackerInfo[] getBlacklistedTrackers()
throws IOException, InterruptedException {
return null;
}
public TaskCompletionEvent[] getTaskCompletionEvents(
org.apache.hadoop.mapreduce.JobID jobid
, int fromEventId, int maxEvents) throws IOException {
return TaskCompletionEvent.EMPTY_ARRAY;
}
public org.apache.hadoop.mapreduce.JobStatus[] getAllJobs() {return null;}
/**
* Returns the diagnostic information for a particular task in the given job.
* To be implemented
*/
public String[] getTaskDiagnostics(
org.apache.hadoop.mapreduce.TaskAttemptID taskid) throws IOException{
return new String [0];
}
/**
* @see org.apache.hadoop.mapreduce.protocol.ClientProtocol#getSystemDir()
*/
public String getSystemDir() {
Path sysDir = new Path(
conf.get(JTConfig.JT_SYSTEM_DIR, "/tmp/hadoop/mapred/system"));
return fs.makeQualified(sysDir).toString();
}
/**
* @see org.apache.hadoop.mapred.JobSubmissionProtocol#getQueueAdmins()
*/
public AccessControlList getQueueAdmins(String queueName) throws IOException {
return new AccessControlList(" ");// no queue admins for local job runner
}
/**
* @see org.apache.hadoop.mapreduce.protocol.ClientProtocol#getStagingAreaDir()
*/
public String getStagingAreaDir() throws IOException {
Path stagingRootDir = new Path(conf.get(JTConfig.JT_STAGING_AREA_ROOT,
"/tmp/hadoop/mapred/staging"));
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
String user;
if (ugi != null) {
user = ugi.getShortUserName() + rand.nextInt();
} else {
user = "dummy" + rand.nextInt();
}
return fs.makeQualified(new Path(stagingRootDir, user+"/.staging")).toString();
}
public String getJobHistoryDir() {
return null;
}
@Override
public QueueInfo[] getChildQueues(String queueName) throws IOException {
return null;
}
@Override
public QueueInfo[] getRootQueues() throws IOException {
return null;
}
@Override
public QueueInfo[] getQueues() throws IOException {
return null;
}
@Override
public QueueInfo getQueue(String queue) throws IOException {
return null;
}
@Override
public org.apache.hadoop.mapreduce.QueueAclsInfo[]
getQueueAclsForCurrentUser() throws IOException{
return null;
}
/**
* Set the max number of map tasks to run concurrently in the LocalJobRunner.
* @param job the job to configure
* @param maxMaps the maximum number of map tasks to allow.
*/
public static void setLocalMaxRunningMaps(
org.apache.hadoop.mapreduce.JobContext job,
int maxMaps) {
job.getConfiguration().setInt(LOCAL_MAX_MAPS, maxMaps);
}
/**
* @return the max number of map tasks to run concurrently in the
* LocalJobRunner.
*/
public static int getLocalMaxRunningMaps(
org.apache.hadoop.mapreduce.JobContext job) {
return job.getConfiguration().getInt(LOCAL_MAX_MAPS, 1);
}
@Override
public void cancelDelegationToken(Token<DelegationTokenIdentifier> token
) throws IOException,
InterruptedException {
}
@Override
public Token<DelegationTokenIdentifier>
getDelegationToken(Text renewer) throws IOException, InterruptedException {
return null;
}
@Override
public long renewDelegationToken(Token<DelegationTokenIdentifier> token
) throws IOException,InterruptedException{
return 0;
}
}