blob: 4fb32e52308b40fa363ca491e7fb9ecdb9630d08 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.brooklyn.core.mgmt.internal;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.brooklyn.util.JavaGroovyEquivalents.elvis;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.concurrent.Callable;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.brooklyn.api.effector.Effector;
import org.apache.brooklyn.api.entity.Application;
import org.apache.brooklyn.api.entity.Entity;
import org.apache.brooklyn.api.location.Location;
import org.apache.brooklyn.api.mgmt.AccessController;
import org.apache.brooklyn.api.mgmt.ExecutionContext;
import org.apache.brooklyn.api.mgmt.ExecutionManager;
import org.apache.brooklyn.api.mgmt.ManagementContext;
import org.apache.brooklyn.api.mgmt.SubscriptionManager;
import org.apache.brooklyn.api.mgmt.Task;
import org.apache.brooklyn.api.mgmt.TaskAdaptable;
import org.apache.brooklyn.api.mgmt.ha.ManagementNodeState;
import org.apache.brooklyn.core.BrooklynFeatureEnablement;
import org.apache.brooklyn.core.effector.Effectors;
import org.apache.brooklyn.core.entity.EntityInternal;
import org.apache.brooklyn.core.entity.drivers.downloads.BasicDownloadsManager;
import org.apache.brooklyn.core.internal.BrooklynInitialization;
import org.apache.brooklyn.core.internal.BrooklynProperties;
import org.apache.brooklyn.core.internal.BrooklynProperties.Factory.Builder;
import org.apache.brooklyn.core.mgmt.entitlement.Entitlements;
import org.apache.brooklyn.core.mgmt.ha.OsgiManager;
import org.apache.brooklyn.core.objs.proxy.InternalEntityFactory;
import org.apache.brooklyn.core.objs.proxy.InternalLocationFactory;
import org.apache.brooklyn.core.objs.proxy.InternalPolicyFactory;
import org.apache.brooklyn.util.core.task.BasicExecutionContext;
import org.apache.brooklyn.util.core.task.BasicExecutionManager;
import org.apache.brooklyn.util.core.task.DynamicTasks;
import org.apache.brooklyn.util.core.task.TaskTags;
import org.apache.brooklyn.util.core.task.Tasks;
import org.apache.brooklyn.util.exceptions.Exceptions;
import org.apache.brooklyn.util.guava.Maybe;
import org.apache.brooklyn.util.text.Strings;
import org.apache.brooklyn.util.time.CountdownTimer;
import org.apache.brooklyn.util.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.Beta;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableSet;
/**
* A local (single node) implementation of the {@link ManagementContext} API.
*/
public class LocalManagementContext extends AbstractManagementContext {
private static final Logger log = LoggerFactory.getLogger(LocalManagementContext.class);
// Any usage of Brooklyn must first create a management context. Therefore do our
// initialisation here.
// TODO We could delete our other calls to BrooklynInitialization.initAll(), to rely on
// just one (e.g. AbstractEntity should not need to do it; and that is insufficient if a test
// just deals with locations but not entities).
static {
BrooklynInitialization.initAll();
}
private static final Set<LocalManagementContext> INSTANCES = Collections.synchronizedSet(Collections.newSetFromMap(new WeakHashMap<LocalManagementContext, Boolean>()));
private final Builder builder;
private final List<ManagementContext.PropertiesReloadListener> reloadListeners = new CopyOnWriteArrayList<ManagementContext.PropertiesReloadListener>();
@VisibleForTesting
static Set<LocalManagementContext> getInstances() {
synchronized (INSTANCES) {
return ImmutableSet.copyOf(INSTANCES);
}
}
// Note also called reflectively by BrooklynLeakListener
public static void logAll(Logger logger){
for (LocalManagementContext context : getInstances()) {
logger.warn("Management Context "+context+" running, creation stacktrace:\n" + Throwables.getStackTraceAsString(context.constructionStackTrace));
}
}
/** terminates all (best effort); returns count of sessions closed; if exceptions thrown, returns negative number.
* semantics might change, particular in dealing with interminable mgmt contexts. */
// Note also called reflectively by BrooklynLeakListener
@Beta
public static int terminateAll() {
int closed=0,dangling=0;
for (LocalManagementContext context : getInstances()) {
try {
context.terminate();
closed++;
}catch (Throwable t) {
Exceptions.propagateIfFatal(t);
log.warn("Failed to terminate management context", t);
dangling++;
}
}
if (dangling>0) return -dangling;
return closed;
}
private final AtomicBoolean terminated = new AtomicBoolean(false);
private String managementPlaneId;
private BasicExecutionManager execution;
private SubscriptionManager subscriptions;
private LocalEntityManager entityManager;
private final LocalLocationManager locationManager;
private final LocalAccessManager accessManager;
private final LocalUsageManager usageManager;
private OsgiManager osgiManager;
public final Throwable constructionStackTrace = new Throwable("for construction stacktrace").fillInStackTrace();
private final Map<String, Object> brooklynAdditionalProperties;
/**
* Creates a LocalManagement with default BrooklynProperties.
*/
public LocalManagementContext() {
this(BrooklynProperties.Factory.builderDefault());
}
/**
* Creates a new LocalManagementContext.
*
* @param brooklynProperties the BrooklynProperties.
*/
@VisibleForTesting
public LocalManagementContext(BrooklynProperties brooklynProperties) {
this(Builder.fromProperties(brooklynProperties));
}
public LocalManagementContext(Builder builder) {
this(builder, null);
}
public LocalManagementContext(Builder builder, Map<String, Object> brooklynAdditionalProperties) {
super(builder.build());
checkNotNull(configMap, "brooklynProperties");
this.builder = builder;
this.brooklynAdditionalProperties = brooklynAdditionalProperties;
if (brooklynAdditionalProperties != null)
configMap.addFromMap(brooklynAdditionalProperties);
BrooklynFeatureEnablement.init(configMap);
this.locationManager = new LocalLocationManager(this);
this.accessManager = new LocalAccessManager();
this.usageManager = new LocalUsageManager(this);
if (configMap.getConfig(OsgiManager.USE_OSGI)) {
this.osgiManager = new OsgiManager(this);
osgiManager.start();
}
INSTANCES.add(this);
log.debug("Created management context "+this);
}
@Override
@Deprecated
public String getManagementPlaneId() {
return managementPlaneId;
}
@Override
public Maybe<String> getManagementPlaneIdMaybe() {
return Maybe.ofDisallowingNull(managementPlaneId);
}
public void setManagementPlaneId(String newPlaneId) {
if (managementPlaneId != null && !managementPlaneId.equals(newPlaneId)) {
log.warn("Management plane ID at {} {} changed from {} to {} (can happen on concurrent startup of multiple nodes)", new Object[] { getManagementNodeId(), getHighAvailabilityManager().getNodeState(), managementPlaneId, newPlaneId });
log.debug("Management plane ID at {} {} changed from {} to {} (can happen on concurrent startup of multiple nodes)", new Object[] {getManagementNodeId(), getHighAvailabilityManager().getNodeState(), managementPlaneId, newPlaneId, new RuntimeException("Stack trace for setManagementPlaneId")});
}
this.managementPlaneId = newPlaneId;
}
public void generateManagementPlaneId() {
if (this.managementPlaneId != null) {
throw new IllegalStateException("Request to generate a management plane ID for node "+getManagementNodeId()+" but one already exists (" + managementPlaneId + ")");
}
this.managementPlaneId = Strings.makeRandomId(8);
}
@Override
public void prePreManage(Entity entity) {
getEntityManager().prePreManage(entity);
}
@Override
public void prePreManage(Location location) {
getLocationManager().prePreManage(location);
}
@Override
public synchronized Collection<Application> getApplications() {
return getEntityManager().getApplications();
}
@Override
public void addEntitySetListener(CollectionChangeListener<Entity> listener) {
getEntityManager().addEntitySetListener(listener);
}
@Override
public void removeEntitySetListener(CollectionChangeListener<Entity> listener) {
getEntityManager().removeEntitySetListener(listener);
}
@Override
protected void manageIfNecessary(Entity entity, Object context) {
getEntityManager().manageIfNecessary(entity, context);
}
@Override
public synchronized LocalEntityManager getEntityManager() {
if (!isRunning()) {
throw new IllegalStateException("Management context no longer running");
}
if (entityManager == null) {
entityManager = new LocalEntityManager(this);
}
return entityManager;
}
@Override
public InternalEntityFactory getEntityFactory() {
return getEntityManager().getEntityFactory();
}
@Override
public InternalLocationFactory getLocationFactory() {
return getLocationManager().getLocationFactory();
}
@Override
public InternalPolicyFactory getPolicyFactory() {
return getEntityManager().getPolicyFactory();
}
@Override
public synchronized LocalLocationManager getLocationManager() {
if (!isRunning()) throw new IllegalStateException("Management context no longer running");
return locationManager;
}
@Override
public synchronized LocalAccessManager getAccessManager() {
if (!isRunning()) throw new IllegalStateException("Management context no longer running");
return accessManager;
}
@Override
public synchronized LocalUsageManager getUsageManager() {
if (!isRunning()) throw new IllegalStateException("Management context no longer running");
return usageManager;
}
@Override
public synchronized Maybe<OsgiManager> getOsgiManager() {
if (!isRunning()) throw new IllegalStateException("Management context no longer running");
if (osgiManager==null) return Maybe.absent("OSGi not available in this instance");
return Maybe.of(osgiManager);
}
@Override
public synchronized AccessController getAccessController() {
return getAccessManager().getAccessController();
}
@Override
public synchronized SubscriptionManager getSubscriptionManager() {
if (!isRunning()) throw new IllegalStateException("Management context no longer running");
if (subscriptions == null) {
subscriptions = new LocalSubscriptionManager(getExecutionManager());
}
return subscriptions;
}
@Override
public synchronized ExecutionManager getExecutionManager() {
if (!isRunning()) throw new IllegalStateException("Management context no longer running");
if (execution == null) {
execution = new BasicExecutionManager(getManagementNodeId());
gc = new BrooklynGarbageCollector(configMap, execution, getStorage());
}
return execution;
}
@Override
public void terminate() {
if (terminated.getAndSet(true)) {
log.trace("Already terminated management context "+this);
// no harm in doing it twice, but it makes logs ugly!
return;
}
log.debug("Terminating management context "+this);
INSTANCES.remove(this);
super.terminate();
if (usageManager != null) usageManager.terminate();
if (execution != null) execution.shutdownNow();
if (gc != null) gc.shutdownNow();
if (osgiManager!=null) {
osgiManager.stop();
osgiManager = null;
}
log.debug("Terminated management context "+this);
}
@Override
protected void finalize() {
terminate();
}
@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
protected <T> Task<T> runAtEntity(Map flags, Entity entity, Callable<T> c) {
manageIfNecessary(entity, elvis(Arrays.asList(flags.get("displayName"), flags.get("description"), flags, c)));
return runAtEntity(entity, Tasks.<T>builder().dynamic(true).body(c).flags(flags).build());
}
protected <T> Task<T> runAtEntity(Entity entity, TaskAdaptable<T> task) {
((EntityInternal)entity).getExecutionContext().submit(task);
if (DynamicTasks.getTaskQueuingContext()!=null) {
// put it in the queueing context so it appears in the GUI
// mark it inessential as this is being invoked from code,
// the caller will do 'get' to handle errors
TaskTags.markInessential(task);
DynamicTasks.getTaskQueuingContext().queue(task.asTask());
}
return task.asTask();
}
@Override
protected <T> Task<T> runAtEntity(final Entity entity, final Effector<T> eff, @SuppressWarnings("rawtypes") final Map parameters) {
manageIfNecessary(entity, eff);
// prefer to submit this from the current execution context so it sets up correct cross-context chaining
// ExecutionContext ec = BasicExecutionContext.getCurrentExecutionContext();
// if (ec == null) {
// log.debug("Top-level effector invocation: {} on {}", eff, entity);
// ec = getExecutionContext(entity);
// }
return runAtEntity(entity, Effectors.invocation(entity, eff, parameters));
}
@Override
public boolean isManagedLocally(Entity e) {
return true;
}
@Override
public String toString() {
String planeId = MoreObjects.firstNonNull(managementPlaneId, "?");
return LocalManagementContext.class.getSimpleName()+"["+planeId+"-"+getManagementNodeId()+"]";
}
@Override
public void reloadBrooklynProperties() {
log.info("Reloading brooklyn properties from " + builder);
if (builder.hasDelegateOriginalProperties())
log.warn("When reloading, mgmt context "+this+" properties are fixed, so reload will be of limited utility");
BrooklynProperties properties = builder.build();
configMap = new DeferredBrooklynProperties(properties, this);
if (brooklynAdditionalProperties != null && !brooklynAdditionalProperties.isEmpty()) {
log.info("Reloading additional brooklyn properties from " + brooklynAdditionalProperties);
configMap.addFromMap(brooklynAdditionalProperties);
}
this.downloadsManager = BasicDownloadsManager.newDefault(configMap);
this.entitlementManager = Entitlements.newManager(this, configMap);
clearLocationRegistry();
BrooklynFeatureEnablement.init(configMap);
// Notify listeners that properties have been reloaded
for (PropertiesReloadListener listener : reloadListeners) {
listener.reloaded();
}
}
@VisibleForTesting
public void clearLocationRegistry() {
// Force reload of location registry
this.locationRegistry = null;
}
@Override
public void addPropertiesReloadListener(PropertiesReloadListener listener) {
reloadListeners.add(checkNotNull(listener, "listener"));
}
@Override
public void removePropertiesReloadListener(PropertiesReloadListener listener) {
reloadListeners.remove(listener);
}
private Object startupSynchObject = new Object();
public void noteStartupComplete() {
synchronized (startupSynchObject) {
startupComplete = true;
startupSynchObject.notifyAll();
}
}
/** Exposed for services to advertise that startup tasks are again occurring */
public void noteStartupTransitioning() {
synchronized (startupSynchObject) {
startupComplete = false;
startupSynchObject.notifyAll();
}
}
@Override
public boolean isStartupComplete() {
synchronized (startupSynchObject) {
return startupComplete;
}
}
@Override public void waitForManagementStartupComplete(Duration timeout) {
if (timeout==null) timeout = Duration.minutes(5);
CountdownTimer timer = CountdownTimer.newInstanceStarted(timeout);
try {
Tasks.withBlockingDetails("Waiting on management plane to completely start", () -> {
while (true) {
synchronized (startupSynchObject) {
if (isStartupComplete()) return null;
if (!isRunning()) {
throw new IllegalStateException("Management context transitioned to not running before startup detected as completed");
}
if (!timer.waitOnForExpiryUnchecked(startupSynchObject)) {
throw Exceptions.propagate(new TimeoutException("Timeout waiting for management context to start"));
}
}
}
});
} catch (Exception e) {
throw Exceptions.propagate(e);
}
}
@Override
public ManagementNodeState getNodeState() {
synchronized (startupSynchObject) {
if (!startupComplete) return ManagementNodeState.INITIALIZING;
if (!errors().isEmpty()) return ManagementNodeState.FAILED;
return getHighAvailabilityManager().getNodeState();
}
}
}