blob: f6d2615d05ca24f6aae8b59f4526ed235e667981 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.brooklyn.entity.chef;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import org.apache.brooklyn.api.entity.Entity;
import org.apache.brooklyn.api.entity.EntityLocal;
import org.apache.brooklyn.api.mgmt.ExecutionContext;
import org.apache.brooklyn.api.sensor.AttributeSensor;
import org.apache.brooklyn.config.ConfigKey;
import org.apache.brooklyn.core.config.ConfigKeys;
import org.apache.brooklyn.core.entity.EntityInternal;
import org.apache.brooklyn.core.feed.AbstractFeed;
import org.apache.brooklyn.core.feed.PollHandler;
import org.apache.brooklyn.core.feed.Poller;
import org.apache.brooklyn.feed.ssh.SshPollValue;
import org.apache.brooklyn.util.core.flags.TypeCoercions;
import org.apache.brooklyn.util.core.task.system.ProcessTaskWrapper;
import org.apache.brooklyn.util.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.base.Strings;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.reflect.TypeToken;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
/**
* A sensor feed that retrieves attributes from Chef server and converts selected attributes to sensors.
*
* <p>To use this feed, you must provide the entity, the name of the node as it is known to Chef, and a collection of attribute
* sensors. The attribute sensors must follow the naming convention of starting with the string <tt>chef.attribute.</tt>
* followed by a period-separated path through the Chef attribute hierarchy. For example, an attribute sensor named
* <tt>chef.attribute.sql_server.instance_name</tt> would cause the feed to search for a Chef attribute called
* <tt>sql_server</tt>, and within that an attribute <tt>instance_name</tt>, and set the sensor to the value of this
* attribute.</p>
*
* <p>This feed uses the <tt>knife</tt> tool to query all the attributes on a named node. It then iterates over the configured
* list of attribute sensors, using the sensor name to locate an equivalent Chef attribute. The sensor is then set to the value
* of the Chef attribute.</p>
*
* <p>Example:</p>
*
* {@code
* @Override
* protected void connectSensors() {
* nodeAttributesFeed = ChefAttributeFeed.newFeed(this, nodeName, new AttributeSensor[]{
* SqlServerNode.CHEF_ATTRIBUTE_NODE_NAME,
* SqlServerNode.CHEF_ATTRIBUTE_SQL_SERVER_INSTANCE_NAME,
* SqlServerNode.CHEF_ATTRIBUTE_SQL_SERVER_PORT,
* SqlServerNode.CHEF_ATTRIBUTE_SQL_SERVER_SA_PASSWORD
* });
* }
* }
*
* @since 0.6.0
* @author richardcloudsoft
*/
public class ChefAttributeFeed extends AbstractFeed {
private static final Logger log = LoggerFactory.getLogger(ChefAttributeFeed.class);
/**
* Prefix for attribute sensor names.
*/
public static final String CHEF_ATTRIBUTE_PREFIX = "chef.attribute.";
@SuppressWarnings("serial")
public static final ConfigKey<Set<ChefAttributePollConfig<?>>> POLLS = ConfigKeys.newConfigKey(
new TypeToken<Set<ChefAttributePollConfig<?>>>() {},
"polls");
public static final ConfigKey<String> NODE_NAME = ConfigKeys.newStringConfigKey("nodeName");
public static Builder builder() {
return new Builder();
}
@SuppressWarnings("rawtypes")
public static class Builder {
private Entity entity;
private boolean onlyIfServiceUp = false;
private String nodeName;
private Set<ChefAttributePollConfig> polls = Sets.newLinkedHashSet();
private Duration period = Duration.of(30, TimeUnit.SECONDS);
private String uniqueTag;
private volatile boolean built;
public Builder entity(Entity val) {
this.entity = checkNotNull(val, "entity");
return this;
}
public Builder onlyIfServiceUp() { return onlyIfServiceUp(true); }
public Builder onlyIfServiceUp(boolean onlyIfServiceUp) {
this.onlyIfServiceUp = onlyIfServiceUp;
return this;
}
public Builder nodeName(String nodeName) {
this.nodeName = checkNotNull(nodeName, "nodeName");
return this;
}
public Builder addSensor(ChefAttributePollConfig config) {
polls.add(config);
return this;
}
@SuppressWarnings("unchecked")
public Builder addSensor(String chefAttributePath, AttributeSensor sensor) {
return addSensor(new ChefAttributePollConfig(sensor).chefAttributePath(chefAttributePath));
}
public Builder addSensors(Map<String, AttributeSensor> sensors) {
for (Map.Entry<String, AttributeSensor> entry : sensors.entrySet()) {
addSensor(entry.getKey(), entry.getValue());
}
return this;
}
public Builder addSensors(AttributeSensor[] sensors) {
return addSensors(Arrays.asList(checkNotNull(sensors, "sensors")));
}
public Builder addSensors(Iterable<AttributeSensor> sensors) {
for(AttributeSensor sensor : checkNotNull(sensors, "sensors")) {
checkNotNull(sensor, "sensors collection contains a null value");
checkArgument(sensor.getName().startsWith(CHEF_ATTRIBUTE_PREFIX), "sensor name must be prefixed "+CHEF_ATTRIBUTE_PREFIX+" for autodetection to work");
addSensor(sensor.getName().substring(CHEF_ATTRIBUTE_PREFIX.length()), sensor);
}
return this;
}
public Builder period(Duration period) {
this.period = period;
return this;
}
public Builder period(long millis) {
return period(Duration.of(millis, TimeUnit.MILLISECONDS));
}
public Builder period(long val, TimeUnit units) {
return period(Duration.of(val, units));
}
public Builder uniqueTag(String uniqueTag) {
this.uniqueTag = uniqueTag;
return this;
}
public ChefAttributeFeed build() {
built = true;
ChefAttributeFeed result = new ChefAttributeFeed(this);
result.setEntity(checkNotNull((EntityLocal)entity, "entity"));
result.start();
return result;
}
@Override
protected void finalize() {
if (!built) log.warn("SshFeed.Builder created, but build() never called");
}
}
private KnifeTaskFactory<String> knifeTaskFactory;
/**
* For rebind; do not call directly; use builder
*/
public ChefAttributeFeed() {
}
protected ChefAttributeFeed(Builder builder) {
setConfig(ONLY_IF_SERVICE_UP, builder.onlyIfServiceUp);
setConfig(NODE_NAME, checkNotNull(builder.nodeName, "builder.nodeName"));
Set<ChefAttributePollConfig<?>> polls = Sets.newLinkedHashSet();
for (ChefAttributePollConfig<?> config : builder.polls) {
if (!config.isEnabled()) continue;
@SuppressWarnings({ "unchecked", "rawtypes" })
ChefAttributePollConfig<?> configCopy = new ChefAttributePollConfig(config);
if (configCopy.getPeriod() < 0) configCopy.period(builder.period);
polls.add(configCopy);
}
setConfig(POLLS, polls);
initUniqueTag(builder.uniqueTag, polls);
}
@Override
protected void preStart() {
final String nodeName = getConfig(NODE_NAME);
final Set<ChefAttributePollConfig<?>> polls = getConfig(POLLS);
long minPeriod = Integer.MAX_VALUE;
for (ChefAttributePollConfig<?> config : polls) {
minPeriod = Math.min(minPeriod, config.getPeriod());
}
knifeTaskFactory = new KnifeNodeAttributeQueryTaskFactory(nodeName);
final Callable<SshPollValue> getAttributesFromKnife = new Callable<SshPollValue>() {
@Override
public SshPollValue call() throws Exception {
ProcessTaskWrapper<String> taskWrapper = knifeTaskFactory.newTask();
final ExecutionContext executionContext = ((EntityInternal) entity).getExecutionContext();
log.debug("START: Running knife to query attributes of Chef node {}", nodeName);
executionContext.submit(taskWrapper);
taskWrapper.block();
log.debug("DONE: Running knife to query attributes of Chef node {}", nodeName);
return new SshPollValue(null, taskWrapper.getExitCode(), taskWrapper.getStdout(), taskWrapper.getStderr());
}
};
getPoller().scheduleAtFixedRate(
new CallInEntityExecutionContext<SshPollValue>(entity, getAttributesFromKnife),
new SendChefAttributesToSensors(entity, polls),
minPeriod);
}
@Override
@SuppressWarnings("unchecked")
protected Poller<SshPollValue> getPoller() {
return (Poller<SshPollValue>) super.getPoller();
}
/**
* An implementation of {@link KnifeTaskFactory} that queries for the attributes of a node.
*/
private static class KnifeNodeAttributeQueryTaskFactory extends KnifeTaskFactory<String> {
private final String nodeName;
public KnifeNodeAttributeQueryTaskFactory(String nodeName) {
super("retrieve attributes of node " + nodeName);
this.nodeName = nodeName;
}
@Override
protected List<String> initialKnifeParameters() {
return ImmutableList.of("node", "show", "-l", nodeName, "--format", "json");
}
}
/**
* A {@link Callable} that wraps another {@link Callable}, where the inner {@link Callable} is executed in the context of a
* specific entity.
*
* @param <T> The type of the {@link Callable}.
*/
private static class CallInEntityExecutionContext<T> implements Callable<T> {
private final Callable<T> job;
private Entity entity;
private CallInEntityExecutionContext(Entity entity, Callable<T> job) {
this.job = job;
this.entity = entity;
}
@Override
public T call() throws Exception {
final ExecutionContext executionContext = ((EntityInternal) entity).getExecutionContext();
return executionContext.submit(Maps.newHashMap(), job).get();
}
}
/**
* A poll handler that takes the result of the <tt>knife</tt> invocation and sets the appropriate sensors.
*/
private static class SendChefAttributesToSensors implements PollHandler<SshPollValue> {
private static final Iterable<String> PREFIXES = ImmutableList.of("", "automatic", "force_override", "override", "normal", "force_default", "default");
private static final Splitter SPLITTER = Splitter.on('.');
private final Entity entity;
private final Map<String, AttributeSensor<?>> chefAttributeSensors;
public SendChefAttributesToSensors(Entity entity, Set<ChefAttributePollConfig<?>> polls) {
this.entity = entity;
chefAttributeSensors = Maps.newLinkedHashMap();
for (ChefAttributePollConfig<?> config : polls) {
chefAttributeSensors.put(config.getChefAttributePath(), config.getSensor());
}
}
@Override
public boolean checkSuccess(SshPollValue val) {
if (val.getExitStatus() != 0) return false;
String stderr = val.getStderr();
if (stderr == null || stderr.length() != 0) return false;
String out = val.getStdout();
if (out == null || out.length() == 0) return false;
if (!out.contains("{")) return false;
return true;
}
@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public void onSuccess(SshPollValue val) {
String stdout = val.getStdout();
int jsonStarts = stdout.indexOf('{');
if (jsonStarts > 0)
stdout = stdout.substring(jsonStarts);
JsonElement jsonElement = new Gson().fromJson(stdout, JsonElement.class);
for (Map.Entry<String, AttributeSensor<?>> attribute : chefAttributeSensors.entrySet()) {
String chefAttributeName = attribute.getKey();
AttributeSensor<?> sensor = attribute.getValue();
log.trace("Finding value for attribute sensor " + sensor.getName());
Iterable<String> path = SPLITTER.split(chefAttributeName);
JsonElement elementForSensor = null;
for(String prefix : PREFIXES) {
Iterable<String> prefixedPath = !Strings.isNullOrEmpty(prefix)
? Iterables.concat(ImmutableList.of(prefix), path)
: path;
try {
elementForSensor = getElementByPath(jsonElement.getAsJsonObject(), prefixedPath);
} catch(IllegalArgumentException e) {
log.error("Entity {}: bad Chef attribute {} for sensor {}: {}", new Object[]{
entity.getDisplayName(),
Joiner.on('.').join(prefixedPath),
sensor.getName(),
e.getMessage()});
throw Throwables.propagate(e);
}
if (elementForSensor != null) {
log.debug("Entity {}: apply Chef attribute {} to sensor {} with value {}", new Object[]{
entity.getDisplayName(),
Joiner.on('.').join(prefixedPath),
sensor.getName(),
elementForSensor.getAsString()});
break;
}
}
if (elementForSensor != null) {
entity.sensors().set((AttributeSensor)sensor, TypeCoercions.coerce(elementForSensor.getAsString(), sensor.getTypeToken()));
} else {
log.debug("Entity {}: no Chef attribute matching {}; setting sensor {} to null", new Object[]{
entity.getDisplayName(),
chefAttributeName,
sensor.getName()});
entity.sensors().set(sensor, null);
}
}
}
private JsonElement getElementByPath(JsonElement element, Iterable<String> path) {
if (Iterables.isEmpty(path)) {
return element;
} else {
String head = Iterables.getFirst(path, null);
Preconditions.checkArgument(!Strings.isNullOrEmpty(head), "path must not contain empty or null elements");
Iterable<String> tail = Iterables.skip(path, 1);
JsonElement child = ((JsonObject) element).get(head);
return child != null
? getElementByPath(child, tail)
: null;
}
}
@Override
public void onFailure(SshPollValue val) {
log.error("Chef attribute query did not respond as expected. exitcode={} stdout={} stderr={}", new Object[]{val.getExitStatus(), val.getStdout(), val.getStderr()});
for (AttributeSensor<?> attribute : chefAttributeSensors.values()) {
if (!attribute.getName().startsWith(CHEF_ATTRIBUTE_PREFIX))
continue;
entity.sensors().set(attribute, null);
}
}
@Override
public void onException(Exception exception) {
log.error("Detected exception while retrieving Chef attributes from entity " + entity.getDisplayName(), exception);
for (AttributeSensor<?> attribute : chefAttributeSensors.values()) {
if (!attribute.getName().startsWith(CHEF_ATTRIBUTE_PREFIX))
continue;
entity.sensors().set(attribute, null);
}
}
@Override
public String toString() {
return super.toString()+"["+getDescription()+"]";
}
@Override
public String getDescription() {
return ""+chefAttributeSensors;
}
}
}