blob: ca7f625c279f9b3ff14e7dffdf9b337fc6f466d5 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.app;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.Proxy;
import java.net.URL;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
import org.mortbay.log.Log;
/**
* <p>This class handles job end notification. Submitters of jobs can choose to
* be notified of the end of a job by supplying a URL to which a connection
* will be established.
* <ul><li> The URL connection is fire and forget by default.</li> <li>
* User can specify number of retry attempts and a time interval at which to
* attempt retries</li><li>
* Cluster administrators can set final parameters to set maximum number of
* tries (0 would disable job end notification) and max time interval and a
* proxy if needed</li><li>
* The URL may contain sentinels which will be replaced by jobId and jobStatus
* (eg. SUCCEEDED/KILLED/FAILED) </li> </ul>
* </p>
*/
public class JobEndNotifier implements Configurable {
private static final String JOB_ID = "$jobId";
private static final String JOB_STATUS = "$jobStatus";
private Configuration conf;
protected String userUrl;
protected String proxyConf;
protected int numTries; //Number of tries to attempt notification
protected int waitInterval; //Time to wait between retrying notification
protected URL urlToNotify; //URL to notify read from the config
protected Proxy proxyToUse = Proxy.NO_PROXY; //Proxy to use for notification
/**
* Parse the URL that needs to be notified of the end of the job, along
* with the number of retries in case of failure, the amount of time to
* wait between retries and proxy settings
* @param conf the configuration
*/
public void setConf(Configuration conf) {
this.conf = conf;
numTries = Math.min(
conf.getInt(MRJobConfig.MR_JOB_END_RETRY_ATTEMPTS, 0) + 1
, conf.getInt(MRJobConfig.MR_JOB_END_NOTIFICATION_MAX_ATTEMPTS, 1)
);
waitInterval = Math.min(
conf.getInt(MRJobConfig.MR_JOB_END_RETRY_INTERVAL, 5)
, conf.getInt(MRJobConfig.MR_JOB_END_NOTIFICATION_MAX_RETRY_INTERVAL, 5)
);
waitInterval = (waitInterval < 0) ? 5 : waitInterval;
userUrl = conf.get(MRJobConfig.MR_JOB_END_NOTIFICATION_URL);
proxyConf = conf.get(MRJobConfig.MR_JOB_END_NOTIFICATION_PROXY);
//Configure the proxy to use if its set. It should be set like
//proxyType@proxyHostname:port
if(proxyConf != null && !proxyConf.equals("") &&
proxyConf.lastIndexOf(":") != -1) {
int typeIndex = proxyConf.indexOf("@");
Proxy.Type proxyType = Proxy.Type.HTTP;
if(typeIndex != -1 &&
proxyConf.substring(0, typeIndex).compareToIgnoreCase("socks") == 0) {
proxyType = Proxy.Type.SOCKS;
}
String hostname = proxyConf.substring(typeIndex + 1,
proxyConf.lastIndexOf(":"));
String portConf = proxyConf.substring(proxyConf.lastIndexOf(":") + 1);
try {
int port = Integer.parseInt(portConf);
proxyToUse = new Proxy(proxyType,
new InetSocketAddress(hostname, port));
Log.info("Job end notification using proxy type \"" + proxyType +
"\" hostname \"" + hostname + "\" and port \"" + port + "\"");
} catch(NumberFormatException nfe) {
Log.warn("Job end notification couldn't parse configured proxy's port "
+ portConf + ". Not going to use a proxy");
}
}
}
public Configuration getConf() {
return conf;
}
/**
* Notify the URL just once. Use best effort. Timeout hard coded to 5
* seconds.
*/
protected boolean notifyURLOnce() {
boolean success = false;
try {
Log.info("Job end notification trying " + urlToNotify);
HttpURLConnection conn =
(HttpURLConnection) urlToNotify.openConnection(proxyToUse);
conn.setConnectTimeout(5*1000);
conn.setReadTimeout(5*1000);
conn.setAllowUserInteraction(false);
if(conn.getResponseCode() != HttpURLConnection.HTTP_OK) {
Log.warn("Job end notification to " + urlToNotify +" failed with code: "
+ conn.getResponseCode() + " and message \"" + conn.getResponseMessage()
+"\"");
}
else {
success = true;
Log.info("Job end notification to " + urlToNotify + " succeeded");
}
} catch(IOException ioe) {
Log.warn("Job end notification to " + urlToNotify + " failed", ioe);
}
return success;
}
/**
* Notify a server of the completion of a submitted job. The user must have
* configured MRJobConfig.MR_JOB_END_NOTIFICATION_URL
* @param jobReport JobReport used to read JobId and JobStatus
* @throws InterruptedException
*/
public void notify(JobReport jobReport)
throws InterruptedException {
// Do we need job-end notification?
if (userUrl == null) {
Log.info("Job end notification URL not set, skipping.");
return;
}
//Do string replacements for jobId and jobStatus
if (userUrl.contains(JOB_ID)) {
userUrl = userUrl.replace(JOB_ID, jobReport.getJobId().toString());
}
if (userUrl.contains(JOB_STATUS)) {
userUrl = userUrl.replace(JOB_STATUS, jobReport.getJobState().toString());
}
// Create the URL, ensure sanity
try {
urlToNotify = new URL(userUrl);
} catch (MalformedURLException mue) {
Log.warn("Job end notification couldn't parse " + userUrl, mue);
return;
}
// Send notification
boolean success = false;
while (numTries-- > 0 && !success) {
Log.info("Job end notification attempts left " + numTries);
success = notifyURLOnce();
if (!success) {
Thread.sleep(waitInterval);
}
}
if (!success) {
Log.warn("Job end notification failed to notify : " + urlToNotify);
} else {
Log.info("Job end notification succeeded for " + jobReport.getJobId());
}
}
}