blob: d28e72290fd131bc6c415bbf7daa6c4a98f0d948 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import org.apache.commons.httpclient.HttpClient;
import org.apache.commons.httpclient.HttpMethod;
import org.apache.commons.httpclient.URI;
import org.apache.commons.httpclient.methods.GetMethod;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class JobEndNotifier {
private static final Log LOG =
LogFactory.getLog(JobEndNotifier.class.getName());
private static Thread thread;
private static volatile boolean running;
private static BlockingQueue<JobEndStatusInfo> queue =
new DelayQueue<JobEndStatusInfo>();
public static void startNotifier() {
running = true;
thread = new Thread(
new Runnable() {
public void run() {
try {
while (running) {
sendNotification(queue.take());
}
}
catch (InterruptedException irex) {
if (running) {
LOG.error("Thread has ended unexpectedly", irex);
}
}
}
private void sendNotification(JobEndStatusInfo notification) {
try {
int code = httpNotification(notification.getUri());
if (code != 200) {
throw new IOException("Invalid response status code: " + code);
}
}
catch (IOException ioex) {
LOG.error("Notification failure [" + notification + "]", ioex);
if (notification.configureForRetry()) {
try {
queue.put(notification);
}
catch (InterruptedException iex) {
LOG.error("Notification queuing error [" + notification + "]",
iex);
}
}
}
catch (Exception ex) {
LOG.error("Notification failure [" + notification + "]", ex);
}
}
}
);
thread.start();
}
public static void stopNotifier() {
running = false;
thread.interrupt();
}
private static JobEndStatusInfo createNotification(JobConf conf,
JobStatus status) {
JobEndStatusInfo notification = null;
String uri = conf.getJobEndNotificationURI();
if (uri != null) {
// +1 to make logic for first notification identical to a retry
int retryAttempts = conf.getInt(JobContext.END_NOTIFICATION_RETRIES, 0) + 1;
long retryInterval = conf.getInt(JobContext.END_NOTIFICATION_RETRIE_INTERVAL, 30000);
if (uri.contains("$jobId")) {
uri = uri.replace("$jobId", status.getJobID().toString());
}
if (uri.contains("$jobStatus")) {
String statusStr =
(status.getRunState() == JobStatus.SUCCEEDED) ? "SUCCEEDED" :
(status.getRunState() == JobStatus.FAILED) ? "FAILED" : "KILLED";
uri = uri.replace("$jobStatus", statusStr);
}
notification = new JobEndStatusInfo(uri, retryAttempts, retryInterval);
}
return notification;
}
public static void registerNotification(JobConf jobConf, JobStatus status) {
JobEndStatusInfo notification = createNotification(jobConf, status);
if (notification != null) {
try {
queue.put(notification);
}
catch (InterruptedException iex) {
LOG.error("Notification queuing failure [" + notification + "]", iex);
}
}
}
private static int httpNotification(String uri) throws IOException {
URI url = new URI(uri, false);
HttpClient m_client = new HttpClient();
HttpMethod method = new GetMethod(url.getEscapedURI());
method.setRequestHeader("Accept", "*/*");
return m_client.executeMethod(method);
}
// for use by the LocalJobRunner, without using a thread&queue,
// simple synchronous way
public static void localRunnerNotification(JobConf conf, JobStatus status) {
JobEndStatusInfo notification = createNotification(conf, status);
if (notification != null) {
while (notification.configureForRetry()) {
try {
int code = httpNotification(notification.getUri());
if (code != 200) {
throw new IOException("Invalid response status code: " + code);
}
else {
break;
}
}
catch (IOException ioex) {
LOG.error("Notification error [" + notification.getUri() + "]", ioex);
}
catch (Exception ex) {
LOG.error("Notification error [" + notification.getUri() + "]", ex);
}
try {
Thread.sleep(notification.getRetryInterval());
}
catch (InterruptedException iex) {
LOG.error("Notification retry error [" + notification + "]", iex);
}
}
}
}
private static class JobEndStatusInfo implements Delayed {
private String uri;
private int retryAttempts;
private long retryInterval;
private long delayTime;
JobEndStatusInfo(String uri, int retryAttempts, long retryInterval) {
this.uri = uri;
this.retryAttempts = retryAttempts;
this.retryInterval = retryInterval;
this.delayTime = System.currentTimeMillis();
}
public String getUri() {
return uri;
}
public int getRetryAttempts() {
return retryAttempts;
}
public long getRetryInterval() {
return retryInterval;
}
public long getDelayTime() {
return delayTime;
}
public boolean configureForRetry() {
boolean retry = false;
if (getRetryAttempts() > 0) {
retry = true;
delayTime = System.currentTimeMillis() + retryInterval;
}
retryAttempts--;
return retry;
}
public long getDelay(TimeUnit unit) {
long n = this.delayTime - System.currentTimeMillis();
return unit.convert(n, TimeUnit.MILLISECONDS);
}
public int compareTo(Delayed d) {
return (int)(delayTime - ((JobEndStatusInfo)d).delayTime);
}
@Override
public boolean equals(Object o) {
if (!(o instanceof JobEndStatusInfo)) {
return false;
}
if (delayTime == ((JobEndStatusInfo)o).delayTime) {
return true;
}
return false;
}
@Override
public int hashCode() {
return 37 * 17 + (int) (delayTime^(delayTime>>>32));
}
@Override
public String toString() {
return "URL: " + uri + " remaining retries: " + retryAttempts +
" interval: " + retryInterval;
}
}
}