| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.brooklyn.core.mgmt.internal; |
| |
| import static com.google.common.base.Preconditions.checkNotNull; |
| |
| import java.lang.reflect.Proxy; |
| import java.util.*; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.regex.Pattern; |
| |
| import javax.annotation.Nullable; |
| |
| import org.apache.brooklyn.api.entity.Application; |
| import org.apache.brooklyn.api.entity.Entity; |
| import org.apache.brooklyn.api.entity.EntitySpec; |
| import org.apache.brooklyn.api.entity.EntityTypeRegistry; |
| import org.apache.brooklyn.api.entity.Group; |
| import org.apache.brooklyn.api.location.Location; |
| import org.apache.brooklyn.api.mgmt.AccessController; |
| import org.apache.brooklyn.api.mgmt.Task; |
| import org.apache.brooklyn.api.policy.Policy; |
| import org.apache.brooklyn.api.policy.PolicySpec; |
| import org.apache.brooklyn.api.sensor.Enricher; |
| import org.apache.brooklyn.api.sensor.EnricherSpec; |
| import org.apache.brooklyn.core.BrooklynLogging; |
| import org.apache.brooklyn.core.entity.AbstractEntity; |
| import org.apache.brooklyn.core.entity.Entities; |
| import org.apache.brooklyn.core.entity.EntityInternal; |
| import org.apache.brooklyn.core.entity.EntityPredicates; |
| import org.apache.brooklyn.core.entity.trait.Startable; |
| import org.apache.brooklyn.core.internal.storage.BrooklynStorage; |
| import org.apache.brooklyn.core.mgmt.BrooklynTags; |
| import org.apache.brooklyn.core.mgmt.BrooklynTags.NamedStringTag; |
| import org.apache.brooklyn.core.mgmt.BrooklynTaskTags; |
| import org.apache.brooklyn.core.objs.BasicEntityTypeRegistry; |
| import org.apache.brooklyn.core.objs.proxy.EntityProxy; |
| import org.apache.brooklyn.core.objs.proxy.EntityProxyImpl; |
| import org.apache.brooklyn.core.objs.proxy.InternalEntityFactory; |
| import org.apache.brooklyn.core.objs.proxy.InternalPolicyFactory; |
| import org.apache.brooklyn.util.collections.MutableSet; |
| import org.apache.brooklyn.util.collections.SetFromLiveMap; |
| import org.apache.brooklyn.util.core.task.Tasks; |
| import org.apache.brooklyn.util.exceptions.Exceptions; |
| import org.apache.brooklyn.util.time.CountdownTimer; |
| import org.apache.brooklyn.util.time.Duration; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.google.common.annotations.Beta; |
| import com.google.common.base.Function; |
| import com.google.common.base.Preconditions; |
| import com.google.common.base.Predicate; |
| import com.google.common.base.Predicates; |
| import com.google.common.collect.FluentIterable; |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.collect.Iterables; |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Maps; |
| import com.google.common.collect.Sets; |
| |
| public class LocalEntityManager implements EntityManagerInternal { |
| |
| private static final Logger log = LoggerFactory.getLogger(LocalEntityManager.class); |
| |
| /** |
| * Regex used for validating entity ids that are passed in, for use when creating an entity. |
| * |
| * Only lower-case letters and digits; min 10 chars; max 63 chars. We are this extreme because |
| * some existing entity implementations rely on the entity-id format for use in hostnames, etc. |
| */ |
| private static final Pattern ENTITY_ID_PATTERN = Pattern.compile("[a-z0-9]{10,63}"); |
| |
| private final LocalManagementContext managementContext; |
| private final BasicEntityTypeRegistry entityTypeRegistry; |
| private final InternalEntityFactory entityFactory; |
| private final InternalPolicyFactory policyFactory; |
| |
| /** Entities that have been created, but have not yet begun to be managed */ |
| private final Map<String,Entity> preRegisteredEntitiesById = Collections.synchronizedMap(new WeakHashMap<String, Entity>()); |
| |
| /** Entities that are in the process of being managed, but where management is not yet complete */ |
| private final Map<String,Entity> preManagedEntitiesById = Collections.synchronizedMap(new WeakHashMap<String, Entity>()); |
| |
| /** Proxies of the managed entities */ |
| private final ConcurrentMap<String,Entity> entityProxiesById = Maps.newConcurrentMap(); |
| |
| /** Real managed entities */ |
| private final Map<String,Entity> entitiesById = Maps.newLinkedHashMap(); |
| |
| /** Management mode for each entity */ |
| private final Map<String,ManagementTransitionMode> entityModesById = Collections.synchronizedMap(Maps.<String,ManagementTransitionMode>newLinkedHashMap()); |
| |
| /** |
| * Proxies of the managed entities. |
| * |
| * Access to this is always done in a synchronized block (synchronizing on `this`). |
| */ |
| private final ObservableSet<Entity> entities = new ObservableSet<Entity>(); |
| |
| /** Proxies of the managed entities that are applications */ |
| private final Set<Application> applications = Sets.newConcurrentHashSet(); |
| |
| private final BrooklynStorage storage; |
| private final Map<String,String> entityTypes; |
| private final Set<String> applicationIds; |
| |
| public LocalEntityManager(LocalManagementContext managementContext) { |
| this.managementContext = checkNotNull(managementContext, "managementContext"); |
| this.storage = managementContext.getStorage(); |
| this.entityTypeRegistry = new BasicEntityTypeRegistry(); |
| this.policyFactory = new InternalPolicyFactory(managementContext); |
| this.entityFactory = new InternalEntityFactory(managementContext, entityTypeRegistry, policyFactory); |
| |
| entityTypes = storage.getMap("entities"); |
| applicationIds = SetFromLiveMap.create(storage.<String,Boolean>getMap("applications")); |
| } |
| |
| public InternalEntityFactory getEntityFactory() { |
| if (!isRunning()) throw new IllegalStateException("Management context no longer running"); |
| return entityFactory; |
| } |
| |
| public InternalPolicyFactory getPolicyFactory() { |
| if (!isRunning()) throw new IllegalStateException("Management context no longer running"); |
| return policyFactory; |
| } |
| |
| @Override |
| public EntityTypeRegistry getEntityTypeRegistry() { |
| if (!isRunning()) throw new IllegalStateException("Management context no longer running"); |
| return entityTypeRegistry; |
| } |
| |
| @Override |
| public <T extends Entity> T createEntity(EntitySpec<T> spec, EntityCreationOptions options) { |
| String entityId = options.getRequiredUniqueId(); |
| |
| if (entityId!=null) { |
| if (!ENTITY_ID_PATTERN.matcher(entityId).matches()) { |
| throw new IllegalArgumentException("Invalid entity id '"+entityId+"'"); |
| } |
| } |
| |
| try { |
| T entity = entityFactory.createEntity(spec, options); |
| |
| if (options.isDryRun()) { |
| unmanageDryRun(entity); |
| // also need to do this to remove tasks etc |
| managementContext.getGarbageCollector().onUnmanaged(entity); |
| return entity; |
| |
| } else { |
| Entity proxy = ((AbstractEntity)entity).getProxy(); |
| checkNotNull(proxy, "proxy for entity %s, spec %s", entity, spec); |
| manage(entity); |
| return (T) proxy; |
| } |
| |
| } catch (Throwable e) { |
| log.warn("Failed to create entity using spec "+spec+" (rethrowing)", e); |
| throw Exceptions.propagate(e); |
| } |
| } |
| |
| @Override |
| public <T extends Entity> T createEntity(Map<?,?> config, Class<T> type) { |
| return createEntity(EntitySpec.create(config, type)); |
| } |
| |
| @Override |
| public <T extends Policy> T createPolicy(PolicySpec<T> spec) { |
| try { |
| return policyFactory.createPolicy(spec); |
| } catch (Throwable e) { |
| log.warn("Failed to create policy using spec "+spec+" (rethrowing)", e); |
| throw Exceptions.propagate(e); |
| } |
| } |
| |
| @Override |
| public <T extends Enricher> T createEnricher(EnricherSpec<T> spec) { |
| try { |
| return policyFactory.createEnricher(spec); |
| } catch (Throwable e) { |
| log.warn("Failed to create enricher using spec "+spec+" (rethrowing)", e); |
| throw Exceptions.propagate(e); |
| } |
| } |
| |
| @Override |
| public Collection<Entity> getEntities() { |
| return ImmutableList.copyOf(entityProxiesById.values()); |
| } |
| |
| @Override |
| public Collection<String> getEntityIds() { |
| return ImmutableList.copyOf(entityProxiesById.keySet()); |
| } |
| |
| @Override |
| public Collection<Entity> getEntitiesInApplication(Application application) { |
| Predicate<Entity> predicate = EntityPredicates.applicationIdEqualTo(application.getId()); |
| return ImmutableList.copyOf(Iterables.filter(entityProxiesById.values(), predicate)); |
| } |
| |
| @Override |
| public Collection<Entity> findEntities(Predicate<? super Entity> filter) { |
| return ImmutableList.copyOf(Iterables.filter(entityProxiesById.values(), filter)); |
| } |
| |
| @Override |
| public Collection<Entity> findEntitiesInApplication(Application application, Predicate<? super Entity> filter) { |
| Predicate<Entity> predicate = Predicates.and(EntityPredicates.applicationIdEqualTo(application.getId()), filter); |
| return ImmutableList.copyOf(Iterables.filter(entityProxiesById.values(), predicate)); |
| } |
| |
| @Override |
| public Iterable<Entity> getAllEntitiesInApplication(Application application) { |
| // To fix https://issues.apache.org/jira/browse/BROOKLYN-352, we need to synchronize on |
| // preRegisteredEntitiesById and preManagedEntitiesById while iterating over them (because |
| // they are synchronizedMaps). entityProxiesById is a ConcurrentMap, so no need to |
| // synchronize on that. |
| // Only synchronize on one at a time, to avoid the risk of deadlock. |
| |
| Predicate<Entity> predicate = EntityPredicates.applicationIdEqualTo(application.getId()); |
| Set<Entity> result = Sets.newLinkedHashSet(); |
| |
| synchronized (preRegisteredEntitiesById) { |
| for (Entity entity : preRegisteredEntitiesById.values()) { |
| if (predicate.apply(entity)) { |
| result.add(entity); |
| } |
| } |
| } |
| synchronized (preManagedEntitiesById) { |
| for (Entity entity : preManagedEntitiesById.values()) { |
| if (predicate.apply(entity)) { |
| result.add(entity); |
| } |
| } |
| } |
| for (Entity entity : entityProxiesById.values()) { |
| if (predicate.apply(entity)) { |
| result.add(entity); |
| } |
| } |
| |
| return FluentIterable.from(result) |
| .transform(new Function<Entity, Entity>() { |
| @Override public Entity apply(Entity input) { |
| return Entities.proxy(input); |
| }}) |
| .toSet(); |
| } |
| |
| @Override |
| public Entity getEntity(String id) { |
| return entityProxiesById.get(id); |
| } |
| |
| Collection<Application> getApplications() { |
| return ImmutableList.copyOf(applications); |
| } |
| |
| @Override |
| public boolean isManaged(Entity e) { |
| // Confirm we know about this entity (by id), and that it is the same entity instance |
| // (rather than just a different unmanaged entity with the same id). |
| return (isRunning() && getEntity(e.getId()) != null) && (entitiesById.get(e.getId()) == deproxyIfNecessary(e)); |
| } |
| |
| boolean isPreRegistered(Entity e) { |
| return preRegisteredEntitiesById.containsKey(e.getId()); |
| } |
| |
| void prePreManage(Entity entity) { |
| if (isPreRegistered(entity)) { |
| log.warn(""+this+" redundant call to pre-pre-manage entity "+entity+"; skipping", |
| new Exception("source of duplicate pre-pre-manage of "+entity)); |
| return; |
| } |
| preRegisteredEntitiesById.put(entity.getId(), entity); |
| } |
| |
| @Override |
| public ManagementTransitionMode getLastManagementTransitionMode(String itemId) { |
| return entityModesById.get(itemId); |
| } |
| |
| @Override |
| public void setManagementTransitionMode(Entity item, ManagementTransitionMode mode) { |
| entityModesById.put(item.getId(), mode); |
| } |
| |
| // TODO synchronization issues here. We guard with isManaged(), but if another thread executing |
| // concurrently then the managed'ness could be set after our check but before we do |
| // onManagementStarting etc. However, we can't just synchronize because we're calling alien code |
| // (the user might override entity.onManagementStarting etc). |
| // |
| // TODO We need to do some check about isPreManaged - i.e. is there another thread (or is this a |
| // re-entrant call) where the entity is not yet full managed (i.e. isManaged==false) but we're in |
| // the middle of managing it. |
| // |
| // TODO Also see LocalLocationManager.manage(Entity), if fixing things here |
| @Override |
| public void manage(Entity e) { |
| if (isManaged(e)) { |
| log.warn(""+this+" redundant call to start management of entity (and descendants of) "+e+"; skipping", |
| new Exception("source of duplicate management of "+e)); |
| return; |
| } |
| manageRecursive(e, ManagementTransitionMode.guessing(BrooklynObjectManagementMode.NONEXISTENT, BrooklynObjectManagementMode.MANAGED_PRIMARY)); |
| } |
| |
| @Override |
| public void manageRebindedRoot(Entity item) { |
| ManagementTransitionMode mode = getLastManagementTransitionMode(item.getId()); |
| Preconditions.checkNotNull(mode, "Mode not set for rebinding %s", item); |
| manageRecursive(item, mode); |
| } |
| |
| protected void checkManagementAllowed(Entity item) { |
| AccessController.Response access = managementContext.getAccessController().canManageEntity(item); |
| if (!access.isAllowed()) { |
| throw new IllegalStateException("Access controller forbids management of "+item+": "+access.getMsg()); |
| } |
| } |
| |
| /* TODO we sloppily use "recursive" to ensure ordering of parent-first in many places |
| * (which may not be necessary but seems like a good idea), |
| * and also to collect many entities when doing a big rebind, |
| * ensuring all have #manageNonRecursive called before calling #onManagementStarted. |
| * |
| * it would be better to have a manageAll(Map<Entity,ManagementTransitionMode> items) |
| * method which did that in two phases, allowing us to selectively rebind, |
| * esp when we come to want supporting different modes and different brooklyn nodes. |
| * |
| * the impl of manageAll could sort them with parents before children, |
| * (and manageRecursive could simply populate a map and delegate to manageAll). |
| * |
| * manageRebindRoot would then go, and the (few) callers would construct the map. |
| * |
| * similarly we might want an unmanageAll(), |
| * although possibly all unmanagement should be recursive, if we assume an entity's ancestors are always at least proxied |
| * (and the non-recursive RO path here could maybe be dropped) |
| */ |
| |
| /** Applies management lifecycle callbacks (onManagementStarting, for all beforehand, then onManagementStopped, for all after) */ |
| protected void manageRecursive(Entity e, final ManagementTransitionMode initialMode) { |
| checkManagementAllowed(e); |
| |
| final List<EntityInternal> allEntities = Lists.newArrayList(); |
| Predicate<EntityInternal> manageEntity = new Predicate<EntityInternal>() { @Override public boolean apply(EntityInternal it) { |
| ManagementTransitionMode mode = getLastManagementTransitionMode(it.getId()); |
| if (mode==null) { |
| setManagementTransitionMode(it, mode = initialMode); |
| } |
| |
| Boolean isReadOnlyFromEntity = it.getManagementSupport().isReadOnlyRaw(); |
| if (isReadOnlyFromEntity==null) { |
| if (mode.isReadOnly()) { |
| // should have been marked by rebinder |
| log.warn("Read-only entity "+it+" not marked as such on call to manage; marking and continuing"); |
| } |
| it.getManagementSupport().setReadOnly(mode.isReadOnly()); |
| } else { |
| if (!isReadOnlyFromEntity.equals(mode.isReadOnly())) { |
| log.warn("Read-only status at entity "+it+" ("+isReadOnlyFromEntity+") not consistent with management mode "+mode); |
| } |
| } |
| |
| if (it.getManagementSupport().isDeployed()) { |
| if (mode.wasNotLoaded()) { |
| // silently bail out |
| return false; |
| } else { |
| if (mode.wasPrimary() && mode.isPrimary()) { |
| // active partial rebind; continue |
| } else if (mode.wasReadOnly() && mode.isReadOnly()) { |
| // reload in RO mode |
| } else { |
| // on initial non-RO rebind, should not have any deployed instances |
| log.warn("Already deployed "+it+" when managing "+mode+"/"+initialMode+"; ignoring this and all descendants"); |
| return false; |
| } |
| } |
| } |
| |
| // check RO status is consistent |
| boolean isNowReadOnly = Boolean.TRUE.equals( it.getManagementSupport().isReadOnly() ); |
| if (mode.isReadOnly()!=isNowReadOnly) { |
| throw new IllegalStateException("Read-only status mismatch for "+it+": "+mode+" / RO="+isNowReadOnly); |
| } |
| |
| allEntities.add(it); |
| preManageNonRecursive(it, mode); |
| it.getManagementSupport().onManagementStarting( new ManagementTransitionInfo(managementContext, mode) ); |
| return manageNonRecursive(it, mode); |
| } }; |
| boolean isRecursive = true; |
| if (initialMode.wasPrimary() && initialMode.isPrimary()) { |
| // already managed, so this shouldn't be recursive |
| // (in ActivePartialRebind we cheat, calling in to this method then skipping recursion). |
| // it also falls through to here when doing a redundant promotion, |
| // in that case we *should* be recursive; determine by checking whether a child exists and is preregistered. |
| // the TODO above removing manageRebindRoot in favour of explicit mgmt list would clean this up a lot! |
| Entity aChild = Iterables.getFirst(e.getChildren(), null); |
| if (aChild!=null && isPreRegistered(aChild)) { |
| log.debug("Managing "+e+" in mode "+initialMode+", doing this recursively because a child is preregistered"); |
| } else { |
| log.debug("Managing "+e+" but skipping recursion, as mode is "+initialMode); |
| isRecursive = false; |
| } |
| } |
| if (!isRecursive) { |
| manageEntity.apply( (EntityInternal)e ); |
| } else { |
| recursively(e, manageEntity); |
| } |
| |
| for (EntityInternal it : allEntities) { |
| if (!it.getManagementSupport().isFullyManaged()) { |
| ManagementTransitionMode mode = getLastManagementTransitionMode(it.getId()); |
| ManagementTransitionInfo info = new ManagementTransitionInfo(managementContext, mode); |
| |
| it.getManagementSupport().onManagementStarted(info); |
| managementContext.getRebindManager().getChangeListener().onManaged(it); |
| } |
| } |
| } |
| |
| @Override |
| public void unmanage(final Entity e) { |
| // TODO don't want to guess; should we inspect state of e ? or maybe it doesn't matter ? |
| unmanage(e, ManagementTransitionMode.guessing(BrooklynObjectManagementMode.MANAGED_PRIMARY, BrooklynObjectManagementMode.NONEXISTENT)); |
| } |
| |
| @Override |
| public void unmanage(final Entity e, final ManagementTransitionMode mode) { |
| unmanage(e, mode, false); |
| } |
| |
| @Override |
| public void discardPremanaged(final Entity e) { |
| if (e == null) return; |
| if (!isRunning()) return; |
| |
| Set<String> todiscard = new LinkedHashSet<>(); |
| Stack<Entity> tovisit = new Stack<>(); |
| Set<Entity> visited = new LinkedHashSet<>(); |
| |
| tovisit.push(e); |
| |
| while (!tovisit.isEmpty()) { |
| Entity next = tovisit.pop(); |
| visited.add(next); |
| for (Entity child : next.getChildren()) { |
| if (!visited.contains(child)) { |
| tovisit.push(child); |
| } |
| } |
| |
| if (isManaged(next)) { |
| throw new IllegalStateException("Cannot discard entity "+e+" because it or a descendent is already managed ("+next+")"); |
| } |
| Entity realNext = deproxyIfNecessary(next); |
| String id = next.getId(); |
| Entity realFound = preRegisteredEntitiesById.get(id); |
| if (realFound == null) preManagedEntitiesById.get(id); |
| |
| if (realFound != null && realFound != realNext) { |
| throw new IllegalStateException("Cannot discard pre-managed entity "+e+" because it or a descendent's id ("+id+") clashes with a different entity (given "+next+" but found "+realFound+")"); |
| } |
| |
| todiscard.add(id); |
| } |
| |
| for (String id : todiscard) { |
| preRegisteredEntitiesById.remove(id); |
| preManagedEntitiesById.remove(id); |
| } |
| } |
| |
| private void unmanageDryRun(final Entity e) { |
| final ManagementTransitionInfo info = new ManagementTransitionInfo(managementContext, |
| ManagementTransitionMode.transitioning(BrooklynObjectManagementMode.NONEXISTENT, BrooklynObjectManagementMode.NONEXISTENT)); |
| discardPremanaged(e); |
| |
| ((EntityInternal)e).getManagementSupport().onManagementStopping(info, true); |
| stopTasks(e); |
| ((EntityInternal)e).getManagementSupport().onManagementStopped(info, true); |
| } |
| |
| private void unmanage(final Entity e, ManagementTransitionMode mode, boolean hasBeenReplaced) { |
| if (shouldSkipUnmanagement(e)) return; |
| final ManagementTransitionInfo info = new ManagementTransitionInfo(managementContext, mode); |
| |
| if (hasBeenReplaced) { |
| // we are unmanaging an old instance after having replaced it |
| // don't unmanage or even clear its fields, because there might be references to it |
| |
| if (mode.wasReadOnly()) { |
| // if coming *from* read only; nothing needed |
| } else { |
| if (!mode.wasPrimary()) { |
| log.warn("Unexpected mode "+mode+" for unmanage-replace "+e+" (applying anyway)"); |
| } |
| // migrating away or in-place active partial rebind: |
| ((EntityInternal)e).getManagementSupport().onManagementStopping(info, false); |
| stopTasks(e); |
| ((EntityInternal)e).getManagementSupport().onManagementStopped(info, false); |
| } |
| // do not remove from maps below, bail out now |
| return; |
| |
| } else if (mode.wasReadOnly() && mode.isNoLongerLoaded()) { |
| // we are unmanaging an instance (secondary); either stopping here or primary destroyed elsewhere |
| ((EntityInternal)e).getManagementSupport().onManagementStopping(info, false); |
| unmanageNonRecursive(e); |
| stopTasks(e); |
| ((EntityInternal)e).getManagementSupport().onManagementStopped(info, false); |
| managementContext.getRebindManager().getChangeListener().onUnmanaged(e); |
| if (managementContext.getGarbageCollector() != null) managementContext.getGarbageCollector().onUnmanaged(e); |
| |
| } else if (mode.wasPrimary() && mode.isNoLongerLoaded()) { |
| // unmanaging a primary; currently this is done recursively |
| |
| /* TODO tidy up when it is recursive and when it isn't; if something is being unloaded or destroyed, |
| * that probably *is* recursive, but the old mode might be different if in some cases things are read-only. |
| * or maybe nothing needs to be recursive, we just make sure the callers (e.g. HighAvailabilityModeImpl.clearManagedItems) |
| * call in a good order |
| * |
| * see notes above about recursive/manage/All/unmanageAll |
| */ |
| |
| // Need to store all child entities as onManagementStopping removes a child from the parent entity |
| final List<EntityInternal> allEntities = Lists.newArrayList(); |
| recursively(e, new Predicate<EntityInternal>() { @Override public boolean apply(EntityInternal it) { |
| if (shouldSkipUnmanagement(it)) return false; |
| allEntities.add(it); |
| it.getManagementSupport().onManagementStopping(info, false); |
| return true; |
| } }); |
| |
| for (EntityInternal it : allEntities) { |
| if (shouldSkipUnmanagement(it)) continue; |
| unmanageNonRecursive(it); |
| stopTasks(it); |
| } |
| for (EntityInternal it : allEntities) { |
| it.getManagementSupport().onManagementStopped(info, false); |
| managementContext.getRebindManager().getChangeListener().onUnmanaged(it); |
| if (managementContext.getGarbageCollector() != null) managementContext.getGarbageCollector().onUnmanaged(e); |
| } |
| |
| } else { |
| log.warn("Invalid mode for unmanage: "+mode+" on "+e+" (ignoring)"); |
| } |
| |
| preRegisteredEntitiesById.remove(e.getId()); |
| preManagedEntitiesById.remove(e.getId()); |
| entityProxiesById.remove(e.getId()); |
| entitiesById.remove(e.getId()); |
| entityModesById.remove(e.getId()); |
| } |
| |
| private void stopTasks(Entity entity) { |
| stopTasks(entity, null); |
| } |
| |
| /** stops all tasks (apart from any current one or its descendants) on this entity, |
| * optionally -- if a timeout is given -- waiting for completion and warning on incomplete tasks */ |
| @Beta |
| public void stopTasks(Entity entity, @Nullable Duration timeout) { |
| CountdownTimer timeleft = timeout==null ? null : timeout.countdownTimer(); |
| // try forcibly interrupting tasks on managed entities |
| Collection<Exception> exceptions = MutableSet.of(); |
| try { |
| Set<Task<?>> tasksCancelled = MutableSet.of(); |
| for (Task<?> t: managementContext.getExecutionContext(entity).getTasks()) { |
| if (entity.equals(BrooklynTaskTags.getContextEntity(Tasks.current())) && hasTaskAsAncestor(t, Tasks.current())) { |
| // don't cancel if we are running inside a task on the target entity and |
| // the task being considered is one we have submitted -- e.g. on "stop" don't cancel ourselves! |
| // but if our current task is from another entity we probably do want to cancel them (we are probably invoking unmanage) |
| continue; |
| } |
| |
| if (!t.isDone()) { |
| try { |
| log.debug("Cancelling "+t+" on "+entity); |
| tasksCancelled.add(t); |
| t.cancel(true); |
| } catch (Exception e) { |
| Exceptions.propagateIfFatal(e); |
| log.debug("Error cancelling "+t+" on "+entity+" (will warn when all tasks are cancelled): "+e, e); |
| exceptions.add(e); |
| } |
| } |
| } |
| |
| if (timeleft!=null) { |
| Set<Task<?>> tasksIncomplete = MutableSet.of(); |
| // go through all tasks, not just cancelled ones, in case there are previously cancelled ones which are not complete |
| for (Task<?> t: managementContext.getExecutionContext(entity).getTasks()) { |
| if (hasTaskAsAncestor(t, Tasks.current())) |
| continue; |
| if (!Tasks.blockUntilInternalTasksEnded(t, timeleft.getDurationRemaining())) { |
| tasksIncomplete.add(t); |
| } |
| } |
| if (!tasksIncomplete.isEmpty()) { |
| log.warn("Incomplete tasks when stopping "+entity+": "+tasksIncomplete); |
| } |
| if (log.isTraceEnabled()) |
| log.trace("Cancelled "+tasksCancelled+" tasks for "+entity+", with "+ |
| timeleft.getDurationRemaining()+" remaining (of "+timeout+"): "+tasksCancelled); |
| } else { |
| if (log.isTraceEnabled()) |
| log.trace("Cancelled "+tasksCancelled+" tasks for "+entity+": "+tasksCancelled); |
| } |
| } catch (Exception e) { |
| Exceptions.propagateIfFatal(e); |
| log.warn("Error inspecting tasks to cancel on unmanagement: "+e, e); |
| } |
| if (!exceptions.isEmpty()) |
| log.warn("Error when cancelling tasks for "+entity+" on unmanagement: "+Exceptions.create(exceptions)); |
| } |
| |
| private boolean hasTaskAsAncestor(Task<?> t, Task<?> potentialAncestor) { |
| if (t==null || potentialAncestor==null) return false; |
| if (t.equals(potentialAncestor)) return true; |
| return hasTaskAsAncestor(t.getSubmittedByTask(), potentialAncestor); |
| } |
| |
| /** |
| * activates management when effector invoked, warning unless context is acceptable |
| * (currently only acceptable context is "start") |
| */ |
| void manageIfNecessary(Entity entity, Object context) { |
| if (!isRunning()) { |
| return; // TODO Still a race for terminate being called, and then isManaged below returning false |
| } else if (((EntityInternal)entity).getManagementSupport().wasDeployed()) { |
| return; |
| } else if (isManaged(entity)) { |
| return; |
| } else if (isPreManaged(entity)) { |
| return; |
| } else if (Boolean.TRUE.equals(((EntityInternal)entity).getManagementSupport().isReadOnly())) { |
| return; |
| } else { |
| Entity rootUnmanaged = entity; |
| while (true) { |
| Entity candidateUnmanagedParent = rootUnmanaged.getParent(); |
| if (candidateUnmanagedParent == null || isManaged(candidateUnmanagedParent) || isPreManaged(candidateUnmanagedParent)) |
| break; |
| rootUnmanaged = candidateUnmanagedParent; |
| } |
| if (context == Startable.START.getName()) |
| log.info("Activating local management for {} on start", rootUnmanaged); |
| else |
| log.warn("Activating local management for {} due to effector invocation on {}: {}", new Object[]{rootUnmanaged, entity, context}); |
| manage(rootUnmanaged); |
| } |
| } |
| |
| private void recursively(Entity e, Predicate<EntityInternal> action) { |
| Entity otherPreregistered = preRegisteredEntitiesById.get(e.getId()); |
| if (otherPreregistered!=null) { |
| // if something has been pre-registered, prefer it |
| // (e.g. if we recursing through children, we might have a proxy from previous iteration; |
| // the most recent will have been pre-registered) |
| e = otherPreregistered; |
| } |
| |
| boolean success = action.apply( (EntityInternal)e ); |
| if (!success) { |
| return; // Don't manage children if action false/unnecessary for parent |
| } |
| for (Entity child : e.getChildren()) { |
| recursively(child, action); |
| } |
| } |
| |
| /** |
| * Whether the entity is in the process of being managed. |
| */ |
| private synchronized boolean isPreManaged(Entity e) { |
| return preManagedEntitiesById.containsKey(e.getId()); |
| } |
| |
| /** |
| * Should ensure that the entity is now known about, but should not be accessible from other entities yet. |
| * |
| * Records that the given entity is about to be managed (used for answering {@link #isPreManaged(Entity)}. |
| * Note that refs to the given entity are stored in a a weak hashmap so if the subsequent management |
| * attempt fails then this reference to the entity will eventually be discarded (if no-one else holds |
| * a reference). |
| */ |
| private synchronized boolean preManageNonRecursive(Entity e, ManagementTransitionMode mode) { |
| Entity realE = toRealEntity(e); |
| |
| Object old = preManagedEntitiesById.put(e.getId(), realE); |
| preRegisteredEntitiesById.remove(e.getId()); |
| |
| if (old!=null && mode.wasNotLoaded()) { |
| if (old.equals(e)) { |
| log.warn("{} redundant call to pre-start management of entity {}, mode {}; ignoring", new Object[] { this, e, mode }); |
| } else { |
| throw new IllegalStateException("call to pre-manage entity "+e+" ("+mode+") but different entity "+old+" already known under that id at "+this); |
| } |
| return false; |
| } else { |
| if (log.isTraceEnabled()) log.trace("{} pre-start management of entity {}, mode {}", |
| new Object[] { this, e, mode }); |
| return true; |
| } |
| } |
| |
| /** |
| * Should ensure that the entity is now managed somewhere, and known about in all the lists. |
| * Returns true if the entity has now become managed; false if it was already managed (anything else throws exception) |
| */ |
| private synchronized boolean manageNonRecursive(Entity e, ManagementTransitionMode mode) { |
| Entity old = entitiesById.get(e.getId()); |
| |
| if (old!=null && mode.wasNotLoaded()) { |
| if (old == deproxyIfNecessary(e)) { |
| log.warn("{} redundant call to start management of entity {}; ignoring", this, e); |
| } else { |
| throw new IdAlreadyExistsException("call to manage entity "+e+" ("+mode+") but " |
| + "different entity "+old+" already known under that id '"+e.getId()+"' at "+this); |
| } |
| return false; |
| } |
| |
| BrooklynLogging.log(log, BrooklynLogging.levelDebugOrTraceIfReadOnly(e), |
| "{} starting management of entity {}", this, e); |
| Entity realE = toRealEntity(e); |
| |
| Entity oldProxy = entityProxiesById.get(e.getId()); |
| Entity proxyE; |
| if (oldProxy!=null) { |
| if (mode.wasNotLoaded()) { |
| throw new IdAlreadyExistsException("call to manage entity "+e+" from unloaded " |
| + "state ("+mode+") but already had proxy "+oldProxy+" already known " |
| + "under that id '"+e.getId()+"' at "+this); |
| } |
| // make the old proxy point at this new delegate |
| // (some other tricks done in the call below) |
| ((EntityProxyImpl)(Proxy.getInvocationHandler(oldProxy))).resetDelegate(oldProxy, oldProxy, realE); |
| proxyE = oldProxy; |
| } else { |
| proxyE = toProxyEntityIfAvailable(e); |
| } |
| entityProxiesById.put(e.getId(), proxyE); |
| entityTypes.put(e.getId(), realE.getClass().getName()); |
| entitiesById.put(e.getId(), realE); |
| |
| preManagedEntitiesById.remove(e.getId()); |
| if ((e instanceof Application) && (e.getParent()==null)) { |
| applications.add((Application)proxyE); |
| applicationIds.add(e.getId()); |
| } |
| if (!entities.contains(proxyE)) |
| entities.add(proxyE); |
| |
| if (old!=null && old!=e) { |
| // passing the transition info will ensure the right shutdown steps invoked for old instance |
| unmanage(old, mode, true); |
| } |
| |
| return true; |
| } |
| |
| /** |
| * Should ensure that the entity is no longer managed anywhere, remove from all lists. |
| * Returns true if the entity has been removed from management; if it was not previously managed (anything else throws exception) |
| */ |
| private boolean unmanageNonRecursive(Entity e) { |
| /* |
| * When method is synchronized, hit deadlock: |
| * 1. thread called unmanage() on a member of a group, so we got the lock and called group.removeMember; |
| * this ties to synchronize on AbstractGroupImpl.members |
| * 2. another thread was doing AbstractGroupImpl.addMember, which is synchronized on AbstractGroupImpl.members; |
| * it tries to call Entities.manage(child) which calls LocalEntityManager.getEntity(), which is |
| * synchronized on this. |
| * |
| * We MUST NOT call alien code from within the management framework while holding locks. |
| * The AbstractGroup.removeMember is effectively alien because a user could override it, and because |
| * it is entity specific. |
| * |
| * TODO Does getting then removing from groups risk this entity being added to other groups while |
| * this is happening? Should abstractEntity.onManagementStopped or some such remove the entity |
| * from its groups? |
| */ |
| |
| if (!getLastManagementTransitionMode(e.getId()).isReadOnly()) { |
| e.clearParent(); |
| for (Group group : e.groups()) { |
| if (!Entities.isNoLongerManaged(group)) group.removeMember(e); |
| } |
| if (e instanceof Group) { |
| Collection<Entity> members = ((Group)e).getMembers(); |
| for (Entity member : members) { |
| if (!Entities.isNoLongerManaged(member)) ((EntityInternal)member).groups().remove((Group)e); |
| } |
| } |
| } else { |
| log.debug("No relations being updated on unmanage of read only {}", e); |
| } |
| |
| unmanageOwnedLocations(e); |
| |
| synchronized (this) { |
| Entity proxyE = toProxyEntityIfAvailable(e); |
| if (e instanceof Application) { |
| applications.remove(proxyE); |
| applicationIds.remove(e.getId()); |
| } |
| |
| entities.remove(proxyE); |
| entityProxiesById.remove(e.getId()); |
| entityModesById.remove(e.getId()); |
| |
| Object old = entitiesById.remove(e.getId()); |
| |
| entityTypes.remove(e.getId()); |
| if (old==null) { |
| log.warn("{} call to stop management of unknown entity (already unmanaged?) {}; ignoring", this, e); |
| return false; |
| } else if (!old.equals(e)) { |
| // shouldn't happen... |
| log.error("{} call to stop management of entity {} removed different entity {}", new Object[] { this, e, old }); |
| return true; |
| } else { |
| if (log.isDebugEnabled()) log.debug("{} stopped management of entity {}", this, e); |
| return true; |
| } |
| } |
| } |
| |
| private void unmanageOwnedLocations(Entity e) { |
| for (Location loc : e.getLocations()) { |
| NamedStringTag ownerEntityTag = BrooklynTags.findFirst(BrooklynTags.OWNER_ENTITY_ID, loc.tags().getTags()); |
| if (ownerEntityTag != null) { |
| if (e.getId().equals(ownerEntityTag.getContents())) { |
| managementContext.getLocationManager().unmanage(loc); |
| } else { |
| // A location is "owned" if it was created as part of the EntitySpec of an entity (by Brooklyn). |
| // To share a location between entities create it yourself and pass it to any entities that needs it. |
| log.debug("Unmanaging entity {}, which contains a location {} owned by another entity {}. " + |
| "Not automatically unmanaging the location (it will be unmanaged when its owning " + |
| "entity is unmanaged).", |
| new Object[] {e, loc, ownerEntityTag.getContents()}); |
| } |
| } |
| } |
| } |
| |
| void addEntitySetListener(CollectionChangeListener<Entity> listener) { |
| //must notify listener in a different thread to avoid deadlock (issue #378) |
| AsyncCollectionChangeAdapter<Entity> wrappedListener = new AsyncCollectionChangeAdapter<Entity>(managementContext.getExecutionManager(), listener); |
| entities.addListener(wrappedListener); |
| } |
| |
| void removeEntitySetListener(CollectionChangeListener<Entity> listener) { |
| AsyncCollectionChangeAdapter<Entity> wrappedListener = new AsyncCollectionChangeAdapter<Entity>(managementContext.getExecutionManager(), listener); |
| entities.removeListener(wrappedListener); |
| } |
| |
| private boolean shouldSkipUnmanagement(Entity e) { |
| if (e==null) { |
| log.warn(""+this+" call to unmanage null entity; skipping", |
| new IllegalStateException("source of null unmanagement call to "+this)); |
| return true; |
| } |
| if (!isManaged(e)) { |
| log.warn("{} call to stop management of unknown entity (already unmanaged?) {}; skipping, and all descendants", this, e); |
| return true; |
| } |
| return false; |
| } |
| |
| private Entity toProxyEntityIfAvailable(Entity e) { |
| checkNotNull(e, "entity"); |
| |
| if (e instanceof EntityProxy) { |
| return e; |
| } else if (e instanceof AbstractEntity) { |
| Entity result = ((AbstractEntity)e).getProxy(); |
| return (result == null) ? e : result; |
| } else { |
| // If we don't already know about the proxy, then use the real thing; presumably it's |
| // the legacy way of creating the entity so didn't get a preManage() call |
| |
| return e; |
| } |
| } |
| |
| private Entity toRealEntity(Entity e) { |
| checkNotNull(e, "entity"); |
| |
| if (e instanceof AbstractEntity) { |
| return e; |
| } else { |
| Entity result = toRealEntityOrNull(e.getId()); |
| if (result == null) { |
| throw new IllegalStateException("No concrete entity known for entity "+e+" ("+e.getId()+", "+e.getEntityType().getName()+")"); |
| } |
| return result; |
| } |
| } |
| |
| public boolean isKnownEntityId(String id) { |
| return entitiesById.containsKey(id) || preManagedEntitiesById.containsKey(id) || preRegisteredEntitiesById.containsKey(id); |
| } |
| |
| private Entity toRealEntityOrNull(String id) { |
| Entity result; |
| // prefer the preRegistered and preManaged entities, during hot proxying, they should be newer |
| result = preRegisteredEntitiesById.get(id); |
| if (result==null) |
| result = preManagedEntitiesById.get(id); |
| if (result==null) |
| entitiesById.get(id); |
| return result; |
| } |
| |
| private Entity deproxyIfNecessary(Entity e) { |
| return (e instanceof AbstractEntity) ? e : Entities.deproxy(e); |
| } |
| |
| |
| private boolean isRunning() { |
| return managementContext.isRunning(); |
| } |
| |
| } |