blob: 721c3a038206b334f20f717387ea6e443868204c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.brooklyn.entity.software.base.lifecycle;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import javax.annotation.Nullable;
import org.apache.brooklyn.api.effector.Effector;
import org.apache.brooklyn.api.entity.Entity;
import org.apache.brooklyn.api.location.Location;
import org.apache.brooklyn.api.location.MachineLocation;
import org.apache.brooklyn.api.location.MachineManagementMixins;
import org.apache.brooklyn.api.location.MachineManagementMixins.SuspendsMachines;
import org.apache.brooklyn.api.location.MachineProvisioningLocation;
import org.apache.brooklyn.api.location.NoMachinesAvailableException;
import org.apache.brooklyn.api.mgmt.Task;
import org.apache.brooklyn.api.sensor.AttributeSensor;
import org.apache.brooklyn.api.sensor.Feed;
import org.apache.brooklyn.config.ConfigKey;
import org.apache.brooklyn.core.config.ConfigKeys;
import org.apache.brooklyn.core.config.Sanitizer;
import org.apache.brooklyn.core.effector.EffectorBody;
import org.apache.brooklyn.core.effector.Effectors;
import org.apache.brooklyn.core.entity.Attributes;
import org.apache.brooklyn.core.entity.BrooklynConfigKeys;
import org.apache.brooklyn.core.entity.Entities;
import org.apache.brooklyn.core.entity.EntityInternal;
import org.apache.brooklyn.core.entity.internal.AttributesInternal;
import org.apache.brooklyn.core.entity.internal.AttributesInternal.ProvisioningTaskState;
import org.apache.brooklyn.core.entity.lifecycle.Lifecycle;
import org.apache.brooklyn.core.entity.lifecycle.Lifecycle.Transition;
import org.apache.brooklyn.core.entity.lifecycle.ServiceStateLogic;
import org.apache.brooklyn.core.entity.trait.Startable;
import org.apache.brooklyn.core.entity.trait.StartableMethods;
import org.apache.brooklyn.core.feed.ConfigToAttributes;
import org.apache.brooklyn.core.location.AbstractLocation;
import org.apache.brooklyn.core.location.Locations;
import org.apache.brooklyn.core.location.Machines;
import org.apache.brooklyn.core.location.cloud.CloudLocationConfig;
import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
import org.apache.brooklyn.core.mgmt.entitlement.Entitlements;
import org.apache.brooklyn.core.sensor.BasicAttributeSensor;
import org.apache.brooklyn.core.sensor.ReleaseableLatch;
import org.apache.brooklyn.entity.machine.MachineInitTasks;
import org.apache.brooklyn.entity.machine.ProvidesProvisioningFlags;
import org.apache.brooklyn.entity.software.base.SoftwareProcess;
import org.apache.brooklyn.entity.software.base.SoftwareProcess.RestartSoftwareParameters;
import org.apache.brooklyn.entity.software.base.SoftwareProcess.RestartSoftwareParameters.RestartMachineMode;
import org.apache.brooklyn.entity.software.base.SoftwareProcess.StopSoftwareParameters;
import org.apache.brooklyn.entity.software.base.SoftwareProcess.StopSoftwareParameters.StopMode;
import org.apache.brooklyn.entity.stock.EffectorStartableImpl.StartParameters;
import org.apache.brooklyn.location.localhost.LocalhostMachineProvisioningLocation;
import org.apache.brooklyn.location.ssh.CanResolveOnBoxDir;
import org.apache.brooklyn.location.ssh.SshMachineLocation;
import org.apache.brooklyn.util.collections.MutableMap;
import org.apache.brooklyn.util.collections.MutableSet;
import org.apache.brooklyn.util.core.config.ConfigBag;
import org.apache.brooklyn.util.core.task.DynamicTasks;
import org.apache.brooklyn.util.core.task.Tasks;
import org.apache.brooklyn.util.core.task.ValueResolverIterator;
import org.apache.brooklyn.util.exceptions.Exceptions;
import org.apache.brooklyn.util.guava.Maybe;
import org.apache.brooklyn.util.net.UserAndHostAndPort;
import org.apache.brooklyn.util.os.Os;
import org.apache.brooklyn.util.repeat.Repeater;
import org.apache.brooklyn.util.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.Beta;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.reflect.TypeToken;
/**
* Default skeleton for start/stop/restart tasks on machines.
* <p>
* Knows how to provision machines, making use of {@link ProvidesProvisioningFlags#obtainProvisioningFlags(MachineProvisioningLocation)},
* and provides hooks for injecting behaviour at common places.
* <p>
* Methods are designed for overriding, with the convention that *Async methods should queue (and not block).
* The following methods are commonly overridden (and you can safely queue tasks, block, or return immediately in them):
* <ul>
* <li> {@link #startProcessesAtMachine(Supplier)} (required)
* <li> {@link #stopProcessesAtMachine()} (required, but can be left blank if you assume the VM will be destroyed)
* <li> {@link #preStartCustom(MachineLocation)}
* <li> {@link #postStartCustom()}
* <li> {@link #preStopConfirmCustom()}
* <li> {@link #postStopCustom()}
* </ul>
* Note methods at this level typically look after the {@link Attributes#SERVICE_STATE_ACTUAL} sensor.
*
* @since 0.6.0
*/
@Beta
public abstract class MachineLifecycleEffectorTasks {
private static final Logger log = LoggerFactory.getLogger(MachineLifecycleEffectorTasks.class);
public static final ConfigKey<Boolean> ON_BOX_BASE_DIR_RESOLVED = ConfigKeys.newBooleanConfigKey(
"onbox.base.dir.resolved",
"Whether the on-box base directory has been resolved (for internal use)");
public static final ConfigKey<Collection<? extends Location>> LOCATIONS = StartParameters.LOCATIONS;
public static final ConfigKey<Duration> STOP_PROCESS_TIMEOUT = ConfigKeys.newDurationConfigKey(
"process.stop.timeout", "How long to wait for the processes to be stopped; use null to mean forever",
Duration.TWO_MINUTES);
@Beta
public static final ConfigKey<Duration> STOP_WAIT_PROVISIONING_TIMEOUT = ConfigKeys.newDurationConfigKey(
"stop.wait.provisioning.timeout",
"If stop is called on an entity while it is still provisioning the machine (such that "
+ "the provisioning cannot be safely interrupted), this is the length of time "
+ "to wait for the machine instance to become available so that it can be terminated. "
+ "If stop aborts before this point, the machine may be left running.",
Duration.minutes(10));
@Beta
public static final AttributeSensor<MachineLocation> INTERNAL_PROVISIONED_MACHINE = new BasicAttributeSensor<MachineLocation>(
TypeToken.of(MachineLocation.class),
"internal.provisioning.task.machine",
"Internal transient sensor (do not use) for tracking the machine being provisioned (to better handle aborting)",
AttributeSensor.SensorPersistenceMode.NONE);
protected final MachineInitTasks machineInitTasks = new MachineInitTasks();
/** Attaches lifecycle effectors (start, restart, stop) to the given entity post-creation. */
public void attachLifecycleEffectors(Entity entity) {
((EntityInternal) entity).getMutableEntityType().addEffector(newStartEffector());
((EntityInternal) entity).getMutableEntityType().addEffector(newRestartEffector());
((EntityInternal) entity).getMutableEntityType().addEffector(newStopEffector());
}
/**
* Return an effector suitable for setting in a {@code public static final} or attaching dynamically.
* <p>
* The effector overrides the corresponding effector from {@link Startable} with
* the behaviour in this lifecycle class instance.
*/
public Effector<Void> newStartEffector() {
return Effectors.effector(Startable.START).impl(newStartEffectorTask()).build();
}
/** @see {@link #newStartEffector()} */
public Effector<Void> newRestartEffector() {
return Effectors.effector(Startable.RESTART)
.parameter(RestartSoftwareParameters.RESTART_CHILDREN)
.parameter(RestartSoftwareParameters.RESTART_MACHINE)
.impl(newRestartEffectorTask())
.build();
}
/** @see {@link #newStartEffector()} */
public Effector<Void> newStopEffector() {
return Effectors.effector(Startable.STOP)
.parameter(StopSoftwareParameters.STOP_PROCESS_MODE)
.parameter(StopSoftwareParameters.STOP_MACHINE_MODE)
.impl(newStopEffectorTask())
.build();
}
/** @see {@link #newStartEffector()} */
public Effector<Void> newSuspendEffector() {
return Effectors.effector(Void.class, "suspend")
.description("Suspend the process/service represented by an entity")
.parameter(StopSoftwareParameters.STOP_PROCESS_MODE)
.parameter(StopSoftwareParameters.STOP_MACHINE_MODE)
.impl(newSuspendEffectorTask())
.build();
}
/**
* Returns the {@link EffectorBody} which supplies the implementation for the start effector.
* <p>
* Calls {@link #start(Collection)} in this class.
*/
public EffectorBody<Void> newStartEffectorTask() {
// TODO included anonymous inner class for backwards compatibility with persisted state.
new EffectorBody<Void>() {
@Override
public Void call(ConfigBag parameters) {
Collection<? extends Location> locations = null;
Object locationsRaw = parameters.getStringKey(LOCATIONS.getName());
locations = Locations.coerceToCollectionOfLocationsManaged(entity().getManagementContext(), locationsRaw);
if (locations==null) {
// null/empty will mean to inherit from parent
locations = Collections.emptyList();
}
start(locations);
return null;
}
};
return new StartEffectorBody();
}
private class StartEffectorBody extends EffectorBody<Void> {
@Override
public Void call(ConfigBag parameters) {
Collection<? extends Location> locations = null;
Object locationsRaw = parameters.getStringKey(LOCATIONS.getName());
locations = Locations.coerceToCollectionOfLocationsManaged(entity().getManagementContext(), locationsRaw);
if (locations == null) {
// null/empty will mean to inherit from parent
locations = Collections.emptyList();
}
start(locations);
return null;
}
}
/**
* Calls {@link #restart(ConfigBag)}.
*
* @see {@link #newStartEffectorTask()}
*/
public EffectorBody<Void> newRestartEffectorTask() {
// TODO included anonymous inner class for backwards compatibility with persisted state.
new EffectorBody<Void>() {
@Override
public Void call(ConfigBag parameters) {
restart(parameters);
return null;
}
};
return new RestartEffectorBody();
}
private class RestartEffectorBody extends EffectorBody<Void> {
@Override
public Void call(ConfigBag parameters) {
restart(parameters);
return null;
}
}
/**
* Calls {@link #stop(ConfigBag)}.
*
* @see {@link #newStartEffectorTask()}
*/
public EffectorBody<Void> newStopEffectorTask() {
// TODO included anonymous inner class for backwards compatibility with persisted state.
new EffectorBody<Void>() {
@Override
public Void call(ConfigBag parameters) {
stop(parameters);
return null;
}
};
return new StopEffectorBody();
}
private class StopEffectorBody extends EffectorBody<Void> {
@Override
public Void call(ConfigBag parameters) {
stop(parameters);
return null;
}
}
/**
* Calls {@link #suspend(ConfigBag)}.
*
* @see {@link #newStartEffectorTask()}
*/
public EffectorBody<Void> newSuspendEffectorTask() {
return new SuspendEffectorBody();
}
private class SuspendEffectorBody extends EffectorBody<Void> {
@Override
public Void call(ConfigBag parameters) {
suspend(parameters);
return null;
}
}
protected EntityInternal entity() {
return (EntityInternal) BrooklynTaskTags.getTargetOrContextEntity(Tasks.current());
}
protected Location getLocation(@Nullable Collection<? extends Location> locations) {
if (locations==null || locations.isEmpty()) locations = entity().getLocations();
if (locations.isEmpty()) {
MachineProvisioningLocation<?> provisioner = entity().getAttribute(SoftwareProcess.PROVISIONING_LOCATION);
if (provisioner!=null) locations = Arrays.<Location>asList(provisioner);
}
locations = Locations.getLocationsCheckingAncestors(locations, entity());
Maybe<MachineLocation> ml = Locations.findUniqueMachineLocation(locations);
if (ml.isPresent()) return ml.get();
if (locations.isEmpty())
throw new IllegalArgumentException("No locations specified when starting "+entity());
if (locations.size() != 1 || Iterables.getOnlyElement(locations)==null)
throw new IllegalArgumentException("Ambiguous locations detected when starting "+entity()+": "+locations);
return Iterables.getOnlyElement(locations);
}
/** runs the tasks needed to start, wrapped by setting {@link Attributes#SERVICE_STATE_EXPECTED} appropriately */
public void start(Collection<? extends Location> locations) {
ServiceStateLogic.setExpectedState(entity(), Lifecycle.STARTING);
try {
startInLocations(locations);
DynamicTasks.waitForLast();
ServiceStateLogic.setExpectedState(entity(), Lifecycle.RUNNING);
} catch (Throwable t) {
ServiceStateLogic.setExpectedState(entity(), Lifecycle.ON_FIRE);
throw Exceptions.propagate(t);
}
}
/** Asserts there is a single location and calls {@link #startInLocation(Location)} with that location. */
protected void startInLocations(Collection<? extends Location> locations) {
startInLocation(getLocation(locations));
}
/** Dispatches to the appropriate method(s) to start in the given location. */
protected void startInLocation(final Location location) {
Supplier<MachineLocation> locationS = null;
if (location instanceof MachineProvisioningLocation) {
Task<MachineLocation> machineTask = provisionAsync((MachineProvisioningLocation<?>)location);
locationS = Tasks.supplier(machineTask);
} else if (location instanceof MachineLocation) {
locationS = Suppliers.ofInstance((MachineLocation)location);
}
Preconditions.checkState(locationS != null, "Unsupported location "+location+", when starting "+entity());
final Supplier<MachineLocation> locationSF = locationS;
// Opportunity to block startup until other dependent components are available
try (CloseableLatch latch = waitForCloseableLatch(entity(), SoftwareProcess.START_LATCH)) {
preStartAtMachineAsync(locationSF);
DynamicTasks.queue("start (processes)", new StartProcessesAtMachineTask(locationSF));
postStartAtMachineAsync();
}
}
private class StartProcessesAtMachineTask implements Runnable {
private final Supplier<MachineLocation> machineSupplier;
private StartProcessesAtMachineTask(Supplier<MachineLocation> machineSupplier) {
this.machineSupplier = machineSupplier;
}
@Override
public void run() {
startProcessesAtMachine(machineSupplier);
}
}
/**
* Returns a queued {@link Task} which provisions a machine in the given location
* and returns that machine. The task can be used as a supplier to subsequent methods.
*/
protected Task<MachineLocation> provisionAsync(final MachineProvisioningLocation<?> location) {
return DynamicTasks.queue(Tasks.<MachineLocation>builder().displayName("provisioning (" + location.getDisplayName() + ")").body(
new ProvisionMachineTask(location)).build());
}
private class ProvisionMachineTask implements Callable<MachineLocation> {
final MachineProvisioningLocation<?> location;
private ProvisionMachineTask(MachineProvisioningLocation<?> location) {
this.location = location;
}
@Override
public MachineLocation call() throws Exception {
// Blocks if a latch was configured.
entity().getConfig(BrooklynConfigKeys.PROVISION_LATCH);
final Map<String, Object> flags = obtainProvisioningFlags(location);
if (!(location instanceof LocalhostMachineProvisioningLocation))
log.info("Starting {}, obtaining a new location instance in {} with ports {}", new Object[]{entity(), location, flags.get("inboundPorts")});
entity().sensors().set(SoftwareProcess.PROVISIONING_LOCATION, location);
Transition expectedState = entity().sensors().get(Attributes.SERVICE_STATE_EXPECTED);
// BROOKLYN-263: see corresponding code in doStop()
if (expectedState != null && (expectedState.getState() == Lifecycle.STOPPING || expectedState.getState() == Lifecycle.STOPPED)) {
throw new IllegalStateException("Provisioning aborted before even begun for "+entity()+" in "+location+" (presumably by a concurrent call to stop");
}
entity().sensors().set(AttributesInternal.INTERNAL_PROVISIONING_TASK_STATE, ProvisioningTaskState.RUNNING);
MachineLocation machine;
try {
machine = Tasks.withBlockingDetails("Provisioning machine in " + location, new ObtainLocationTask(location, flags));
entity().sensors().set(INTERNAL_PROVISIONED_MACHINE, machine);
} finally {
entity().sensors().remove(AttributesInternal.INTERNAL_PROVISIONING_TASK_STATE);
}
if (machine == null) {
throw new NoMachinesAvailableException("Failed to obtain machine in " + location.toString());
}
if (log.isDebugEnabled()) {
log.debug("While starting {}, obtained new location instance {}", entity(),
(machine instanceof SshMachineLocation
? machine + ", details " + ((SshMachineLocation) machine).getUser() + ":" + Sanitizer.sanitize(((SshMachineLocation) machine).config().getLocalBag())
: machine));
}
return machine;
}
}
private static class ObtainLocationTask implements Callable<MachineLocation> {
final MachineProvisioningLocation<?> location;
final Map<String, Object> flags;
private ObtainLocationTask(MachineProvisioningLocation<?> location, Map<String, Object> flags) {
this.flags = flags;
this.location = location;
}
@Override
public MachineLocation call() throws NoMachinesAvailableException {
return location.obtain(flags);
}
}
/**
* Wraps a call to {@link #preStartCustom(MachineLocation)}, after setting the hostname and address.
*/
protected void preStartAtMachineAsync(final Supplier<MachineLocation> machineS) {
DynamicTasks.queue("pre-start", new PreStartTask(machineS.get()));
}
private class PreStartTask implements Runnable {
final MachineLocation machine;
private PreStartTask(MachineLocation machine) {
this.machine = machine;
}
@Override
public void run() {
log.info("Starting {} on machine {}", entity(), machine);
Collection<Location> oldLocs = entity().getLocations();
if (!oldLocs.isEmpty()) {
List<MachineLocation> oldSshLocs = ImmutableList.copyOf(Iterables.filter(oldLocs, MachineLocation.class));
if (!oldSshLocs.isEmpty()) {
// check if existing locations are compatible
log.debug("Entity " + entity() + " had machine locations " + oldSshLocs + " when starting at " + machine + "; checking if they are compatible");
for (MachineLocation oldLoc : oldSshLocs) {
// machines are deemed compatible if hostname and address are the same, or they are localhost
// this allows a machine create by jclouds to then be defined with an ip-based spec
if (!"localhost".equals(machine.getConfig(AbstractLocation.ORIGINAL_SPEC))) {
checkLocationParametersCompatible(machine, oldLoc, "hostname",
oldLoc.getAddress().getHostName(), machine.getAddress().getHostName());
checkLocationParametersCompatible(machine, oldLoc, "address",
oldLoc.getAddress().getHostAddress(), machine.getAddress().getHostAddress());
}
}
log.debug("Entity " + entity() + " old machine locations " + oldSshLocs + " were compatible, removing them to start at " + machine);
entity().removeLocations(oldSshLocs);
}
}
entity().addLocations(ImmutableList.of((Location) machine));
// elsewhere we rely on (public) hostname being set _after_ subnet_hostname
// (to prevent the tiny possibility of races resulting in hostname being returned
// simply because subnet is still being looked up)
Maybe<String> lh = Machines.getSubnetHostname(machine);
Maybe<String> la = Machines.getSubnetIp(machine);
if (lh.isPresent() && entity().sensors().get(Attributes.SUBNET_HOSTNAME) == null) {
entity().sensors().set(Attributes.SUBNET_HOSTNAME, lh.get());
}
if (la.isPresent() && entity().sensors().get(Attributes.SUBNET_ADDRESS) == null) {
entity().sensors().set(Attributes.SUBNET_ADDRESS, la.get());
}
if (entity().sensors().get(Attributes.HOSTNAME) == null) {
entity().sensors().set(Attributes.HOSTNAME, machine.getAddress().getHostName());
}
if (entity().sensors().get(Attributes.ADDRESS) == null) {
entity().sensors().set(Attributes.ADDRESS, machine.getAddress().getHostAddress());
}
if (machine instanceof SshMachineLocation) {
SshMachineLocation sshMachine = (SshMachineLocation) machine;
UserAndHostAndPort sshAddress = UserAndHostAndPort.fromParts(
sshMachine.getUser(), sshMachine.getAddress().getHostName(), sshMachine.getPort());
// FIXME: Who or what is SSH_ADDRESS intended for? It's not necessarily the address that
// the SshMachineLocation is using for ssh connections (because it accepts SSH_HOST as an override).
entity().sensors().set(Attributes.SSH_ADDRESS, sshAddress);
}
if (Boolean.TRUE.equals(entity().getConfig(SoftwareProcess.OPEN_IPTABLES))) {
if (machine instanceof SshMachineLocation) {
@SuppressWarnings("unchecked")
Iterable<Integer> inboundPorts = (Iterable<Integer>) machine.config().get(CloudLocationConfig.INBOUND_PORTS);
machineInitTasks.openIptablesAsync(inboundPorts, (SshMachineLocation)machine);
} else {
log.warn("Ignoring flag OPEN_IPTABLES on non-ssh location {}", machine);
}
}
if (Boolean.TRUE.equals(entity().getConfig(SoftwareProcess.STOP_IPTABLES))) {
if (machine instanceof SshMachineLocation) {
machineInitTasks.stopIptablesAsync((SshMachineLocation)machine);
} else {
log.warn("Ignoring flag STOP_IPTABLES on non-ssh location {}", machine);
}
}
if (Boolean.TRUE.equals(entity().getConfig(SoftwareProcess.DONT_REQUIRE_TTY_FOR_SUDO))) {
if (machine instanceof SshMachineLocation) {
machineInitTasks.dontRequireTtyForSudoAsync((SshMachineLocation)machine);
} else {
log.warn("Ignoring flag DONT_REQUIRE_TTY_FOR_SUDO on non-ssh location {}", machine);
}
}
resolveOnBoxDir(entity(), machine);
preStartCustom(machine);
}
}
/**
* Resolves the on-box dir.
* <p>
* Initialize and pre-create the right onbox working dir, if an ssh machine location.
* Logs a warning if not.
*/
@SuppressWarnings("deprecation")
public static String resolveOnBoxDir(EntityInternal entity, MachineLocation machine) {
String base = entity.getConfig(BrooklynConfigKeys.ONBOX_BASE_DIR);
if (base==null) base = machine.getConfig(BrooklynConfigKeys.ONBOX_BASE_DIR);
if (base!=null && Boolean.TRUE.equals(entity.getConfig(ON_BOX_BASE_DIR_RESOLVED))) return base;
if (base==null) base = entity.getManagementContext().getConfig().getConfig(BrooklynConfigKeys.ONBOX_BASE_DIR);
if (base==null) base = entity.getConfig(BrooklynConfigKeys.BROOKLYN_DATA_DIR);
if (base==null) base = machine.getConfig(BrooklynConfigKeys.BROOKLYN_DATA_DIR);
if (base==null) base = entity.getManagementContext().getConfig().getConfig(BrooklynConfigKeys.BROOKLYN_DATA_DIR);
if (base==null) base = "~/brooklyn-managed-processes";
if (base.equals("~")) base=".";
if (base.startsWith("~/")) base = "."+base.substring(1);
String resolvedBase = null;
if (entity.getConfig(BrooklynConfigKeys.SKIP_ON_BOX_BASE_DIR_RESOLUTION) || machine.getConfig(BrooklynConfigKeys.SKIP_ON_BOX_BASE_DIR_RESOLUTION)) {
if (log.isDebugEnabled()) log.debug("Skipping on-box base dir resolution for "+entity+" at "+machine);
if (!Os.isAbsolutish(base)) base = "~/"+base;
resolvedBase = Os.tidyPath(base);
} else if (machine instanceof CanResolveOnBoxDir) {
resolvedBase = ((CanResolveOnBoxDir)machine).resolveOnBoxDirFor(entity, base);
}
if (resolvedBase==null) {
if (!Os.isAbsolutish(base)) base = "~/"+base;
resolvedBase = Os.tidyPath(base);
log.warn("Could not resolve on-box directory for "+entity+" at "+machine+"; using "+resolvedBase+", though this may not be accurate at the target (and may fail shortly)");
}
entity.config().set(BrooklynConfigKeys.ONBOX_BASE_DIR, resolvedBase);
entity.config().set(ON_BOX_BASE_DIR_RESOLVED, true);
return resolvedBase;
}
protected void checkLocationParametersCompatible(MachineLocation oldLoc, MachineLocation newLoc, String paramSummary,
Object oldParam, Object newParam) {
if (oldParam==null || newParam==null || !oldParam.equals(newParam))
throw new IllegalStateException("Cannot start "+entity()+" in "+newLoc+" as it has already been started with incompatible location "+oldLoc+" " +
"("+paramSummary+" not compatible: "+oldParam+" / "+newParam+"); "+newLoc+" may require manual removal.");
}
@Deprecated
protected void preStartCustom(MachineLocation machine) {
ConfigToAttributes.apply(entity());
}
protected Map<String, Object> obtainProvisioningFlags(final MachineProvisioningLocation<?> location) {
if (entity() instanceof ProvidesProvisioningFlags) {
return ((ProvidesProvisioningFlags)entity()).obtainProvisioningFlags(location).getAllConfig();
}
return MutableMap.<String, Object>of();
}
protected abstract String startProcessesAtMachine(final Supplier<MachineLocation> machineS);
protected void postStartAtMachineAsync() {
DynamicTasks.queue("post-start", new PostStartTask());
}
private class PostStartTask implements Runnable {
@Override
public void run() {
postStartCustom();
}
}
/**
* Default post-start hooks.
* <p>
* Can be extended by subclasses, and typically will wait for confirmation of start.
* The service not set to running until after this. Also invoked following a restart.
*/
protected void postStartCustom() {
// nothing by default
}
/**
* whether when 'auto' mode is specified, the machine should be stopped when the restart effector is called
* <p>
* with {@link MachineLifecycleEffectorTasks}, a machine will always get created on restart if there wasn't one already
* (unlike certain subclasses which might attempt a shortcut process-level restart)
* so there is no reason for default behaviour of restart to throw away a provisioned machine,
* hence default impl returns <code>false</code>.
* <p>
* if it is possible to tell that a machine is unhealthy, or if {@link #restart(ConfigBag)} is overridden,
* then it might be appropriate to return <code>true</code> here.
*/
protected boolean getDefaultRestartStopsMachine() {
return false;
}
/**
* Default restart implementation for an entity.
* <p>
* Stops processes if possible, then starts the entity again.
*/
public void restart(ConfigBag parameters) {
ServiceStateLogic.setExpectedState(entity(), Lifecycle.STOPPING);
RestartMachineMode isRestartMachine = parameters.get(RestartSoftwareParameters.RESTART_MACHINE_TYPED);
if (isRestartMachine==null)
isRestartMachine=RestartMachineMode.AUTO;
if (isRestartMachine==RestartMachineMode.AUTO)
isRestartMachine = getDefaultRestartStopsMachine() ? RestartMachineMode.TRUE : RestartMachineMode.FALSE;
// Calling preStopCustom without a corresponding postStopCustom invocation
// doesn't look right so use a separate callback pair; Also depending on the arguments
// stop() could be called which will call the {pre,post}StopCustom on its own.
DynamicTasks.queue("pre-restart", new PreRestartTask());
if (isRestartMachine==RestartMachineMode.FALSE) {
DynamicTasks.queue("stopping (process)", new StopProcessesAtMachineTask());
} else {
Map<String, Object> stopMachineFlags = MutableMap.of();
if (Entitlements.getEntitlementContext() != null) {
stopMachineFlags.put("tags", MutableSet.of(BrooklynTaskTags.tagForEntitlement(Entitlements.getEntitlementContext())));
}
Task<String> stopTask = Tasks.<String>builder()
.displayName("stopping (machine)")
.body(new StopMachineTask())
.flags(stopMachineFlags)
.build();
DynamicTasks.queue(stopTask);
}
DynamicTasks.queue("starting", new StartInLocationsTask());
restartChildren(parameters);
DynamicTasks.queue("post-restart", new PostRestartTask());
DynamicTasks.waitForLast();
ServiceStateLogic.setExpectedState(entity(), Lifecycle.RUNNING);
}
private class PreRestartTask implements Runnable {
@Override
public void run() {
preRestartCustom();
}
}
private class PostRestartTask implements Runnable {
@Override
public void run() {
postRestartCustom();
}
}
private class StartInLocationsTask implements Runnable {
@Override
public void run() {
// startInLocations will look up the location, and provision a machine if necessary
// (if it remembered the provisioning location)
ServiceStateLogic.setExpectedState(entity(), Lifecycle.STARTING);
startInLocations(null);
}
}
protected void restartChildren(ConfigBag parameters) {
// TODO should we consult ChildStartableMode?
Boolean isRestartChildren = parameters.get(RestartSoftwareParameters.RESTART_CHILDREN);
if (isRestartChildren==null || !isRestartChildren) {
return;
}
if (isRestartChildren) {
DynamicTasks.queue(StartableMethods.restartingChildren(entity(), parameters));
return;
}
throw new IllegalArgumentException("Invalid value '"+isRestartChildren+"' for "+RestartSoftwareParameters.RESTART_CHILDREN.getName());
}
/**
* Default stop implementation for an entity.
* <p>
* Aborts if already stopped, otherwise sets state {@link Lifecycle#STOPPING} then
* invokes {@link #preStopCustom()}, {@link #stopProcessesAtMachine()}, then finally
* {@link #stopAnyProvisionedMachines()} and sets state {@link Lifecycle#STOPPED}.
* If no errors were encountered call {@link #postStopCustom()} at the end.
*/
public void stop(ConfigBag parameters) {
doStopLatching(parameters, new StopAnyProvisionedMachinesTask());
}
/**
* As {@link #stop} but calling {@link #suspendAnyProvisionedMachines} rather than
* {@link #stopAnyProvisionedMachines}.
*/
public void suspend(ConfigBag parameters) {
doStopLatching(parameters, new SuspendAnyProvisionedMachinesTask());
}
protected void doStopLatching(ConfigBag parameters, Callable<StopMachineDetails<Integer>> stopTask) {
try (CloseableLatch latch = waitForCloseableLatch(entity(), SoftwareProcess.STOP_LATCH)) {
doStop(parameters, stopTask);
}
}
protected void doStop(ConfigBag parameters, Callable<StopMachineDetails<Integer>> stopTask) {
preStopConfirmCustom();
log.info("Stopping {} in {}", entity(), entity().getLocations());
StopMode stopMachineMode = getStopMachineMode(parameters);
StopMode stopProcessMode = parameters.get(StopSoftwareParameters.STOP_PROCESS_MODE);
DynamicTasks.queue("pre-stop", new PreStopCustomTask());
// BROOKLYN-263:
// With this change the stop effector will wait for Location to provision so it can terminate
// the machine, if a provisioning request is in-progress.
//
// The ProvisionMachineTask stores transient internal state in PROVISIONING_TASK_STATE and
// PROVISIONED_MACHINE: it records when the provisioning is running and when done; and it
// records the final machine. We record the machine in the internal sensor (rather than
// just relying on getLocations) because the latter is set much later in the start()
// process.
//
// This code is a big improvement (previously there was a several-minute window in some
// clouds where a call to stop() would leave the machine running).
//
// However, there are still races. If the start() code has not yet reached the call to
// location.obtain() then we won't wait, and the start() call won't know to abort. It's
// fiddly to get that right, because we need to cope with restart() - so we mustn't leave
// any state behind that will interfere with subsequent sequential calls to start().
// There is some attempt to handle it by ProvisionMachineTask checking if the expectedState
// is stopping/stopped.
Maybe<MachineLocation> machine = Machines.findUniqueMachineLocation(entity().getLocations());
ProvisioningTaskState provisioningState = entity().sensors().get(AttributesInternal.INTERNAL_PROVISIONING_TASK_STATE);
if (machine.isAbsent() && provisioningState == ProvisioningTaskState.RUNNING) {
Duration maxWait = entity().config().get(STOP_WAIT_PROVISIONING_TIMEOUT);
log.info("When stopping {}, waiting for up to {} for the machine to finish provisioning, before terminating it", entity(), maxWait);
boolean success = Repeater.create("Wait for a machine to appear")
.until(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
ProvisioningTaskState state = entity().sensors().get(AttributesInternal.INTERNAL_PROVISIONING_TASK_STATE);
return (state != ProvisioningTaskState.RUNNING);
}})
.backoffTo(Duration.FIVE_SECONDS)
.limitTimeTo(maxWait)
.run();
if (!success) {
log.warn("When stopping {}, timed out after {} waiting for the machine to finish provisioning - machine may we left running", entity(), maxWait);
}
machine = Maybe.ofDisallowingNull(entity().sensors().get(INTERNAL_PROVISIONED_MACHINE));
}
entity().sensors().remove(AttributesInternal.INTERNAL_PROVISIONING_TASK_STATE);
entity().sensors().remove(INTERNAL_PROVISIONED_MACHINE);
Task<List<?>> stoppingProcess = null;
if (canStop(stopProcessMode, entity())) {
stoppingProcess = Tasks.parallel("stopping",
Tasks.create("stopping (process)", new StopProcessesAtMachineTask()),
Tasks.create("stopping (feeds)", new StopFeedsAtMachineTask()));
DynamicTasks.queue(stoppingProcess);
}
Task<StopMachineDetails<Integer>> stoppingMachine = null;
if (canStop(stopMachineMode, machine.isAbsent())) {
// Release this machine (even if error trying to stop process - we rethrow that after)
Map<String, Object> stopMachineFlags = MutableMap.of();
if (Entitlements.getEntitlementContext() != null) {
stopMachineFlags.put("tags", MutableSet.of(BrooklynTaskTags.tagForEntitlement(Entitlements.getEntitlementContext())));
}
Task<StopMachineDetails<Integer>> stopMachineTask = Tasks.<StopMachineDetails<Integer>>builder()
.displayName("stopping (machine)")
.body(stopTask).flags(stopMachineFlags)
.build();
stoppingMachine = DynamicTasks.queue(stopMachineTask);
DynamicTasks.drain(entity().getConfig(STOP_PROCESS_TIMEOUT), false);
// shutdown the machine if stopping process fails or takes too long
synchronized (stoppingMachine) {
// task also used as mutex by DST when it submits it; ensure it only submits once!
if (!stoppingMachine.isSubmitted()) {
// force the stoppingMachine task to run by submitting it here
StringBuilder msg = new StringBuilder("Submitting machine stop early in background for ").append(entity());
if (stoppingProcess == null) {
msg.append(". Process stop skipped, pre-stop not finished?");
} else {
msg.append(" because process stop has ").append(
(stoppingProcess.isDone() ? "finished abnormally" : "not finished"));
}
log.warn(msg.toString());
Entities.submit(entity(), stoppingMachine);
}
}
}
try {
// This maintains previous behaviour of silently squashing any errors on the stoppingProcess task if the
// stoppingMachine exits with a nonzero value
boolean checkStopProcesses = (stoppingProcess != null && (stoppingMachine == null || stoppingMachine.get().value == 0));
if (checkStopProcesses) {
// TODO we should test for destruction above, not merely successful "stop", as things like localhost and ssh won't be destroyed
DynamicTasks.waitForLast();
if (machine.isPresent()) {
// throw early errors *only if* there is a machine and we have not destroyed it
stoppingProcess.get();
}
}
} catch (Throwable e) {
ServiceStateLogic.setExpectedState(entity(), Lifecycle.ON_FIRE);
Exceptions.propagate(e);
}
entity().sensors().set(SoftwareProcess.SERVICE_UP, false);
ServiceStateLogic.setExpectedState(entity(), Lifecycle.STOPPED);
DynamicTasks.queue("post-stop", new PostStopCustomTask());
if (log.isDebugEnabled()) log.debug("Stopped software process entity "+entity());
}
private class StopAnyProvisionedMachinesTask implements Callable<StopMachineDetails<Integer>> {
@Override
public StopMachineDetails<Integer> call() {
return stopAnyProvisionedMachines();
}
}
private class SuspendAnyProvisionedMachinesTask implements Callable<StopMachineDetails<Integer>> {
@Override
public StopMachineDetails<Integer> call() {
return suspendAnyProvisionedMachines();
}
}
private class StopProcessesAtMachineTask implements Callable<String> {
@Override
public String call() {
DynamicTasks.markInessential();
stopProcessesAtMachine();
DynamicTasks.waitForLast();
return "Stop processes completed with no errors.";
}
}
private class StopFeedsAtMachineTask implements Callable<String> {
@Override
public String call() {
DynamicTasks.markInessential();
for (Feed feed : entity().feeds().getFeeds()) {
if (feed.isActivated()) feed.stop();
}
DynamicTasks.waitForLast();
return "Stop feeds completed with no errors.";
}
}
private class StopMachineTask implements Callable<String> {
@Override
public String call() {
DynamicTasks.markInessential();
stop(ConfigBag.newInstance().configure(StopSoftwareParameters.STOP_MACHINE_MODE, StopMode.IF_NOT_STOPPED));
DynamicTasks.waitForLast();
return "Stop of machine completed with no errors.";
}
}
private class PreStopCustomTask implements Callable<String> {
@Override
public String call() {
if (entity().getAttribute(SoftwareProcess.SERVICE_STATE_ACTUAL) == Lifecycle.STOPPED) {
log.debug("Skipping stop of entity " + entity() + " when already stopped");
return "Already stopped";
}
ServiceStateLogic.setExpectedState(entity(), Lifecycle.STOPPING);
entity().sensors().set(SoftwareProcess.SERVICE_UP, false);
preStopCustom();
return null;
}
}
private class PostStopCustomTask implements Callable<Void> {
@Override
public Void call() {
postStopCustom();
return null;
}
}
public static StopMode getStopMachineMode(ConfigBag parameters) {
final StopMode stopMachineMode = parameters.get(StopSoftwareParameters.STOP_MACHINE_MODE);
return stopMachineMode;
}
public static boolean canStop(StopMode stopMode, Entity entity) {
boolean isEntityStopped = entity.getAttribute(SoftwareProcess.SERVICE_STATE_ACTUAL)==Lifecycle.STOPPED;
return canStop(stopMode, isEntityStopped);
}
protected static boolean canStop(StopMode stopMode, boolean isStopped) {
return stopMode == StopMode.ALWAYS ||
stopMode == StopMode.IF_NOT_STOPPED && !isStopped;
}
@Deprecated
protected void preStopConfirmCustom() {
}
protected void preStopCustom() {
// nothing needed here
}
protected void postStopCustom() {
// nothing needed here
}
protected void preRestartCustom() {
// nothing needed here
}
protected void postRestartCustom() {
// nothing needed here
}
public static class StopMachineDetails<T> implements Serializable {
private static final long serialVersionUID = 3256747214315895431L;
final String message;
final T value;
protected StopMachineDetails(String message, T value) {
this.message = message;
this.value = value;
}
@Override
public String toString() {
return message;
}
}
/**
* Return string message of result.
* <p>
* Can run synchronously or not, caller will submit/queue as needed, and will block on any submitted tasks.
*/
protected abstract String stopProcessesAtMachine();
/**
* Stop and release the {@link MachineLocation} the entity is provisioned at.
* <p>
* Can run synchronously or not, caller will submit/queue as needed, and will block on any submitted tasks.
*/
protected StopMachineDetails<Integer> stopAnyProvisionedMachines() {
@SuppressWarnings("unchecked")
MachineProvisioningLocation<MachineLocation> provisioner = entity().getAttribute(SoftwareProcess.PROVISIONING_LOCATION);
if (Iterables.isEmpty(entity().getLocations())) {
log.debug("No machine decommissioning necessary for "+entity()+" - no locations");
return new StopMachineDetails<Integer>("No machine decommissioning necessary - no locations", 0);
}
// Only release this machine if we ourselves provisioned it (e.g. it might be running other services)
if (provisioner==null) {
log.debug("No machine decommissioning necessary for "+entity()+" - did not provision");
return new StopMachineDetails<Integer>("No machine decommissioning necessary - did not provision", 0);
}
Location machine = getLocation(null);
if (!(machine instanceof MachineLocation)) {
log.debug("No decommissioning necessary for "+entity()+" - not a machine location ("+machine+")");
return new StopMachineDetails<Integer>("No machine decommissioning necessary - not a machine ("+machine+")", 0);
}
entity().sensors().set(AttributesInternal.INTERNAL_TERMINATION_TASK_STATE, ProvisioningTaskState.RUNNING);
try {
clearEntityLocationAttributes(machine);
provisioner.release((MachineLocation)machine);
} finally {
// TODO On exception, should we add the machine back to the entity (because it might not really be terminated)?
// Do we need a better exception hierarchy for that?
entity().sensors().remove(AttributesInternal.INTERNAL_TERMINATION_TASK_STATE);
}
return new StopMachineDetails<Integer>("Decommissioned "+machine, 1);
}
/**
* Suspend the {@link MachineLocation} the entity is provisioned at.
* <p>
* Expects the entity's {@link SoftwareProcess#PROVISIONING_LOCATION provisioner} to be capable of
* {@link SuspendsMachines suspending machines}.
*
* @throws java.lang.UnsupportedOperationException if the entity's provisioner cannot suspend machines.
* @see MachineManagementMixins.SuspendsMachines
*/
protected StopMachineDetails<Integer> suspendAnyProvisionedMachines() {
@SuppressWarnings("unchecked")
MachineProvisioningLocation<MachineLocation> provisioner = entity().getAttribute(SoftwareProcess.PROVISIONING_LOCATION);
if (Iterables.isEmpty(entity().getLocations())) {
log.debug("No machine decommissioning necessary for " + entity() + " - no locations");
return new StopMachineDetails<>("No machine suspend necessary - no locations", 0);
}
// Only release this machine if we ourselves provisioned it (e.g. it might be running other services)
if (provisioner == null) {
log.debug("No machine decommissioning necessary for " + entity() + " - did not provision");
return new StopMachineDetails<>("No machine suspend necessary - did not provision", 0);
}
Location machine = getLocation(null);
if (!(machine instanceof MachineLocation)) {
log.debug("No decommissioning necessary for " + entity() + " - not a machine location (" + machine + ")");
return new StopMachineDetails<>("No machine suspend necessary - not a machine (" + machine + ")", 0);
}
if (!(provisioner instanceof SuspendsMachines)) {
log.debug("Location provisioner ({}) cannot suspend machines", provisioner);
throw new UnsupportedOperationException("Location provisioner cannot suspend machines: " + provisioner);
}
clearEntityLocationAttributes(machine);
SuspendsMachines.class.cast(provisioner).suspendMachine(MachineLocation.class.cast(machine));
return new StopMachineDetails<>("Suspended " + machine, 1);
}
/**
* Nulls the attached entity's hostname, address, subnet hostname and subnet address sensors
* and removes the given machine from its locations.
*/
protected void clearEntityLocationAttributes(Location machine) {
entity().removeLocations(ImmutableList.of(machine));
entity().sensors().set(Attributes.HOSTNAME, null);
entity().sensors().set(Attributes.ADDRESS, null);
entity().sensors().set(Attributes.SUBNET_HOSTNAME, null);
entity().sensors().set(Attributes.SUBNET_ADDRESS, null);
}
// Removes the checked Exception from the method signature
public static class CloseableLatch implements AutoCloseable {
private Entity caller;
private ReleaseableLatch releaseableLatch;
public CloseableLatch(Entity caller, ReleaseableLatch releaseableLatch) {
this.caller = caller;
this.releaseableLatch = releaseableLatch;
}
@Override
public void close() {
DynamicTasks.drain(null, false);
releaseableLatch.release(caller);
}
}
public static CloseableLatch waitForCloseableLatch(Entity entity, ConfigKey<Boolean> configKey) {
ReleaseableLatch releaseableLatch = waitForLatch((EntityInternal)entity, configKey);
return new CloseableLatch(entity, releaseableLatch);
}
private static ReleaseableLatch waitForLatch(EntityInternal entity, ConfigKey<Boolean> configKey) {
Maybe<?> rawValue = entity.config().getRaw(configKey);
if (rawValue.isAbsent()) {
return ReleaseableLatch.NOP;
} else {
ValueResolverIterator<Boolean> iter = resolveLatchIterator(entity, rawValue.get(), configKey);
// The iterator is used to prevent coercion; the value should always be the last one, but iter.last() will return a coerced Boolean
Maybe<ReleaseableLatch> releasableLatchMaybe = iter.next(ReleaseableLatch.class);
if (releasableLatchMaybe.isPresent()) {
ReleaseableLatch latch = releasableLatchMaybe.get();
log.debug("{} finished waiting for {} (value {}); waiting to acquire the latch", new Object[] {entity, configKey, latch});
Tasks.setBlockingDetails("Acquiring " + configKey + " " + latch);
try {
latch.acquire(entity);
} finally {
Tasks.resetBlockingDetails();
}
log.debug("{} Acquired latch {} (value {}); continuing...", new Object[] {entity, configKey, latch});
return latch;
} else {
// If iter.next() above returned absent due to a resolve error next line will throw with the cause
Boolean val = iter.last().get();
if (rawValue != null) log.debug("{} finished waiting for {} (value {}); continuing...", new Object[] {entity, configKey, val});
return ReleaseableLatch.NOP;
}
}
}
private static ValueResolverIterator<Boolean> resolveLatchIterator(EntityInternal entity, Object val, ConfigKey<Boolean> key) {
return Tasks.resolving(val, Boolean.class)
.context(entity.getExecutionContext())
.description("config " + key.getName())
.iterator();
}
}