| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.brooklyn.entity.group; |
| |
| import java.util.Collection; |
| |
| import org.apache.brooklyn.api.entity.Entity; |
| import org.apache.brooklyn.api.mgmt.Task; |
| import org.apache.brooklyn.api.sensor.Sensor; |
| import org.apache.brooklyn.api.sensor.SensorEvent; |
| import org.apache.brooklyn.api.sensor.SensorEventListener; |
| import org.apache.brooklyn.core.BrooklynLogging; |
| import org.apache.brooklyn.core.BrooklynLogging.LoggingLevel; |
| import org.apache.brooklyn.core.entity.Entities; |
| import org.apache.brooklyn.core.entity.EntityPredicates; |
| import org.apache.brooklyn.core.mgmt.internal.CollectionChangeListener; |
| import org.apache.brooklyn.core.mgmt.internal.ManagementContextInternal; |
| import org.apache.brooklyn.util.core.task.Tasks; |
| import org.apache.brooklyn.util.exceptions.Exceptions; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.google.common.base.Predicate; |
| import com.google.common.base.Predicates; |
| import com.google.common.collect.Iterables; |
| import com.google.common.collect.Sets; |
| |
| public class DynamicGroupImpl extends AbstractGroupImpl implements DynamicGroup { |
| |
| private static final Logger log = LoggerFactory.getLogger(DynamicGroupImpl.class); |
| |
| protected final Object memberChangeMutex = new Object(); |
| |
| private volatile MyEntitySetChangeListener setChangeListener = null; |
| |
| @Override |
| public void init() { |
| super.init(); |
| sensors().set(RUNNING, true); |
| } |
| |
| @Override |
| public void setEntityFilter(Predicate<? super Entity> filter) { |
| setConfigEvenIfOwned(ENTITY_FILTER, filter); |
| rescanEntities(); |
| } |
| |
| @Override |
| public Predicate<? super Entity> entityFilter() { |
| return getEntityFilter(); |
| } |
| |
| /** |
| * @return |
| * The filter configured in {@link #ENTITY_FILTER} ANDed with a check that the |
| * entity has the same application ID. |
| */ |
| protected Predicate<? super Entity> getEntityFilter() { |
| Predicate<? super Entity> entityFilter = getConfig(ENTITY_FILTER); |
| if (entityFilter == null) { |
| entityFilter = Predicates.alwaysFalse(); |
| } |
| Entity ancestor = getAncestorToScan(); |
| Predicate<Entity> ancestorFilter; |
| if (ancestor==null) ancestorFilter = EntityPredicates.applicationIdEqualTo(getApplicationId()); |
| else if (ancestor.getParent()==null) ancestorFilter = EntityPredicates.applicationIdEqualTo(ancestor.getId()); |
| else ancestorFilter = EntityPredicates.isDescendantOf(ancestor); |
| return Predicates.and( |
| ancestorFilter, |
| entityFilter); |
| } |
| |
| protected Entity getAncestorToScan() { |
| Entity ancestor = getConfig(ANCESTOR); |
| if (ancestor==null) return getApplication(); |
| return ancestor; |
| } |
| |
| private boolean isRunning() { |
| return Boolean.TRUE.equals(getAttribute(RUNNING)); |
| } |
| |
| @Override |
| public void stop() { |
| sensors().set(RUNNING, false); |
| if (setChangeListener != null) { |
| ((ManagementContextInternal) getManagementContext()).removeEntitySetListener(setChangeListener); |
| } |
| } |
| |
| @Override |
| public <T> void addSubscription(Entity producer, Sensor<T> sensor, final Predicate<? super SensorEvent<? super T>> filter) { |
| SensorEventListener<T> listener = new SensorEventListener<T>() { |
| @Override |
| public void onEvent(SensorEvent<T> event) { |
| if (filter.apply(event)) onEntityChanged(event.getSource()); |
| } |
| }; |
| subscriptions().subscribe(producer, sensor, listener); |
| } |
| |
| @Override |
| public <T> void addSubscription(Entity producer, Sensor<T> sensor) { |
| addSubscription(producer, sensor, Predicates.<SensorEvent<? super T>>alwaysTrue()); |
| } |
| |
| protected boolean acceptsEntity(Entity e) { |
| return entityFilter().apply(e); |
| } |
| |
| protected void onEntityAdded(Entity item) { |
| synchronized (memberChangeMutex) { |
| if (acceptsEntity(item)) { |
| if (log.isDebugEnabled()) log.debug("{} detected item add {}", this, item); |
| addMember(item); |
| } |
| } |
| } |
| |
| protected void onEntityRemoved(Entity item) { |
| synchronized (memberChangeMutex) { |
| if (removeMember(item)) |
| if (log.isDebugEnabled()) log.debug("{} detected item removal {}", this, item); |
| } |
| } |
| |
| protected void onEntityChanged(Entity item) { |
| synchronized (memberChangeMutex) { |
| boolean accepts = acceptsEntity(item); |
| boolean has = hasMember(item); |
| if (has && !accepts) { |
| removeMember(item); |
| if (log.isDebugEnabled()) log.debug("{} detected item removal on change of {}", this, item); |
| } else if (!has && accepts) { |
| if (log.isDebugEnabled()) log.debug("{} detected item add on change of {}", this, item); |
| addMember(item); |
| } |
| } |
| } |
| |
| private class MyEntitySetChangeListener implements CollectionChangeListener.ListenerWithErrorHandler<Entity> { |
| @Override |
| public void onItemAdded(Entity item) { onEntityAdded(item); } |
| @Override |
| public void onItemRemoved(Entity item) { onEntityRemoved(item); } |
| |
| @Override |
| public void onError(String msg, Throwable trace) { |
| // log debug if shutting down |
| BrooklynLogging.log(log, Entities.isManagedActive(DynamicGroupImpl.this) ? BrooklynLogging.LoggingLevel.WARN : BrooklynLogging.LoggingLevel.DEBUG, |
| msg, trace); |
| throw Exceptions.propagate(trace); |
| } |
| } |
| |
| @Override |
| public void onManagementBecomingMaster() { |
| if (setChangeListener != null) { |
| log.warn("{} becoming master twice", this); |
| return; |
| } |
| setChangeListener = new MyEntitySetChangeListener(); |
| ((ManagementContextInternal) getManagementContext()).addEntitySetListener(setChangeListener); |
| Task<Object> rescan = Tasks.builder().displayName("rescan entities").body( |
| new Runnable() { |
| @Override |
| public void run() { |
| try { |
| rescanEntities(); |
| } catch (Exception e) { |
| log.warn("Error rescanning entities on management of "+DynamicGroupImpl.this+"; may be a group set against an unknown entity: "+e); |
| log.debug("Trace for rescan entities error", e); |
| Exceptions.propagateIfFatal(e); |
| } |
| } |
| }).build(); |
| getExecutionContext().submit(rescan); |
| } |
| |
| @Override |
| public void onManagementNoLongerMaster() { |
| if (setChangeListener == null) { |
| log.warn("{} no longer master twice", this); |
| return; |
| } |
| ((ManagementContextInternal) getManagementContext()).removeEntitySetListener(setChangeListener); |
| setChangeListener = null; |
| } |
| |
| @Override |
| public void rescanEntities() { |
| rescanEntitiesInternal(true); |
| } |
| |
| public boolean prescanEntities() { |
| return rescanEntitiesInternal(false); |
| } |
| |
| protected boolean rescanEntitiesInternal(boolean makeChanges) { |
| synchronized (memberChangeMutex) { |
| if (!isRunning() || !getManagementSupport().isDeployed()) { |
| if (log.isDebugEnabled()) log.debug("{} not scanning for children: stopped", this); |
| return false; |
| } |
| if (getAncestorToScan() == null) { |
| BrooklynLogging.log(log, BrooklynLogging.levelDependingIfReadOnly(this, LoggingLevel.WARN, LoggingLevel.TRACE, LoggingLevel.TRACE), |
| "{} not (yet) scanning for children: no application defined", this); |
| return false; |
| } |
| boolean changed = false; |
| Collection<Entity> currentMembers = getMembers(); |
| Collection<Entity> toRemove = Sets.newLinkedHashSet(currentMembers); |
| |
| final Iterable<Entity> unfiltered = Entities.descendantsAndSelf(getAncestorToScan()); |
| log.debug("{} filtering {} with {}", new Object[]{this, unfiltered, entityFilter()}); |
| for (Entity it : Iterables.filter(unfiltered, entityFilter())) { |
| toRemove.remove(it); |
| if (!currentMembers.contains(it)) { |
| if (log.isDebugEnabled()) log.debug("{} rescan detected new item {}", this, it); |
| if (!makeChanges) return true; |
| addMember(it); |
| changed = true; |
| } |
| } |
| for (Entity it : toRemove) { |
| if (log.isDebugEnabled()) log.debug("{} rescan detected vanished item {}", this, it); |
| if (!makeChanges) return true; |
| removeMember(it); |
| changed = true; |
| } |
| if (changed && log.isDebugEnabled()) |
| log.debug("{} rescan complete, members now {}", this, getMembers()); |
| return changed; |
| } |
| } |
| |
| } |