blob: 4e94167d4254d8b7eea598c2c3f82c9e8715e02a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.chukwa.datatrigger;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.conf.Configuration;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.BufferedReader;
import java.io.OutputStreamWriter;
import java.net.URL;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.Map;
import java.util.HashMap;
import java.util.Map.Entry;
/**
* Trigger action that makes an HTTP request when executed.
* <P>
* To use this trigger, two types of configurations must be set. First, this class
* must be configured to be invoked for a given trigger event. Second, the
* the relevant settings for the HTTP request(s) to be made must be set as
* described below.
* <P>
* The general format of this classes configs is
* <code>chukwa.trigger.[eventName].http.[N].[paramName]</code> where
* <code>eventName</code> is the name of the event the request values are bound
* to (see TriggerEvent), <code>N</code> is a counter for each request configured (starting at 1)
* and <code>paramName</code> is the request parameter being set.
* <P>
* Using the post demux success trigger event as an example, the first request
* to be fired would use the following configurations
* <ul>
* <li><code>chukwa.trigger.post.demux.success.http.1.url</code> - The HTTP url to
* invoke.</li>
* <li><code>chukwa.trigger.post.demux.success.http.1.method</code> - The HTTP method
* (optional, default=GET).</li>
* <li><code>chukwa.trigger.post.demux.success.http.1.headers</code> - A comma-delimited
* set of HTTP headers (in <code>[headerName]:[headerValue]</code> form) to
* include (optional).</li>
* <li><code>chukwa.trigger.post.demux.success.http.1.body</code> - The text HTTP body
* to include (optional).</li>
* <li><code>chukwa.trigger.post.demux.success.http.1.connect.timeout</code> - The
* HTTP connection timeout setting in milliseconds (optional, default=5000ms).</li>
* <li><code>chukwa.trigger.post.demux.success.http.1.read.timeout</code> - The
* HTTP read timeout setting in milliseconds (optional, default=5000ms).</li>
* </ul>
* @see TriggerAction
* @see TriggerEvent
*/
public class HttpTriggerAction implements TriggerAction {
protected Log log = LogFactory.getLog(getClass());
/**
* Iterates over each URL found, fetched other settings and fires and HTTP
* request.
*
* @param conf is Chukwa configuration
* @param fs is HDFS File System
* @param src is list of sources to look for data
* @param triggerEvent is type of processing to happen
* @throws IOException if error in process triggers
*/
public void execute(Configuration conf, FileSystem fs,
FileStatus[] src, TriggerEvent triggerEvent) throws IOException {
if (log.isDebugEnabled()) {
for (FileStatus file : src) {
log.debug("Execute file: " + file.getPath());
}
}
int reqNumber = 1;
URL url = null;
while ((url = getUrl(conf, triggerEvent, reqNumber)) != null) {
// get settings for this request
String method = getMethod(conf, triggerEvent, reqNumber);
Map<String, String> headers = getHeaders(conf, triggerEvent, reqNumber);
String body = getBody(conf, triggerEvent, reqNumber);
int connectTimeout = getConnectTimeout(conf, triggerEvent, reqNumber);
int readTimeout = getReadTimeout(conf, triggerEvent, reqNumber);
try {
// make the request
makeHttpRequest(url, method, headers, body, connectTimeout, readTimeout);
}
catch(Exception e) {
log.error("Error making request to " + url, e);
}
reqNumber++;
}
}
private void makeHttpRequest(URL url, String method,
Map<String, String> headers, String body,
int connectTimeout, int readTimeout) throws IOException {
if (url == null) {
return;
}
// initialize the connection
HttpURLConnection conn = (HttpURLConnection)url.openConnection();
conn.setRequestMethod(method);
conn.setDoInput(true);
conn.setConnectTimeout(connectTimeout);
conn.setReadTimeout(readTimeout);
// set headers
boolean contentLengthExists = false;
if (headers != null) {
for(Entry<String, String> entry : headers.entrySet()) {
String name = entry.getKey();
String value = entry.getValue();
if (log.isDebugEnabled()) {
log.debug("Setting header " + name + ": " + value);
}
if (name.equalsIgnoreCase("content-length")) {
contentLengthExists = true;
}
conn.setRequestProperty(name, value);
}
}
// set content-length if not already set
if (!"GET".equals(method) && !contentLengthExists) {
String contentLength = body != null ? String.valueOf(body.length()) : "0";
conn.setRequestProperty("Content-Length", contentLength);
}
// send body if it exists
if (body != null) {
conn.setDoOutput(true);
OutputStreamWriter writer = new OutputStreamWriter(conn.getOutputStream(), Charset.forName("UTF-8"));
writer.write(body);
writer.flush();
writer.close();
}
else {
conn.setDoOutput(false);
}
// read reponse code/message and dump response
log.info("Making HTTP " + method + " to: " + url);
int responseCode = conn.getResponseCode();
log.info("HTTP Response code: " + responseCode);
if (responseCode != 200) {
log.info("HTTP Response message: " + conn.getResponseMessage());
}
else {
BufferedReader reader = new BufferedReader(
new InputStreamReader(conn.getInputStream(), Charset.forName("UTF-8")));
String line;
StringBuilder sb = new StringBuilder();
while ((line = reader.readLine()) != null) {
if(sb.length() > 0) {
sb.append("\n");
}
sb.append(line);
}
log.info("HTTP Response:\n" + sb);
reader.close();
}
conn.disconnect();
}
protected URL getUrl(Configuration conf,
TriggerEvent triggerEvent,
int reqNumber) throws MalformedURLException {
String urlString = conf.get(getConfigKey(triggerEvent, reqNumber, "url"), null);
if (urlString == null) {
return null;
}
return new URL(urlString);
}
protected String getMethod(Configuration conf,
TriggerEvent triggerEvent,
int reqNumber) {
return conf.get(getConfigKey(triggerEvent, reqNumber, "method"), "GET");
}
protected Map<String, String> getHeaders(Configuration conf,
TriggerEvent triggerEvent,
int reqNumber) {
Map<String, String> headerMap = new HashMap<String,String>();
String headers = conf.get(getConfigKey(triggerEvent, reqNumber, "headers"), null);
if (headers != null) {
String[] headersSplit = headers.split(",");
for (String header : headersSplit) {
String[] nvp = header.split(":", 2);
if (nvp.length < 2) {
log.error("Invalid HTTP header found: " + Arrays.toString(nvp));
continue;
}
headerMap.put(nvp[0].trim(), nvp[1].trim());
}
}
return headerMap;
}
protected String getBody(Configuration conf,
TriggerEvent triggerEvent,
int reqNumber) {
return conf.get(getConfigKey(triggerEvent, reqNumber, "body"), "GET");
}
protected int getConnectTimeout(Configuration conf,
TriggerEvent triggerEvent,
int reqNumber) {
String timeout = conf.get(getConfigKey(triggerEvent, reqNumber, "connect.timeout"), null);
return timeout != null ? Integer.parseInt(timeout) : 5000;
}
protected int getReadTimeout(Configuration conf,
TriggerEvent triggerEvent,
int reqNumber) {
String timeout = conf.get(getConfigKey(triggerEvent, reqNumber, "read.timeout"), null);
return timeout != null ? Integer.parseInt(timeout) : 5000;
}
private String getConfigKey(TriggerEvent triggerEvent, int reqNumber, String name) {
return triggerEvent.getConfigKeyBase() + ".http." + reqNumber + "." + name;
}
}