blob: 54d584b84028b900287f909b8e3db72096ce0138 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.DataInputStream;
import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JvmTask;
import org.apache.hadoop.mapreduce.MRConfig;
/**
* IsolationRunner is intended to facilitate debugging by re-running a specific
* task, given left-over task files for a (typically failed) past job.
* Currently, it is limited to re-running map tasks.
*
* Users may coerce MapReduce to keep task files around by setting
* mapreduce.task.files.preserve.failedtasks. See mapred_tutorial.xml for more documentation.
*/
public class IsolationRunner {
private static final Log LOG =
LogFactory.getLog(IsolationRunner.class.getName());
static class FakeUmbilical implements TaskUmbilicalProtocol {
public long getProtocolVersion(String protocol, long clientVersion) {
return TaskUmbilicalProtocol.versionID;
}
public void done(TaskAttemptID taskid) throws IOException {
LOG.info("Task " + taskid + " reporting done.");
}
public void fsError(TaskAttemptID taskId, String message) throws IOException {
LOG.info("Task " + taskId + " reporting file system error: " + message);
}
public void shuffleError(TaskAttemptID taskId, String message) throws IOException {
LOG.info("Task " + taskId + " reporting shuffle error: " + message);
}
public void fatalError(TaskAttemptID taskId, String msg) throws IOException {
LOG.info("Task " + taskId + " reporting fatal error: " + msg);
}
public JvmTask getTask(JvmContext context) throws IOException {
return null;
}
public boolean ping(TaskAttemptID taskid) throws IOException {
return true;
}
public void commitPending(TaskAttemptID taskId, TaskStatus taskStatus)
throws IOException, InterruptedException {
statusUpdate(taskId, taskStatus);
}
public boolean canCommit(TaskAttemptID taskid) throws IOException {
return true;
}
public boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus)
throws IOException, InterruptedException {
StringBuffer buf = new StringBuffer("Task ");
buf.append(taskId);
buf.append(" making progress to ");
buf.append(taskStatus.getProgress());
String state = taskStatus.getStateString();
if (state != null) {
buf.append(" and state of ");
buf.append(state);
}
LOG.info(buf.toString());
// ignore phase
// ignore counters
return true;
}
public void reportDiagnosticInfo(TaskAttemptID taskid, String trace) throws IOException {
LOG.info("Task " + taskid + " has problem " + trace);
}
public MapTaskCompletionEventsUpdate getMapCompletionEvents(JobID jobId,
int fromEventId, int maxLocs, TaskAttemptID id) throws IOException {
return new MapTaskCompletionEventsUpdate(TaskCompletionEvent.EMPTY_ARRAY,
false);
}
public void reportNextRecordRange(TaskAttemptID taskid,
SortedRanges.Range range) throws IOException {
LOG.info("Task " + taskid + " reportedNextRecordRange " + range);
}
}
private ClassLoader makeClassLoader(JobConf conf,
File workDir) throws IOException {
List<String> classPaths = new ArrayList<String>();
// Add jar clas files (includes lib/* and classes/*)
String jar = conf.getJar();
if (jar != null) {
TaskRunner.appendJobJarClasspaths(conf.getJar(), classPaths);
}
// Add the workdir, too.
classPaths.add(workDir.toString());
// Note: TaskRunner.run() does more, including DistributedCache files.
// Convert to URLs
URL[] urls = new URL[classPaths.size()];
for (int i = 0; i < classPaths.size(); ++i) {
urls[i] = new File(classPaths.get(i)).toURL();
}
return new URLClassLoader(urls);
}
/**
* Main method.
*/
boolean run(String[] args)
throws ClassNotFoundException, IOException, InterruptedException {
if (args.length != 1) {
System.out.println("Usage: IsolationRunner <path>/job.xml");
return false;
}
File jobFilename = new File(args[0]);
if (!jobFilename.exists() || !jobFilename.isFile()) {
System.out.println(jobFilename + " is not a valid job file.");
return false;
}
JobConf conf = new JobConf(new Path(jobFilename.toString()));
TaskAttemptID taskId = TaskAttemptID.forName(conf.get(JobContext.TASK_ATTEMPT_ID));
if (taskId == null) {
System.out.println("mapreduce.task.attempt.id not found in configuration;" +
" job.xml is not a task config");
}
boolean isMap = conf.getBoolean(JobContext.TASK_ISMAP, true);
if (!isMap) {
System.out.println("Only map tasks are supported.");
return false;
}
int partition = conf.getInt(JobContext.TASK_PARTITION, 0);
// setup the local and user working directories
FileSystem local = FileSystem.getLocal(conf);
LocalDirAllocator lDirAlloc = new LocalDirAllocator(MRConfig.LOCAL_DIR);
File workDirName = TaskRunner.formWorkDir(lDirAlloc, taskId, false, conf);
local.setWorkingDirectory(new Path(workDirName.toString()));
FileSystem.get(conf).setWorkingDirectory(conf.getWorkingDirectory());
// set up a classloader with the right classpath
ClassLoader classLoader =
makeClassLoader(conf, new File(workDirName.toString()));
Thread.currentThread().setContextClassLoader(classLoader);
conf.setClassLoader(classLoader);
// split.dta file is used only by IsolationRunner. The file can now be in
// any of the configured local disks, so use LocalDirAllocator to find out
// where it is.
Path localSplit =
new LocalDirAllocator(MRConfig.LOCAL_DIR).getLocalPathToRead(
TaskTracker.getLocalSplitFile(conf.getUser(), taskId.getJobID()
.toString(), taskId.toString()), conf);
DataInputStream splitFile = FileSystem.getLocal(conf).open(localSplit);
String splitClass = Text.readString(splitFile);
BytesWritable split = new BytesWritable();
split.readFields(splitFile);
splitFile.close();
Task task = new MapTask(jobFilename.toString(), taskId, partition, splitClass, split, 1);
task.setConf(conf);
task.run(conf, new FakeUmbilical());
return true;
}
/**
* Run a single task.
*
* @param args the first argument is the task directory
*/
public static void main(String[] args)
throws ClassNotFoundException, IOException, InterruptedException {
if (!new IsolationRunner().run(args)) {
System.exit(1);
}
}
}