blob: d86b62363151a9c2a752301a3936b5bdeb6127d6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.brooklyn.core.workflow;
import com.google.common.annotations.Beta;
import com.google.common.reflect.TypeToken;
import org.apache.brooklyn.api.entity.Entity;
import org.apache.brooklyn.api.entity.EntityInitializer;
import org.apache.brooklyn.api.entity.EntityLocal;
import org.apache.brooklyn.api.mgmt.Task;
import org.apache.brooklyn.api.objs.BrooklynObject;
import org.apache.brooklyn.api.objs.EntityAdjunct;
import org.apache.brooklyn.api.sensor.AttributeSensor;
import org.apache.brooklyn.config.ConfigKey;
import org.apache.brooklyn.core.config.ConfigKeys;
import org.apache.brooklyn.core.effector.AddSensorInitializer;
import org.apache.brooklyn.core.effector.AddSensorInitializerAbstractProto;
import org.apache.brooklyn.core.entity.EntityInitializers;
import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
import org.apache.brooklyn.core.sensor.AbstractAddTriggerableSensor;
import org.apache.brooklyn.core.sensor.Sensors;
import org.apache.brooklyn.core.workflow.steps.appmodel.EntityValueToSet;
import org.apache.brooklyn.feed.function.FunctionFeed;
import org.apache.brooklyn.feed.function.FunctionPollConfig;
import org.apache.brooklyn.util.core.config.ConfigBag;
import org.apache.brooklyn.util.core.flags.TypeCoercions;
import org.apache.brooklyn.util.core.task.DynamicTasks;
import org.apache.brooklyn.util.core.task.Tasks;
import org.apache.brooklyn.util.exceptions.Exceptions;
import org.apache.brooklyn.util.guava.Maybe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
/**
* Configurable {@link EntityInitializer} which adds a sensor feed running a given workflow.
*/
@Beta
public class WorkflowSensor<T> extends AbstractAddTriggerableSensor<T> implements WorkflowCommonConfig {
private static final Logger LOG = LoggerFactory.getLogger(WorkflowSensor.class);
// override to be object
public static final ConfigKey<String> SENSOR_TYPE = ConfigKeys.newConfigKeyWithDefault(AddSensorInitializer.SENSOR_TYPE, Object.class.getName());
public static final ConfigKey<EntityValueToSet> SENSOR = ConfigKeys.newConfigKey(EntityValueToSet.class, "sensor");
public static final ConfigKey<String> UNIQUE_TAG_CAMEL = ConfigKeys.newStringConfigKey("uniqueTag");
public static final ConfigKey<String> UNIQUE_TAG_UNDERSCORE = ConfigKeys.newStringConfigKey("unique_tag");
public static final ConfigKey<String> UNIQUE_TAG_DASH = ConfigKeys.newStringConfigKey("unique-tag");
static String getUniqueTag(ConfigBag bag, String defaultValue) {
String result = bag.get(UNIQUE_TAG_CAMEL);
if (result==null) result = bag.get(UNIQUE_TAG_UNDERSCORE);
if (result==null) result = bag.get(UNIQUE_TAG_DASH);
if (result==null) result = defaultValue;
return result;
}
// do we need to have an option not to run when initialization is done?
public WorkflowSensor() {}
public WorkflowSensor(ConfigBag params) {
super(params);
}
@Override
public void apply(final EntityLocal entity) {
ConfigBag params = initParams();
// previously if a commandUrl was used we would listen for the install dir to be set; but that doesn't survive rebind;
// now we install on first run as part of the SshFeed
apply(entity, params);
}
private void apply(final EntityLocal entity, final ConfigBag params) {
AttributeSensor<T> sensor = addSensor(entity);
if (LOG.isDebugEnabled()) {
LOG.debug("Adding workflow sensor {} to {}", sensor.getName(), entity);
}
WorkflowPollCallable wc = new WorkflowPollCallable("Workflow for sensor " + sensor.getName(), params, null);
FunctionPollConfig<Object,Object> pollConfig = new FunctionPollConfig<Object,T>(sensor)
.callable(wc)
.onSuccess(TypeCoercions.<T>function((Class)sensor.getTypeToken().getRawType()));
standardPollConfig(entity, initParams(), pollConfig);
FunctionFeed.Builder feedBuilder = FunctionFeed.builder()
.name("Sensor Workflow Feed: "+sensor.getName())
.entity(entity)
.onlyIfServiceUp(Maybe.ofDisallowingNull(EntityInitializers.resolve(params, ONLY_IF_SERVICE_UP)).or(false))
.uniqueTag(getUniqueTag(params, "workflow_sensor_feed:"+sensor.getName()))
.poll(pollConfig);
wc.init(feedBuilder.build(true));
}
@Override
protected AttributeSensor<T> sensor(Entity entity) {
// overridden because sensor type defaults to object here; and we support 'sensor'
EntityValueToSet s = lookupSensorReference(entity);
// could assert compatibility with defined sensor, or tighten; but currently we don't
TypeToken<T> clazz = s.type == null ? (TypeToken)TypeToken.of(Object.class) : getType(entity, s.type, s.name);
return Sensors.newSensor(clazz, s.name);
}
@Override
protected TypeToken<T> getType(Entity entity, String className) {
return getType(entity, className, lookupSensorReference(entity).name);
}
static <T> TypeToken<T> getType(Entity entity, String className, String sensorName) {
return AddSensorInitializerAbstractProto.getType(entity, className, sensorName);
}
protected EntityValueToSet lookupSensorReference(Entity entity) {
// overridden because sensor type defaults to object here; and we support 'sensor'
EntityValueToSet s = initParam(SENSOR);
if (s==null) s = new EntityValueToSet();
if (s.entity!=null && !s.entity.equals(entity)) throw new IllegalArgumentException("Not permitted to specify different entity ("+s.entity+") for workflow-sensor on "+entity);
String t2 = initParam(SENSOR_TYPE);
if (Object.class.getName().equals(t2)) t2 = null;
if (s.type!=null && t2!=null && !t2.equals(s.type)) throw new IllegalArgumentException("Incompatible types "+s.type+" and "+t2);
if (t2!=null) s.type = t2;
String n2 = initParam(SENSOR_NAME);
if (s.name!=null && n2!=null && !n2.equals(s.name)) throw new IllegalArgumentException("Incompatible names "+s.name+" and "+n2);
if (n2!=null) s.name = n2;
return s;
}
public static class WorkflowPollCallable implements Callable<Object> {
private final String workflowCallableName;
private BrooklynObject entityOrAdjunct;
private final Map<String,Object> params;
protected WorkflowPollCallable(String workflowCallableName, ConfigBag params, BrooklynObject entityOrAdjunct) {
this.workflowCallableName = workflowCallableName;
this.params = params.getAllConfigRaw();
this.entityOrAdjunct = entityOrAdjunct;
}
/** used in some places where the entity/adjunct is set late */
public void init(BrooklynObject entityOrAdjunct) {
this.entityOrAdjunct = entityOrAdjunct;
}
@Override
public Object call() throws Exception {
BrooklynObject entityOrAdjunct = this.entityOrAdjunct;
if (entityOrAdjunct==null) entityOrAdjunct = BrooklynTaskTags.getContextEntityAdjunct(Tasks.current(), false);
if (entityOrAdjunct==null) entityOrAdjunct = BrooklynTaskTags.getContextEntity(Tasks.current());
if (entityOrAdjunct==null) throw new IllegalStateException("No entity adjunct or entity available for "+this);
WorkflowExecutionContext wc = WorkflowExecutionContext.newInstancePersisted(entityOrAdjunct, WorkflowExecutionContext.WorkflowContextType.SENSOR,
workflowCallableName, ConfigBag.newInstance(params), null, null, null);
Task<Object> wt = wc.getTask(false /* condition checked by poll config framework */).get();
if (entityOrAdjunct instanceof EntityAdjunct) {
// add tag to each task so it shows up in list on mgmt context
BrooklynTaskTags.addTagDynamically(wt, BrooklynTaskTags.tagForContextAdjunct((EntityAdjunct) entityOrAdjunct));
}
if (entityOrAdjunct instanceof WorkflowPolicy) {
((WorkflowPolicy)entityOrAdjunct).highlightAction("Workflow running", wt);
}
try {
Object result = DynamicTasks.queue(wt).getUnchecked();
if (entityOrAdjunct instanceof WorkflowPolicy) {
((WorkflowPolicy)entityOrAdjunct).highlightAction("Workflow run (success)", wt);
}
return result;
} catch (Exception e) {
if (entityOrAdjunct instanceof WorkflowPolicy) {
((WorkflowPolicy)entityOrAdjunct).highlightAction("Workflow run, with error: "+
Exceptions.collapseText(e), wt);
}
throw e;
}
}
public Map<String,Object> getParams() {
return params;
}
@Override
public String toString() {
return "WorkflowPollCallable{" +
"workflowCallableName='" + workflowCallableName + '\'' +
", context=" + entityOrAdjunct +
", params=" + params +
'}';
}
}
}