blob: b755b1f3147fe725dfe49e88b44e51d27a930317 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.brooklyn.entity.group;
import java.util.Collection;
import org.apache.brooklyn.api.entity.Entity;
import org.apache.brooklyn.api.mgmt.Task;
import org.apache.brooklyn.api.sensor.Sensor;
import org.apache.brooklyn.api.sensor.SensorEvent;
import org.apache.brooklyn.api.sensor.SensorEventListener;
import org.apache.brooklyn.core.BrooklynLogging;
import org.apache.brooklyn.core.BrooklynLogging.LoggingLevel;
import org.apache.brooklyn.core.entity.Entities;
import org.apache.brooklyn.core.entity.EntityPredicates;
import org.apache.brooklyn.core.mgmt.internal.CollectionChangeListener;
import org.apache.brooklyn.core.mgmt.internal.ManagementContextInternal;
import org.apache.brooklyn.util.core.task.Tasks;
import org.apache.brooklyn.util.exceptions.Exceptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
public class DynamicGroupImpl extends AbstractGroupImpl implements DynamicGroup {
private static final Logger log = LoggerFactory.getLogger(DynamicGroupImpl.class);
protected final Object memberChangeMutex = new Object();
private volatile MyEntitySetChangeListener setChangeListener = null;
@Override
public void init() {
super.init();
sensors().set(RUNNING, true);
}
@Override
public void setEntityFilter(Predicate<? super Entity> filter) {
setConfigEvenIfOwned(ENTITY_FILTER, filter);
rescanEntities();
}
@Override
public Predicate<? super Entity> entityFilter() {
return getEntityFilter();
}
/**
* @return
* The filter configured in {@link #ENTITY_FILTER} ANDed with a check that the
* entity has the same application ID.
*/
protected Predicate<? super Entity> getEntityFilter() {
Predicate<? super Entity> entityFilter = getConfig(ENTITY_FILTER);
if (entityFilter == null) {
entityFilter = Predicates.alwaysFalse();
}
Entity ancestor = getAncestorToScan();
Predicate<Entity> ancestorFilter;
if (ancestor==null) ancestorFilter = EntityPredicates.applicationIdEqualTo(getApplicationId());
else if (ancestor.getParent()==null) ancestorFilter = EntityPredicates.applicationIdEqualTo(ancestor.getId());
else ancestorFilter = EntityPredicates.isDescendantOf(ancestor);
return Predicates.and(
ancestorFilter,
entityFilter);
}
protected Entity getAncestorToScan() {
Entity ancestor = getConfig(ANCESTOR);
if (ancestor==null) return getApplication();
return ancestor;
}
private boolean isRunning() {
return Boolean.TRUE.equals(getAttribute(RUNNING));
}
@Override
public void stop() {
sensors().set(RUNNING, false);
if (setChangeListener != null) {
((ManagementContextInternal) getManagementContext()).removeEntitySetListener(setChangeListener);
}
}
@Override
public <T> void addSubscription(Entity producer, Sensor<T> sensor, final Predicate<? super SensorEvent<? super T>> filter) {
SensorEventListener<T> listener = new SensorEventListener<T>() {
@Override
public void onEvent(SensorEvent<T> event) {
if (filter.apply(event)) onEntityChanged(event.getSource());
}
};
subscriptions().subscribe(producer, sensor, listener);
}
@Override
public <T> void addSubscription(Entity producer, Sensor<T> sensor) {
addSubscription(producer, sensor, Predicates.<SensorEvent<? super T>>alwaysTrue());
}
protected boolean acceptsEntity(Entity e) {
return entityFilter().apply(e);
}
protected void onEntityAdded(Entity item) {
synchronized (memberChangeMutex) {
if (acceptsEntity(item)) {
if (log.isDebugEnabled()) log.debug("{} detected item add {}", this, item);
addMember(item);
}
}
}
protected void onEntityRemoved(Entity item) {
synchronized (memberChangeMutex) {
if (removeMember(item))
if (log.isDebugEnabled()) log.debug("{} detected item removal {}", this, item);
}
}
protected void onEntityChanged(Entity item) {
synchronized (memberChangeMutex) {
boolean accepts = acceptsEntity(item);
boolean has = hasMember(item);
if (has && !accepts) {
removeMember(item);
if (log.isDebugEnabled()) log.debug("{} detected item removal on change of {}", this, item);
} else if (!has && accepts) {
if (log.isDebugEnabled()) log.debug("{} detected item add on change of {}", this, item);
addMember(item);
}
}
}
private class MyEntitySetChangeListener implements CollectionChangeListener.ListenerWithErrorHandler<Entity> {
@Override
public void onItemAdded(Entity item) { onEntityAdded(item); }
@Override
public void onItemRemoved(Entity item) { onEntityRemoved(item); }
@Override
public void onError(String msg, Throwable trace) {
// log debug if shutting down
BrooklynLogging.log(log, Entities.isManagedActive(DynamicGroupImpl.this) ? BrooklynLogging.LoggingLevel.WARN : BrooklynLogging.LoggingLevel.DEBUG,
msg, trace);
throw Exceptions.propagate(trace);
}
}
@Override
public void onManagementBecomingMaster() {
if (setChangeListener != null) {
log.warn("{} becoming master twice", this);
return;
}
setChangeListener = new MyEntitySetChangeListener();
((ManagementContextInternal) getManagementContext()).addEntitySetListener(setChangeListener);
Task<Object> rescan = Tasks.builder().displayName("rescan entities").body(
new Runnable() {
@Override
public void run() {
try {
rescanEntities();
} catch (Exception e) {
log.warn("Error rescanning entities on management of "+DynamicGroupImpl.this+"; may be a group set against an unknown entity: "+e);
log.debug("Trace for rescan entities error", e);
Exceptions.propagateIfFatal(e);
}
}
}).build();
getExecutionContext().submit(rescan);
}
@Override
public void onManagementNoLongerMaster() {
if (setChangeListener == null) {
log.warn("{} no longer master twice", this);
return;
}
((ManagementContextInternal) getManagementContext()).removeEntitySetListener(setChangeListener);
setChangeListener = null;
}
@Override
public void rescanEntities() {
rescanEntitiesInternal(true);
}
public boolean prescanEntities() {
return rescanEntitiesInternal(false);
}
protected boolean rescanEntitiesInternal(boolean makeChanges) {
synchronized (memberChangeMutex) {
if (!isRunning() || !getManagementSupport().isDeployed()) {
if (log.isDebugEnabled()) log.debug("{} not scanning for children: stopped", this);
return false;
}
if (getAncestorToScan() == null) {
BrooklynLogging.log(log, BrooklynLogging.levelDependingIfReadOnly(this, LoggingLevel.WARN, LoggingLevel.TRACE, LoggingLevel.TRACE),
"{} not (yet) scanning for children: no application defined", this);
return false;
}
boolean changed = false;
Collection<Entity> currentMembers = getMembers();
Collection<Entity> toRemove = Sets.newLinkedHashSet(currentMembers);
final Iterable<Entity> unfiltered = Entities.descendantsAndSelf(getAncestorToScan());
log.debug("{} filtering {} with {}", new Object[]{this, unfiltered, entityFilter()});
for (Entity it : Iterables.filter(unfiltered, entityFilter())) {
toRemove.remove(it);
if (!currentMembers.contains(it)) {
if (log.isDebugEnabled()) log.debug("{} rescan detected new item {}", this, it);
if (!makeChanges) return true;
addMember(it);
changed = true;
}
}
for (Entity it : toRemove) {
if (log.isDebugEnabled()) log.debug("{} rescan detected vanished item {}", this, it);
if (!makeChanges) return true;
removeMember(it);
changed = true;
}
if (changed && log.isDebugEnabled())
log.debug("{} rescan complete, members now {}", this, getMembers());
return changed;
}
}
}