blob: 3b917af6f6d7a17173a99977a73b3707d44f36ef [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred.gridmix;
import java.io.IOException;
import java.nio.channels.ClosedByInterruptException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapreduce.Job;
/**
* Component accepting submitted, running jobs and responsible for
* monitoring jobs for success and failure. Once a job is submitted, it is
* polled for status until complete. If a job is complete, then the monitor
* thread returns immediately to the queue. If not, the monitor will sleep
* for some duration.
*/
class JobMonitor implements Gridmix.Component<Job> {
public static final Log LOG = LogFactory.getLog(JobMonitor.class);
private final Queue<Job> mJobs;
private final MonitorThread mThread;
private final BlockingQueue<Job> runningJobs;
private final long pollDelayMillis;
private boolean graceful = false;
private boolean shutdown = false;
/**
* Create a JobMonitor with a default polling interval of 5s.
*/
public JobMonitor() {
this(5, TimeUnit.SECONDS);
}
/**
* Create a JobMonitor that sleeps for the specified duration after
* polling a still-running job.
* @param pollDelay Delay after polling a running job
* @param unit Time unit for pollDelaySec (rounded to milliseconds)
*/
public JobMonitor(int pollDelay, TimeUnit unit) {
mThread = new MonitorThread();
runningJobs = new LinkedBlockingQueue<Job>();
mJobs = new LinkedList<Job>();
this.pollDelayMillis = TimeUnit.MILLISECONDS.convert(pollDelay, unit);
}
/**
* Add a job to the polling queue.
*/
public void add(Job job) throws InterruptedException {
runningJobs.put(job);
}
/**
* Temporary hook for recording job success.
*/
protected void onSuccess(Job job) {
LOG.info(job.getJobName() + " (" + job.getID() + ")" + " success");
}
/**
* Temporary hook for recording job failure.
*/
protected void onFailure(Job job) {
LOG.info(job.getJobName() + " (" + job.getID() + ")" + " failure");
}
/**
* If shutdown before all jobs have completed, any still-running jobs
* may be extracted from the component.
* @throws IllegalStateException If monitoring thread is still running.
* @return Any jobs submitted and not known to have completed.
*/
List<Job> getRemainingJobs() {
if (mThread.isAlive()) {
LOG.warn("Internal error: Polling running monitor for jobs");
}
synchronized (mJobs) {
return new ArrayList<Job>(mJobs);
}
}
/**
* Monitoring thread pulling running jobs from the component and into
* a queue to be polled for status.
*/
private class MonitorThread extends Thread {
public MonitorThread() {
super("GridmixJobMonitor");
}
/**
* Check a job for success or failure.
*/
public void process(Job job) throws IOException, InterruptedException {
if (job.isSuccessful()) {
onSuccess(job);
} else {
onFailure(job);
}
}
@Override
public void run() {
boolean graceful;
boolean shutdown;
while (true) {
try {
synchronized (mJobs) {
graceful = JobMonitor.this.graceful;
shutdown = JobMonitor.this.shutdown;
runningJobs.drainTo(mJobs);
}
// shutdown conditions; either shutdown requested and all jobs
// have completed or abort requested and there are recently
// submitted jobs not in the monitored set
if (shutdown) {
if (!graceful) {
while (!runningJobs.isEmpty()) {
synchronized (mJobs) {
runningJobs.drainTo(mJobs);
}
}
break;
} else if (mJobs.isEmpty()) {
break;
}
}
while (!mJobs.isEmpty()) {
Job job;
synchronized (mJobs) {
job = mJobs.poll();
}
try {
if (job.isComplete()) {
process(job);
continue;
}
} catch (IOException e) {
if (e.getCause() instanceof ClosedByInterruptException) {
// Job doesn't throw InterruptedException, but RPC socket layer
// is blocking and may throw a wrapped Exception if this thread
// is interrupted. Since the lower level cleared the flag,
// reset it here
Thread.currentThread().interrupt();
} else {
LOG.warn("Lost job " + (null == job.getJobName()
? "<unknown>" : job.getJobName()), e);
continue;
}
}
synchronized (mJobs) {
if (!mJobs.offer(job)) {
LOG.error("Lost job " + (null == job.getJobName()
? "<unknown>" : job.getJobName())); // should never
// happen
}
}
break;
}
try {
TimeUnit.MILLISECONDS.sleep(pollDelayMillis);
} catch (InterruptedException e) {
shutdown = true;
continue;
}
} catch (Throwable e) {
LOG.warn("Unexpected exception: ", e);
}
}
}
}
/**
* Start the internal, monitoring thread.
*/
public void start() {
mThread.start();
}
/**
* Wait for the monitor to halt, assuming shutdown or abort have been
* called. Note that, since submission may be sporatic, this will hang
* if no form of shutdown has been requested.
*/
public void join(long millis) throws InterruptedException {
mThread.join(millis);
}
/**
* Drain all submitted jobs to a queue and stop the monitoring thread.
* Upstream submitter is assumed dead.
*/
public void abort() {
synchronized (mJobs) {
graceful = false;
shutdown = true;
}
mThread.interrupt();
}
/**
* When all monitored jobs have completed, stop the monitoring thread.
* Upstream submitter is assumed dead.
*/
public void shutdown() {
synchronized (mJobs) {
graceful = true;
shutdown = true;
}
mThread.interrupt();
}
}