blob: afa88dc7968729d07cf1c523563f343cf4b450f4 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.apache.brooklyn.core.sensor;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonSetter;
import org.apache.brooklyn.api.entity.Entity;
import org.apache.brooklyn.api.sensor.Sensor;
import org.apache.brooklyn.config.ConfigKey;
import org.apache.brooklyn.core.config.ConfigKeys;
import org.apache.brooklyn.core.entity.BrooklynConfigKeys;
import org.apache.brooklyn.core.entity.EntityInitializers;
import org.apache.brooklyn.core.entity.EntityPredicates;
import org.apache.brooklyn.core.feed.PollConfig;
import org.apache.brooklyn.core.mgmt.internal.AppGroupTraverser;
import org.apache.brooklyn.core.resolve.jackson.PrimitiveTokenOrExpectedObject;
import org.apache.brooklyn.util.collections.MutableList;
import org.apache.brooklyn.util.core.config.ConfigBag;
import org.apache.brooklyn.util.core.predicates.DslPredicates;
import org.apache.brooklyn.util.core.task.Tasks;
import org.apache.brooklyn.util.exceptions.Exceptions;
import org.apache.brooklyn.util.guava.Maybe;
import org.apache.brooklyn.util.time.Duration;
import org.apache.commons.lang3.tuple.Pair;
* Super-class for entity initializers that add feeds.
public abstract class AbstractAddTriggerableSensor<T> extends AbstractAddSensorFeed<T> {
public static final ConfigKey<Duration> SENSOR_PERIOD = ConfigKeys.newConfigKey(Duration.class, "period", "Period, including units e.g. 1m or 5s or 200ms", null);
public static final ConfigKey<Object> SENSOR_TRIGGERS = ConfigKeys.newConfigKey(new TypeToken<Object>() {}, "triggers",
"Sensors which should trigger this feed, supplied with list of maps containing sensor (name or sensor instance) and entity (ID or entity instance), or just sensor names or just one sensor");
public static final ConfigKey<Boolean> SKIP_INITIAL_RUN = ConfigKeys.newConfigKey(Boolean.class, "skip_initial_run", "If set, skips running when added; runs only after the period or on a subsequent trigger");
public static final ConfigKey<DslPredicates.DslPredicate> CONDITION = ConfigKeys.newConfigKey(DslPredicates.DslPredicate.class, "condition", "Optional condition required for this sensor feed to run");
public static final ConfigKey<Boolean> ONLY_IF_SERVICE_UP = ConfigKeys.newBooleanConfigKey("onlyIfServiceUp", "Whether to run only if service is up.", null);
protected AbstractAddTriggerableSensor() {}
public AbstractAddTriggerableSensor(ConfigBag parameters) {
protected Duration getPeriod(Entity context, ConfigBag config) {
if (config.containsKey(SENSOR_PERIOD) || !hasTriggers(config)) {
if (context!=null) return Tasks.resolving(config, SENSOR_PERIOD).context(context).immediately(true).get();
else return config.get(SENSOR_PERIOD);
protected Maybe<Object> getTriggersMaybe(Entity context, ConfigBag config) {
return Tasks.resolving(config, SENSOR_TRIGGERS).context(context).deep().immediately(true).getMaybe();
public static List<Pair<Entity,Sensor>> resolveTriggers(Entity context, Object otherTriggers) {
Object triggers = Tasks.resolving(otherTriggers, Object.class).context(context).deep().immediately(true).get();
if (triggers==null || (triggers instanceof Collection && ((Collection)triggers).isEmpty())) return Collections.emptyList();
if (triggers instanceof String) {
SensorFeedTrigger t = new SensorFeedTrigger();
t.sensorName = (String)triggers;
triggers = MutableList.of(t);
if (!(triggers instanceof Collection)) {
throw new IllegalStateException("Triggers should be a list containing sensors or sensor names");
return ((Collection<?>)triggers).stream().map(ti -> {
SensorFeedTrigger t;
if (ti instanceof SensorFeedTrigger) {
t = (SensorFeedTrigger) ti;
} else {
if (ti instanceof Map) {
t = Tasks.resolving(ti, SensorFeedTrigger.class).context(context).deep().get();
} else if (ti instanceof String) {
t = new SensorFeedTrigger();
t.sensorName = (String) ti;
} else {
throw new IllegalStateException("Trigger should be a map specifying entity and sensor");
Entity entity = t.entity;
if (entity==null) {
if (t.entityId != null) {
String desiredComponentId = t.entityId;
List<Entity> firstGroupOfMatches = AppGroupTraverser.findFirstGroupOfMatches(context, true,
Predicates.and(EntityPredicates.configEqualTo(BrooklynConfigKeys.PLAN_ID, desiredComponentId), x -> true)::apply);
if (firstGroupOfMatches.isEmpty()) {
firstGroupOfMatches = AppGroupTraverser.findFirstGroupOfMatches(context, true,
Predicates.and(EntityPredicates.idEqualTo(desiredComponentId), x -> true)::apply);
if (!firstGroupOfMatches.isEmpty()) {
entity = firstGroupOfMatches.get(0);
} else {
throw new IllegalStateException("Cannot find entity with ID '" + desiredComponentId + "'");
} else {
entity = context;
Sensor sensor = t.sensor;
if (sensor==null) {
if (t.sensorName!=null) {
sensor = entity.getEntityType().getSensor(t.sensorName);
if (sensor==null) sensor = Sensors.newSensor(Object.class, t.sensorName);
} else {
throw new IllegalStateException("Sensor is required for a trigger");
return Pair.of(entity, sensor);
protected boolean hasTriggers(ConfigBag config) {
Maybe<Object> triggers = getTriggersMaybe(null, config);
if (triggers==null || triggers.isAbsent()) return false;
if (triggers.get() instanceof Collection && ((Collection)triggers.get()).isEmpty()) return false;
return true;
public static class SensorFeedTrigger {
Entity entity;
String entityId;
Sensor<?> sensor;
String sensorName;
// could support predicates on the value; but we do it on the entity which is enough
// @JsonDeserialize(using = PrimitiveOrObjectDeserializer.class)
// @JsonTypeInfo(use = JsonTypeInfo.Id.NONE)
public void setEntity(PrimitiveTokenOrExpectedObject<Entity> po) {
//@JsonTypeInfo(use = JsonTypeInfo.Id.NONE) @JsonDeserialize(using = PrimitiveOrObjectDeserializer.class)
// Object entity) {
if (po.hasObject()) setEntity(po.asObject());
else if (po.hasStringPrimitive()) setEntity(po.asString());
else if (entity==null) { /* do nothing */ }
else throw new IllegalArgumentException("Invalid input for entity to "+this+": "+entity);
public void setEntity(Entity entity) {
this.entity = entity;
public void setEntity(String entityId) {
this.entityId = entityId;
public Object getEntity() {
return entity!=null ? entity : entityId;
public void setSensor(Sensor<?> sensor) {
this.sensor = sensor;
public void setSensor(String sensorName) {
this.sensorName = sensorName;
public Object getSensor() {
return sensor!=null ? sensor : sensorName;
protected void standardPollConfig(Entity entity, ConfigBag configBag, PollConfig<?,?,?> poll) {
final Boolean suppressDuplicates = EntityInitializers.resolve(configBag, SUPPRESS_DUPLICATES);
final Duration logWarningGraceTimeOnStartup = EntityInitializers.resolve(configBag, LOG_WARNING_GRACE_TIME_ON_STARTUP);
final Duration logWarningGraceTime = EntityInitializers.resolve(configBag, LOG_WARNING_GRACE_TIME);
.period(getPeriod(entity, initParams()))
.otherTriggers(getTriggersMaybe(entity, configBag).orNull())
.condition(new ConditionSupplierFromConfigBag(configBag, entity));
if (!Boolean.FALSE.equals(configBag.get(FAIL_TASK_ON_ERROR))) {
poll.onException(new RethrowException());
static class ConditionSupplierFromConfigBag implements Supplier<DslPredicates.DslPredicate> {
final ConfigBag configBag;
final Entity entity;
ConditionSupplierFromConfigBag(ConfigBag configBag, Entity entity) {
this.configBag = configBag;
this.entity = entity;
public DslPredicates.DslPredicate get() {
return Tasks.resolving(configBag, CONDITION).context(entity).deep().immediately(true).get();
public static class RethrowException<T> implements<Throwable,T> {
public T apply(Throwable err) {
throw Exceptions.propagate(err);