blob: d3d513be3cc71efb49dfbb46eaf909423723b04a [file] [log] [blame]
package brooklyn.entity.basic
import groovy.time.Duration;
import java.util.Collection
import java.util.Map
import java.util.Set
import java.util.concurrent.TimeUnit
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import brooklyn.entity.ConfigKey
import brooklyn.entity.Entity
import brooklyn.entity.basic.lifecycle.StartStopDriver
import brooklyn.entity.basic.lifecycle.StartStopSshDriver
import brooklyn.entity.trait.Startable
import brooklyn.event.AttributeSensor
import brooklyn.event.adapter.ConfigSensorAdapter
import brooklyn.event.adapter.SensorRegistry
import brooklyn.event.basic.BasicAttributeSensor
import brooklyn.event.basic.BasicAttributeSensorAndConfigKey
import brooklyn.event.basic.BasicConfigKey
import brooklyn.event.basic.PortAttributeSensorAndConfigKey
import brooklyn.location.Location
import brooklyn.location.MachineLocation
import brooklyn.location.MachineProvisioningLocation
import brooklyn.location.NoMachinesAvailableException
import brooklyn.location.PortRange
import brooklyn.location.basic.LocalhostMachineProvisioningLocation;
import brooklyn.location.basic.SshMachineLocation
import brooklyn.location.basic.jclouds.JcloudsLocation.JcloudsSshMachineLocation
import brooklyn.util.flags.SetFromFlag
import brooklyn.util.internal.Repeater
import com.google.common.base.Preconditions
import com.google.common.base.Predicate
import com.google.common.collect.Iterables
/**
* An {@link Entity} representing a piece of software which can be installed, run, and controlled.
* A single such entity can only run on a single {@link MachineLocation} at a time (you can have multiple on the machine).
* It typically takes config keys for suggested versions, filesystem locations to use, and environment variables to set.
* <p>
* It exposes sensors for service state (Lifecycle) and status (String), and for host info, log file location.
*/
public abstract class SoftwareProcessEntity extends AbstractEntity implements Startable {
private static final Logger log = LoggerFactory.getLogger(SoftwareProcessEntity.class)
@SetFromFlag("startLatch")
public static final ConfigKey<String> START_LATCH = ConfigKeys.START_LATCH
@SetFromFlag("installLatch")
public static final ConfigKey<String> INSTALL_LATCH = ConfigKeys.INSTALL_LATCH
@SetFromFlag("customizeLatch")
public static final ConfigKey<String> CUSTOMIZE_LATCH = ConfigKeys.CUSTOMIZE_LATCH
@SetFromFlag("launchLatch")
public static final ConfigKey<String> LAUNCH_LATCH = ConfigKeys.LAUNCH_LATCH
@SetFromFlag("version")
public static final ConfigKey<String> SUGGESTED_VERSION = ConfigKeys.SUGGESTED_VERSION
@SetFromFlag("installDir")
public static final ConfigKey<String> SUGGESTED_INSTALL_DIR = ConfigKeys.SUGGESTED_INSTALL_DIR
@SetFromFlag("runDir")
public static final ConfigKey<String> SUGGESTED_RUN_DIR = ConfigKeys.SUGGESTED_RUN_DIR
@SetFromFlag("env")
public static final BasicConfigKey<Map> SHELL_ENVIRONMENT = [ Map, "shell.env", "Map of environment variables to pass to the runtime shell", [:] ]
public static final AttributeSensor<String> HOSTNAME = Attributes.HOSTNAME
public static final AttributeSensor<String> ADDRESS = Attributes.ADDRESS
public static final BasicAttributeSensor<Lifecycle> SERVICE_STATE = Attributes.SERVICE_STATE
private MachineProvisioningLocation provisioningLoc
private StartStopDriver driverLocal
protected transient SensorRegistry sensorRegistry
public SoftwareProcessEntity(Map properties=[:], Entity owner=null) {
super(properties, owner)
setAttribute(SERVICE_UP, false)
setAttribute(SERVICE_STATE, Lifecycle.CREATED)
}
protected void setProvisioningLocation(MachineProvisioningLocation val) {
if (provisioningLoc) throw new IllegalStateException("Cannot change provisioning location: existing="+provisioningLoc+"; new="+val)
provisioningLoc = val
}
protected MachineProvisioningLocation getProvisioningLocation() {
return provisioningLoc
}
public StartStopDriver getDriver() { driverLocal }
/**
* @deprecated will be deleted in 0.5. Refer to driver instead
*/
@Deprecated
public StartStopDriver getSetup() { driver }
protected abstract StartStopDriver newDriver(SshMachineLocation loc);
protected void preStart() {
if (!sensorRegistry) sensorRegistry = new SensorRegistry(this)
ConfigSensorAdapter.apply(this);
}
protected void postStart() {
connectSensors()
checkAllSensorsConnected()
}
protected void postActivation() {
}
/** lifecycle message for connecting sensors to registry;
* typically overridden by subclasses */
protected void connectSensors() {
}
protected void checkAllSensorsConnected() {
//TODO warn if we don't have sensors wired up to something
/*
what about sensors where it doesn't necessarily make sense to register them here, e.g.:
config -- easy to exclude (by type)
lifecycle, member added -- ??
enriched sensors -- could add the policies here
proposed solution:
- ignore if registered
- ignore is has a value
- ignore if manually excluded (registered with "manual provider"), e.g.
sensorRegistry.register(ManualSensorAdaptor).register(SOME_MANUAL_SENSOR)
those which are updated by a policy need to get recorded somehow
*/
}
public void waitForServiceUp() {
waitForServiceUp(60*TimeUnit.SECONDS)
}
public void waitForServiceUp(Duration duration) {
if (!Repeater.create(timeout:duration, description:"Waiting for SERVICE_UP on ${this}")
.rethrowException().repeat().every(1*TimeUnit.SECONDS)
.until() {
getAttribute(SERVICE_UP)
}
.run()) {
throw new IllegalStateException("Could not determine if ${this} is up");
}
log.info("Service ${this} is running")
}
public void checkModifiable() {
def state = getAttribute(SERVICE_STATE);
if (getAttribute(SERVICE_STATE) == Lifecycle.RUNNING) return;
if (getAttribute(SERVICE_STATE) == Lifecycle.STARTING) return;
// TODO this check may be redundant or even inappropriate
throw new IllegalStateException("Cannot configure entity ${this} in state ${state}")
}
protected void preStop() { }
@Override
public void start(Collection<? extends Location> locations) {
log.info("Starting software process entity "+this+" at "+locations);
setAttribute(SERVICE_STATE, Lifecycle.STARTING)
if (!sensorRegistry) sensorRegistry = new SensorRegistry(this)
startInLocation locations
postStart()
sensorRegistry.activateAdapters()
postActivation()
if (getAttribute(SERVICE_STATE) == Lifecycle.STARTING)
setAttribute(SERVICE_STATE, Lifecycle.RUNNING);
}
public void startInLocation(Collection<Location> locations) {
Preconditions.checkArgument locations.size() == 1
Location location = Iterables.getOnlyElement(locations)
startInLocation(location)
}
public void startInLocation(MachineProvisioningLocation location) {
Map<String,Object> flags = location.getProvisioningFlags([ getClass().getName() ])
if (!flags.inboundPorts) {
def ports = getRequiredOpenPorts();
if (ports) flags.inboundPorts = getRequiredOpenPorts()
}
if (!(location in LocalhostMachineProvisioningLocation))
LOG.info("SoftwareProcessEntity {} obtaining a new location instance in {} with ports {}", this, location, flags.inboundPorts)
provisioningLoc = location
SshMachineLocation machine = location.obtain(flags)
if (machine == null) throw new NoMachinesAvailableException(location)
if (!(location in LocalhostMachineProvisioningLocation))
LOG.info("SoftwareProcessEntity {} obtained a new location instance {}, now preparing process there", this, machine)
startInLocation(machine)
}
/** returns the ports that this entity wants to use;
* default implementation returns 22 plus first value for each PortAttributeSensorAndConfigKey config key PortRange.
*/
protected Collection<Integer> getRequiredOpenPorts() {
Set<Integer> ports = [22]
for (ConfigKey k: getConfigKeys().values()) {
if (PortRange.class.isAssignableFrom(k.getType())) {
PortRange p = getConfig(k);
if (p != null && !p.isEmpty()) ports += p.iterator().next()
}
}
log.debug("getRequiredOpenPorts detected default ${ports} for ${this}")
ports
}
public String getLocalHostname() {
Location where = Iterables.getFirst(locations, null)
String hostname = null
if (where in JcloudsSshMachineLocation)
hostname = ((JcloudsSshMachineLocation) where).getSubnetHostname()
if (!hostname && where in SshMachineLocation)
hostname = ((SshMachineLocation) where).getAddress()?.hostAddress
log.debug("computed hostname ${hostname} for ${this}")
if (!hostname)
throw new IllegalStateException("Cannot find hostname for ${this} at location ${where}")
return hostname
}
public void startInLocation(SshMachineLocation machine) {
locations.add(machine)
initDriver(machine)
// Note: must only apply config-sensors after adding to locations and creating driver;
// otherwise can't do things like acquire free port from location, or allowing driver to set up ports
// TODO use different method naming/overriding pattern, so as we have more things than SshMachineLocation they all still get called?
ConfigSensorAdapter.apply(this);
setAttribute(HOSTNAME, machine.address.hostName)
setAttribute(ADDRESS, machine.address.hostAddress)
// Opportunity to block startup until other dependent components are available
Object val = getConfig(START_LATCH)
if (val != null) LOG.debug("{} finished waiting for start-latch; continuing...", this, val)
if (driver) {
preStart();
driver.start()
waitForEntityStart()
} else {
throw new UnsupportedOperationException("cannot start ${this} on ${machine}: no setup class found");
}
}
protected void initDriver(SshMachineLocation machine) {
if (driver!=null) {
if ((driver in StartStopSshDriver) && ( ((StartStopSshDriver)driver).location==machine)) {
//just reuse
} else {
log.warn("driver/location change for {} is untested: cannot start ${this} on ${machine}: driver already created");
driverLocal = newDriver(machine)
}
} else {
driverLocal = newDriver(machine)
}
}
// TODO Find a better way to detect early death of process.
public void waitForEntityStart() throws IllegalStateException {
if (log.isDebugEnabled()) log.debug "waiting to ensure $this doesn't abort prematurely"
long startTime = System.currentTimeMillis()
long waitTime = startTime + 75000 // FIXME magic number; should be config key with default value?
boolean isRunningResult = false;
while (!isRunningResult && System.currentTimeMillis() < waitTime) {
Thread.sleep 1000 // FIXME magic number; should be config key with default value?
isRunningResult = driver.isRunning()
if (log.isDebugEnabled()) log.debug "checked {}, is running returned: {}", this, isRunningResult
}
if (!isRunningResult) {
log.warn("Software process entity ${this} did not appear to start; setting state to indicate problem; consult logs for more details")
setAttribute(SERVICE_STATE, Lifecycle.ON_FIRE)
}
}
public void stop() {
// TODO There is a race where we set SERVICE_UP=false while sensor-adapter threads may still be polling.
// The other thread might reset SERVICE_UP to true immediately after we set it to false here.
// Deactivating adapters before setting SERVICE_UP reduces the race, and it is reduced further by setting
// SERVICE_UP to false at the end of stop as well.
if (getAttribute(SERVICE_STATE)==Lifecycle.STOPPED) {
log.warn("Skipping stop of software process entity "+this+" when already stopped");
return;
}
log.info("Stopping software process entity "+this);
setAttribute(SERVICE_STATE, Lifecycle.STOPPING)
if (sensorRegistry!=null) sensorRegistry.deactivateAdapters();
setAttribute(SERVICE_UP, false)
preStop()
MachineLocation machine = removeFirstMatchingLocation({ it in MachineLocation })
if (machine) {
stopInLocation(machine)
}
setAttribute(SERVICE_UP, false)
setAttribute(SERVICE_STATE, Lifecycle.STOPPED)
if (log.isDebugEnabled()) log.debug("Stopped software process entity "+this);
}
Location removeFirstMatchingLocation(Closure matcher) {
synchronized (locations) {
Location loc = locations.find(matcher)
if (loc) locations.remove(loc)
return loc
}
return removeFirstMatchingLocation(matcher as Predicate)
}
Location removeFirstMatchingLocation(Predicate<Location> matcher) {
return removeFirstMatchingLocation({ return matcher.apply(it) })
}
public void stopInLocation(MachineLocation machine) {
if (sensorRegistry) sensorRegistry.close()
if (driver) driver.stop()
// Only release this machine if we ourselves provisioned it (e.g. it might be running other services)
provisioningLoc?.release(machine)
driverLocal = null;
}
public void restart() {
if (driver) driver.restart()
else throw new IllegalStateException("entity "+this+" not set up for operations (restart)")
//if successfully restarts
setAttribute(SERVICE_STATE, Lifecycle.RUNNING);
}
}
public interface UsesJava {
public static final BasicConfigKey<Map<String, String>> JAVA_OPTIONS = [ Map, "java.options", "Java command line options", [:] ]
}
public interface UsesJmx extends UsesJava {
public static final int DEFAULT_JMX_PORT = 1099; // RMI port?
@SetFromFlag("jmxPort")
public static final PortAttributeSensorAndConfigKey JMX_PORT = Attributes.JMX_PORT
@SetFromFlag("rmiPort")
public static final PortAttributeSensorAndConfigKey RMI_PORT = Attributes.RMI_PORT
@SetFromFlag("jmxContext")
public static final BasicAttributeSensorAndConfigKey<String> JMX_CONTEXT = Attributes.JMX_CONTEXT
public static final BasicAttributeSensor<String> JMX_URL = [ String, "jmx.url", "JMX URL" ]
}
public interface UsesJavaMXBeans {
@SetFromFlag("mxbeanStatsEnabled")
public static final BasicConfigKey<Boolean> MXBEAN_STATS_ENABLED = [ Boolean, "java.metrics.mxbeanStatsEnabled", "Enables collection of JVM stats from the MXBeans, such as memory and thread usage (default is true)", true ]
public static final BasicAttributeSensor<Long> USED_HEAP_MEMORY = [ Long, "java.metrics.heap.used", "current heap size in bytes" ]
public static final BasicAttributeSensor<Long> INIT_HEAP_MEMORY = [ Long, "java.metrics.heap.init", "initial heap size in bytes" ]
public static final BasicAttributeSensor<Long> COMMITTED_HEAP_MEMORY = [ Long, "java.metrics.heap.committed", "commited heap size in bytes" ]
public static final BasicAttributeSensor<Long> MAX_HEAP_MEMORY = [ Long, "java.metrics.heap.max", "max heap size in bytes" ]
public static final BasicAttributeSensor<Long> NON_HEAP_MEMORY_USAGE = [ Long, "java.metrics.nonheap.used", "current non-heap size in bytes" ]
public static final BasicAttributeSensor<Integer> CURRENT_THREAD_COUNT = [ Integer, "java.metrics.threads.current", "current number of threads" ]
public static final BasicAttributeSensor<Integer> PEAK_THREAD_COUNT = [ Integer, "java.metrics.threads.max", "peak number of threads" ]
//operating system attributes
public static final BasicAttributeSensor<Long> START_TIME = [ Long, "java.metrics.starttime", "start time" ]
public static final BasicAttributeSensor<Long> UP_TIME = [ Long, "java.metrics.uptime", "the uptime" ]
public static final BasicAttributeSensor<Integer> AVAILABLE_PROCESSORS = [ Integer, "java.metrics.processors.available", "number of processors available to the Java virtual machine" ]
public static final BasicAttributeSensor<Double> SYSTEM_LOAD_AVERAGE = [ Double, "java.metrics.systemload.average", "average system load" ]
public static final BasicAttributeSensor<Long> TOTAL_PHYSICAL_MEMORY_SIZE = [ Long, "java.metrics.physicalmemory.total", "The physical memory available to the operating system" ]
public static final BasicAttributeSensor<Long> FREE_PHYSICAL_MEMORY_SIZE = [ Long, "java.metrics.physicalmemory.free", "The free memory available to the operating system" ]
public static final BasicAttributeSensor<Long> GARBAGE_COLLECTION_TIME = [ Map, "java.metrics.gc.time", "garbage collection time" ]
}