blob: 20a5381123300aab8456f157285fe4b8168f609a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.brooklyn.core.feed;
import static com.google.common.base.Preconditions.checkNotNull;
import org.apache.brooklyn.api.entity.Entity;
import org.apache.brooklyn.api.sensor.AttributeSensor;
import org.apache.brooklyn.core.entity.Attributes;
import org.apache.brooklyn.core.entity.Entities;
import org.apache.brooklyn.core.entity.EntityInternal;
import org.apache.brooklyn.core.entity.lifecycle.Lifecycle;
import org.apache.brooklyn.core.entity.lifecycle.Lifecycle.Transition;
import org.apache.brooklyn.util.core.flags.TypeCoercions;
import org.apache.brooklyn.util.core.task.Tasks;
import org.apache.brooklyn.util.exceptions.Exceptions;
import org.apache.brooklyn.util.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Objects;
/**
* Handler for when polling an entity's attribute. On each poll result the entity's attribute is set.
*
* Calls to onSuccess and onError will happen sequentially, but may be called from different threads
* each time. Note that no guarantees of a synchronized block exist, so additional synchronization
* would be required for the Java memory model's "happens before" relationship.
*
* @author aled
*/
public class AttributePollHandler<V> implements PollHandler<V> {
public static final Logger log = LoggerFactory.getLogger(AttributePollHandler.class);
private final FeedConfig<V,?,?> config;
private final Entity entity;
@SuppressWarnings("rawtypes")
private final AttributeSensor sensor;
private final AbstractFeed feed;
private final boolean suppressDuplicates;
// internal state to look after whether to log warnings
private volatile Long lastSuccessTime = null;
private volatile Long currentProblemStartTime = null;
private volatile boolean currentProblemLoggedAsWarning = false;
private volatile boolean lastWasProblem = false;
public AttributePollHandler(FeedConfig<V,?,?> config, Entity entity, AbstractFeed feed) {
this.config = checkNotNull(config, "config");
this.entity = checkNotNull(entity, "entity");
this.sensor = checkNotNull(config.getSensor(), "sensor");
this.feed = checkNotNull(feed, "feed");
this.suppressDuplicates = config.getSupressDuplicates();
}
@Override
public boolean checkSuccess(V val) {
// Always true if no checkSuccess predicate was configured.
return !config.hasCheckSuccessHandler() || config.getCheckSuccess().apply(val);
}
@Override
public void onSuccess(V val) {
if (log.isTraceEnabled()) log.trace("poll for "+getBriefDescription()+" got: "+val);
setSensor(transformValueOnSuccess(val));
if (lastWasProblem) {
if (currentProblemLoggedAsWarning) {
log.info("Success (following previous problem) reading "+getBriefDescription());
} else {
log.debug("Success (following previous problem) reading "+getBriefDescription());
}
lastWasProblem = false;
currentProblemStartTime = null;
currentProblemLoggedAsWarning = false;
}
lastSuccessTime = System.currentTimeMillis();
}
/** allows post-processing, such as applying a success handler;
* default applies the onSuccess handler (which is recommended) */
protected Object transformValueOnSuccess(V val) {
return config.hasSuccessHandler() ? config.getOnSuccess().apply(val) : val;
}
@Override
public void onFailure(V val) {
if (!config.hasFailureHandler()) {
onException(new Exception("checkSuccess of "+this+" for "+getBriefDescription()+" was false but poller has no failure handler"));
} else {
logProblem("failure", val);
try {
setSensor(config.hasFailureHandler() ? config.getOnFailure().apply(val) : val);
} catch (Exception e) {
if (feed.isConnected()) {
log.warn("Error computing " + getBriefDescription() + "; val=" + val+": "+ e, e);
} else {
if (log.isDebugEnabled())
log.debug("Error computing " + getBriefDescription() + "; val=" + val + " (when inactive)", e);
}
}
}
}
@Override
public void onException(Exception exception) {
if (!feed.isConnected()) {
if (log.isTraceEnabled()) log.trace("Read of "+this+" in "+getBriefDescription()+" gave exception (while not connected or not yet connected): "+ exception);
} else {
logProblem("exception", exception);
}
if (config.hasExceptionHandler()) {
try {
setSensor( config.getOnException().apply(exception) );
} catch (Exception e) {
if (feed.isConnected()) {
if (!e.equals(exception)) {
log.warn("Exception handler for "+getBriefDescription()+" returned a different error than the original computation; original error was "+exception, exception);
}
log.debug("Exception handler failed computing "+getBriefDescription()+" (rethrowing): "+e);
throw Exceptions.propagate(e);
} else {
if (log.isDebugEnabled()) log.debug("Unable to compute "+getBriefDescription()+"; exception="+exception+" (when inactive)", e);
}
}
}
}
protected void logProblem(String type, Object val) {
if (lastWasProblem && currentProblemLoggedAsWarning) {
if (log.isTraceEnabled())
log.trace("Recurring "+type+" reading "+this+" in "+getBriefDescription()+": "+val);
} else {
long nowTime = System.currentTimeMillis();
// get a non-volatile value
Long currentProblemStartTimeCache = currentProblemStartTime;
long expiryTime =
(lastSuccessTime!=null && !isTransitioningOrStopped())
? lastSuccessTime+config.getLogWarningGraceTime().toMilliseconds()
: (currentProblemStartTimeCache != null)
? currentProblemStartTimeCache+config.getLogWarningGraceTimeOnStartup().toMilliseconds()
: nowTime+config.getLogWarningGraceTimeOnStartup().toMilliseconds();
if (!lastWasProblem) {
if (expiryTime <= nowTime) {
currentProblemLoggedAsWarning = true;
if (entity==null || Entities.isManagedActive(entity)) {
log.warn("Read of " + getBriefDescription() + " gave " + type + ": " + val);
if (log.isDebugEnabled() && val instanceof Throwable)
log.debug("Trace for "+type+" reading "+getBriefDescription()+": "+val, (Throwable)val);
} else {
log.debug("Read (unmanaged) of " + getBriefDescription() + " gave " + type + ": " + val);
}
} else {
if (log.isDebugEnabled())
log.debug("Read of " + getBriefDescription() + " gave " + type + " (in grace period): " + val);
}
lastWasProblem = true;
currentProblemStartTime = nowTime;
} else {
if (expiryTime <= nowTime) {
currentProblemLoggedAsWarning = true;
if (entity==null || Entities.isManagedActive(entity)) {
log.warn("Read of " + getBriefDescription() + " gave " + type +
" (grace period expired, occurring for " + Duration.millis(nowTime - currentProblemStartTimeCache) +
(config.hasExceptionHandler() ? "" : ", no exception handler set for sensor") +
")" +
": " + val);
if (log.isDebugEnabled() && val instanceof Throwable)
log.debug("Trace for " + type + " reading " + getBriefDescription() + ": " + val, (Throwable) val);
} else {
if (log.isDebugEnabled())
log.debug("Read (unmanaged) of " + getBriefDescription() + " gave " + type + " (grace period expired): " + val);
}
} else {
if (log.isDebugEnabled())
log.debug("Recurring {} reading {} in {} (still in grace period): {}", new Object[] {type, this, getBriefDescription(), val});
}
}
}
}
protected boolean isTransitioningOrStopped() {
if (entity==null) return false;
Transition expected = entity.getAttribute(Attributes.SERVICE_STATE_EXPECTED);
if (expected==null) return false;
return (expected.getState()==Lifecycle.STARTING || expected.getState()==Lifecycle.STOPPING || expected.getState()==Lifecycle.STOPPED);
}
@SuppressWarnings("unchecked")
protected void setSensor(Object v) {
if (Entities.isNoLongerManaged(entity)) {
if (Tasks.isInterrupted()) return;
log.warn(""+entity+" is not managed; feed "+this+" setting "+sensor+" to "+v+" at this time is not supported ("+Tasks.current()+")");
}
if (v == FeedConfig.UNCHANGED) {
// nothing
} else if (v == FeedConfig.REMOVE) {
((EntityInternal)entity).sensors().remove(sensor);
feed.onRemoveSensor(sensor);
} else if (sensor == FeedConfig.NO_SENSOR) {
// nothing
} else {
Object coercedV = TypeCoercions.coerce(v, sensor.getType());
if (suppressDuplicates && Objects.equal(coercedV, entity.getAttribute(sensor))) {
// no change; nothing
} else {
entity.sensors().set(sensor, coercedV);
feed.onPublishSensor(sensor, coercedV);
}
}
}
@Override
public String toString() {
return super.toString()+"["+getDescription()+"]";
}
@Override
public String getDescription() {
return
// we used to show the sensor name, but it was redundant with info on the FeedConfig
//sensor.getName() + " " /*+entity.getId()*/ +" <- " +
"" + config;
}
protected String getBriefDescription() {
return ""+entity+"->"+(sensor==FeedConfig.NO_SENSOR ? "(dynamic sensors)" : ""+sensor);
}
}