blob: 341c49a9c99f891f5a29cb2c1d5ee8653d712e1a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred.gridmix;
import java.io.IOException;
import java.nio.channels.ClosedByInterruptException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapred.gridmix.Statistics.JobStats;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobStatus;
/**
* Component accepting submitted, running {@link Statistics.JobStats} and
* responsible for monitoring jobs for success and failure. Once a job is
* submitted, it is polled for status until complete. If a job is complete,
* then the monitor thread returns immediately to the queue. If not, the monitor
* will sleep for some duration.
*
* {@link JobMonitor} can be configured to use multiple threads for polling
* the job statuses. Use {@link Gridmix#GRIDMIX_JOBMONITOR_THREADS} to specify
* the total number of monitoring threads.
*
* The duration for which a monitoring thread sleeps if the first job in the
* queue is running can also be configured. Use
* {@link Gridmix#GRIDMIX_JOBMONITOR_SLEEPTIME_MILLIS} to specify a custom
* value.
*/
class JobMonitor implements Gridmix.Component<JobStats> {
public static final Log LOG = LogFactory.getLog(JobMonitor.class);
private final Queue<JobStats> mJobs;
private ExecutorService executor;
private int numPollingThreads;
private final BlockingQueue<JobStats> runningJobs;
private final long pollDelayMillis;
private Statistics statistics;
private boolean graceful = false;
private boolean shutdown = false;
/**
* Create a JobMonitor that sleeps for the specified duration after
* polling a still-running job.
* @param pollDelay Delay after polling a running job
* @param unit Time unit for pollDelaySec (rounded to milliseconds)
* @param statistics StatCollector , listener to job completion.
*/
public JobMonitor(int pollDelay, TimeUnit unit, Statistics statistics,
int numPollingThreads) {
executor = Executors.newCachedThreadPool();
this.numPollingThreads = numPollingThreads;
runningJobs = new LinkedBlockingQueue<JobStats>();
mJobs = new LinkedList<JobStats>();
this.pollDelayMillis = TimeUnit.MILLISECONDS.convert(pollDelay, unit);
this.statistics = statistics;
}
/**
* Add a running job's status to the polling queue.
*/
public void add(JobStats job) throws InterruptedException {
runningJobs.put(job);
}
/**
* Add a submission failed job's status, such that it can be communicated
* back to serial.
* TODO: Cleaner solution for this problem
* @param job
*/
public void submissionFailed(JobStats job) {
String jobID = job.getJob().getConfiguration().get(Gridmix.ORIGINAL_JOB_ID);
LOG.info("Job submission failed notification for job " + jobID);
synchronized (statistics) {
this.statistics.add(job);
}
}
/**
* Temporary hook for recording job success.
*/
protected void onSuccess(Job job) {
LOG.info(job.getJobName() + " (" + job.getJobID() + ")" + " success");
}
/**
* Temporary hook for recording job failure.
*/
protected void onFailure(Job job) {
LOG.info(job.getJobName() + " (" + job.getJobID() + ")" + " failure");
}
/**
* If shutdown before all jobs have completed, any still-running jobs
* may be extracted from the component.
* @throws IllegalStateException If monitoring thread is still running.
* @return Any jobs submitted and not known to have completed.
*/
List<JobStats> getRemainingJobs() {
synchronized (mJobs) {
return new ArrayList<JobStats>(mJobs);
}
}
/**
* Monitoring thread pulling running jobs from the component and into
* a queue to be polled for status.
*/
private class MonitorThread extends Thread {
public MonitorThread(int i) {
super("GridmixJobMonitor-" + i);
}
@Override
public void run() {
boolean graceful;
boolean shutdown;
while (true) {
try {
synchronized (mJobs) {
graceful = JobMonitor.this.graceful;
shutdown = JobMonitor.this.shutdown;
runningJobs.drainTo(mJobs);
}
// shutdown conditions; either shutdown requested and all jobs
// have completed or abort requested and there are recently
// submitted jobs not in the monitored set
if (shutdown) {
if (!graceful) {
while (!runningJobs.isEmpty()) {
synchronized (mJobs) {
runningJobs.drainTo(mJobs);
}
}
break;
}
synchronized (mJobs) {
if (graceful && mJobs.isEmpty()) {
break;
}
}
}
JobStats jobStats = null;
synchronized (mJobs) {
jobStats = mJobs.poll();
}
while (jobStats != null) {
Job job = jobStats.getJob();
try {
// get the job status
long start = System.currentTimeMillis();
JobStatus status = job.getStatus(); // cache the job status
long end = System.currentTimeMillis();
if (LOG.isDebugEnabled()) {
LOG.debug("Status polling for job " + job.getJobID() + " took "
+ (end-start) + "ms.");
}
// update the job progress
jobStats.updateJobStatus(status);
// if the job is complete, let others know
if (status.isJobComplete()) {
if (status.getState() == JobStatus.State.SUCCEEDED) {
onSuccess(job);
} else {
onFailure(job);
}
synchronized (statistics) {
statistics.add(jobStats);
}
} else {
// add the running job back and break
synchronized (mJobs) {
if (!mJobs.offer(jobStats)) {
LOG.error("Lost job " + (null == job.getJobName()
? "<unknown>" : job.getJobName())); // should never
// happen
}
}
break;
}
} catch (IOException e) {
if (e.getCause() instanceof ClosedByInterruptException) {
// Job doesn't throw InterruptedException, but RPC socket layer
// is blocking and may throw a wrapped Exception if this thread
// is interrupted. Since the lower level cleared the flag,
// reset it here
Thread.currentThread().interrupt();
} else {
LOG.warn("Lost job " + (null == job.getJobName()
? "<unknown>" : job.getJobName()), e);
synchronized (statistics) {
statistics.add(jobStats);
}
}
}
// get the next job
synchronized (mJobs) {
jobStats = mJobs.poll();
}
}
// sleep for a while before checking again
try {
TimeUnit.MILLISECONDS.sleep(pollDelayMillis);
} catch (InterruptedException e) {
shutdown = true;
continue;
}
} catch (Throwable e) {
LOG.warn("Unexpected exception: ", e);
}
}
}
}
/**
* Start the internal, monitoring thread.
*/
public void start() {
for (int i = 0; i < numPollingThreads; ++i) {
executor.execute(new MonitorThread(i));
}
}
/**
* Wait for the monitor to halt, assuming shutdown or abort have been
* called. Note that, since submission may be sporatic, this will hang
* if no form of shutdown has been requested.
*/
public void join(long millis) throws InterruptedException {
executor.awaitTermination(millis, TimeUnit.MILLISECONDS);
}
/**
* Drain all submitted jobs to a queue and stop the monitoring thread.
* Upstream submitter is assumed dead.
*/
public void abort() {
synchronized (mJobs) {
graceful = false;
shutdown = true;
}
executor.shutdown();
}
/**
* When all monitored jobs have completed, stop the monitoring thread.
* Upstream submitter is assumed dead.
*/
public void shutdown() {
synchronized (mJobs) {
graceful = true;
shutdown = true;
}
executor.shutdown();
}
}