blob: 847d76d2e679b76e8e1dab786063123106a51a16 [file] [log] [blame]
package brooklyn.entity.messaging.qpid
import java.util.Collection
import java.util.Map
import javax.management.ObjectName
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import brooklyn.entity.Entity
import brooklyn.entity.basic.Attributes
import brooklyn.entity.basic.SoftwareProcessEntity
import brooklyn.entity.basic.UsesJmx
import brooklyn.entity.messaging.Queue
import brooklyn.entity.messaging.Topic
import brooklyn.entity.messaging.amqp.AmqpExchange
import brooklyn.entity.messaging.amqp.AmqpServer;
import brooklyn.entity.messaging.jms.JMSBroker;
import brooklyn.entity.messaging.jms.JMSDestination;
import brooklyn.event.adapter.JmxHelper
import brooklyn.event.adapter.JmxSensorAdapter
import brooklyn.event.adapter.SensorRegistry
import brooklyn.event.basic.BasicAttributeSensorAndConfigKey
import brooklyn.event.basic.BasicConfigKey
import brooklyn.event.basic.PortAttributeSensorAndConfigKey;
import brooklyn.location.PortRange
import brooklyn.location.basic.SshMachineLocation
import brooklyn.util.flags.SetFromFlag
/**
* An {@link brooklyn.entity.Entity} that represents a single Qpid broker instance, using AMQP 0-10.
*/
public class QpidBroker extends JMSBroker<QpidQueue, QpidTopic> implements UsesJmx, AmqpServer {
private static final Logger log = LoggerFactory.getLogger(QpidBroker.class)
/* Qpid runtime file locations for convenience. */
public static final String CONFIG_XML = "etc/config.xml"
public static final String VIRTUALHOSTS_XML = "etc/virtualhosts.xml"
public static final String PASSWD = "etc/passwd"
@SetFromFlag("version")
public static final BasicConfigKey<String> SUGGESTED_VERSION = [ SoftwareProcessEntity.SUGGESTED_VERSION, "0.14" ]
@SetFromFlag("amqpPort")
public static final PortAttributeSensorAndConfigKey AMQP_PORT = AmqpServer.AMQP_PORT
@SetFromFlag("virtualHost")
public static final BasicAttributeSensorAndConfigKey<String> VIRTUAL_HOST_NAME = AmqpServer.VIRTUAL_HOST_NAME
@SetFromFlag("amqpVersion")
public static final BasicAttributeSensorAndConfigKey<String> AMQP_VERSION = [ AmqpServer.AMQP_VERSION, AmqpServer.AMQP_0_10 ]
/** Files to be copied to the server, map of "subpath/file.name": "classpath://foo/file.txt" (or other url) */
@SetFromFlag("runtimeFiles")
public static final BasicConfigKey<Map> RUNTIME_FILES = [ Map, "qpid.files.runtime", "Map of files to be copied, keyed by destination name relative to runDir" ]
//TODO if this is included, AbstractEntity complains about multiple sensors;
// //should be smart enough to exclude;
// //also, we'd prefer to hide this from being configurable full stop
// /** not configurable; must be 100 more than JMX port */
// public static final PortAttributeSensorAndConfigKey RMI_PORT = [ UsesJmx.RMI_PORT, 9101 ]
public String getVirtualHost() { return getAttribute(VIRTUAL_HOST_NAME) }
public String getAmqpVersion() { return getAttribute(AMQP_VERSION) }
public Integer getAmqpPort() { return getAttribute(AMQP_PORT) }
public QpidBroker(Map properties=[:], Entity owner=null) {
super(properties, owner)
// TODO test, then change keys to be jmxUser, jmxPassword, configurable on the keys themselves
setConfigIfValNonNull(Attributes.JMX_USER, properties.user ?: "admin")
setConfigIfValNonNull(Attributes.JMX_PASSWORD, properties.password ?: "admin")
}
public void setBrokerUrl() {
String urlFormat = "amqp://guest:guest@/%s?brokerlist='tcp://%s:%d'"
setAttribute(BROKER_URL, String.format(urlFormat, getAttribute(VIRTUAL_HOST_NAME), getAttribute(HOSTNAME), getAttribute(AMQP_PORT)))
}
public QpidQueue createQueue(Map properties) {
return new QpidQueue(properties, this)
}
public QpidTopic createTopic(Map properties) {
return new QpidTopic(properties, this)
}
public QpidSshDriver newDriver(SshMachineLocation machine) {
return new QpidSshDriver(this, machine)
}
@Override
protected Collection<Integer> getRequiredOpenPorts() {
Set<Integer> ports = super.getRequiredOpenPorts() + getAttribute(AMQP_PORT)
Integer jmx = getAttribute(JMX_PORT)
if (jmx) ports += (jmx + 100)
log.debug("getRequiredOpenPorts detected expanded (qpid) ports ${ports} for ${this}")
ports
}
@Override
protected void preStart() {
super.preStart();
// NOTE difference of 100 hard-coded in Qpid - RMI port ignored
setAttribute(RMI_PORT, getAttribute(JMX_PORT) + 100)
}
transient JmxSensorAdapter jmxAdapter;
@Override
protected void connectSensors() {
jmxAdapter = sensorRegistry.register(new JmxSensorAdapter())
jmxAdapter.objectName("org.apache.qpid:type=ServerInformation,name=ServerInformation")
.attribute("ProductVersion")
.subscribe(SERVICE_UP) {
if (it == null) return false
if (it == getConfig(SUGGESTED_VERSION)) return true
log.warn("ProductVersion is ${it}, requested version is {}", getConfig(SUGGESTED_VERSION))
return false
}
jmxAdapter.activateAdapter()
setAttribute(Attributes.JMX_USER)
setAttribute(Attributes.JMX_PASSWORD)
}
@Override
public Collection<String> toStringFieldsToInclude() {
return super.toStringFieldsToInclude() + [ 'amqpPort' ]
}
}
public abstract class QpidDestination extends JMSDestination implements AmqpExchange {
public static final Logger log = LoggerFactory.getLogger(QpidDestination.class);
@SetFromFlag
String virtualHost
protected ObjectName virtualHostManager
protected ObjectName exchange
protected transient SensorRegistry sensorRegistry
protected transient JmxSensorAdapter jmxAdapter
public QpidDestination(Map properties=[:], Entity owner=null) {
super(properties, owner)
}
public void init() {
if (!virtualHost) virtualHost = getConfig(QpidBroker.VIRTUAL_HOST_NAME)
setAttribute(QpidBroker.VIRTUAL_HOST_NAME, virtualHost)
virtualHostManager = new ObjectName("org.apache.qpid:type=VirtualHost.VirtualHostManager,VirtualHost=\"${virtualHost}\"")
if (!sensorRegistry) sensorRegistry = new SensorRegistry(this)
def helper = new JmxHelper(owner)
helper.connect();
jmxAdapter = sensorRegistry.register(new JmxSensorAdapter(helper));
}
public void create() {
jmxAdapter.helper.operation(virtualHostManager, "createNewQueue", name, getOwner().getAttribute(Attributes.JMX_USER), true)
jmxAdapter.helper.operation(exchange, "createNewBinding", name, name)
connectSensors()
sensorRegistry.activateAdapters()
}
public void delete() {
jmxAdapter.helper.operation(exchange, "removeBinding", name, name)
jmxAdapter.helper.operation(virtualHostManager, "deleteQueue", name)
sensorRegistry.deactivateAdapters()
}
/**
* Return the AMQP name for the queue.
*/
public String getQueueName() {
if (getOwner().amqpVersion == AmqpServer.AMQP_0_10) {
return String.format("'%s'/'%s'; { assert: never }", exchangeName, name)
} else {
return name
}
}
@Override
public Collection<String> toStringFieldsToInclude() {
return super.toStringFieldsToInclude() + ['name']
}
}
public class QpidQueue extends QpidDestination implements Queue {
public QpidQueue(Map properties=[:], Entity owner=null) {
super(properties, owner)
}
@Override
public void init() {
setAttribute QUEUE_NAME, name
super.init()
exchange = new ObjectName("org.apache.qpid:type=VirtualHost.Exchange,VirtualHost=\"${virtualHost}\",name=\"${exchangeName}\",ExchangeType=direct")
}
public void connectSensors() {
String queue = "org.apache.qpid:type=VirtualHost.Queue,VirtualHost=\"${virtualHost}\",name=\"${name}\""
jmxAdapter.objectName(queue).with {
attribute("QueueDepth").poll(QUEUE_DEPTH_BYTES)
attribute("MessageCount").poll(QUEUE_DEPTH_MESSAGES)
}
}
/** {@inheritDoc} */
public String getExchangeName() { AmqpExchange.DIRECT }
}
public class QpidTopic extends QpidDestination implements Topic {
public QpidTopic(Map properties=[:], Entity owner=null) {
super(properties, owner)
}
// TODO sensors
public void connectSensors() { }
@Override
public void init() {
setAttribute TOPIC_NAME, name
super.init()
exchange = new ObjectName("org.apache.qpid:type=VirtualHost.Exchange,VirtualHost=\"${virtualHost}\",name=\"${exchangeName}\",ExchangeType=topic")
}
/** {@inheritDoc} */
public String getExchangeName() { AmqpExchange.TOPIC }
public String getTopicName() { queueName }
}