blob: 45cb8b69281f15b957243b99639073550d34e97b [file] [log] [blame]
package brooklyn.entity.group;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import brooklyn.entity.Entity;
import brooklyn.entity.Group;
import brooklyn.entity.basic.DynamicGroup;
import brooklyn.entity.trait.Startable;
import brooklyn.event.Sensor;
import brooklyn.event.SensorEvent;
import brooklyn.event.SensorEventListener;
import brooklyn.policy.basic.AbstractPolicy;
import brooklyn.util.flags.SetFromFlag;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
/** abstract class which helps track membership of a group, invoking (empty) methods in this class on MEMBER{ADDED,REMOVED} events, as well as SERVICE_UP {true,false} for those members. */
public abstract class AbstractMembershipTrackingPolicy extends AbstractPolicy {
private static final Logger LOG = LoggerFactory.getLogger(AbstractMembershipTrackingPolicy.class);
private Group group;
@SetFromFlag
private Set<Sensor<?>> sensorsToTrack;
public AbstractMembershipTrackingPolicy(Map<?,?> flags) {
super(flags);
if (sensorsToTrack == null) sensorsToTrack = Sets.newLinkedHashSet();
}
public AbstractMembershipTrackingPolicy() {
this(Collections.emptyMap());
}
/**
* Sets the group to be tracked; unsubscribes from any previous group, and subscribes to this group.
*
* Note this must be called *after* adding the policy to the entity.
*
* @param group
*/
public void setGroup(Group group) {
Preconditions.checkNotNull(group, "The group cannot be null");
unsubscribeFromGroup();
this.group = group;
subscribeToGroup();
}
/**
* Unsubscribes from the group.
*/
public void reset() {
unsubscribeFromGroup();
}
@Override
public void suspend() {
unsubscribeFromGroup();
super.suspend();
}
@Override
public void resume() {
super.resume();
if (group != null) {
subscribeToGroup();
}
}
// TODO having "subscribe to changes only" semantics as part of subscription would be much cleaner
// than this lightweight map
Map<String,Boolean> lastKnownServiceUpCache = new ConcurrentHashMap<String, Boolean>();
protected void subscribeToGroup() {
Preconditions.checkNotNull(group, "The group cannot be null");
LOG.debug("Subscribing to group "+group+", for memberAdded, memberRemoved, serviceUp, and {}", sensorsToTrack);
subscribe(group, DynamicGroup.MEMBER_ADDED, new SensorEventListener<Entity>() {
@Override public void onEvent(SensorEvent<Entity> event) {
onEntityEvent(EventType.ENTITY_ADDED, event.getValue());
}
});
subscribe(group, DynamicGroup.MEMBER_REMOVED, new SensorEventListener<Entity>() {
@Override public void onEvent(SensorEvent<Entity> event) {
lastKnownServiceUpCache.remove(event.getSource());
onEntityEvent(EventType.ENTITY_REMOVED, event.getValue());
}
});
subscribeToMembers(group, Startable.SERVICE_UP, new SensorEventListener<Boolean>() {
@Override public void onEvent(SensorEvent<Boolean> event) {
if (event.getValue() == lastKnownServiceUpCache.put(event.getSource().getId(), event.getValue()))
// ignore if value has not changed
return;
onEntityEvent(EventType.ENTITY_CHANGE, event.getSource());
}
});
for (Sensor<?> sensor : sensorsToTrack) {
subscribeToMembers(group, sensor, new SensorEventListener<Object>() {
@Override public void onEvent(SensorEvent<Object> event) {
onEntityEvent(EventType.ENTITY_CHANGE, event.getSource());
}
});
}
for (Entity it : group.getMembers()) { onEntityEvent(EventType.ENTITY_ADDED, it); }
// FIXME cluster may be remote, we need to make this retrieve the remote values, or store members in local mgmt node, or use children
}
protected void unsubscribeFromGroup() {
if (getSubscriptionTracker()!=null && group != null) unsubscribe(group);
}
public enum EventType { ENTITY_CHANGE, ENTITY_ADDED, ENTITY_REMOVED }
/** All entity events pass through this method. Default impl delegates to onEntityXxxx methods, whose default behaviours are no-op.
* Callers may override this to intercept all entity events in a single place, and to suppress subsequent processing if desired.
*/
protected void onEntityEvent(EventType type, Entity entity) {
switch (type) {
case ENTITY_CHANGE: onEntityChange(entity); break;
case ENTITY_ADDED: onEntityAdded(entity); break;
case ENTITY_REMOVED: onEntityRemoved(entity); break;
}
}
/**
* Called when a member's "up" sensor changes.
*/
protected void onEntityChange(Entity member) {}
/**
* Called when a member is added.
* Note that the change event may arrive before this event; implementations here should typically look at the last value.
*/
protected void onEntityAdded(Entity member) {}
/**
* Called when a member is removed.
* Note that entity change events may arrive after this event; they should typically be ignored.
*/
protected void onEntityRemoved(Entity member) {}
}