blob: e4f77fb25a27d27208da51faf1df5fb2b18fa3ad [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.DataInputStream;
import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
public class IsolationRunner {
private static final Log LOG =
LogFactory.getLog(IsolationRunner.class.getName());
private static class FakeUmbilical implements TaskUmbilicalProtocol {
public long getProtocolVersion(String protocol, long clientVersion) {
return TaskUmbilicalProtocol.versionID;
}
public void done(TaskAttemptID taskid, boolean shouldPromote) throws IOException {
LOG.info("Task " + taskid + " reporting done.");
}
public void fsError(TaskAttemptID taskId, String message) throws IOException {
LOG.info("Task " + taskId + " reporting file system error: " + message);
}
public void shuffleError(TaskAttemptID taskId, String message) throws IOException {
LOG.info("Task " + taskId + " reporting shuffle error: " + message);
}
public Task getTask(TaskAttemptID taskid) throws IOException {
return null;
}
public boolean ping(TaskAttemptID taskid) throws IOException {
return true;
}
public boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus)
throws IOException, InterruptedException {
StringBuffer buf = new StringBuffer("Task ");
buf.append(taskId);
buf.append(" making progress to ");
buf.append(taskStatus.getProgress());
String state = taskStatus.getStateString();
if (state != null) {
buf.append(" and state of ");
buf.append(state);
}
LOG.info(buf.toString());
// ignore phase
// ignore counters
return true;
}
public void reportDiagnosticInfo(TaskAttemptID taskid, String trace) throws IOException {
LOG.info("Task " + taskid + " has problem " + trace);
}
public TaskCompletionEvent[] getMapCompletionEvents(JobID jobId,
int fromEventId, int maxLocs) throws IOException {
return TaskCompletionEvent.EMPTY_ARRAY;
}
}
private static ClassLoader makeClassLoader(JobConf conf,
File workDir) throws IOException {
List<URL> cp = new ArrayList<URL>();
String jar = conf.getJar();
if (jar != null) { // if jar exists, it into workDir
File[] libs = new File(workDir, "lib").listFiles();
if (libs != null) {
for (int i = 0; i < libs.length; i++) {
cp.add(new URL("file:" + libs[i].toString()));
}
}
cp.add(new URL("file:" + new File(workDir, "classes/").toString()));
cp.add(new URL("file:" + workDir.toString() + "/"));
}
return new URLClassLoader(cp.toArray(new URL[cp.size()]));
}
/**
* Create empty sequence files for any of the map outputs that we don't have.
* @param fs the filesystem to create the files in
* @param dir the directory name to create the files in
* @param conf the jobconf
* @throws IOException if something goes wrong writing
*/
private static void fillInMissingMapOutputs(FileSystem fs,
TaskAttemptID taskId,
int numMaps,
JobConf conf) throws IOException {
Class keyClass = conf.getMapOutputKeyClass();
Class valueClass = conf.getMapOutputValueClass();
MapOutputFile namer = new MapOutputFile(taskId.getJobID());
namer.setConf(conf);
for(int i=0; i<numMaps; i++) {
Path f = namer.getInputFile(i, taskId);
if (!fs.exists(f)) {
LOG.info("Create missing input: " + f);
SequenceFile.Writer out =
SequenceFile.createWriter(fs, conf, f, keyClass, valueClass);
out.close();
}
}
}
/**
* Run a single task
* @param args the first argument is the task directory
*/
public static void main(String[] args) throws IOException {
if (args.length != 1) {
System.out.println("Usage: IsolationRunner <path>/job.xml");
System.exit(1);
}
File jobFilename = new File(args[0]);
if (!jobFilename.exists() || !jobFilename.isFile()) {
System.out.println(jobFilename + " is not a valid job file.");
System.exit(1);
}
JobConf conf = new JobConf(new Path(jobFilename.toString()));
TaskAttemptID taskId = TaskAttemptID.forName(conf.get("mapred.task.id"));
boolean isMap = conf.getBoolean("mapred.task.is.map", true);
int partition = conf.getInt("mapred.task.partition", 0);
// setup the local and user working directories
FileSystem local = FileSystem.getLocal(conf);
LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
File workDirName = new File(lDirAlloc.getLocalPathToRead(
TaskTracker.getJobCacheSubdir()
+ Path.SEPARATOR + taskId.getJobID()
+ Path.SEPARATOR + taskId
+ Path.SEPARATOR + "work",
conf). toString());
local.setWorkingDirectory(new Path(workDirName.toString()));
FileSystem.get(conf).setWorkingDirectory(conf.getWorkingDirectory());
// set up a classloader with the right classpath
ClassLoader classLoader = makeClassLoader(conf, workDirName);
Thread.currentThread().setContextClassLoader(classLoader);
conf.setClassLoader(classLoader);
Task task;
if (isMap) {
Path localSplit = new Path(new Path(jobFilename.toString()).getParent(),
"split.dta");
DataInputStream splitFile = FileSystem.getLocal(conf).open(localSplit);
String splitClass = Text.readString(splitFile);
BytesWritable split = new BytesWritable();
split.readFields(splitFile);
splitFile.close();
task = new MapTask(jobFilename.toString(), taskId, partition, splitClass, split);
} else {
int numMaps = conf.getNumMapTasks();
fillInMissingMapOutputs(local, taskId, numMaps, conf);
task = new ReduceTask(jobFilename.toString(), taskId, partition, numMaps);
}
task.setConf(conf);
task.run(conf, new FakeUmbilical());
}
}