blob: 75dcb0bd5d8af3de6a211b231003ea9b78c74499 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.brooklyn.core.catalog.internal;
import static com.google.common.base.Preconditions.checkNotNull;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.function.Supplier;
import org.apache.brooklyn.api.catalog.BrooklynCatalog;
import org.apache.brooklyn.api.catalog.CatalogItem;
import org.apache.brooklyn.api.catalog.CatalogItem.CatalogItemType;
import org.apache.brooklyn.api.entity.EntitySpec;
import org.apache.brooklyn.api.internal.AbstractBrooklynObjectSpec;
import org.apache.brooklyn.api.mgmt.ManagementContext;
import org.apache.brooklyn.api.mgmt.ha.ManagementNodeState;
import org.apache.brooklyn.api.mgmt.rebind.RebindExceptionHandler;
import org.apache.brooklyn.api.objs.BrooklynObjectType;
import org.apache.brooklyn.api.typereg.BrooklynTypeRegistry.RegisteredTypeKind;
import org.apache.brooklyn.api.typereg.ManagedBundle;
import org.apache.brooklyn.api.typereg.RegisteredType;
import org.apache.brooklyn.core.mgmt.ManagementContextInjectable;
import org.apache.brooklyn.core.mgmt.ha.OsgiBundleInstallationResult;
import org.apache.brooklyn.core.mgmt.ha.OsgiManager;
import org.apache.brooklyn.core.mgmt.internal.ManagementContextInternal;
import org.apache.brooklyn.core.mgmt.rebind.RebindManagerImpl;
import org.apache.brooklyn.core.objs.BrooklynTypes;
import org.apache.brooklyn.core.server.BrooklynServerConfig;
import org.apache.brooklyn.core.typereg.BundleUpgradeParser;
import org.apache.brooklyn.core.typereg.BundleUpgradeParser.CatalogUpgrades;
import org.apache.brooklyn.core.typereg.RegisteredTypePredicates;
import org.apache.brooklyn.core.typereg.RegisteredTypes;
import org.apache.brooklyn.util.collections.MutableList;
import org.apache.brooklyn.util.collections.MutableMap;
import org.apache.brooklyn.util.collections.MutableSet;
import org.apache.brooklyn.util.core.ResourceUtils;
import org.apache.brooklyn.util.exceptions.Exceptions;
import org.apache.brooklyn.util.exceptions.FatalRuntimeException;
import org.apache.brooklyn.util.exceptions.PropagatedRuntimeException;
import org.apache.brooklyn.util.exceptions.RuntimeInterruptedException;
import org.apache.brooklyn.util.exceptions.UserFacingException;
import org.apache.brooklyn.util.guava.Maybe;
import org.apache.brooklyn.util.javalang.JavaClassNames;
import org.apache.brooklyn.util.os.Os;
import org.apache.brooklyn.util.osgi.VersionedName;
import org.apache.brooklyn.util.text.Strings;
import org.apache.brooklyn.util.time.Duration;
import org.apache.commons.lang3.tuple.Pair;
import org.osgi.framework.Bundle;
import org.osgi.framework.BundleContext;
import org.osgi.framework.BundleException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.Beta;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
@Beta
public class CatalogInitialization implements ManagementContextInjectable {
/*
A1) if not persisting, go to B1
A2) if there is a persisted catalog, read it and go to C1
A3) go to B1
B1) look for --catalog-initial, if so read it, then go to C1
B2) look for BrooklynServerConfig.BROOKLYN_CATALOG_URL, if so, read it, supporting YAML, then go to C1
B3) look for ~/.brooklyn/catalog.bom, if exists, read it then go to C1
B4) read all classpath://brooklyn/default.catalog.bom items, if they exist (and for now they will)
B5) go to C1
C1) if persisting, read the rest of the persisted items (entities etc)
*/
private static final Logger log = LoggerFactory.getLogger(CatalogInitialization.class);
private String initialUri;
/** has run the initial catalog initialization */
private boolean hasRunInitialCatalogInitialization = false;
/** has run an official initialization, but it is not a permanent one (e.g. during a hot standby mode, or a run failed) */
private boolean hasRunPersistenceInitialization = false;
/** has run an official initialization which is permanent (node is master, and the new catalog is now set) */
private boolean hasRunFinalInitialization = false;
private ManagementContextInternal managementContext;
private CatalogUpgradeScanner catalogUpgradeScanner;
private boolean isStartingUp = false;
private boolean failOnStartupErrors = false;
/** is running a populate method; used to prevent recursive loops */
private boolean isPopulatingInitial = false;
private final Object populatingCatalogMutex = new Object();
public CatalogInitialization() {
this(null);
}
public CatalogInitialization(String initialUri) {
this.initialUri = initialUri;
}
@Override
public void setManagementContext(ManagementContext managementContext) {
checkNotNull(managementContext, "management context");
if (this.managementContext!=null && managementContext!=this.managementContext)
throw new IllegalStateException("Cannot switch management context, from "+this.managementContext+" to "+managementContext);
this.managementContext = (ManagementContextInternal) managementContext;
catalogUpgradeScanner = new CatalogUpgradeScanner(this.managementContext,
BundleUpgradeParser::parseBundleManifestForCatalogUpgrades,
RegisteredTypePredicates::containingBundle,
RegisteredTypePredicates::containingBundle);
}
/** Called by the framework to set true while starting up, and false afterwards,
* in order to assist in appropriate logging and error handling. */
public void setStartingUp(boolean isStartingUp) {
this.isStartingUp = isStartingUp;
}
public void setFailOnStartupErrors(boolean startupFailOnCatalogErrors) {
this.failOnStartupErrors = startupFailOnCatalogErrors;
}
public ManagementContextInternal getManagementContext() {
return checkNotNull(managementContext, "management context has not been injected into "+this);
}
/** Returns true if the canonical initialization has completed,
* that is, an initialization which is done when a node is rebinded as master
* (or an initialization done by the startup routines when not running persistence);
* see also {@link #hasRunAnyInitialization()}. */
private boolean hasRunFinalInitialization() {
return hasRunFinalInitialization;
}
private boolean hasRunInitialCatalogInitialization() {
return hasRunFinalInitialization || hasRunInitialCatalogInitialization;
}
/**
* Returns true if we have added catalog bundles/items from persisted state.
*/
private boolean hasRunPersistenceInitialization() {
return hasRunFinalInitialization || hasRunPersistenceInitialization;
}
/**
* Returns true if the initializer has run at all.
*/
@VisibleForTesting
@Beta
public boolean hasRunAnyInitialization() {
return hasRunFinalInitialization || hasRunInitialCatalogInitialization || hasRunPersistenceInitialization;
}
/**
* Populates the initial catalog (i.e. from the initial .bom file).
*
* Expected to be called exactly once at startup.
*/
public void populateInitialCatalogOnly() {
if (log.isDebugEnabled()) {
log.debug("Populating only the initial catalog; from "+JavaClassNames.callerNiceClassAndMethod(1));
}
synchronized (populatingCatalogMutex) {
if (hasRunInitialCatalogInitialization()) {
throw new IllegalStateException("Catalog initialization called to populate only initial, even though it has already run it");
}
populateInitialCatalogImpl(true);
onFinalCatalog();
}
}
/**
* Clears all record of the brooklyn-managed-bundles (so use with care!).
*
* Used when promoting from HOT_STANDBY to MASTER. Previous actions performed as HOT_STANDBY
* will have been done in read-only mode. When we rebind in anger as master, we want to do this
* without a previous cache of managed bundles.
*/
public void clearBrooklynManagedBundles() {
Maybe<OsgiManager> osgiManager = managementContext.getOsgiManager();
if (osgiManager.isPresent()) {
osgiManager.get().clearManagedBundles();
}
}
/**
* Adds the given persisted catalog items.
*
* Can be called multiple times, e.g.:
* <ul>
* <li>if "hot-standby" then will be called repeatedly, as we rebind to the persisted state
* <li> if being promoted to master then we will be called (and may already have been called for "hot-standby").
* </ul>
*/
public void populateInitialAndPersistedCatalog(ManagementNodeState mode, PersistedCatalogState persistedState, RebindExceptionHandler exceptionHandler, RebindLogger rebindLogger) {
if (log.isDebugEnabled()) {
String message = "Add persisted catalog for "+mode+", persistedBundles="+persistedState.getBundles().size()+", legacyItems="+persistedState.getLegacyCatalogItems().size()+"; from "+JavaClassNames.callerNiceClassAndMethod(1);
if (!ManagementNodeState.isHotProxy(mode)) {
log.debug(message);
} else {
// in hot modes, make this message trace so we don't get too much output then
log.trace(message);
}
}
synchronized (populatingCatalogMutex) {
if (hasRunFinalInitialization()) {
log.warn("Catalog initialization called to add persisted catalog, even though it has already run the final 'master' initialization; mode="+mode, new Throwable("Source of duplicate catalog initialization"));
hasRunFinalInitialization = false;
}
if (hasRunPersistenceInitialization()) {
// Multiple calls; will need to reset (only way to clear out the previous persisted state's catalog)
if (log.isDebugEnabled()) {
String message = "Catalog initialization repeated call to add persisted catalog, resetting catalog (including initial) to start from clean slate; mode="+mode;
log.debug(message);
}
} else if (hasRunInitialCatalogInitialization()) {
throw new IllegalStateException("Catalog initialization already run for initial catalog by mechanism other than populating persisted state; mode="+mode);
}
installPersistedBundles(persistedState, () -> {
// now we install and start the bundles from the catalog;
// 2021-12-03 now this only will look for classes in active bundles, so it won't resolve persisted bundles
// and we can safely filter them out later
populateInitialCatalogImpl(true);
}, exceptionHandler, rebindLogger);
if (mode == ManagementNodeState.MASTER) {
// TODO ideally this would remain false until it has *persisted* the changed catalog;
// if there is a subsequent startup failure the forced additions will not be persisted,
// but nor will they be loaded on a subsequent run.
// callers will have to restart a brooklyn, or reach into this class to change this field,
// or (recommended) manually adjust the catalog.
// TODO also, if a node comes up in standby, the addition might not take effector for a while
//
// however since these options are mainly for use on the very first brooklyn run, it's not such a big deal;
// once up and running the typical way to add items is via the REST API
onFinalCatalog();
}
}
}
/** shared routine between above "normal" initialization, and special addition via ServerResource.import */
@Beta
public void installPersistedBundles(PersistedCatalogState persistedState, Runnable beforeDeferredStartAndSetRunPersistence, RebindExceptionHandler exceptionHandler, RebindLogger rebindLogger) {
// Always install the bundles from persisted state; installed (but not started) prior to catalog,
// so that OSGi unique IDs might be picked up when initial catalog is populated
Map<InstallableManagedBundle, OsgiBundleInstallationResult> persistenceInstalls = installPersistedBundlesDontStart(persistedState.getBundles(), exceptionHandler, rebindLogger);
if (beforeDeferredStartAndSetRunPersistence!=null) beforeDeferredStartAndSetRunPersistence.run();
final Maybe<OsgiManager> maybesOsgiManager = managementContext.getOsgiManager();
if (maybesOsgiManager.isAbsent()) {
// Can't find any bundles to tell if there are upgrades. Could be running tests; do no filtering.
CatalogUpgrades.storeInManagementContext(CatalogUpgrades.EMPTY, managementContext);
} else {
final OsgiManager osgiManager = maybesOsgiManager.get();
final BundleContext bundleContext = osgiManager.getFramework().getBundleContext();
final CatalogUpgrades catalogUpgrades = catalogUpgradeScanner.scan(osgiManager, bundleContext, rebindLogger);
CatalogUpgrades.storeInManagementContext(catalogUpgrades, managementContext);
}
PersistedCatalogState filteredPersistedState = filterBundlesAndCatalogInPersistedState(persistedState, rebindLogger);
// 2021-09-14 previously we effectively installed here, after populating; but now we do it earlier and then uninstall if needed, to preserve IDs
// Map<InstallableManagedBundle, OsgiBundleInstallationResult> persistenceInstalls = installPersistedBundlesDontStart(filteredPersistedState.getBundles(), exceptionHandler, rebindLogger);
try {
startPersistedBundles(filteredPersistedState, persistenceInstalls, exceptionHandler, rebindLogger);
BrooklynCatalog catalog = managementContext.getCatalog();
catalog.addCatalogLegacyItemsOnRebind(filteredPersistedState.getLegacyCatalogItems());
} finally {
if (beforeDeferredStartAndSetRunPersistence!=null) hasRunPersistenceInitialization = true;
}
}
/**
* Populates the initial catalog, but not via an official code-path.
*
* Expected to be called only during tests, where the test has not gone through the same
* management-context lifecycle as is done in BasicLauncher.
*
* Subsequent calls will fail to things like {@link #populateInitialCatalogOnly()} or
* {@link #populateInitialAndPersistedCatalog(ManagementNodeState, PersistedCatalogState, RebindExceptionHandler, RebindLogger)}.
*/
@VisibleForTesting
@Beta
public void unofficialPopulateInitialCatalog() {
if (log.isDebugEnabled()) {
log.debug("Unofficially populate initial catalog; should be used only by tests! Called from "+JavaClassNames.callerNiceClassAndMethod(1));
}
synchronized (populatingCatalogMutex) {
if (hasRunInitialCatalogInitialization()) {
return;
}
populateInitialCatalogImpl(true);
}
}
public void handleException(Throwable throwable, Object details) {
if (throwable instanceof InterruptedException)
throw new RuntimeInterruptedException((InterruptedException) throwable);
if (throwable instanceof RuntimeInterruptedException)
throw (RuntimeInterruptedException) throwable;
if (details instanceof CatalogItem) {
if (((CatalogItem<?,?>)details).getCatalogItemId() != null) {
details = ((CatalogItem<?,?>)details).getCatalogItemId();
}
}
PropagatedRuntimeException wrap = new PropagatedRuntimeException("Error loading catalog item "+details, throwable);
log.warn(Exceptions.collapseText(wrap));
log.debug("Trace for: "+wrap, wrap);
getManagementContext().errors().add(wrap);
if (isStartingUp && failOnStartupErrors) {
throw new FatalRuntimeException("Unable to load catalog item "+details, wrap);
}
}
private void confirmCatalog() {
// Force load of catalog (so web console is up to date)
Stopwatch time = Stopwatch.createStarted();
Iterable<RegisteredType> all = getManagementContext().getTypeRegistry().getAll();
int errors = 0;
for (RegisteredType rt: all) {
if (RegisteredTypes.isTemplate(rt)) {
// skip validation of templates, they might contain instructions,
// and additionally they might contain multiple items in which case
// the validation below won't work anyway (you need to go via a deployment plan)
} else {
if (rt.getKind()==RegisteredTypeKind.UNRESOLVED) {
errors++;
handleException(new UserFacingException("Unresolved type in catalog"), rt);
}
}
}
// and force resolution of legacy items
BrooklynCatalog catalog = getManagementContext().getCatalog();
Iterable<CatalogItem<Object, Object>> items = catalog.getCatalogItemsLegacy();
for (CatalogItem<Object, Object> item: items) {
try {
if (item.getCatalogItemType()==CatalogItemType.TEMPLATE) {
// skip validation of templates, they might contain instructions,
// and additionally they might contain multiple items in which case
// the validation below won't work anyway (you need to go via a deployment plan)
} else {
AbstractBrooklynObjectSpec<?, ?> spec = catalog.peekSpec(item);
if (spec instanceof EntitySpec) {
BrooklynTypes.getDefinedEntityType(((EntitySpec<?>)spec).getType());
}
log.debug("Catalog loaded spec "+spec+" for item "+item);
}
} catch (Throwable throwable) {
handleException(throwable, item);
}
}
log.debug("Catalog (size "+Iterables.size(all)+", of which "+Iterables.size(items)+" legacy) confirmed in "+Duration.of(time)+(errors>0 ? ", errors found ("+errors+")" : ""));
// nothing else added here
}
private void populateInitialCatalogImpl(boolean reset) {
assert Thread.holdsLock(populatingCatalogMutex);
if (isPopulatingInitial) {
// Avoid recursively loops, where getCatalog() calls unofficialPopulateInitialCatalog(), but populateInitialCatalogImpl() also calls getCatalog()
return;
}
isPopulatingInitial = true;
try {
BasicBrooklynCatalog catalog = (BasicBrooklynCatalog) managementContext.getCatalog();
if (!catalog.getCatalog().isLoaded()) {
catalog.load();
} else {
if (reset) {
catalog.reset(ImmutableList.<CatalogItem<?,?>>of());
}
}
populateViaInitialBomImpl(catalog);
} catch (Throwable e) {
if (!Thread.currentThread().isInterrupted() && !isRebindReadOnlyShuttingDown(getManagementContext())) {
// normal on interruption, esp during tests; only worth remarking here otherwise (we rethrow in any case)
log.warn("Error populating catalog (rethrowing): "+e, e);
}
throw Exceptions.propagate(e);
} finally {
isPopulatingInitial = false;
hasRunInitialCatalogInitialization = true;
}
}
private void onFinalCatalog() {
assert Thread.holdsLock(populatingCatalogMutex);
hasRunFinalInitialization = true;
confirmCatalog();
}
public void clearForSubsequentCatalogInit() {
hasRunInitialCatalogInitialization = false;
hasRunPersistenceInitialization = false;
hasRunFinalInitialization = false;
}
private void populateViaInitialBomImpl(BasicBrooklynCatalog catalog) {
// B1) look for --catalog-initial, if so read it, then go to C1
// B2) look for BrooklynServerConfig.BROOKLYN_CATALOG_URL, if so, read it, supporting YAML, then go to C1
// B3) look for ~/.brooklyn/catalog.bom, if exists, read it then go to C1
// B4) read all classpath://brooklyn/default.catalog.bom items, if they exist (and for now they will)
// B5) go to C1
if (initialUri!=null) {
populateInitialFromUri(catalog, initialUri);
return;
}
String catalogUrl = managementContext.getConfig().getConfig(BrooklynServerConfig.BROOKLYN_CATALOG_URL);
if (Strings.isNonBlank(catalogUrl)) {
populateInitialFromUri(catalog, catalogUrl);
return;
}
catalogUrl = Os.mergePaths(BrooklynServerConfig.getMgmtBaseDir( managementContext.getConfig() ), "catalog.bom");
if (new File(catalogUrl).exists()) {
populateInitialFromUri(catalog, new File(catalogUrl).toURI().toString());
return;
}
// otherwise look for for classpath:/brooklyn/default.catalog.bom --
// there is one on the classpath which says to scan, and provides a few templates;
// if one is supplied by user in the conf/ dir that will override the item from the classpath
// (TBD - we might want to scan for all such bom's?)
catalogUrl = "classpath:/brooklyn/default.catalog.bom";
if (new ResourceUtils(this).doesUrlExist(catalogUrl)) {
populateInitialFromUri(catalog, catalogUrl);
return;
}
log.info("No catalog found on classpath or specified; catalog will not be initialized.");
return;
}
private void populateInitialFromUri(BasicBrooklynCatalog catalog, String catalogUrl) {
log.debug("Loading initial catalog from {}", catalogUrl);
try {
String contents = new ResourceUtils(this).getResourceAsString(catalogUrl);
catalog.reset(MutableList.<CatalogItem<?,?>>of());
Object result = catalog.addItems(contents, true, false, true);
log.debug("Loaded initial catalog from {}: {}", catalogUrl, result);
} catch (Exception e) {
Exceptions.propagateIfFatal(e);
if (isRebindReadOnlyShuttingDown(getManagementContext())) {
throw Exceptions.propagate(e);
}
log.warn("Error importing catalog from " + catalogUrl + ": " + e, e);
}
}
@Beta
public static boolean isRebindReadOnlyShuttingDown(ManagementContext mgmt) {
if (mgmt!=null && mgmt.getRebindManager() instanceof RebindManagerImpl) {
if (((RebindManagerImpl)mgmt.getRebindManager()).isReadOnlyStopping()) {
return true;
}
}
return false;
}
private Map<InstallableManagedBundle, OsgiBundleInstallationResult> installPersistedBundlesDontStart(Map<VersionedName, InstallableManagedBundle> bundles, RebindExceptionHandler exceptionHandler, RebindLogger rebindLogger) {
Map<InstallableManagedBundle, OsgiBundleInstallationResult> installs = MutableMap.of();
// Install the bundles
Map<VersionedName, InstallableManagedBundle> remaining = MutableMap.copyOf(bundles);
Set<Pair<Entry<VersionedName, InstallableManagedBundle>, Exception>> errors = MutableSet.of();
while (!remaining.isEmpty()) {
int installed = 0;
for (Entry<VersionedName, InstallableManagedBundle> entry : MutableSet.copyOf(remaining.entrySet())) {
rebindLogger.debug("RebindManager installing bundle {}", entry.getKey());
try {
installs.put(entry.getValue(), installBundle(entry.getValue().getManagedBundle(), entry.getValue().getInputStreamSource()));
remaining.remove(entry.getKey());
installed++;
} catch (Exception e) {
rebindLogger.debug("Unable to install bundle " + entry.getKey() + ", but may re-try in case it has a dependency on another bundle (" + e + ")");
errors.add(Pair.of(entry, e));
}
}
if (installed == 0) {
// keep trying until either nothing is installed or nothing is left to install
break;
}
}
rebindLogger.debug("RebindManager installed bundles {}, {} errors", installs.keySet(), errors.size());
errors.forEach(err -> exceptionHandler.onCreateFailed(BrooklynObjectType.MANAGED_BUNDLE,
err.getLeft().getKey().toString(), err.getLeft().getValue().getManagedBundle().getSymbolicName(), err.getRight()));
return installs;
}
private void startPersistedBundles(PersistedCatalogState filteredPersistedState, Map<InstallableManagedBundle, OsgiBundleInstallationResult> installs, RebindExceptionHandler exceptionHandler, RebindLogger rebindLogger) {
// Start the bundles (now that we've installed them all)
// start order is: OSGi and not catalog; then OSGi and catalog; then not catalog nor OSGi; then catalog and not OSGi
// (we need OSGi and not catalog to start first; the others are less important)
Set<OsgiBundleInstallationResult> bundlesInOrder = MutableSet.of();
Set<OsgiBundleInstallationResult> bundlesToRemove = MutableSet.of();
installs.values().stream().forEach(candidate -> {
if (filteredPersistedState.getBundles().containsKey(candidate.getVersionedName())) {
candidate.setRebinding(true);
bundlesInOrder.add(candidate);
} else {
log.debug("Skipping start of persisted bundle "+candidate+" due to catalog upgrade metadata instructions");
bundlesToRemove.add(candidate);
}
});
bundlesToRemove.forEach(b -> {
log.debug("Considering to uninstall: "+b+" / "+getManagementContext().getOsgiManager().get().getManagedBundle(b.getVersionedName()));
if (b.getBundle()!=null && b.getBundle().getState() >= Bundle.INSTALLED && b.getBundle().getState() < Bundle.STARTING) {
// we installed it, catalog did not start it, so let's uninstall it
OsgiBundleInstallationResult result = getManagementContext().getOsgiManager().get().uninstallUploadedBundle(b.getMetadata());
log.debug("Result of uninstalling "+b+" due to catalog upgrade metadata instructions: "+result);
}
});
MutableSet.copyOf(bundlesInOrder).stream().filter(b -> b.getBundle() != null && b.getBundle().getResource("/catalog.bom") != null).forEach(b -> {
bundlesInOrder.remove(b);
bundlesInOrder.add(b); // then move catalog.bom items to the end
});
MutableSet.copyOf(bundlesInOrder).stream().filter(b -> b.getBundle() != null && b.getBundle().getResource("/META-INF/MANIFEST.MF") == null).forEach(b -> {
bundlesInOrder.remove(b);
bundlesInOrder.add(b); // move non-osgi items to the end
});
if (!bundlesInOrder.isEmpty()) {
log.debug("Rebind bundle start order is: " + bundlesInOrder);
}
Set<RegisteredType> installedTypes = MutableSet.of();
for (OsgiBundleInstallationResult br : bundlesInOrder) {
try {
startBundle(br);
Iterables.addAll(installedTypes, managementContext.getTypeRegistry().getMatching(
RegisteredTypePredicates.containingBundle(br.getVersionedName())));
} catch (Exception e) {
exceptionHandler.onCreateFailed(BrooklynObjectType.MANAGED_BUNDLE, br.getMetadata().getId(), br.getMetadata().getSymbolicName(), e);
}
}
// Validate that they all started successfully
if (!installedTypes.isEmpty()) {
validateAllTypes(installedTypes, exceptionHandler);
}
for (Map.Entry<InstallableManagedBundle, OsgiBundleInstallationResult> entry : installs.entrySet()) {
ManagedBundle bundle = entry.getKey().getManagedBundle();
OsgiBundleInstallationResult result = entry.getValue();
if (result.getCode() == OsgiBundleInstallationResult.ResultCode.IGNORING_BUNDLE_AREADY_INSTALLED
&& !result.getMetadata().getId().equals(bundle.getId())) {
// Bundle was already installed as a "Brooklyn managed bundle" (with different id),
// and will thus be persisted with that id.
// For example, can happen if it is in the "initial catalog" and also in persisted state.
// Delete this copy from the persisted state as it is a duplicate.
managementContext.getRebindManager().getChangeListener().onUnmanaged(bundle);
}
}
}
private void validateAllTypes(Set<RegisteredType> installedTypes, RebindExceptionHandler exceptionHandler) {
Stopwatch sw = Stopwatch.createStarted();
log.debug("Getting catalog to validate all types");
final BrooklynCatalog catalog = this.managementContext.getCatalog();
log.debug("Got catalog in {} now validate", sw.toString());
sw.reset(); sw.start();
Map<RegisteredType, Collection<Throwable>> validationErrors = catalog.validateTypes( installedTypes );
log.debug("Validation done in {}", sw.toString());
if (!validationErrors.isEmpty()) {
Map<VersionedName, Map<RegisteredType,Collection<Throwable>>> errorsByBundle = MutableMap.of();
for (RegisteredType t: validationErrors.keySet()) {
VersionedName vn = VersionedName.fromString(t.getContainingBundle());
Map<RegisteredType, Collection<Throwable>> errorsInBundle = errorsByBundle.get(vn);
if (errorsInBundle==null) {
errorsInBundle = MutableMap.of();
errorsByBundle.put(vn, errorsInBundle);
}
errorsInBundle.put(t, validationErrors.get(t));
}
for (VersionedName vn: errorsByBundle.keySet()) {
Map<RegisteredType, Collection<Throwable>> errorsInBundle = errorsByBundle.get(vn);
ManagedBundle b = managementContext.getOsgiManager().get().getManagedBundle(vn);
String id = b!=null ? b.getId() : /* just in case it was uninstalled concurrently somehow */ vn.toString();
exceptionHandler.onCreateFailed(BrooklynObjectType.MANAGED_BUNDLE,
id,
vn.getSymbolicName(),
Exceptions.create("Failed to install "+vn+", types "+errorsInBundle.keySet()+" gave errors",
Iterables.concat(errorsInBundle.values())));
}
}
}
/** install the bundles into brooklyn and osgi, but do not start nor validate;
* caller (rebind) will do that manually, doing each step across all bundles before proceeding
* to prevent reference errors */
private OsgiBundleInstallationResult installBundle(ManagedBundle bundle, Supplier<InputStream> zipInput) {
return getManagementContext().getOsgiManager().get().installDeferredStart(bundle, zipInput, false).get();
}
private void startBundle(OsgiBundleInstallationResult br) throws BundleException {
if (br.getDeferredStart()!=null) {
br.getDeferredStart().run();
}
}
private PersistedCatalogState filterBundlesAndCatalogInPersistedState(PersistedCatalogState persistedState, RebindLogger rebindLogger) {
CatalogUpgrades catalogUpgrades = CatalogUpgrades.getFromManagementContext(managementContext);
if (catalogUpgrades.isEmpty()) {
return persistedState;
} else {
rebindLogger.debug("Filtering out persisted catalog: removedBundles="+catalogUpgrades.getRemovedBundles()+"; removedLegacyItems="+catalogUpgrades.getRemovedLegacyItems());
}
Map<VersionedName, InstallableManagedBundle> bundles = new LinkedHashMap<>();
for (Map.Entry<VersionedName, InstallableManagedBundle> entry : persistedState.getBundles().entrySet()) {
if (catalogUpgrades.isBundleRemoved(entry.getKey())) {
rebindLogger.debug("Filtering out persisted bundle "+entry.getKey());
// will skip persistence while not yet installed, and will remove when uninstalled
} else {
bundles.put(entry.getKey(), entry.getValue());
}
}
List<CatalogItem<?, ?>> legacyCatalogItems = new ArrayList<>();
for (CatalogItem<?, ?> legacyCatalogItem : persistedState.getLegacyCatalogItems()) {
if (catalogUpgrades.isLegacyItemRemoved(legacyCatalogItem)) {
rebindLogger.debug("Filtering out persisted legacy catalog item "+legacyCatalogItem.getId());
// will skip persistence while not yet installed, and will remove when uninstalled
} else {
legacyCatalogItems.add(legacyCatalogItem);
}
}
return new PersistedCatalogState(bundles, legacyCatalogItems);
}
public interface RebindLogger {
void debug(String message, Object... args);
void info(String message, Object... args);
}
public interface InstallableManagedBundle {
public ManagedBundle getManagedBundle();
/** The caller is responsible for closing each stream. */
public Supplier<InputStream> getInputStreamSource() throws IOException;
}
public static class PersistedCatalogState {
private final Map<VersionedName, InstallableManagedBundle> bundles;
private final Collection<CatalogItem<?, ?>> legacyCatalogItems;
public PersistedCatalogState(Map<VersionedName, InstallableManagedBundle> bundles, Collection<CatalogItem<?, ?>> legacyCatalogItems) {
this.bundles = checkNotNull(bundles, "bundles");
this.legacyCatalogItems = checkNotNull(legacyCatalogItems, "legacyCatalogItems");
}
/**
* The persisted bundles (from the {@code /bundles} sub-directory of the persisted state).
*/
public Map<VersionedName, InstallableManagedBundle> getBundles() {
return bundles;
}
/**
* The persisted catalog items (from the {@code /catalog} sub-directory of the persisted state).
*/
public Collection<CatalogItem<?,?>> getLegacyCatalogItems() {
return legacyCatalogItems;
}
}
}