blob: 0ac21501d2e4fe38800b941d67f0fe45cfc1503d [file] [log] [blame]
package brooklyn.event.feed.http;
import static com.google.common.base.Preconditions.checkNotNull;
import java.io.IOException;
import java.net.URI;
import java.net.URL;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import com.google.common.base.Optional;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.Credentials;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.conn.scheme.Scheme;
import org.apache.http.conn.ssl.SSLSocketFactory;
import org.apache.http.conn.ssl.TrustStrategy;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.impl.client.AbstractHttpClient;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import brooklyn.entity.basic.Entities;
import brooklyn.entity.basic.EntityLocal;
import brooklyn.event.feed.AbstractFeed;
import brooklyn.event.feed.AttributePollHandler;
import brooklyn.event.feed.DelegatingPollHandler;
import brooklyn.event.feed.Poller;
import brooklyn.util.Suppliers2;
import brooklyn.util.exceptions.Exceptions;
import com.google.common.base.Objects;
import com.google.common.base.Supplier;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.SetMultimap;
import com.google.common.collect.Sets;
/**
* Provides a feed of attribute values, by polling over http.
*
* Example usage (e.g. in an entity that extends SoftwareProcessImpl):
* <pre>
* {@code
* private HttpFeed feed;
*
* //@Override
* protected void connectSensors() {
* super.connectSensors();
*
* feed = HttpFeed.builder()
* .entity(this)
* .period(200)
* .baseUri(String.format("http://%s:%s/management/subsystem/web/connector/http/read-resource", host, port))
* .baseUriVars(ImmutableMap.of("include-runtime","true"))
* .poll(new HttpPollConfig<Boolean>(SERVICE_UP)
* .onSuccess(HttpValueFunctions.responseCodeEquals(200))
* .onError(Functions.constant(false)))
* .poll(new HttpPollConfig<Integer>(REQUEST_COUNT)
* .onSuccess(HttpValueFunctions.jsonContents("requestCount", Integer.class)))
* .build();
* }
*
* {@literal @}Override
* protected void disconnectSensors() {
* super.disconnectSensors();
* if (feed != null) feed.stop();
* }
* }
* </pre>
* <p>
*
* This also supports giving a Supplier for the URL
* (e.g. {@link Entities#attributeSupplier(brooklyn.entity.Entity, brooklyn.event.AttributeSensor)})
* from a sensor. Note however that if a Supplier-based sensor is *https*,
* https-specific initialization may not occur if the URL is not available at start time,
* and it may report errors if that sensor is not available.
* Some guidance for controlling enablement of a feed based on availability of a sensor
* can be seen in HttpLatencyDetector (in brooklyn-policy).
*
* @author aled
*/
public class HttpFeed extends AbstractFeed {
public static final Logger log = LoggerFactory.getLogger(HttpFeed.class);
public static Builder builder() {
return new Builder();
}
public static class Builder {
private EntityLocal entity;
private Supplier<URI> baseUriProvider;
private long period = 500;
private TimeUnit periodUnits = TimeUnit.MILLISECONDS;
private List<HttpPollConfig<?>> polls = Lists.newArrayList();
private URI baseUri;
private Map<String, String> baseUriVars = Maps.newLinkedHashMap();
private Map<String, String> headers = Maps.newLinkedHashMap();
private boolean suspended = false;
private Credentials credentials;
private volatile boolean built;
public Builder entity(EntityLocal val) {
this.entity = val;
return this;
}
public Builder baseUri(Supplier<URI> val) {
if (baseUri!=null && val!=null)
throw new IllegalStateException("Builder cannot take both a URI and a URI Provider");
this.baseUriProvider = val;
return this;
}
public Builder baseUri(URI val) {
if (baseUriProvider!=null && val!=null)
throw new IllegalStateException("Builder cannot take both a URI and a URI Provider");
this.baseUri = val;
return this;
}
public Builder baseUrl(URL val) {
return baseUri(URI.create(val.toString()));
}
public Builder baseUri(String val) {
return baseUri(URI.create(val));
}
public Builder baseUriVars(Map<String,String> vals) {
baseUriVars.putAll(vals);
return this;
}
public Builder baseUriVar(String key, String val) {
baseUriVars.put(key, val);
return this;
}
public Builder headers(Map<String,String> vals) {
headers.putAll(vals);
return this;
}
public Builder header(String key, String val) {
headers.put(key, val);
return this;
}
public Builder period(long millis) {
return period(millis, TimeUnit.MILLISECONDS);
}
public Builder period(long val, TimeUnit units) {
this.period = val;
this.periodUnits = units;
return this;
}
public Builder poll(HttpPollConfig<?> config) {
polls.add(config);
return this;
}
public Builder suspended() {
return suspended(true);
}
public Builder suspended(boolean startsSuspended) {
this.suspended = startsSuspended;
return this;
}
public Builder credentials(String username, String password) {
this.credentials = new UsernamePasswordCredentials(username, password);
return this;
}
public HttpFeed build() {
built = true;
HttpFeed result = new HttpFeed(this);
if (suspended) result.suspend();
result.start();
return result;
}
@Override
protected void finalize() {
if (!built) log.warn("HttpFeed.Builder created, but build() never called");
}
}
private static class HttpPollIdentifier {
final String method;
final Supplier<URI> uriProvider;
final Map<String,String> headers;
final byte[] body;
final Optional<Credentials> credentials;
private HttpPollIdentifier(String method, URI uri, Map<String,String> headers, byte[] body,
Optional<Credentials> credentials) {
this(method, Suppliers2.ofInstance(uri), headers, body, credentials);
}
private HttpPollIdentifier(String method, Supplier<URI> uriProvider, Map<String, String> headers, byte[] body,
Optional<Credentials> credentials) {
this.method = checkNotNull(method, "method").toLowerCase();
this.uriProvider = checkNotNull(uriProvider, "uriProvider");
this.headers = checkNotNull(headers, "headers");
this.body = body;
this.credentials = checkNotNull(credentials, "credentials");
if (!(this.method.equals("get") || this.method.equals("post"))) {
throw new IllegalArgumentException("Unsupported HTTP method (only supports GET and POST): "+method);
}
if (body != null && method.equalsIgnoreCase("get")) {
throw new IllegalArgumentException("Must not set body for http GET method");
}
}
@Override
public int hashCode() {
return Objects.hashCode(method, uriProvider, headers, body, credentials);
}
@Override
public boolean equals(Object other) {
if (!(other instanceof HttpPollIdentifier)) {
return false;
}
HttpPollIdentifier o = (HttpPollIdentifier) other;
return Objects.equal(method, o.method) &&
Objects.equal(uriProvider, o.uriProvider) &&
Objects.equal(headers, o.headers) &&
Objects.equal(body, o.body) &&
Objects.equal(credentials, o.credentials);
}
}
// Treat as immutable once built
private final SetMultimap<HttpPollIdentifier, HttpPollConfig<?>> polls = HashMultimap.<HttpPollIdentifier,HttpPollConfig<?>>create();
protected HttpFeed(Builder builder) {
super(builder.entity);
Map<String,String> baseHeaders = ImmutableMap.copyOf(checkNotNull(builder.headers, "headers"));
for (HttpPollConfig<?> config : builder.polls) {
@SuppressWarnings({ "unchecked", "rawtypes" })
HttpPollConfig<?> configCopy = new HttpPollConfig(config);
if (configCopy.getPeriod() < 0) configCopy.period(builder.period, builder.periodUnits);
String method = config.getMethod();
Map<String,String> headers = config.buildHeaders(baseHeaders);
byte[] body = config.getBody();
Optional<Credentials> credentials = Optional.fromNullable(builder.credentials);
Supplier<URI> baseUriProvider = builder.baseUriProvider;
if (builder.baseUri!=null) {
if (baseUriProvider!=null)
throw new IllegalStateException("Not permitted to supply baseUri and baseUriProvider");
Map<String,String> baseUriVars = ImmutableMap.copyOf(checkNotNull(builder.baseUriVars, "baseUriVars"));
URI uri = config.buildUri(builder.baseUri, baseUriVars);
baseUriProvider = Suppliers2.ofInstance(uri);
} else if (!builder.baseUriVars.isEmpty()) {
throw new IllegalStateException("Not permitted to supply URI vars when using a URI provider");
}
checkNotNull(baseUriProvider);
polls.put(new HttpPollIdentifier(method, baseUriProvider, headers, body, credentials), configCopy);
}
}
@Override
protected void preStart() {
for (final HttpPollIdentifier pollInfo : polls.keySet()) {
// Though HttpClients are thread safe and can take advantage of connection pooling
// and authentication caching, the httpcomponents documentation says:
// "While HttpClient instances are thread safe and can be shared between multiple
// threads of execution, it is highly recommended that each thread maintains its
// own dedicated instance of HttpContext.
// http://hc.apache.org/httpcomponents-client-ga/tutorial/html/connmgmt.html
final HttpClient httpClient = createHttpClient(pollInfo);
Set<HttpPollConfig<?>> configs = polls.get(pollInfo);
long minPeriod = Integer.MAX_VALUE;
Set<AttributePollHandler<? super HttpPollValue>> handlers = Sets.newLinkedHashSet();
for (HttpPollConfig<?> config : configs) {
handlers.add(new AttributePollHandler<HttpPollValue>(config, entity, this));
if (config.getPeriod() > 0) minPeriod = Math.min(minPeriod, config.getPeriod());
}
Callable<HttpPollValue> pollJob;
if (pollInfo.method.equals("get")) {
pollJob = new Callable<HttpPollValue>() {
public HttpPollValue call() throws Exception {
if (log.isTraceEnabled()) log.trace("http polling for {} sensors at {}", entity, pollInfo);
return httpGet(httpClient, pollInfo.uriProvider.get(), pollInfo.headers);
}};
} else if (pollInfo.method.equals("post")) {
pollJob = new Callable<HttpPollValue>() {
public HttpPollValue call() throws Exception {
if (log.isTraceEnabled()) log.trace("http polling for {} sensors at {}", entity, pollInfo);
return httpPost(httpClient, pollInfo.uriProvider.get(), pollInfo.headers, pollInfo.body);
}};
} else {
throw new IllegalStateException("Unexpected http method: "+pollInfo.method);
}
getPoller().scheduleAtFixedRate(pollJob, new DelegatingPollHandler<HttpPollValue>(handlers), minPeriod);
}
}
private HttpClient createHttpClient(HttpPollIdentifier pollIdentifier) {
final DefaultHttpClient httpClient = new DefaultHttpClient();
URI uri = pollIdentifier.uriProvider.get();
// TODO if supplier returns null, we may wish to defer initialization until url available?
if (uri != null && "https".equalsIgnoreCase(uri.getScheme())) {
try {
int port = (uri.getPort() >= 0) ? uri.getPort() : 443;
SSLSocketFactory socketFactory = new SSLSocketFactory(
new TrustAllStrategy(), SSLSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER);
Scheme sch = new Scheme("https", port, socketFactory);
httpClient.getConnectionManager().getSchemeRegistry().register(sch);
} catch (Exception e) {
log.warn("Error in HTTP Feed of {}, setting trust for uri {}", entity, uri);
throw Exceptions.propagate(e);
}
}
// Set credentials
if (uri != null && pollIdentifier.credentials.isPresent()) {
String hostname = uri.getHost();
int port = uri.getPort();
httpClient.getCredentialsProvider().setCredentials(
new AuthScope(hostname, port), pollIdentifier.credentials.get());
}
return httpClient;
}
@SuppressWarnings("unchecked")
private Poller<HttpPollValue> getPoller() {
return (Poller<HttpPollValue>) poller;
}
private HttpPollValue httpGet(HttpClient httpClient, URI uri, Map<String,String> headers) throws ClientProtocolException, IOException {
HttpGet httpGet = new HttpGet(uri);
for (Map.Entry<String,String> entry : headers.entrySet()) {
httpGet.addHeader(entry.getKey(), entry.getValue());
}
long startTime = System.currentTimeMillis();
HttpResponse httpResponse = httpClient.execute(httpGet);
try {
return new HttpPollValue(httpResponse, startTime);
} finally {
EntityUtils.consume(httpResponse.getEntity());
}
}
private HttpPollValue httpPost(HttpClient httpClient, URI uri, Map<String,String> headers, byte[] body) throws ClientProtocolException, IOException {
HttpPost httpPost = new HttpPost(uri);
for (Map.Entry<String,String> entry : headers.entrySet()) {
httpPost.addHeader(entry.getKey(), entry.getValue());
}
if (body != null) {
HttpEntity httpEntity = new ByteArrayEntity(body);
httpPost.setEntity(httpEntity);
}
long startTime = System.currentTimeMillis();
HttpResponse httpResponse = httpClient.execute(httpPost);
try {
return new HttpPollValue(httpResponse, startTime);
} finally {
EntityUtils.consume(httpResponse.getEntity());
}
}
private static class TrustAllStrategy implements TrustStrategy {
@Override
public boolean isTrusted(X509Certificate[] chain, String authType) throws CertificateException {
return true;
}
}
}