blob: 55d2d3e745512867177181761ea33ef5356b7fba [file] [log] [blame]
package brooklyn.internal.rebind;
import static com.google.common.base.Preconditions.checkNotNull;
import java.lang.reflect.Constructor;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import brooklyn.entity.Application;
import brooklyn.entity.Entity;
import brooklyn.entity.basic.AbstractEntity;
import brooklyn.entity.basic.Entities;
import brooklyn.entity.proxying.EntitySpec;
import brooklyn.entity.proxying.EntitySpecs;
import brooklyn.entity.proxying.InternalEntityFactory;
import brooklyn.entity.proxying.InternalLocationFactory;
import brooklyn.entity.rebind.ChangeListener;
import brooklyn.entity.rebind.RebindContextImpl;
import brooklyn.internal.storage.BrooklynStorage;
import brooklyn.internal.storage.Reference;
import brooklyn.location.Location;
import brooklyn.location.basic.AbstractLocation;
import brooklyn.management.internal.LocalManagementContext;
import brooklyn.management.internal.ManagementContextInternal;
import brooklyn.mementos.BrooklynMementoPersister.Delta;
import brooklyn.mementos.EntityMemento;
import brooklyn.mementos.LocationMemento;
import brooklyn.mementos.PolicyMemento;
import brooklyn.policy.Policy;
import brooklyn.policy.basic.AbstractPolicy;
import brooklyn.util.collections.MutableMap;
import brooklyn.util.collections.SetFromLiveMap;
import brooklyn.util.exceptions.Exceptions;
import brooklyn.util.flags.FlagUtils;
import brooklyn.util.javalang.Reflections;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
public class RebindFromDatagridManagerImpl {
public static final Logger LOG = LoggerFactory.getLogger(RebindFromDatagridManagerImpl.class);
private volatile boolean running = true;
private final ManagementContextInternal managementContext;
private final BrooklynStorage storage;
private final Map<String,String> entityTypes;
private final Set<String> applicationIds;
private final Map<String,String> locationTypes;
private final Map<String,String> policyTypes;
public RebindFromDatagridManagerImpl(ManagementContextInternal managementContext) {
this.managementContext = managementContext;
storage = ((ManagementContextInternal)managementContext).getStorage();
entityTypes = storage.getMap("entities");
locationTypes = storage.getMap("locations");
applicationIds = SetFromLiveMap.create(storage.<String,Boolean>getMap("applications"));
policyTypes = storage.getMap("policies");
}
//@Override
public void stop() {
running = false;
}
//@Override
@VisibleForTesting
public void waitForPendingComplete(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException {
// FIXME
if (!running) return;
//storage.waitForWritesCompleted(timeout, unit);
}
//@Override
public List<Application> rebind() {
return rebind(getClass().getClassLoader());
}
//@Override
public List<Application> rebind(ClassLoader classLoader) {
checkNotNull(classLoader, "classLoader");
Reflections reflections = new Reflections(classLoader);
Map<String,Entity> entities = Maps.newLinkedHashMap();
Map<String,Location> locations = Maps.newLinkedHashMap();
Map<String,Policy> policies = Maps.newLinkedHashMap();
final RebindContextImpl rebindContext = new RebindContextImpl(classLoader);
// Instantiate locations
LOG.info("RebindManager instantiating locations: {}", locationTypes.keySet());
for (Map.Entry<String,String> entry : locationTypes.entrySet()) {
String id = entry.getKey();
String type = entry.getValue();
if (LOG.isTraceEnabled()) LOG.trace("RebindManager instantiating location {} ({})", id, type);
Location location = newLocation(id, type, reflections);
locations.put(id, location);
rebindContext.registerLocation(id, location);
((LocalManagementContext)managementContext).prePreManage(location); // FIXME
}
// Instantiate entities
LOG.info("RebindManager instantiating entities: {}", entityTypes.keySet());
for (Map.Entry<String, String> entry : entityTypes.entrySet()) {
String id = entry.getKey();
String type = entry.getValue();
if (LOG.isDebugEnabled()) LOG.debug("RebindManager instantiating entity {} ({})", id, type);
Entity entity = newEntity(id, type, reflections);
entities.put(id, entity);
rebindContext.registerEntity(id, entity);
((LocalManagementContext)managementContext).prePreManage(entity); // FIXME
}
// Instantiate policies
LOG.info("RebindManager instantiating policies: {}", policyTypes.keySet());
for (Map.Entry<String, String> entry : policyTypes.entrySet()) {
String id = entry.getKey();
String type = entry.getValue();
if (LOG.isDebugEnabled()) LOG.debug("RebindManager instantiating policy {} ({})", id, type);
Policy policy = newPolicy(id, type, reflections);
policies.put(id, policy);
rebindContext.registerPolicy(id, policy);
}
// Reconstruct locations
LOG.info("RebindManager reconstructing locations");
for (String id : locationTypes.keySet()) {
Location location = rebindContext.getLocation(id);
if (LOG.isDebugEnabled()) LOG.debug("RebindManager reconstructing location {}", id);
((AbstractLocation)location).setManagementContext(managementContext);
((AbstractLocation)location).reconstruct();
}
// Reconstruct policies
LOG.info("RebindManager reconstructing policies");
for (String id : policyTypes.keySet()) {
Policy policy = rebindContext.getPolicy(id);
if (LOG.isDebugEnabled()) LOG.debug("RebindManager reconstructing policy {}", id);
((AbstractPolicy)policy).reconstruct();
}
// Reconstruct entities
LOG.info("RebindManager reconstructing entities");
for (String id : entityTypes.keySet()) {
Entity entity = rebindContext.getEntity(id);
if (LOG.isDebugEnabled()) LOG.debug("RebindManager reconstructing entity {}", id);
((AbstractEntity)entity).setManagementContext(managementContext);
((AbstractEntity)entity).reconstruct();
}
// Manage the top-level locations (causing everything under them to become managed)
LOG.info("RebindManager managing locations");
for (Location location : findTopLevelLocations(locations.values())) {
Entities.manage(location, managementContext);
}
// Manage the top-level apps (causing everything under them to become managed)
LOG.info("RebindManager managing entities");
List<Application> apps = Lists.newArrayList();
for (String appId : applicationIds) {
Application app = (Application) rebindContext.getEntity(appId);
Entities.startManagement(app, managementContext);
apps.add(app);
}
LOG.info("RebindManager complete; return apps: {}", applicationIds);
return Collections.unmodifiableList(apps);
}
private List<Location> findTopLevelLocations(Collection<Location> locations) {
List<Location> result = new ArrayList<Location>();
for (Location contender : locations) {
if (contender.getParentLocation() == null) {
result.add(contender);
}
}
return result;
}
private Entity newEntity(String entityId, String entityType, Reflections reflections) {
Class<? extends Entity> entityClazz = (Class<? extends Entity>) reflections.loadClass(entityType);
Class<?>[] additionalInterfaces = entityClazz.getInterfaces();
try {
InternalEntityFactory entityFactory = ((ManagementContextInternal)managementContext).getEntityFactory();
Entity entity = entityFactory.constructEntity(entityClazz);
FlagUtils.setFieldsFromFlags(ImmutableMap.of("id", entityId), entity);
EntitySpec<Entity> entitySpec = EntitySpecs.spec(Entity.class).additionalInterfaces(additionalInterfaces);
((AbstractEntity)entity).setProxy(entityFactory.createEntityProxy(entitySpec, entity));
return entity;
} catch (Exception e) {
throw Exceptions.propagate(e);
}
}
/**
* Constructs a new location, passing to its constructor the location id and all of memento.getFlags().
*/
private Location newLocation(String locationId, String locationType, Reflections reflections) {
Class<? extends Location> locationClazz = (Class<? extends Location>) reflections.loadClass(locationType);
// TODO Move this code inside location?
// FIXME What about config that refers to other location objects? Those won't have been instantiated yet.
// Need to separate constructor from configure
Reference<String> locationDisplayName = storage.getReference("location-"+locationId+"-displayName");
Map<String,?> locationConfig = storage.getMap("location-"+locationId+"-config");
InternalLocationFactory locationFactory = ((ManagementContextInternal)managementContext).getLocationFactory();
Location location = locationFactory.constructLocation(locationClazz);
FlagUtils.setFieldsFromFlags(ImmutableMap.of("id", locationId), location);
return location;
}
/**
* Constructs a new location, passing to its constructor the location id and all of memento.getFlags().
*/
private Policy newPolicy(String policyId, String policyType, Reflections reflections) {
Class<?> policyClazz = reflections.loadClass(policyType);
// TODO Move this code inside location?
// FIXME What about config that refers to other location/entity/policy objects? Those won't have been instantiated yet.
// Need to separate constructor from configure
Reference<String> policyDisplayName = storage.getReference("policy-"+policyId+"-displayName");
Map<String,?> policyConfig = storage.getMap("policy-"+policyId+"-config");
Map<String, Object> flags = MutableMap.<String, Object>builder()
.put("id", policyId)
.putIfNotNull("displayName", policyDisplayName.get())
.putAll(policyConfig)
.build();
return (Policy) invokeConstructor(reflections, policyClazz, new Object[] {flags});
}
private <T> T invokeConstructor(Reflections reflections, Class<T> clazz, Object[]... possibleArgs) {
for (Object[] args : possibleArgs) {
Constructor<T> constructor = Reflections.findCallabaleConstructor(clazz, args);
if (constructor != null) {
constructor.setAccessible(true);
return reflections.loadInstance(constructor, args);
}
}
throw new IllegalStateException("Cannot instantiate instance of type "+clazz+"; expected constructor signature not found");
}
private static class DeltaImpl implements Delta {
Collection<LocationMemento> locations = Collections.emptyList();
Collection<EntityMemento> entities = Collections.emptyList();
Collection<PolicyMemento> policies = Collections.emptyList();
Collection <String> removedLocationIds = Collections.emptyList();
Collection <String> removedEntityIds = Collections.emptyList();
Collection <String> removedPolicyIds = Collections.emptyList();
@Override
public Collection<LocationMemento> locations() {
return locations;
}
@Override
public Collection<EntityMemento> entities() {
return entities;
}
@Override
public Collection<PolicyMemento> policies() {
return policies;
}
@Override
public Collection<String> removedLocationIds() {
return removedLocationIds;
}
@Override
public Collection<String> removedEntityIds() {
return removedEntityIds;
}
@Override
public Collection<String> removedPolicyIds() {
return removedPolicyIds;
}
}
/**
* Wraps a ChangeListener, to log and never propagate any exceptions that it throws.
*
* Catches Throwable, because really don't want a problem to propagate up to user code,
* to cause business-level operations to fail. For example, if there is a linkage error
* due to some problem in the serialization dependencies then just log it. For things
* more severe (e.g. OutOfMemoryError) then the catch+log means we'll report that we
* failed to persist, and we'd expect other threads to throw the OutOfMemoryError so
* we shouldn't lose anything.
*/
private static class SafeChangeListener implements ChangeListener {
private final ChangeListener delegate;
public SafeChangeListener(ChangeListener delegate) {
this.delegate = delegate;
}
@Override
public void onManaged(Entity entity) {
try {
delegate.onManaged(entity);
} catch (Throwable t) {
LOG.error("Error persisting mememento onManaged("+entity+"); continuing.", t);
}
}
@Override
public void onManaged(Location location) {
try {
delegate.onManaged(location);
} catch (Throwable t) {
LOG.error("Error persisting mememento onManaged("+location+"); continuing.", t);
}
}
@Override
public void onChanged(Entity entity) {
try {
delegate.onChanged(entity);
} catch (Throwable t) {
LOG.error("Error persisting mememento onChanged("+entity+"); continuing.", t);
}
}
@Override
public void onUnmanaged(Entity entity) {
try {
delegate.onUnmanaged(entity);
} catch (Throwable t) {
LOG.error("Error persisting mememento onUnmanaged("+entity+"); continuing.", t);
}
}
@Override
public void onUnmanaged(Location location) {
try {
delegate.onUnmanaged(location);
} catch (Throwable t) {
LOG.error("Error persisting mememento onUnmanaged("+location+"); continuing.", t);
}
}
@Override
public void onChanged(Location location) {
try {
delegate.onChanged(location);
} catch (Throwable t) {
LOG.error("Error persisting mememento onChanged("+location+"); continuing.", t);
}
}
@Override
public void onChanged(Policy policy) {
try {
delegate.onChanged(policy);
} catch (Throwable t) {
LOG.error("Error persisting mememento onChanged("+policy+"); continuing.", t);
}
}
}
}