blob: 6b9dcde3bc854f68f1a4b2ff6c799038bd2bfc5b [file] [log] [blame]
package brooklyn.management.internal;
import static com.google.common.base.Preconditions.checkNotNull;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import brooklyn.config.ConfigKey;
import brooklyn.entity.Application;
import brooklyn.entity.Effector;
import brooklyn.entity.Entity;
import brooklyn.entity.basic.AbstractEntity;
import brooklyn.entity.basic.Entities;
import brooklyn.entity.basic.EntityInternal;
import brooklyn.event.AttributeSensor;
import brooklyn.management.ExecutionContext;
import brooklyn.management.ManagementContext;
import brooklyn.management.SubscriptionContext;
import brooklyn.management.internal.NonDeploymentManagementContext.NonDeploymentManagementContextMode;
import brooklyn.util.exceptions.Exceptions;
import com.google.common.annotations.Beta;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
/**
* Encapsulates management activities at an entity.
* <p>
* On entity deployment, ManagementContext.manage(entity) causes
* <p>
* * onManagementStarting(ManagementContext)
* * onManagementStartingSubscriptions()
* * onManagementStartingSensorEmissions()
* * onManagementStartingExecutions()
* * onManagementStarted() - when all the above is said and done
* * onManagementStartingHere();
* <p>
* on unmanage it hits onManagementStoppingHere() then onManagementStopping().
* <p>
* When an entity's management migrates, it invoked onManagementStoppingHere() at the old location,
* then onManagementStartingHere() at the new location.
*/
public class EntityManagementSupport {
private static final Logger log = LoggerFactory.getLogger(EntityManagementSupport.class);
public EntityManagementSupport(AbstractEntity entity) {
this.entity = entity;
nonDeploymentManagementContext = new NonDeploymentManagementContext(entity, NonDeploymentManagementContextMode.PRE_MANAGEMENT);
}
protected transient AbstractEntity entity;
NonDeploymentManagementContext nonDeploymentManagementContext;
protected transient ManagementContext initialManagementContext;
protected transient ManagementContext managementContext;
protected transient SubscriptionContext subscriptionContext;
protected transient ExecutionContext executionContext;
// TODO the application
protected final AtomicBoolean managementContextUsable = new AtomicBoolean(false);
protected final AtomicBoolean currentlyDeployed = new AtomicBoolean(false);
protected final AtomicBoolean everDeployed = new AtomicBoolean(false);
protected final AtomicBoolean managementFailed = new AtomicBoolean(false);
private volatile EntityChangeListener entityChangeListener = EntityChangeListener.NOOP;
/**
* Whether this entity is managed (i.e. "onManagementStarting" has been called, so the framework knows about it,
* and it has not been unmanaged).
*/
public boolean isDeployed() { return currentlyDeployed.get(); }
public boolean isNoLongerManaged() {
return wasDeployed() && !isDeployed();
}
public boolean wasDeployed() { return everDeployed.get(); }
/**
* Whether the entity's management lifecycle is complete (i.e. both "onManagementStarting" and "onManagementStarted" have
* been called, and it is has not been unmanaged).
*/
public boolean isFullyManaged() {
return (nonDeploymentManagementContext == null) && currentlyDeployed.get();
}
public synchronized void setManagementContext(ManagementContextInternal val) {
if (initialManagementContext != null) {
throw new IllegalStateException("Initial management context is already set for "+entity+"; cannot change");
}
if (managementContext != null && !managementContext.equals(val)) {
throw new IllegalStateException("Management context is already set for "+entity+"; cannot change");
}
this.initialManagementContext = checkNotNull(val, "managementContext");
if (nonDeploymentManagementContext != null) {
nonDeploymentManagementContext.setManagementContext(val);
}
}
public void onRebind(ManagementTransitionInfo info) {
nonDeploymentManagementContext.setMode(NonDeploymentManagementContext.NonDeploymentManagementContextMode.MANAGEMENT_REBINDING);
}
public void onManagementStarting(ManagementTransitionInfo info) {
try {
synchronized (this) {
boolean alreadyManaging = isDeployed();
if (alreadyManaging) {
log.warn("Already managed: "+entity+" ("+nonDeploymentManagementContext+"); onManagementStarted is no-op");
} else if (nonDeploymentManagementContext == null || !nonDeploymentManagementContext.getMode().isPreManaged()) {
throw new IllegalStateException("Not in expected pre-managed state: "+entity+" ("+nonDeploymentManagementContext+")");
}
if (managementContext != null && !managementContext.equals(info.getManagementContext())) {
throw new IllegalStateException("Already has management context: "+managementContext+"; can't set "+info.getManagementContext());
}
if (initialManagementContext != null && !initialManagementContext.equals(info.getManagementContext())) {
throw new IllegalStateException("Already has different initial management context: "+initialManagementContext+"; can't set "+info.getManagementContext());
}
if (alreadyManaging) {
return;
}
this.managementContext = info.getManagementContext();
nonDeploymentManagementContext.setMode(NonDeploymentManagementContext.NonDeploymentManagementContextMode.MANAGEMENT_STARTING);
nonDeploymentManagementContext.getSubscriptionManager().setDelegate((AbstractSubscriptionManager) managementContext.getSubscriptionManager());
nonDeploymentManagementContext.getSubscriptionManager().startDelegatingForSubscribing();
managementContextUsable.set(true);
currentlyDeployed.set(true);
everDeployed.set(true);
entityChangeListener = new EntityChangeListenerImpl();
}
/*
* TODO framework starting events - phase 1, including rebind
* - establish hierarchy (child, groups, etc; construction if necessary on rebind)
* - set location
* - set local config values
* - set saved sensor values
* - register subscriptions -- BUT nothing is allowed to execute
* [these operations may be done before we invoke starting also; above can happen in any order;
* sensor _publications_ and executor submissions are queued]
* then: set the management context and the entity is "managed" from the perspective of external viewers (ManagementContext.isManaged(entity) returns true)
*/
entity.onManagementStarting();
} catch (Throwable t) {
managementFailed.set(true);
throw Exceptions.propagate(t);
}
}
public void onManagementStarted(ManagementTransitionInfo info) {
try {
synchronized (this) {
boolean alreadyManaged = isFullyManaged();
if (alreadyManaged) {
log.warn("Already managed: "+entity+" ("+nonDeploymentManagementContext+"); onManagementStarted is no-op");
} else if (nonDeploymentManagementContext == null || nonDeploymentManagementContext.getMode() != NonDeploymentManagementContextMode.MANAGEMENT_STARTING) {
throw new IllegalStateException("Not in expected \"management starting\" state: "+entity+" ("+nonDeploymentManagementContext+")");
}
if (managementContext != info.getManagementContext()) {
throw new IllegalStateException("Already has management context: "+managementContext+"; can't set "+info.getManagementContext());
}
if (alreadyManaged) {
return;
}
nonDeploymentManagementContext.setMode(NonDeploymentManagementContext.NonDeploymentManagementContextMode.MANAGEMENT_STARTED);
/*
* - set derived/inherited config values
* - publish all queued sensors
* - start all queued executions (e.g. subscription delivery)
* [above happens in exactly this order, at each entity]
* then: the entity internally knows it fully managed (ManagementSupport.isManaged() returns true -- though not sure we need that);
* subsequent sensor events and executions occur directly (no queueing)
*/
nonDeploymentManagementContext.getSubscriptionManager().startDelegatingForPublishing();
//TODO more of the above
// TODO custom started activities
}
entity.onManagementBecomingMaster();
entity.onManagementStarted();
synchronized (this) {
nonDeploymentManagementContext = null;
}
} catch (Throwable t) {
managementFailed.set(true);
throw Exceptions.propagate(t);
}
}
public void onManagementStopping(ManagementTransitionInfo info) {
synchronized (this) {
if (managementContext != info.getManagementContext()) {
throw new IllegalStateException("Has different management context: "+managementContext+"; expected "+info.getManagementContext());
}
Stopwatch startTime = Stopwatch.createStarted();
while (!managementFailed.get() && nonDeploymentManagementContext!=null &&
nonDeploymentManagementContext.getMode()==NonDeploymentManagementContextMode.MANAGEMENT_STARTING) {
// still becoming managed
try {
if (startTime.elapsed(TimeUnit.SECONDS) > 30) {
// emergency fix, 30s timeout for management starting
log.error("Management stopping event "+info+" in "+this+" timed out waiting for start; proceeding to stopping");
break;
}
wait(100);
} catch (InterruptedException e) {
Exceptions.propagate(e);
}
}
if (nonDeploymentManagementContext==null) {
nonDeploymentManagementContext = new NonDeploymentManagementContext(entity, NonDeploymentManagementContextMode.MANAGEMENT_STOPPING);
} else {
// already stopped? or not started?
nonDeploymentManagementContext.setMode(NonDeploymentManagementContext.NonDeploymentManagementContextMode.MANAGEMENT_STOPPING);
}
}
// TODO custom stopping activities
// TODO framework stopping events - no more sensors, executions, etc
if (entity.getParent()!=null) entity.getParent().removeChild(entity.getProxyIfAvailable());
// new subscriptions will be queued / not allowed
nonDeploymentManagementContext.getSubscriptionManager().stopDelegatingForSubscribing();
// new publications will be queued / not allowed
nonDeploymentManagementContext.getSubscriptionManager().stopDelegatingForPublishing();
entity.onManagementNoLongerMaster();
entity.onManagementStopped();
}
public void onManagementStopped(ManagementTransitionInfo info) {
synchronized (this) {
if (managementContext != info.getManagementContext()) {
throw new IllegalStateException("Has different management context: "+managementContext+"; expected "+info.getManagementContext());
}
if (subscriptionContext != null) subscriptionContext.unsubscribeAll();
entityChangeListener = EntityChangeListener.NOOP;
managementContextUsable.set(false);
currentlyDeployed.set(false);
executionContext = null;
subscriptionContext = null;
}
// TODO framework stopped activities, e.g. serialize state ?
entity.invalidateReferences();
synchronized (this) {
managementContext = null;
nonDeploymentManagementContext.setMode(NonDeploymentManagementContext.NonDeploymentManagementContextMode.MANAGEMENT_STOPPED);
}
}
@VisibleForTesting
@Beta
public boolean isManagementContextReal() {
return managementContextUsable.get();
}
public synchronized ManagementContext getManagementContext() {
return (managementContextUsable.get()) ? managementContext : nonDeploymentManagementContext;
}
public synchronized ExecutionContext getExecutionContext() {
if (executionContext!=null) return executionContext;
if (managementContextUsable.get()) {
executionContext = managementContext.getExecutionContext(entity);
return executionContext;
}
return nonDeploymentManagementContext.getExecutionContext(entity);
}
public synchronized SubscriptionContext getSubscriptionContext() {
if (subscriptionContext!=null) return subscriptionContext;
if (managementContextUsable.get()) {
subscriptionContext = managementContext.getSubscriptionContext(entity);
return subscriptionContext;
}
return nonDeploymentManagementContext.getSubscriptionContext(entity);
}
public synchronized void attemptLegacyAutodeployment(String effectorName) {
if (managementContext!=null) {
log.warn("Autodeployment suggested but not required for "+entity+"."+effectorName);
return;
}
if (entity instanceof Application) {
log.warn("Autodeployment with new management context triggered for "+entity+"."+effectorName+" -- will not be supported in future. Explicit manage call required.");
if (initialManagementContext != null) {
initialManagementContext.getEntityManager().manage(entity);
} else {
Entities.startManagement(entity);
}
return;
}
if ("start".equals(effectorName)) {
Entity e=entity;
if (e.getParent()!=null && ((EntityInternal)e.getParent()).getManagementSupport().isDeployed()) {
log.warn("Autodeployment in parent's management context triggered for "+entity+"."+effectorName+" -- will not be supported in future. Explicit manage call required.");
((EntityInternal)e.getParent()).getManagementContext().getEntityManager().manage(entity);
return;
}
}
log.warn("Autodeployment not available for "+entity+"."+effectorName);
}
public EntityChangeListener getEntityChangeListener() {
return entityChangeListener;
}
private class EntityChangeListenerImpl implements EntityChangeListener {
@Override
public void onChanged() {
getManagementContext().getRebindManager().getChangeListener().onChanged(entity);
}
@Override
public void onChildrenChanged() {
getManagementContext().getRebindManager().getChangeListener().onChanged(entity);
}
@Override
public void onLocationsChanged() {
getManagementContext().getRebindManager().getChangeListener().onChanged(entity);
}
@Override
public void onMembersChanged() {
getManagementContext().getRebindManager().getChangeListener().onChanged(entity);
}
@Override
public void onPoliciesChanged() {
getManagementContext().getRebindManager().getChangeListener().onChanged(entity);
}
@Override
public void onAttributeChanged(AttributeSensor<?> attribute) {
// TODO Could make this more efficient by inspecting the attribute to decide if needs persisted
// immediately, or not important, or transient (e.g. do we really need to persist
// request-per-second count for rebind purposes?!)
getManagementContext().getRebindManager().getChangeListener().onChanged(entity);
}
@Override
public void onConfigChanged(ConfigKey<?> key) {
getManagementContext().getRebindManager().getChangeListener().onChanged(entity);
}
@Override
public void onEffectorStarting(Effector<?> effector) {
// ignore
}
@Override
public void onEffectorCompleted(Effector<?> effector) {
getManagementContext().getRebindManager().getChangeListener().onChanged(entity);
}
}
}