blob: cd45d9548e1a23f159dee5c2fce633afbd9069f2 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hama.bsp;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLConnection;
import java.util.Arrays;
import java.util.Map;
import java.util.Random;
import java.util.StringTokenizer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hama.Constants;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.message.MessageManager;
import org.apache.hama.bsp.message.OutgoingMessageManager;
import org.apache.hama.bsp.message.OutgoingPOJOMessageBundle;
import org.apache.hama.bsp.message.queue.MemoryQueue;
import org.apache.hama.bsp.message.queue.MessageQueue;
import org.apache.hama.ipc.HamaRPCProtocolVersion;
import org.apache.hama.ipc.JobSubmissionProtocol;
import org.apache.hama.ipc.RPC;
/**
* BSPJobClient is the primary interface for the user-job to interact with the
* BSPMaster.
*
* BSPJobClient provides facilities to submit jobs, track their progress, access
* component-tasks' reports/logs, get the BSP cluster status information etc.
*/
public class BSPJobClient extends Configured implements Tool {
private static final Log LOG = LogFactory.getLog(BSPJobClient.class);
public static enum TaskStatusFilter {
NONE, KILLED, FAILED, SUCCEEDED, ALL
}
private static final long MAX_JOBPROFILE_AGE = 1000 * 2;
// job files are world-wide readable and owner writable
final private static FsPermission JOB_FILE_PERMISSION = FsPermission
.createImmutable((short) 0644); // rw-r--r--
// job submission directory is world readable/writable/executable
final static FsPermission JOB_DIR_PERMISSION = FsPermission
.createImmutable((short) 0777); // rwx-rwx-rwx
private JobSubmissionProtocol jobSubmitClient = null;
private Path sysDir = null;
private FileSystem fs = null;
class NetworkedJob implements RunningJob {
JobProfile profile;
JobStatus status;
long statustime;
public NetworkedJob() {
}
public NetworkedJob(JobStatus job) throws IOException {
this.status = job;
this.profile = jobSubmitClient.getJobProfile(job.getJobID());
this.statustime = System.currentTimeMillis();
}
/**
* Some methods rely on having a recent job profile object. Refresh it, if
* necessary
*/
synchronized void ensureFreshStatus() throws IOException {
if (System.currentTimeMillis() - statustime > MAX_JOBPROFILE_AGE) {
updateStatus();
}
}
/**
* Some methods need to update status immediately. So, refresh immediately
*
* @throws IOException
*/
synchronized void updateStatus() throws IOException {
this.status = jobSubmitClient.getJobStatus(profile.getJobID());
this.statustime = System.currentTimeMillis();
}
@Override
public BSPJobID getID() {
return profile.getJobID();
}
@Override
public String getJobName() {
return profile.getJobName();
}
@Override
public String getJobFile() {
return profile.getJobFile();
}
@Override
public long progress() throws IOException {
ensureFreshStatus();
return status.progress();
}
/**
* Returns immediately whether the whole job is done yet or not.
*/
@Override
public synchronized boolean isComplete() throws IOException {
updateStatus();
return (status.getRunState() == JobStatus.SUCCEEDED
|| status.getRunState() == JobStatus.FAILED || status.getRunState() == JobStatus.KILLED);
}
/**
* True if job completed successfully.
*/
@Override
public synchronized boolean isSuccessful() throws IOException {
updateStatus();
return status.getRunState() == JobStatus.SUCCEEDED;
}
@Override
public synchronized long getSuperstepCount() throws IOException {
ensureFreshStatus();
return status.getSuperstepCount();
}
/**
* Blocks until the job is finished
*/
@Override
public void waitForCompletion() throws IOException {
while (!isComplete()) {
try {
Thread.sleep(5000);
} catch (InterruptedException ie) {
}
}
}
/**
* Tells the service to get the state of the current job.
*/
@Override
public synchronized int getJobState() throws IOException {
updateStatus();
return status.getRunState();
}
@Override
public JobStatus getStatus() {
return status;
}
/**
* Tells the service to terminate the current job.
*/
@Override
public synchronized void killJob() throws IOException {
jobSubmitClient.killJob(getID());
}
@Override
public void killTask(TaskAttemptID taskId, boolean shouldFail)
throws IOException {
jobSubmitClient.killTask(taskId, shouldFail);
}
@Override
public TaskCompletionEvent[] getTaskCompletionEvents(int startFrom) {
return jobSubmitClient.getTaskCompletionEvents(getID(), startFrom, 10);
}
}
public BSPJobClient(Configuration conf) throws IOException {
setConf(conf);
init(conf);
}
public BSPJobClient() {
}
public void init(Configuration conf) throws IOException {
String masterAdress = conf.get("bsp.master.address");
if (masterAdress != null && !masterAdress.equals("local")) {
this.jobSubmitClient = (JobSubmissionProtocol) RPC.getProxy(
JobSubmissionProtocol.class, HamaRPCProtocolVersion.versionID,
BSPMaster.getAddress(conf), conf,
NetUtils.getSocketFactory(conf, JobSubmissionProtocol.class));
} else {
LOG.debug("Using local BSP runner.");
this.jobSubmitClient = new LocalBSPRunner(conf);
}
}
/**
* Close the <code>JobClient</code>.
*/
public synchronized void close() throws IOException {
String masterAdress = this.getConf().get("bsp.master.address");
if (masterAdress != null && !masterAdress.equals("local")) {
RPC.stopProxy(jobSubmitClient);
}
}
/**
* Get a filesystem handle. We need this to prepare jobs for submission to the
* BSP system.
*
* @return the filesystem handle.
*/
public synchronized FileSystem getFs() throws IOException {
if (this.fs == null) {
Path sysDir = getSystemDir();
this.fs = sysDir.getFileSystem(getConf());
}
return fs;
}
/**
* Gets the jobs that are submitted.
*
* @return array of {@link JobStatus} for the submitted jobs.
* @throws IOException
*/
public JobStatus[] getAllJobs() throws IOException {
return jobSubmitClient.getAllJobs();
}
/**
* Gets the jobs that are not completed and not failed.
*
* @return array of {@link JobStatus} for the running/to-be-run jobs.
* @throws IOException
*/
public JobStatus[] jobsToComplete() throws IOException {
return jobSubmitClient.jobsToComplete();
}
/**
* Submit a job to the BSP system. This returns a handle to the
* {@link RunningJob} which can be used to track the running-job.
*
* @param job the job configuration.
* @return a handle to the {@link RunningJob} which can be used to track the
* running-job.
* @throws FileNotFoundException
* @throws IOException
*/
public RunningJob submitJob(BSPJob job) throws FileNotFoundException,
IOException {
return submitJobInternal(job, jobSubmitClient.getNewJobId());
}
static Random r = new Random();
public RunningJob submitJobInternal(BSPJob pJob, BSPJobID jobId)
throws IOException {
BSPJob job = pJob;
job.setJobID(jobId);
int maxTasks;
int configured = job.getConfiguration().getInt(Constants.MAX_TASKS_PER_JOB,
job.getNumBspTask());
ClusterStatus clusterStatus = getClusterStatus(true);
// Re-adjust the maxTasks based on cluster status.
if (clusterStatus != null) {
maxTasks = clusterStatus.getMaxTasks() - clusterStatus.getTasks();
if (configured > maxTasks) {
LOG.warn("The configured number of tasks has exceeded the maximum allowed. Job will run with "
+ (maxTasks) + " tasks.");
job.setNumBspTask(maxTasks);
}
} else {
maxTasks = configured;
}
Path submitJobDir = new Path(getSystemDir(), "submit_"
+ Integer.toString(Math.abs(r.nextInt()), 36));
Path submitSplitFile = new Path(submitJobDir, "job.split");
Path submitJarFile = new Path(submitJobDir, "job.jar");
Path submitJobFile = new Path(submitJobDir, "job.xml");
LOG.debug("BSPJobClient.submitJobDir: " + submitJobDir);
FileSystem fs = getFs();
// Create a number of filenames in the BSPMaster's fs namespace
fs.delete(submitJobDir, true);
submitJobDir = fs.makeQualified(submitJobDir);
submitJobDir = new Path(submitJobDir.toUri().getPath());
FsPermission bspSysPerms = new FsPermission(JOB_DIR_PERMISSION);
FileSystem.mkdirs(fs, submitJobDir, bspSysPerms);
fs.mkdirs(submitJobDir);
short replication = (short) job.getInt("bsp.submit.replication", 10);
// only create the splits if we have an input
if ((job.get(Constants.JOB_INPUT_DIR) != null)
|| (job.get("bsp.join.expr") != null)) {
// Create the splits for the job
LOG.debug("Creating splits at " + fs.makeQualified(submitSplitFile));
InputSplit[] splits = job.getInputFormat().getSplits(job,
(maxTasks > configured) ? configured : maxTasks);
if (job.getConfiguration().getBoolean(
Constants.ENABLE_RUNTIME_PARTITIONING, false)) {
job = partition(job, splits, maxTasks);
maxTasks = job.getInt("hama.partition.count", maxTasks);
}
if (job.getBoolean("input.has.partitioned", false)) {
splits = job.getInputFormat().getSplits(job, maxTasks);
}
if (maxTasks < splits.length) {
throw new IOException(
"Job failed! The number of splits has exceeded the number of max tasks. The number of splits: "
+ splits.length + ", The number of max tasks: " + maxTasks);
}
int numOfSplits = writeSplits(job, splits, submitSplitFile, maxTasks);
if (numOfSplits > configured
|| !job.getConfiguration().getBoolean(Constants.FORCE_SET_BSP_TASKS,
false)) {
job.setNumBspTask(numOfSplits);
}
job.set("bsp.job.split.file", submitSplitFile.toString());
}
String originalJarPath = job.getJar();
if (originalJarPath != null) { // copy jar to BSPMaster's fs
// use jar name if job is not named.
if ("".equals(job.getJobName())) {
job.setJobName(new Path(originalJarPath).getName());
}
job.setJar(submitJarFile.toString());
fs.copyFromLocalFile(new Path(originalJarPath), submitJarFile);
fs.setReplication(submitJarFile, replication);
fs.setPermission(submitJarFile, new FsPermission(JOB_FILE_PERMISSION));
} else {
LOG.warn("No job jar file set. User classes may not be found. "
+ "See BSPJob#setJar(String) or check Your jar file.");
}
// Set the user's name and working directory
job.setUser(getUnixUserName());
job.set("group.name", getUnixUserGroupName(job.getUser()));
if (job.getWorkingDirectory() == null) {
job.setWorkingDirectory(fs.getWorkingDirectory());
}
// Write job file to BSPMaster's fs
FSDataOutputStream out = FileSystem.create(fs, submitJobFile,
new FsPermission(JOB_FILE_PERMISSION));
try {
job.writeXml(out);
} finally {
out.close();
}
return launchJob(jobId, job, submitJobFile, fs);
}
protected BSPJob partition(BSPJob job, InputSplit[] splits, int maxTasks)
throws IOException {
String inputPath = job.getConfiguration().get(Constants.JOB_INPUT_DIR);
Path partitionDir = new Path("/tmp/hama-parts/" + job.getJobID() + "/");
if (fs.exists(partitionDir)) {
fs.delete(partitionDir, true);
}
if (job.get("bsp.partitioning.runner.job") != null) {
return job;
}// Early exit for the partitioner job.
if (inputPath != null) {
int numSplits = splits.length;
int numTasks = job.getConfiguration().getInt("bsp.peers.num", 0);
if (LOG.isDebugEnabled()) {
LOG.debug(" numTasks = "
+ numTasks
+ " numSplits = "
+ numSplits
+ " enable = "
+ (job.getConfiguration().getBoolean(
Constants.ENABLE_RUNTIME_PARTITIONING, false)
+ " class = " + job.getConfiguration().get(
Constants.RUNTIME_PARTITIONING_CLASS)));
}
if (numTasks == 0) {
numTasks = numSplits;
}
if (job.getConfiguration().getBoolean(
Constants.ENABLE_RUNTIME_PARTITIONING, false)
&& job.getConfiguration().get(Constants.RUNTIME_PARTITIONING_CLASS) != null) {
HamaConfiguration conf = new HamaConfiguration(job.getConfiguration());
if (job.getConfiguration().get(Constants.RUNTIME_PARTITIONING_DIR) != null) {
partitionDir = new Path(job.getConfiguration().get(
Constants.RUNTIME_PARTITIONING_DIR));
}
conf.set(Constants.RUNTIME_PARTITIONING_CLASS,
job.get(Constants.RUNTIME_PARTITIONING_CLASS));
BSPJob partitioningJob = new BSPJob(conf);
partitioningJob.setJobName("Runtime partitioning job for "
+ partitioningJob.getJobName());
LOG.debug("partitioningJob input: "
+ partitioningJob.get(Constants.JOB_INPUT_DIR));
partitioningJob.getConfiguration().setClass(
MessageManager.OUTGOING_MESSAGE_MANAGER_CLASS,
OutgoingPOJOMessageBundle.class, OutgoingMessageManager.class);
partitioningJob.getConfiguration().setClass(
MessageManager.RECEIVE_QUEUE_TYPE_CLASS, MemoryQueue.class,
MessageQueue.class);
partitioningJob.setBoolean(Constants.FORCE_SET_BSP_TASKS, true);
partitioningJob.setInputFormat(job.getInputFormat().getClass());
partitioningJob.setInputKeyClass(job.getInputKeyClass());
partitioningJob.setInputValueClass(job.getInputValueClass());
partitioningJob.setOutputFormat(SequenceFileOutputFormat.class);
partitioningJob.setOutputKeyClass(job.getInputKeyClass());
partitioningJob.setOutputValueClass(job.getInputValueClass());
partitioningJob.setBspClass(PartitioningRunner.class);
partitioningJob.setMessageClass(MapWritable.class);
partitioningJob.set("bsp.partitioning.runner.job", "true");
partitioningJob.getConfiguration().setBoolean(
Constants.ENABLE_RUNTIME_PARTITIONING, false);
partitioningJob.setOutputPath(partitionDir);
boolean isPartitioned = false;
try {
isPartitioned = partitioningJob.waitForCompletion(true);
} catch (InterruptedException e) {
LOG.error("Interrupted partitioning run-time.", e);
} catch (ClassNotFoundException e) {
LOG.error("Class not found error partitioning run-time.", e);
}
if (isPartitioned) {
if (job.getConfiguration().get(Constants.RUNTIME_PARTITIONING_DIR) != null) {
job.setInputPath(new Path(conf
.get(Constants.RUNTIME_PARTITIONING_DIR)));
} else {
job.setInputPath(partitionDir);
}
job.setBoolean("input.has.partitioned", true);
job.setInputFormat(NonSplitSequenceFileInputFormat.class);
} else {
LOG.error("Error partitioning the input path.");
throw new IOException("Runtime partition failed for the job.");
}
}
}
return job;
}
protected RunningJob launchJob(BSPJobID jobId, BSPJob job,
Path submitJobFile, FileSystem fs) throws IOException {
//
// Now, actually submit the job (using the submit name)
//
JobStatus status = jobSubmitClient.submitJob(jobId, submitJobFile
.makeQualified(fs).toString());
if (status != null) {
return new NetworkedJob(status);
} else {
throw new IOException("Could not launch job");
}
}
/**
* Get the {@link CompressionType} for the output {@link SequenceFile}.
*
* @param job the {@link BSPJob}
* @return the {@link CompressionType} for the output {@link SequenceFile},
* defaulting to {@link CompressionType#RECORD}
*/
static CompressionType getOutputCompressionType(BSPJob job) {
String val = job.get("bsp.partitioning.compression.type");
if (val != null) {
return CompressionType.valueOf(val);
} else {
return CompressionType.NONE;
}
}
/**
* Get the {@link CompressionCodec} for compressing the job outputs.
*
* @param job the {@link BSPJob} to look in
* @param defaultValue the {@link CompressionCodec} to return if not set
* @return the {@link CompressionCodec} to be used to compress the job outputs
* @throws IllegalArgumentException if the class was specified, but not found
*/
static Class<? extends CompressionCodec> getOutputCompressorClass(BSPJob job,
Class<? extends CompressionCodec> defaultValue) {
Class<? extends CompressionCodec> codecClass = defaultValue;
Configuration conf = job.getConfiguration();
String name = conf.get("bsp.partitioning.compression.codec");
if (name != null) {
try {
codecClass = conf.getClassByName(name).asSubclass(
CompressionCodec.class);
} catch (ClassNotFoundException e) {
throw new IllegalArgumentException("Compression codec " + name
+ " was not found.", e);
}
}
return codecClass;
}
private static int writeSplits(BSPJob job, InputSplit[] splits,
Path submitSplitFile, int maxTasks) throws IOException {
final DataOutputStream out = writeSplitsFileHeader(job.getConfiguration(),
submitSplitFile, splits.length);
try {
DataOutputBuffer buffer = new DataOutputBuffer();
RawSplit rawSplit = new RawSplit();
for (InputSplit split : splits) {
// set partitionID to rawSplit
if (split.getClass().getName().equals(FileSplit.class.getName())
&& job.getConfiguration().get(Constants.RUNTIME_PARTITIONING_CLASS) != null
&& job.get("bsp.partitioning.runner.job") == null) {
String[] extractPartitionID = ((FileSplit) split).getPath().getName().split("[-]");
rawSplit.setPartitionID(Integer.parseInt(extractPartitionID[1]));
}
rawSplit.setClassName(split.getClass().getName());
buffer.reset();
split.write(buffer);
rawSplit.setDataLength(split.getLength());
rawSplit.setBytes(buffer.getData(), 0, buffer.getLength());
rawSplit.setLocations(split.getLocations());
rawSplit.write(out);
}
} finally {
out.close();
}
return splits.length;
}
private static final int CURRENT_SPLIT_FILE_VERSION = 0;
private static final byte[] SPLIT_FILE_HEADER = "SPL".getBytes();
private static DataOutputStream writeSplitsFileHeader(Configuration conf,
Path filename, int length) throws IOException {
// write the splits to a file for the bsp master
FileSystem fs = filename.getFileSystem(conf);
FSDataOutputStream out = FileSystem.create(fs, filename, new FsPermission(
JOB_FILE_PERMISSION));
out.write(SPLIT_FILE_HEADER);
WritableUtils.writeVInt(out, CURRENT_SPLIT_FILE_VERSION);
WritableUtils.writeVInt(out, length);
return out;
}
/**
* Read a splits file into a list of raw splits
*
* @param in the stream to read from
* @return the complete list of splits
* @throws IOException
*/
static RawSplit[] readSplitFile(DataInput in) throws IOException {
byte[] header = new byte[SPLIT_FILE_HEADER.length];
in.readFully(header);
if (!Arrays.equals(SPLIT_FILE_HEADER, header)) {
throw new IOException("Invalid header on split file");
}
int vers = WritableUtils.readVInt(in);
if (vers != CURRENT_SPLIT_FILE_VERSION) {
throw new IOException("Unsupported split version " + vers);
}
int len = WritableUtils.readVInt(in);
RawSplit[] result = new RawSplit[len];
for (int i = 0; i < len; ++i) {
RawSplit split = new RawSplit();
split.readFields(in);
if (split.getPartitionID() != Integer.MIN_VALUE)
result[split.getPartitionID()] = split;
else
result[i] = split;
}
return result;
}
/**
* Monitor a job and print status in real-time as progress is made and tasks
* fail.
*
* @param job
* @param info
* @return true, if job is successful
* @throws IOException
* @throws InterruptedException
*/
public boolean monitorAndPrintJob(BSPJob job, RunningJob info)
throws IOException, InterruptedException {
String lastReport = null;
LOG.info("Running job: " + info.getID());
int eventCounter = 0;
while (!job.isComplete()) {
Thread.sleep(3000);
long step = job.progress();
String report = "";
report = "Current supersteps number: " + step;
if (!report.equals(lastReport)) {
LOG.info(report);
lastReport = report;
}
TaskCompletionEvent[] events = info.getTaskCompletionEvents(eventCounter);
eventCounter += events.length;
for (TaskCompletionEvent event : events) {
if (event.getTaskStatus() == TaskCompletionEvent.Status.FAILED) {
// Displaying the task logs
displayTaskLogs(event.getTaskAttemptId(), event.getGroomServerInfo());
}
}
}
if (job.isSuccessful()) {
LOG.info("The total number of supersteps: " + info.getSuperstepCount());
info.getStatus()
.getCounter()
.incrCounter(JobInProgress.JobCounter.SUPERSTEPS,
info.getSuperstepCount());
info.getStatus().getCounter().log(LOG);
} else {
LOG.info("Job failed.");
}
return job.isSuccessful();
}
static String getTaskLogURL(TaskAttemptID taskId, String baseUrl) {
return (baseUrl + "/tasklog?plaintext=true&taskid=" + taskId);
}
private static void displayTaskLogs(TaskAttemptID taskId, String baseUrl)
throws MalformedURLException {
// The tasktracker for a 'failed/killed' job might not be around...
if (baseUrl != null) {
// Construct the url for the tasklogs
String taskLogUrl = getTaskLogURL(taskId, baseUrl);
// Copy tasks's stdout of the JobClient
getTaskLogs(taskId, new URL(taskLogUrl + "&filter=stdout"), System.out);
}
}
private static void getTaskLogs(TaskAttemptID taskId, URL taskLogUrl,
OutputStream out) {
try {
URLConnection connection = taskLogUrl.openConnection();
BufferedReader input = new BufferedReader(new InputStreamReader(
connection.getInputStream()));
BufferedWriter output = new BufferedWriter(new OutputStreamWriter(out));
try {
String logData = null;
while ((logData = input.readLine()) != null) {
if (logData.length() > 0) {
output.write(taskId + ": " + logData + "\n");
output.flush();
}
}
} finally {
input.close();
}
} catch (IOException ioe) {
LOG.warn("Error reading task output" + ioe.getMessage());
}
}
/**
* Grab the bspmaster system directory path where job-specific files are to be
* placed.
*
* @return the system directory where job-specific files are to be placed.
*/
public Path getSystemDir() {
if (sysDir == null) {
sysDir = new Path(jobSubmitClient.getSystemDir());
}
return sysDir;
}
public static void runJob(BSPJob job) throws FileNotFoundException,
IOException {
BSPJobClient jc = new BSPJobClient(job.getConfiguration());
if (job.getNumBspTask() == 0
|| job.getNumBspTask() > jc.getClusterStatus(false).getMaxTasks()) {
job.setNumBspTask(jc.getClusterStatus(false).getMaxTasks());
}
RunningJob running = jc.submitJob(job);
BSPJobID jobId = running.getID();
LOG.info("Running job: " + jobId.toString());
while (true) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
if (running.isComplete()) {
break;
}
running = jc.getJob(jobId);
}
if (running.isSuccessful()) {
LOG.info("Job complete: " + jobId);
LOG.info("The total number of supersteps: " + running.getSuperstepCount());
running.getStatus().getCounter().log(LOG);
} else {
LOG.info("Job failed.");
}
// TODO if error found, kill job
// running.killJob();
jc.close();
}
/**
* Get an RunningJob object to track an ongoing job. Returns null if the id
* does not correspond to any known job.
*
* @throws IOException
*/
private RunningJob getJob(BSPJobID jobId) throws IOException {
JobStatus status = jobSubmitClient.getJobStatus(jobId);
if (status != null) {
return new NetworkedJob(status);
} else {
return null;
}
}
/**
* Get status information about the BSP cluster
*
* @param detailed if true then get a detailed status including the
* groomserver names
*
* @return the status information about the BSP cluster as an object of
* {@link ClusterStatus}.
*
* @throws IOException
*/
public ClusterStatus getClusterStatus(boolean detailed) throws IOException {
return (jobSubmitClient != null) ? jobSubmitClient
.getClusterStatus(detailed) : null;
}
// for the testcase
JobSubmissionProtocol getJobSubmissionProtocol() {
return jobSubmitClient;
}
@Override
public int run(String[] args) throws Exception {
int exitCode = -1;
if (args.length < 1) {
displayUsage("");
return exitCode;
}
// process arguments
String cmd = args[0];
boolean listJobs = false;
boolean listAllJobs = false;
boolean listActiveGrooms = false;
boolean killJob = false;
boolean submitJob = false;
boolean getStatus = false;
String submitJobFile = null;
String jobid = null;
HamaConfiguration conf = new HamaConfiguration(getConf());
init(conf);
if ("-list".equals(cmd)) {
if (args.length != 1 && !(args.length == 2 && "all".equals(args[1]))) {
displayUsage(cmd);
return exitCode;
}
if (args.length == 2 && "all".equals(args[1])) {
listAllJobs = true;
} else {
listJobs = true;
}
} else if ("-list-active-grooms".equals(cmd)) {
if (args.length != 1) {
displayUsage(cmd);
return exitCode;
}
listActiveGrooms = true;
} else if ("-submit".equals(cmd)) {
if (args.length == 1) {
displayUsage(cmd);
return exitCode;
}
submitJob = true;
submitJobFile = args[1];
} else if ("-kill".equals(cmd)) {
if (args.length == 1) {
displayUsage(cmd);
return exitCode;
}
killJob = true;
jobid = args[1];
} else if ("-status".equals(cmd)) {
if (args.length != 2) {
displayUsage(cmd);
return exitCode;
}
jobid = args[1];
getStatus = true;
// TODO Later, below functions should be implemented
// with the Fault Tolerant mechanism.
} else if ("-list-attempt-ids".equals(cmd)) {
System.out.println("This function is not implemented yet.");
return exitCode;
} else if ("-kill-task".equals(cmd)) {
System.out.println("This function is not implemented yet.");
return exitCode;
} else if ("-fail-task".equals(cmd)) {
System.out.println("This function is not implemented yet.");
return exitCode;
}
BSPJobClient jc = new BSPJobClient(new HamaConfiguration());
if (listJobs) {
listJobs();
exitCode = 0;
} else if (listAllJobs) {
listAllJobs();
exitCode = 0;
} else if (listActiveGrooms) {
listActiveGrooms();
exitCode = 0;
} else if (submitJob) {
HamaConfiguration tConf = new HamaConfiguration(new Path(submitJobFile));
RunningJob job = jc.submitJob(new BSPJob(tConf));
System.out.println("Created job " + job.getID().toString());
} else if (killJob) {
RunningJob job = jc.getJob(BSPJobID.forName(jobid));
if (job == null) {
System.out.println("Could not find job " + jobid);
} else {
job.killJob();
System.out.println("Killed job " + jobid);
}
exitCode = 0;
} else if (getStatus) {
RunningJob job = jc.getJob(BSPJobID.forName(jobid));
if (job == null) {
System.out.println("Could not find job " + jobid);
} else {
JobStatus jobStatus = jobSubmitClient.getJobStatus(job.getID());
System.out.println("Job name: " + job.getJobName());
System.out.printf("States are:\n\tRunning : 1\tSucceded : 2"
+ "\tFailed : 3\tPrep : 4\n");
System.out.printf("%s\t%d\t%d\t%s\n", jobStatus.getJobID(),
jobStatus.getRunState(), jobStatus.getStartTime(),
jobStatus.getUsername());
exitCode = 0;
}
}
return 0;
}
/**
* Display usage of the command-line tool and terminate execution
*/
private static void displayUsage(String cmd) {
String prefix = "Usage: hama job ";
String taskStates = "running, completed";
if ("-submit".equals(cmd)) {
System.err.println(prefix + "[" + cmd + " <job-file>]");
} else if ("-status".equals(cmd) || "-kill".equals(cmd)) {
System.err.println(prefix + "[" + cmd + " <job-id>]");
} else if ("-list".equals(cmd)) {
System.err.println(prefix + "[" + cmd + " [all]]");
} else if ("-kill-task".equals(cmd) || "-fail-task".equals(cmd)) {
System.err.println(prefix + "[" + cmd + " <task-id>]");
} else if ("-list-active-grooms".equals(cmd)) {
System.err.println(prefix + "[" + cmd + "]");
} else if ("-list-attempt-ids".equals(cmd)) {
System.err.println(prefix + "[" + cmd + " <job-id> <task-state>]. "
+ "Valid values for <task-state> are " + taskStates);
} else {
System.err.printf(prefix + "<command> <args>\n");
System.err.printf("\t[-submit <job-file>]\n");
System.err.printf("\t[-status <job-id>]\n");
System.err.printf("\t[-kill <job-id>]\n");
System.err.printf("\t[-list [all]]\n");
System.err.printf("\t[-list-active-grooms]\n");
System.err.println("\t[-list-attempt-ids <job-id> " + "<task-state>]\n");
System.err.printf("\t[-kill-task <task-id>]\n");
System.err.printf("\t[-fail-task <task-id>]\n\n");
}
}
/**
* Dump a list of currently running jobs
*
* @throws IOException
*/
private void listJobs() throws IOException {
JobStatus[] jobs = jobsToComplete();
if (jobs == null)
jobs = new JobStatus[0];
System.out.printf("%d jobs currently running\n", jobs.length);
displayJobList(jobs);
}
/**
* Dump a list of all jobs submitted.
*
* @throws IOException
*/
private void listAllJobs() throws IOException {
JobStatus[] jobs = getAllJobs();
if (jobs == null)
jobs = new JobStatus[0];
System.out.printf("%d jobs submitted\n", jobs.length);
System.out.printf("States are:\n\tRunning : 1\tSucceded : 2"
+ "\tFailed : 3\tPrep : 4\n");
displayJobList(jobs);
}
void displayJobList(JobStatus[] jobs) {
System.out.printf("JobId\tState\tStartTime\tUserName\n");
for (JobStatus job : jobs) {
System.out.printf("%s\t%d\t%d\t%s\n", job.getJobID(), job.getRunState(),
job.getStartTime(), job.getUsername());
}
}
/**
* Display the list of active groom servers
*/
private void listActiveGrooms() throws IOException {
ClusterStatus c = jobSubmitClient.getClusterStatus(true);
Map<String, String> grooms = c.getActiveGroomNames();
for (String groomName : grooms.keySet()) {
System.out.println(groomName);
}
}
/*
* Helper methods for unix operations
*/
protected static String getUnixUserName() throws IOException {
String[] result = executeShellCommand(new String[] { Shell.USER_NAME_COMMAND });
if (result.length != 1) {
throw new IOException("Expect one token as the result of "
+ Shell.USER_NAME_COMMAND + ": " + toString(result));
}
String fixResult = fixCygwinName(result[0]);
return fixResult;
}
private static String fixCygwinName(String in) {
String string = in;
if (string.contains("\\")) {
// this is for cygwin systems
string = string.substring(string.indexOf("\\"));
}
return string;
}
static String getUnixUserGroupName(String user) throws IOException {
String[] result = executeShellCommand(new String[] { "bash", "-c",
"id -Gn " + user });
if (result.length < 1) {
throw new IOException("Expect one token as the result of "
+ "bash -c id -Gn " + user + ": " + toString(result));
}
return result[0];
}
protected static String toString(String[] strArray) {
if (strArray == null || strArray.length == 0) {
return "";
}
StringBuilder buf = new StringBuilder(strArray[0]);
for (int i = 1; i < strArray.length; i++) {
buf.append(' ');
buf.append(strArray[i]);
}
return buf.toString();
}
protected static String[] executeShellCommand(String[] command)
throws IOException {
String groups = Shell.execCommand(command);
StringTokenizer tokenizer = new StringTokenizer(groups);
int numOfTokens = tokenizer.countTokens();
String[] tokens = new String[numOfTokens];
for (int i = 0; tokenizer.hasMoreTokens(); i++) {
tokens[i] = tokenizer.nextToken();
}
return tokens;
}
public static class RawSplit implements Writable {
private String splitClass;
private BytesWritable bytes = new BytesWritable();
private String[] locations;
private int partitionID = Integer.MIN_VALUE;
long dataLength;
public void setBytes(byte[] data, int offset, int length) {
bytes.set(data, offset, length);
}
public void setPartitionID(int id) {
this.partitionID = id;
}
public int getPartitionID() {
return partitionID;
}
public void setClassName(String className) {
splitClass = className;
}
public String getClassName() {
return splitClass;
}
public BytesWritable getBytes() {
return bytes;
}
public void clearBytes() {
bytes = null;
}
public void setLocations(String[] locations) {
this.locations = locations;
}
public String[] getLocations() {
return locations;
}
@Override
public void readFields(DataInput in) throws IOException {
splitClass = Text.readString(in);
dataLength = in.readLong();
bytes.readFields(in);
int len = WritableUtils.readVInt(in);
locations = new String[len];
for (int i = 0; i < len; ++i) {
locations[i] = Text.readString(in);
}
}
@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, splitClass);
out.writeLong(dataLength);
bytes.write(out);
WritableUtils.writeVInt(out, locations.length);
for (String location : locations) {
Text.writeString(out, location);
}
}
public long getDataLength() {
return dataLength;
}
public void setDataLength(long l) {
dataLength = l;
}
}
/**
*/
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new BSPJobClient(), args);
System.exit(res);
}
}