blob: 341ec5ced0c13252234c60cebf3892b2006cc382 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hcatalog.templeton.tool;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
/**
* A Map Reduce job that will start another job.
*
* We have a single Mapper job that starts a child MR job. The parent
* monitors the child child job and ends when the child job exits. In
* addition, we
*
* - write out the parent job id so the caller can record it.
* - run a keep alive thread so the job doesn't end.
* - Optionally, store the stdout, stderr, and exit value of the child
* in hdfs files.
*/
public class TempletonControllerJob extends Configured implements Tool {
static enum ControllerCounters {SIMPLE_COUNTER}
;
public static final String COPY_NAME = "templeton.copy";
public static final String STATUSDIR_NAME = "templeton.statusdir";
public static final String JAR_ARGS_NAME = "templeton.args";
public static final String OVERRIDE_CLASSPATH = "templeton.override-classpath";
public static final String STDOUT_FNAME = "stdout";
public static final String STDERR_FNAME = "stderr";
public static final String EXIT_FNAME = "exit";
public static final int WATCHER_TIMEOUT_SECS = 10;
public static final int KEEP_ALIVE_MSEC = 60 * 1000;
private static TrivialExecService execService = TrivialExecService.getInstance();
private static final Log LOG = LogFactory.getLog(TempletonControllerJob.class);
public static class LaunchMapper
extends Mapper<NullWritable, NullWritable, Text, Text> {
protected Process startJob(Context context, String user,
String overrideClasspath)
throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
copyLocal(COPY_NAME, conf);
String[] jarArgs
= TempletonUtils.decodeArray(conf.get(JAR_ARGS_NAME));
ArrayList<String> removeEnv = new ArrayList<String>();
removeEnv.add("HADOOP_ROOT_LOGGER");
Map<String, String> env = TempletonUtils.hadoopUserEnv(user,
overrideClasspath);
List<String> jarArgsList = new LinkedList<String>(Arrays.asList(jarArgs));
String tokenFile = System.getenv("HADOOP_TOKEN_FILE_LOCATION");
if (tokenFile != null) {
jarArgsList.add(1, "-Dmapreduce.job.credentials.binary=" + tokenFile);
}
return execService.run(jarArgsList, removeEnv, env);
}
private void copyLocal(String var, Configuration conf)
throws IOException {
String[] filenames = TempletonUtils.decodeArray(conf.get(var));
if (filenames != null) {
for (String filename : filenames) {
Path src = new Path(filename);
Path dst = new Path(src.getName());
FileSystem fs = src.getFileSystem(conf);
System.err.println("templeton: copy " + src + " => " + dst);
fs.copyToLocalFile(src, dst);
}
}
}
@Override
public void run(Context context)
throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
Process proc = startJob(context,
conf.get("user.name"),
conf.get(OVERRIDE_CLASSPATH));
String statusdir = conf.get(STATUSDIR_NAME);
Counter cnt = context.getCounter(ControllerCounters.SIMPLE_COUNTER);
ExecutorService pool = Executors.newCachedThreadPool();
executeWatcher(pool, conf, context.getJobID(),
proc.getInputStream(), statusdir, STDOUT_FNAME);
executeWatcher(pool, conf, context.getJobID(),
proc.getErrorStream(), statusdir, STDERR_FNAME);
KeepAlive keepAlive = startCounterKeepAlive(pool, cnt);
proc.waitFor();
keepAlive.sendReport = false;
pool.shutdown();
if (!pool.awaitTermination(WATCHER_TIMEOUT_SECS, TimeUnit.SECONDS))
pool.shutdownNow();
writeExitValue(conf, proc.exitValue(), statusdir);
JobState state = new JobState(context.getJobID().toString(), conf);
state.setExitValue(proc.exitValue());
state.setCompleteStatus("done");
state.close();
if (proc.exitValue() != 0)
System.err.println("templeton: job failed with exit code "
+ proc.exitValue());
else
System.err.println("templeton: job completed with exit code 0");
}
private void executeWatcher(ExecutorService pool, Configuration conf,
JobID jobid, InputStream in, String statusdir,
String name)
throws IOException {
Watcher w = new Watcher(conf, jobid, in, statusdir, name);
pool.execute(w);
}
private KeepAlive startCounterKeepAlive(ExecutorService pool, Counter cnt)
throws IOException {
KeepAlive k = new KeepAlive(cnt);
pool.execute(k);
return k;
}
private void writeExitValue(Configuration conf, int exitValue, String statusdir)
throws IOException {
if (TempletonUtils.isset(statusdir)) {
Path p = new Path(statusdir, EXIT_FNAME);
FileSystem fs = p.getFileSystem(conf);
OutputStream out = fs.create(p);
System.err.println("templeton: Writing exit value "
+ exitValue + " to " + p);
PrintWriter writer = new PrintWriter(out);
writer.println(exitValue);
writer.close();
}
}
}
public static class Watcher implements Runnable {
private InputStream in;
private OutputStream out;
private JobID jobid;
private Configuration conf;
public Watcher(Configuration conf, JobID jobid, InputStream in,
String statusdir, String name)
throws IOException {
this.conf = conf;
this.jobid = jobid;
this.in = in;
if (name.equals(STDERR_FNAME))
out = System.err;
else
out = System.out;
if (TempletonUtils.isset(statusdir)) {
Path p = new Path(statusdir, name);
FileSystem fs = p.getFileSystem(conf);
out = fs.create(p);
System.err.println("templeton: Writing status to " + p);
}
}
@Override
public void run() {
try {
InputStreamReader isr = new InputStreamReader(in);
BufferedReader reader = new BufferedReader(isr);
PrintWriter writer = new PrintWriter(out);
String line;
while ((line = reader.readLine()) != null) {
writer.println(line);
JobState state = null;
try {
String percent = TempletonUtils.extractPercentComplete(line);
String childid = TempletonUtils.extractChildJobId(line);
if (percent != null || childid != null) {
state = new JobState(jobid.toString(), conf);
state.setPercentComplete(percent);
state.setChildId(childid);
}
} catch (IOException e) {
System.err.println("templeton: state error: " + e);
} finally {
if (state != null) {
try {
state.close();
} catch (IOException e) {
}
}
}
}
writer.flush();
} catch (IOException e) {
System.err.println("templeton: execute error: " + e);
}
}
}
public static class KeepAlive implements Runnable {
private Counter cnt;
public boolean sendReport;
public KeepAlive(Counter cnt) {
this.cnt = cnt;
this.sendReport = true;
}
@Override
public void run() {
try {
while (sendReport) {
cnt.increment(1);
Thread.sleep(KEEP_ALIVE_MSEC);
}
} catch (InterruptedException e) {
// Ok to be interrupted
}
}
}
private JobID submittedJobId;
public String getSubmittedId() {
if (submittedJobId == null)
return null;
else
return submittedJobId.toString();
}
/**
* Enqueue the job and print out the job id for later collection.
*/
@Override
public int run(String[] args)
throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = getConf();
conf.set(JAR_ARGS_NAME, TempletonUtils.encodeArray(args));
conf.set("user.name", UserGroupInformation.getCurrentUser().getShortUserName());
Job job = new Job(conf);
job.setJarByClass(TempletonControllerJob.class);
job.setJobName("TempletonControllerJob");
job.setMapperClass(LaunchMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setInputFormatClass(SingleInputFormat.class);
NullOutputFormat<NullWritable, NullWritable> of
= new NullOutputFormat<NullWritable, NullWritable>();
job.setOutputFormatClass(of.getClass());
job.setNumReduceTasks(0);
JobClient jc = new JobClient(new JobConf(job.getConfiguration()));
Token<DelegationTokenIdentifier> mrdt = jc.getDelegationToken(new Text("mr token"));
job.getCredentials().addToken(new Text("mr token"), mrdt);
job.submit();
submittedJobId = job.getJobID();
return 0;
}
public static void main(String[] args) throws Exception {
int ret = ToolRunner.run(new TempletonControllerJob(), args);
if (ret != 0)
System.err.println("TempletonControllerJob failed!");
System.exit(ret);
}
}