| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.brooklyn.core.mgmt.internal; |
| |
| import static com.google.common.base.Preconditions.checkNotNull; |
| |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| |
| import org.apache.brooklyn.api.effector.Effector; |
| import org.apache.brooklyn.api.entity.Application; |
| import org.apache.brooklyn.api.entity.Entity; |
| import org.apache.brooklyn.api.mgmt.ExecutionContext; |
| import org.apache.brooklyn.api.mgmt.ManagementContext; |
| import org.apache.brooklyn.api.mgmt.SubscriptionContext; |
| import org.apache.brooklyn.api.mgmt.entitlement.EntitlementManager; |
| import org.apache.brooklyn.api.policy.Policy; |
| import org.apache.brooklyn.api.sensor.AttributeSensor; |
| import org.apache.brooklyn.api.sensor.Enricher; |
| import org.apache.brooklyn.api.sensor.Feed; |
| import org.apache.brooklyn.api.sensor.AttributeSensor.SensorPersistenceMode; |
| import org.apache.brooklyn.config.ConfigKey; |
| import org.apache.brooklyn.core.entity.AbstractEntity; |
| import org.apache.brooklyn.core.entity.Entities; |
| import org.apache.brooklyn.core.entity.EntityInternal; |
| import org.apache.brooklyn.core.mgmt.BrooklynTaskTags; |
| import org.apache.brooklyn.core.mgmt.entitlement.Entitlements; |
| import org.apache.brooklyn.core.mgmt.entitlement.Entitlements.EntityAndItem; |
| import org.apache.brooklyn.core.mgmt.entitlement.Entitlements.StringAndArgument; |
| import org.apache.brooklyn.core.mgmt.internal.NonDeploymentManagementContext.NonDeploymentManagementContextMode; |
| import org.apache.brooklyn.util.core.task.Tasks; |
| import org.apache.brooklyn.util.exceptions.Exceptions; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.google.common.annotations.Beta; |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Preconditions; |
| import com.google.common.base.Stopwatch; |
| |
| /** |
| * Encapsulates management activities at an entity. |
| * <p> |
| * On entity deployment, ManagementContext.manage(entity) causes |
| * <p> |
| * * onManagementStarting(ManagementContext) |
| * * onManagementStartingSubscriptions() |
| * * onManagementStartingSensorEmissions() |
| * * onManagementStartingExecutions() |
| * * onManagementStarted() - when all the above is said and done |
| * * onManagementStartingHere(); |
| * <p> |
| * on unmanage it hits onManagementStoppingHere() then onManagementStopping(). |
| * <p> |
| * When an entity's management migrates, it invokes onManagementStoppingHere() at the old location, |
| * then onManagementStartingHere() at the new location. |
| */ |
| public class EntityManagementSupport { |
| |
| private static final Logger log = LoggerFactory.getLogger(EntityManagementSupport.class); |
| |
| public EntityManagementSupport(AbstractEntity entity) { |
| this.entity = entity; |
| nonDeploymentManagementContext = new NonDeploymentManagementContext(entity, NonDeploymentManagementContextMode.PRE_MANAGEMENT); |
| } |
| |
| protected transient AbstractEntity entity; |
| NonDeploymentManagementContext nonDeploymentManagementContext; |
| |
| protected transient ManagementContext initialManagementContext; |
| protected transient ManagementContext managementContext; |
| protected transient volatile SubscriptionContext subscriptionContext; |
| protected transient volatile ExecutionContext executionContext; |
| |
| protected final AtomicBoolean managementContextUsable = new AtomicBoolean(false); |
| protected final AtomicBoolean currentlyDeployed = new AtomicBoolean(false); |
| protected final AtomicBoolean everDeployed = new AtomicBoolean(false); |
| protected Boolean readOnly = null; |
| protected final AtomicBoolean managementFailed = new AtomicBoolean(false); |
| |
| private volatile EntityChangeListener entityChangeListener = EntityChangeListener.NOOP; |
| |
| /** |
| * Whether this entity is managed (i.e. "onManagementStarting" has been called, so the framework knows about it, |
| * and it has not been unmanaged). |
| */ |
| public boolean isDeployed() { |
| return currentlyDeployed.get(); |
| } |
| |
| public boolean isNoLongerManaged() { |
| return wasDeployed() && !isDeployed(); |
| } |
| |
| /** whether entity has ever been deployed (managed) */ |
| public boolean wasDeployed() { |
| return everDeployed.get(); |
| } |
| |
| @Beta |
| public void setReadOnly(boolean isReadOnly) { |
| if (isDeployed()) |
| throw new IllegalStateException("Cannot set read only after deployment"); |
| this.readOnly = isReadOnly; |
| } |
| |
| /** Whether the entity and its adjuncts should be treated as read-only; |
| * may be null briefly when initializing if RO status is unknown. */ |
| @Beta |
| public Boolean isReadOnlyRaw() { |
| return readOnly; |
| } |
| |
| /** Whether the entity and its adjuncts should be treated as read-only; |
| * error if initializing and RO status is unknown. */ |
| @Beta |
| public boolean isReadOnly() { |
| Preconditions.checkNotNull(readOnly, "Read-only status of %s not yet known", entity); |
| return readOnly; |
| } |
| |
| /** |
| * Whether the entity's management lifecycle is complete (i.e. both "onManagementStarting" and "onManagementStarted" have |
| * been called, and it is has not been unmanaged). |
| */ |
| public boolean isFullyManaged() { |
| return (nonDeploymentManagementContext == null) && currentlyDeployed.get(); |
| } |
| |
| public synchronized void setManagementContext(ManagementContextInternal val) { |
| if (initialManagementContext != null) { |
| throw new IllegalStateException("Initial management context is already set for "+entity+"; cannot change"); |
| } |
| if (managementContext != null && !managementContext.equals(val)) { |
| throw new IllegalStateException("Management context is already set for "+entity+"; cannot change"); |
| } |
| |
| this.initialManagementContext = checkNotNull(val, "managementContext"); |
| if (nonDeploymentManagementContext != null) { |
| nonDeploymentManagementContext.setManagementContext(val); |
| } |
| } |
| |
| public void onRebind(ManagementTransitionInfo info) { |
| nonDeploymentManagementContext.setMode(NonDeploymentManagementContextMode.MANAGEMENT_REBINDING); |
| } |
| |
| public void onManagementStarting(ManagementTransitionInfo info) { |
| info.getManagementContext().getExecutionContext(entity).get( Tasks.builder().displayName("Management starting") |
| .dynamic(false) |
| .tag(BrooklynTaskTags.TRANSIENT_TASK_TAG) |
| .body(() -> { try { synchronized (this) { |
| boolean alreadyManaging = isDeployed(); |
| |
| if (alreadyManaging) { |
| log.warn("Already managed: "+entity+" ("+nonDeploymentManagementContext+"); onManagementStarting is no-op"); |
| } else if (nonDeploymentManagementContext == null || !nonDeploymentManagementContext.getMode().isPreManaged()) { |
| throw new IllegalStateException("Not in expected pre-managed state: "+entity+" ("+nonDeploymentManagementContext+")"); |
| } |
| if (managementContext != null && !managementContext.equals(info.getManagementContext())) { |
| throw new IllegalStateException("Already has management context: "+managementContext+"; can't set "+info.getManagementContext()); |
| } |
| if (initialManagementContext != null && !initialManagementContext.equals(info.getManagementContext())) { |
| throw new IllegalStateException("Already has different initial management context: "+initialManagementContext+"; can't set "+info.getManagementContext()); |
| } |
| if (alreadyManaging) { |
| return; |
| } |
| |
| this.managementContext = info.getManagementContext(); |
| nonDeploymentManagementContext.setMode(NonDeploymentManagementContextMode.MANAGEMENT_STARTING); |
| |
| if (!isReadOnly()) { |
| nonDeploymentManagementContext.getSubscriptionManager().setDelegate((AbstractSubscriptionManager) managementContext.getSubscriptionManager()); |
| nonDeploymentManagementContext.getSubscriptionManager().startDelegatingForSubscribing(); |
| } |
| |
| managementContextUsable.set(true); |
| currentlyDeployed.set(true); |
| everDeployed.set(true); |
| |
| entityChangeListener = new EntityChangeListenerImpl(); |
| } |
| |
| /* |
| * TODO framework starting events - phase 1, including rebind |
| * - establish hierarchy (child, groups, etc; construction if necessary on rebind) |
| * - set location |
| * - set local config values |
| * - set saved sensor values |
| * - register subscriptions -- BUT nothing is allowed to execute |
| * [these operations may be done before we invoke starting also; above can happen in any order; |
| * sensor _publications_ and executor submissions are queued] |
| * then: set the management context and the entity is "managed" from the perspective of external viewers (ManagementContext.isManaged(entity) returns true) |
| */ |
| |
| if (!isReadOnly()) { |
| entity.onManagementStarting(); |
| } |
| } catch (Throwable t) { |
| managementFailed.set(true); |
| throw Exceptions.propagate(t); |
| }}).build() ); |
| } |
| |
| @SuppressWarnings("deprecation") |
| public void onManagementStarted(ManagementTransitionInfo info) { |
| info.getManagementContext().getExecutionContext(entity).get( Tasks.builder().displayName("Management started") |
| .dynamic(false) |
| .tag(BrooklynTaskTags.TRANSIENT_TASK_TAG) |
| .body(() -> { try { synchronized (this) { |
| boolean alreadyManaged = isFullyManaged(); |
| |
| if (alreadyManaged) { |
| log.warn("Already managed: "+entity+" ("+nonDeploymentManagementContext+"); onManagementStarted is no-op"); |
| } else if (nonDeploymentManagementContext == null || nonDeploymentManagementContext.getMode() != NonDeploymentManagementContextMode.MANAGEMENT_STARTING) { |
| throw new IllegalStateException("Not in expected \"management starting\" state: "+entity+" ("+nonDeploymentManagementContext+")"); |
| } |
| if (managementContext != info.getManagementContext()) { |
| throw new IllegalStateException("Already has management context: "+managementContext+"; can't set "+info.getManagementContext()); |
| } |
| if (alreadyManaged) { |
| return; |
| } |
| |
| nonDeploymentManagementContext.setMode(NonDeploymentManagementContextMode.MANAGEMENT_STARTED); |
| |
| } |
| |
| /* on start, we want to: |
| * - set derived/inherited config values (not needed, the specs should have taken care of that?) |
| * - publish all queued sensors (done below) |
| * - start all queued executions |
| * (e.g. subscription delivery - done below? are there others and if so how are they unlocked? |
| * curious where the "start queued tasks" logic is; must be somewhere as it all seems to have been working fine (Aug 2016)) |
| * [in exactly this order, at each entity] |
| * then subsequent sensor events and executions occur directly (no queueing) |
| * |
| * NOTE: should happen out of synch block in case something is potentially long running; |
| * should happen quickly tough, state might get messy and errors occur if stopped while starting! |
| */ |
| |
| if (!isReadOnly()) { |
| nonDeploymentManagementContext.getSubscriptionManager().startDelegatingForPublishing(); |
| } |
| |
| if (!isReadOnly()) { |
| entity.onManagementBecomingMaster(); |
| entity.onManagementStarted(); |
| } |
| |
| synchronized (this) { |
| nonDeploymentManagementContext = null; |
| } |
| } catch (Throwable t) { |
| managementFailed.set(true); |
| throw Exceptions.propagate(t); |
| }}).build() ); |
| } |
| |
| @SuppressWarnings("deprecation") |
| public void onManagementStopping(ManagementTransitionInfo info) { |
| synchronized (this) { |
| if (managementContext != info.getManagementContext()) { |
| throw new IllegalStateException("onManagementStopping encountered different management context for "+entity+ |
| (!wasDeployed() ? " (wasn't deployed)" : !isDeployed() ? " (no longer deployed)" : "")+ |
| ": "+managementContext+"; expected "+info.getManagementContext()+" (may be a pre-registered entity which was never properly managed)"); |
| } |
| Stopwatch startTime = Stopwatch.createStarted(); |
| while (!managementFailed.get() && nonDeploymentManagementContext!=null && |
| nonDeploymentManagementContext.getMode()==NonDeploymentManagementContextMode.MANAGEMENT_STARTING) { |
| // still becoming managed |
| try { |
| if (startTime.elapsed(TimeUnit.SECONDS) > 30) { |
| // emergency fix, 30s timeout for management starting |
| log.error("Management stopping event "+info+" in "+this+" timed out waiting for start; proceeding to stopping"); |
| break; |
| } |
| wait(100); |
| } catch (InterruptedException e) { |
| Exceptions.propagate(e); |
| } |
| } |
| if (nonDeploymentManagementContext==null) { |
| nonDeploymentManagementContext = new NonDeploymentManagementContext(entity, NonDeploymentManagementContextMode.MANAGEMENT_STOPPING); |
| } else { |
| // already stopped? or not started? |
| nonDeploymentManagementContext.setMode(NonDeploymentManagementContextMode.MANAGEMENT_STOPPING); |
| } |
| } |
| // TODO custom stopping activities |
| // TODO framework stopping events - no more sensors, executions, etc |
| // (elaborate or remove ^^^ ? -AH, Sept 2014) |
| |
| if (!isReadOnly() && info.getMode().isDestroying()) { |
| // if we support remote parent of local child, the following call will need to be properly remoted |
| if (entity.getParent()!=null) entity.getParent().removeChild(entity.getProxyIfAvailable()); |
| } |
| // new subscriptions will be queued / not allowed |
| nonDeploymentManagementContext.getSubscriptionManager().stopDelegatingForSubscribing(); |
| // new publications will be queued / not allowed |
| nonDeploymentManagementContext.getSubscriptionManager().stopDelegatingForPublishing(); |
| |
| if (!isReadOnly()) { |
| entity.onManagementNoLongerMaster(); |
| entity.onManagementStopped(); |
| } |
| } |
| |
| public void onManagementStopped(ManagementTransitionInfo info) { |
| synchronized (this) { |
| if (managementContext == null && nonDeploymentManagementContext.getMode() == NonDeploymentManagementContextMode.MANAGEMENT_STOPPED) { |
| return; |
| } |
| if (managementContext != info.getManagementContext()) { |
| throw new IllegalStateException("Has different management context: "+managementContext+"; expected "+info.getManagementContext()); |
| } |
| getSubscriptionContext().unsubscribeAll(); |
| entityChangeListener = EntityChangeListener.NOOP; |
| managementContextUsable.set(false); |
| currentlyDeployed.set(false); |
| executionContext = null; |
| subscriptionContext = null; |
| } |
| |
| // TODO framework stopped activities, e.g. serialize state ? |
| entity.invalidateReferences(); |
| |
| synchronized (this) { |
| managementContext = null; |
| nonDeploymentManagementContext.setMode(NonDeploymentManagementContextMode.MANAGEMENT_STOPPED); |
| } |
| } |
| |
| @VisibleForTesting |
| @Beta |
| public boolean isManagementContextReal() { |
| return managementContextUsable.get(); |
| } |
| |
| public synchronized ManagementContext getManagementContext() { |
| return (managementContextUsable.get()) ? managementContext : nonDeploymentManagementContext; |
| } |
| |
| public ExecutionContext getExecutionContext() { |
| if (executionContext!=null) return executionContext; |
| if (managementContextUsable.get()) { |
| synchronized (this) { |
| if (executionContext!=null) return executionContext; |
| executionContext = managementContext.getExecutionContext(entity); |
| return executionContext; |
| } |
| } |
| return nonDeploymentManagementContext.getExecutionContext(entity); |
| } |
| public SubscriptionContext getSubscriptionContext() { |
| if (subscriptionContext!=null) return subscriptionContext; |
| if (managementContextUsable.get()) { |
| synchronized (this) { |
| if (subscriptionContext!=null) return subscriptionContext; |
| subscriptionContext = managementContext.getSubscriptionContext(entity); |
| return subscriptionContext; |
| } |
| } |
| return nonDeploymentManagementContext.getSubscriptionContext(entity); |
| } |
| public synchronized EntitlementManager getEntitlementManager() { |
| return getManagementContext().getEntitlementManager(); |
| } |
| |
| public void attemptLegacyAutodeployment(String effectorName) { |
| synchronized (this) { |
| if (managementContext != null) { |
| log.warn("Autodeployment suggested but not required for " + entity + "." + effectorName); |
| return; |
| } |
| if (entity instanceof Application) { |
| log.warn("Autodeployment with new management context triggered for " + entity + "." + effectorName + " -- will not be supported in future. Explicit manage call required."); |
| if (initialManagementContext != null) { |
| initialManagementContext.getEntityManager().manage(entity); |
| } else { |
| Entities.startManagement(entity); |
| } |
| return; |
| } |
| } |
| if ("start".equals(effectorName)) { |
| Entity e=entity; |
| if (e.getParent()!=null && ((EntityInternal)e.getParent()).getManagementSupport().isDeployed()) { |
| log.warn("Autodeployment in parent's management context triggered for "+entity+"."+effectorName+" -- will not be supported in future. Explicit manage call required."); |
| ((EntityInternal)e.getParent()).getManagementContext().getEntityManager().manage(entity); |
| return; |
| } |
| } |
| log.warn("Autodeployment not available for "+entity+"."+effectorName); |
| } |
| |
| public EntityChangeListener getEntityChangeListener() { |
| return entityChangeListener; |
| } |
| |
| private class EntityChangeListenerImpl implements EntityChangeListener { |
| @Override |
| public void onChanged() { |
| getManagementContext().getRebindManager().getChangeListener().onChanged(entity); |
| } |
| @Override |
| public void onChildrenChanged() { |
| getManagementContext().getRebindManager().getChangeListener().onChanged(entity); |
| } |
| @Override |
| public void onLocationsChanged() { |
| getManagementContext().getRebindManager().getChangeListener().onChanged(entity); |
| } |
| @Override |
| public void onTagsChanged() { |
| getManagementContext().getRebindManager().getChangeListener().onChanged(entity); |
| } |
| @Override |
| public void onMembersChanged() { |
| getManagementContext().getRebindManager().getChangeListener().onChanged(entity); |
| } |
| @Override |
| public void onPolicyAdded(Policy policy) { |
| getManagementContext().getRebindManager().getChangeListener().onChanged(entity); |
| getManagementContext().getRebindManager().getChangeListener().onManaged(policy); |
| } |
| @Override |
| public void onEnricherAdded(Enricher enricher) { |
| getManagementContext().getRebindManager().getChangeListener().onChanged(entity); |
| getManagementContext().getRebindManager().getChangeListener().onManaged(enricher); |
| } |
| @Override |
| public void onFeedAdded(Feed feed) { |
| getManagementContext().getRebindManager().getChangeListener().onChanged(entity); |
| getManagementContext().getRebindManager().getChangeListener().onManaged(feed); |
| } |
| @Override |
| public void onPolicyRemoved(Policy policy) { |
| getManagementContext().getRebindManager().getChangeListener().onChanged(entity); |
| getManagementContext().getRebindManager().getChangeListener().onUnmanaged(policy); |
| } |
| @Override |
| public void onEnricherRemoved(Enricher enricher) { |
| getManagementContext().getRebindManager().getChangeListener().onChanged(entity); |
| getManagementContext().getRebindManager().getChangeListener().onUnmanaged(enricher); |
| } |
| @Override |
| public void onFeedRemoved(Feed feed) { |
| getManagementContext().getRebindManager().getChangeListener().onChanged(entity); |
| getManagementContext().getRebindManager().getChangeListener().onUnmanaged(feed); |
| } |
| @Override |
| public void onAttributeChanged(AttributeSensor<?> attribute) { |
| if (attribute.getPersistenceMode() != SensorPersistenceMode.NONE) { |
| getManagementContext().getRebindManager().getChangeListener().onChanged(entity); |
| } |
| } |
| @Override |
| public void onConfigChanged(ConfigKey<?> key) { |
| getManagementContext().getRebindManager().getChangeListener().onChanged(entity); |
| } |
| @Override |
| public void onEffectorStarting(Effector<?> effector, Object parameters) { |
| Entitlements.checkEntitled(getEntitlementManager(), Entitlements.INVOKE_EFFECTOR, EntityAndItem.of(entity, StringAndArgument.of(effector.getName(), parameters))); |
| } |
| @Override |
| public void onEffectorCompleted(Effector<?> effector) { |
| getManagementContext().getRebindManager().getChangeListener().onChanged(entity); |
| } |
| } |
| |
| @Override |
| public String toString() { |
| return super.toString()+"["+(entity==null ? "null" : entity.getId())+"]"; |
| } |
| } |