blob: 9a2aabd30077e62141278f215728a4d16b1f7b4b [file] [log] [blame]
package brooklyn.entity.monitoring.zabbix;
import static com.google.common.base.Preconditions.checkNotNull;
import java.io.IOException;
import java.net.URI;
import java.net.URL;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.conn.scheme.Scheme;
import org.apache.http.conn.ssl.SSLSocketFactory;
import org.apache.http.conn.ssl.TrustStrategy;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.impl.NoConnectionReuseStrategy;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.impl.conn.tsccm.ThreadSafeClientConnManager;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import brooklyn.entity.Entity;
import brooklyn.entity.basic.Attributes;
import brooklyn.entity.basic.EntityLocal;
import brooklyn.event.feed.AbstractFeed;
import brooklyn.event.feed.AttributePollHandler;
import brooklyn.event.feed.PollHandler;
import brooklyn.event.feed.Poller;
import brooklyn.event.feed.http.HttpPollValue;
import brooklyn.event.feed.http.HttpValueFunctions;
import brooklyn.location.Location;
import brooklyn.location.MachineLocation;
import brooklyn.location.access.BrooklynAccessUtils;
import brooklyn.location.basic.SupportsPortForwarding;
import brooklyn.util.exceptions.Exceptions;
import brooklyn.util.net.Cidr;
import com.google.common.base.Function;
import com.google.common.base.Objects;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicates;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.net.HostAndPort;
import com.google.gson.JsonObject;
public class ZabbixFeed extends AbstractFeed {
public static final Logger log = LoggerFactory.getLogger(ZabbixFeed.class);
public static final String JSON_ITEM_GET =
"{ \"jsonrpc\":\"2.0\",\"method\":\"item.get\"," +
"\"params\":{\"output\":\"extend\"," +
"\"filter\":{\"hostid\":[\"{{hostId}}\"],\"key_\":\"{{itemKey}}\"}}," +
"\"auth\":\"{{token}}\",\"id\":{{id}}}";
public static final String JSON_USER_LOGIN =
"{ \"jsonrpc\":\"2.0\",\"method\":\"user.login\"," +
"\"params\":{\"user\":\"{{username}}\",\"password\":\"{{password}}\"}," +
"\"id\":0 }";
public static final String JSON_HOST_CREATE =
"{ \"jsonrpc\":\"2.0\",\"method\":\"host.create\"," +
"\"params\":{\"host\":\"{{host}}\"," +
"\"interfaces\":[{\"type\":1,\"main\":1,\"useip\":1,\"ip\":\"{{ip}}\",\"dns\":\"\",\"port\":\"{{port}}\"}]," +
"\"groups\":[{\"groupid\":\"{{groupId}}\"}]," +
"\"templates\":[{\"templateid\":\"{{templateId}}\"}]}," +
"\"auth\":\"{{token}}\",\"id\":{{id}}}";
private static final AtomicInteger id = new AtomicInteger(0);
public static Builder<ZabbixFeed, ?> builder() {
return new ConcreteBuilder();
}
private static class ConcreteBuilder extends Builder<ZabbixFeed, ConcreteBuilder> {
}
public static class Builder<T extends ZabbixFeed, B extends Builder<T,B>> {
private EntityLocal entity;
private Supplier<URI> baseUriProvider;
private long period = 500;
private TimeUnit periodUnits = TimeUnit.MILLISECONDS;
private List<ZabbixPollConfig<?>> polls = Lists.newArrayList();
private URI baseUri;
private boolean suspended = false;
private volatile boolean built;
private ZabbixServer server;
private String username;
private String password;
private Integer sessionTimeout;
private Integer groupId;
private Integer templateId;
private Function<? super EntityLocal, String> uniqueHostnameGenerator = new Function<EntityLocal, String>() {
@Override public String apply(EntityLocal entity) {
Location loc = Iterables.find(entity.getLocations(), Predicates.instanceOf(MachineLocation.class));
return loc.getId();
}};
@SuppressWarnings("unchecked")
protected B self() {
return (B) this;
}
public B entity(EntityLocal val) {
this.entity = val;
return self();
}
public B baseUri(Supplier<URI> val) {
if (baseUri!=null && val!=null)
throw new IllegalStateException("Builder cannot take both a URI and a URI Provider");
this.baseUriProvider = val;
return self();
}
public B baseUri(URI val) {
if (baseUriProvider!=null && val!=null)
throw new IllegalStateException("Builder cannot take both a URI and a URI Provider");
this.baseUri = val;
return self();
}
public B baseUrl(URL val) {
return baseUri(URI.create(val.toString()));
}
public B baseUri(String val) {
return baseUri(URI.create(val));
}
public B period(long millis) {
return period(millis, TimeUnit.MILLISECONDS);
}
public B period(long val, TimeUnit units) {
this.period = val;
this.periodUnits = units;
return self();
}
public B poll(ZabbixPollConfig<?> config) {
polls.add(config);
return self();
}
public B suspended() {
return suspended(true);
}
public B suspended(boolean startsSuspended) {
this.suspended = startsSuspended;
return self();
}
public B server(final ZabbixServer server) {
this.server = server;
baseUri(URI.create(server.getConfig(ZabbixServer.ZABBIX_SERVER_API_URL)));
username(server.getConfig(ZabbixServer.ZABBIX_SERVER_USERNAME));
password(server.getConfig(ZabbixServer.ZABBIX_SERVER_PASSWORD));
sessionTimeout(server.getConfig(ZabbixServer.ZABBIX_SESSION_TIMEOUT));
return self();
}
public B username(String username) {
this.username = username;
return self();
}
public B password(String password) {
this.password = password;
return self();
}
public B sessionTimeout(Integer sessionTimeout) {
this.sessionTimeout = sessionTimeout;
return self();
}
public B groupId(Integer groupId) {
this.groupId = groupId;
return self();
}
public B templateId(Integer templateId) {
this.templateId = templateId;
return self();
}
public B register(Integer groupId, Integer templateId) {
this.groupId = groupId;
this.templateId = templateId;
return self();
}
/**
* For generating the name to be used when registering the zabbix agent with the zabbix server.
* When called, guarantees that the entity will have a {@link MachineLocation} (see {@link Entity#getLocations()}).
* Must return a non-empty string that will be unique across all machines where zabbix agents are installed.
*/
public B uniqueHostnameGenerator(Function<? super EntityLocal, String> val) {
this.uniqueHostnameGenerator = checkNotNull(val, "uniqueHostnameGenerator");
return self();
}
@SuppressWarnings("unchecked")
public T build() {
// If server not set and other config not available, try to obtain from entity config
if (server == null
&& (baseUri == null || baseUriProvider == null)
&& username == null && password == null && sessionTimeout == null) {
ZabbixServer server = Preconditions.checkNotNull(entity.getConfig(ZabbixMonitored.ZABBIX_SERVER), "The ZABBIX_SERVER config key must be set on the entity");
server(server);
}
// Now create feed
T result = (T) new ZabbixFeed(this);
built = true;
if (suspended) result.suspend();
result.start();
return result;
}
@Override
protected void finalize() {
if (!built) log.warn("ZabbixFeed.Builder created, but build() never called");
}
}
protected static class ZabbixPollIdentifier {
final String itemName;
protected ZabbixPollIdentifier(String itemName) {
this.itemName = checkNotNull(itemName, "itemName");
}
@Override
public int hashCode() {
return Objects.hashCode(itemName);
}
@Override
public boolean equals(Object other) {
if (!(other instanceof ZabbixPollIdentifier)) {
return false;
}
ZabbixPollIdentifier o = (ZabbixPollIdentifier) other;
return Objects.equal(itemName, o.itemName);
}
}
// Treat as immutable once built
protected final Set<ZabbixPollConfig<?>> polls = Sets.newLinkedHashSet();
protected Supplier<URI> baseUriProvider;
protected Integer groupId, templateId;
// Flag set when the Zabbix agent is registered for a host
protected final AtomicBoolean registered = new AtomicBoolean(false);
private final Function<? super EntityLocal, String> uniqueHostnameGenerator;
protected ZabbixFeed(final Builder<? extends ZabbixFeed, ?> builder) {
super(builder.entity);
baseUriProvider = builder.baseUriProvider;
if (builder.baseUri!=null) {
if (baseUriProvider!=null)
throw new IllegalStateException("Not permitted to supply baseUri and baseUriProvider");
URI uri = builder.baseUri;
baseUriProvider = Suppliers.ofInstance(uri);
}
checkNotNull(baseUriProvider);
groupId = checkNotNull(builder.groupId, "Zabbix groupId must be set");
templateId = checkNotNull(builder.templateId, "Zabbix templateId must be set");
for (ZabbixPollConfig<?> config : builder.polls) {
@SuppressWarnings({ "unchecked", "rawtypes" })
ZabbixPollConfig<?> configCopy = new ZabbixPollConfig(config);
if (configCopy.getPeriod() < 0) configCopy.period(builder.period, builder.periodUnits);
polls.add(configCopy);
}
uniqueHostnameGenerator = checkNotNull(builder.uniqueHostnameGenerator, "uniqueHostnameGenerator");
}
@Override
protected void preStart() {
log.info("starting zabbix feed for {}", entity);
// TODO if supplier returns null, we may wish to defer initialization until url available?
final DefaultHttpClient httpClient = new DefaultHttpClient(new ThreadSafeClientConnManager());
httpClient.setReuseStrategy(new NoConnectionReuseStrategy());
try {
registerSslSocketFactoryIfRequired(baseUriProvider.get(), httpClient);
} catch (Exception e) {
log.warn("Error in ZabbixFeed of {}, setting HTTP trust for uri {}", entity, baseUriProvider.get());
throw Exceptions.propagate(e);
}
// Registration job, calls Zabbix host.create API
final Callable<HttpPollValue> registerJob = new Callable<HttpPollValue>() {
@Override
public HttpPollValue call() throws Exception {
if (!registered.get()) {
// Find the first machine, if available
Optional<Location> location = Iterables.tryFind(entity.getLocations(), Predicates.instanceOf(MachineLocation.class));
if (!location.isPresent()) {
return null; // Do nothing until location is present
}
MachineLocation machine = (MachineLocation) location.get();
String host = uniqueHostnameGenerator.apply(entity);
// Select address and port using port-forwarding if available
String address = entity.getAttribute(Attributes.ADDRESS);
Integer port = entity.getAttribute(ZabbixMonitored.ZABBIX_AGENT_PORT);
if (machine instanceof SupportsPortForwarding) {
Cidr management = entity.getConfig(BrooklynAccessUtils.MANAGEMENT_ACCESS_CIDR);
HostAndPort forwarded = ((SupportsPortForwarding) machine).getSocketEndpointFor(management, port);
address = forwarded.getHostText();
port = forwarded.getPort();
}
// Fill in the JSON template and POST it
byte[] body = JSON_HOST_CREATE
.replace("{{token}}", entity.getConfig(ZabbixMonitored.ZABBIX_SERVER).getAttribute(ZabbixServer.ZABBIX_TOKEN))
.replace("{{host}}", host)
.replace("{{ip}}", address)
.replace("{{port}}", Integer.toString(port))
.replace("{{groupId}}", Integer.toString(groupId))
.replace("{{templateId}}", Integer.toString(templateId))
.replace("{{id}}", Integer.toString(id.incrementAndGet()))
.getBytes();
return httpPost(httpClient, baseUriProvider.get(), body);
}
return null;
}
};
// The handler for the registration job
PollHandler<? super HttpPollValue> registrationHandler = new PollHandler<HttpPollValue>() {
@Override
public void onSuccess(HttpPollValue val) {
if (registered.get() || val == null) {
return; // Skip if we are registered already or no data from job
}
JsonObject response = HttpValueFunctions.jsonContents().apply(val).getAsJsonObject();
if (response.has("error")) {
// Parse the JSON error object and log the message
JsonObject error = response.get("error").getAsJsonObject();
String message = error.get("message").getAsString();
String data = error.get("data").getAsString();
log.warn("zabbix failed registering host - {}: {}", message, data);
} else if (response.has("result")) {
// Parse the JSON result object and save the hostId
JsonObject result = response.get("result").getAsJsonObject();
String hostId = result.get("hostids").getAsJsonArray().get(0).getAsString();
// Update the registered status if not set
if (registered.compareAndSet(false, true)) {
entity.setAttribute(ZabbixMonitored.ZABBIX_AGENT_HOSTID, hostId);
log.info("zabbix registered host as id {}", hostId);
}
} else {
throw new IllegalStateException(String.format("zabbix host registration returned invalid result: %s", response.toString()));
}
}
@Override
public void onError(Exception error) {
log.warn("zabbix exception registering host", error);
}
@Override
public boolean checkSuccess(HttpPollValue val) {
return (val.getResponseCode() == 200);
}
@Override
public void onFailure(HttpPollValue val) {
log.warn("zabbix sever returned failure code: {}", val.getResponseCode());
}
@Override
public void onException(Exception exception) {
log.warn("zabbix exception registering host", exception);
}
};
// Schedule registration attempt once per second
getPoller().scheduleAtFixedRate(registerJob, registrationHandler, 1000l); // TODO make configurable
// Create a polling job for each Zabbix metric
for (final ZabbixPollConfig<?> config : polls) {
Callable<HttpPollValue> pollJob = new Callable<HttpPollValue>() {
@Override
public HttpPollValue call() throws Exception {
if (registered.get()) {
if (log.isTraceEnabled()) log.trace("zabbix polling {} for {}", entity, config);
byte[] body = JSON_ITEM_GET
.replace("{{token}}", entity.getConfig(ZabbixMonitored.ZABBIX_SERVER).getAttribute(ZabbixServer.ZABBIX_TOKEN))
.replace("{{hostId}}", entity.getAttribute(ZabbixMonitored.ZABBIX_AGENT_HOSTID))
.replace("{{itemKey}}", config.getItemKey())
.replace("{{id}}", Integer.toString(id.incrementAndGet()))
.getBytes();
return httpPost(httpClient, baseUriProvider.get(), body);
} else {
throw new IllegalStateException("zabbix agent not yet registered");
}
}
};
// Schedule the Zabbix polling job
AttributePollHandler<? super HttpPollValue> pollHandler = new AttributePollHandler<HttpPollValue>(config, entity, this);
long minPeriod = Integer.MAX_VALUE; // TODO make configurable
if (config.getPeriod() > 0) minPeriod = Math.min(minPeriod, config.getPeriod());
getPoller().scheduleAtFixedRate(pollJob, pollHandler, minPeriod);
}
}
@SuppressWarnings("unchecked")
protected Poller<HttpPollValue> getPoller() {
return (Poller<HttpPollValue>) poller;
}
public static void registerSslSocketFactoryIfRequired(URI uri, HttpClient httpClient) throws Exception {
if (uri!=null && "https".equalsIgnoreCase(uri.getScheme())) {
int port = (uri.getPort() >= 0) ? uri.getPort() : 443;
SSLSocketFactory socketFactory = new SSLSocketFactory(new TrustAllStrategy(), SSLSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER);
Scheme sch = new Scheme("https", port, socketFactory);
httpClient.getConnectionManager().getSchemeRegistry().register(sch);
}
}
protected HttpPollValue httpPost(HttpClient httpClient, URI uri, byte[] body) throws ClientProtocolException, IOException {
HttpPost httpPost = new HttpPost(uri);
httpPost.addHeader("Content-Type", "application/json");
HttpEntity httpEntity = new ByteArrayEntity(body);
httpPost.setEntity(httpEntity);
long startTime = System.currentTimeMillis();
HttpResponse httpResponse = httpClient.execute(httpPost);
try {
return new HttpPollValue(httpResponse, startTime);
} finally {
EntityUtils.consume(httpResponse.getEntity());
}
}
private static class TrustAllStrategy implements TrustStrategy {
@Override
public boolean isTrusted(X509Certificate[] chain, String authType) throws CertificateException {
return true;
}
}
}