blob: c90b8c36828f449cb6e5c00bebc55a25104634f8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.junit;
import static org.apache.activemq.command.ActiveMQDestination.QUEUE_TYPE;
import java.io.Serializable;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Map;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.plugin.StatisticsBrokerPlugin;
import org.apache.activemq.pool.PooledConnectionFactory;
import org.junit.rules.ExternalResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A JUnit Rule that embeds an ActiveMQ broker into a test.
*/
public class EmbeddedActiveMQBroker extends ExternalResource {
Logger log = LoggerFactory.getLogger(this.getClass());
BrokerService brokerService;
InternalClient internalClient;
/**
* Create an embedded ActiveMQ broker using defaults
* <p>
* The defaults are:
* - the broker name is 'embedded-broker'
* - JMX is enable but no management connector is created.
* - Persistence is disabled
*/
public EmbeddedActiveMQBroker() {
brokerService = new BrokerService();
brokerService.setUseJmx(true);
brokerService.getManagementContext().setCreateConnector(false);
brokerService.setUseShutdownHook(false);
brokerService.setPersistent(false);
brokerService.setBrokerName("embedded-broker");
}
/**
* Create an embedded ActiveMQ broker using a configuration URI
*/
public EmbeddedActiveMQBroker(String configurationURI) {
try {
brokerService = BrokerFactory.createBroker(configurationURI);
} catch (Exception ex) {
throw new RuntimeException("Exception encountered creating embedded ActiveMQ broker from configuration URI: " + configurationURI, ex);
}
}
/**
* Create an embedded ActiveMQ broker using a configuration URI
*/
public EmbeddedActiveMQBroker(URI configurationURI) {
try {
brokerService = BrokerFactory.createBroker(configurationURI);
} catch (Exception ex) {
throw new RuntimeException("Exception encountered creating embedded ActiveMQ broker from configuration URI: " + configurationURI, ex);
}
}
public static void setMessageProperties(Message message, Map<String, Object> properties) {
if (properties != null && properties.size() > 0) {
for (Map.Entry<String, Object> property : properties.entrySet()) {
try {
message.setObjectProperty(property.getKey(), property.getValue());
} catch (JMSException jmsEx) {
throw new EmbeddedActiveMQBrokerException(String.format("Failed to set property {%s = %s}", property.getKey(), property.getValue().toString()), jmsEx);
}
}
}
}
/**
* Customize the configuration of the embedded ActiveMQ broker
* <p>
* This method is called before the embedded ActiveMQ broker is started, and can
* be overridden to this method to customize the broker configuration.
*/
protected void configure() {
}
/**
* Start the embedded ActiveMQ broker, blocking until the broker has successfully started.
* <p/>
* The broker will normally be started by JUnit using the before() method. This method allows the broker to
* be started manually to support advanced testing scenarios.
*/
public void start() {
try {
this.configure();
brokerService.start();
internalClient = new InternalClient();
internalClient.start();
} catch (Exception ex) {
throw new RuntimeException("Exception encountered starting embedded ActiveMQ broker: {}" + this.getBrokerName(), ex);
}
brokerService.waitUntilStarted();
}
/**
* Stop the embedded ActiveMQ broker, blocking until the broker has stopped.
* <p/>
* The broker will normally be stopped by JUnit using the after() method. This method allows the broker to
* be stopped manually to support advanced testing scenarios.
*/
public void stop() {
if (internalClient != null) {
internalClient.stop();
internalClient = null;
}
if (!brokerService.isStopped()) {
try {
brokerService.stop();
} catch (Exception ex) {
log.warn("Exception encountered stopping embedded ActiveMQ broker: {}" + this.getBrokerName(), ex);
}
}
brokerService.waitUntilStopped();
}
/**
* Start the embedded ActiveMQ Broker
* <p/>
* Invoked by JUnit to setup the resource
*/
@Override
protected void before() throws Throwable {
log.info("Starting embedded ActiveMQ broker: {}", this.getBrokerName());
this.start();
super.before();
}
/**
* Stop the embedded ActiveMQ Broker
* <p/>
* Invoked by JUnit to tear down the resource
*/
@Override
protected void after() {
log.info("Stopping Embedded ActiveMQ Broker: {}", this.getBrokerName());
super.after();
this.stop();
}
/**
* Create an ActiveMQConnectionFactory for the embedded ActiveMQ Broker
*
* @return a new ActiveMQConnectionFactory
*/
public ActiveMQConnectionFactory createConnectionFactory() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL(getVmURL());
return connectionFactory;
}
/**
* Create an PooledConnectionFactory for the embedded ActiveMQ Broker
*
* @return a new PooledConnectionFactory
*/
public PooledConnectionFactory createPooledConnectionFactory() {
ActiveMQConnectionFactory connectionFactory = createConnectionFactory();
PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(connectionFactory);
return pooledConnectionFactory;
}
/**
* Get the BrokerService for the embedded ActiveMQ broker.
* <p/>
* This may be required for advanced configuration of the BrokerService.
*
* @return the embedded ActiveMQ broker
*/
public BrokerService getBrokerService() {
return brokerService;
}
/**
* Get the failover VM URL for the embedded ActiveMQ Broker
* <p/>
* NOTE: The create=false option is appended to the URL to avoid the automatic creation of brokers
* and the resulting duplicate broker errors
*
* @return the VM URL for the embedded broker
*/
public String getVmURL() {
return getVmURL(true);
}
/**
* Get the VM URL for the embedded ActiveMQ Broker
* <p/>
* NOTE: The create=false option is appended to the URL to avoid the automatic creation of brokers
* and the resulting duplicate broker errors
*
* @param failoverURL if true a failover URL will be returned
* @return the VM URL for the embedded broker
*/
public String getVmURL(boolean failoverURL) {
if (failoverURL) {
return String.format("failover:(%s?create=false)", brokerService.getVmConnectorURI().toString());
}
return brokerService.getVmConnectorURI().toString() + "?create=false";
}
/**
* Get the failover VM URI for the embedded ActiveMQ Broker
* <p/>
* NOTE: The create=false option is appended to the URI to avoid the automatic creation of brokers
* and the resulting duplicate broker errors
*
* @return the VM URI for the embedded broker
*/
public URI getVmURI() {
return getVmURI(true);
}
/**
* Get the VM URI for the embedded ActiveMQ Broker
* <p/>
* NOTE: The create=false option is appended to the URI to avoid the automatic creation of brokers
* and the resulting duplicate broker errors
*
* @param failoverURI if true a failover URI will be returned
* @return the VM URI for the embedded broker
*/
public URI getVmURI(boolean failoverURI) {
URI result;
try {
result = new URI(getVmURL(failoverURI));
} catch (URISyntaxException uriEx) {
throw new RuntimeException("Unable to create failover URI", uriEx);
}
return result;
}
/**
* Get the name of the embedded ActiveMQ Broker
*
* @return name of the embedded broker
*/
public String getBrokerName() {
return brokerService.getBrokerName();
}
public void setBrokerName(String brokerName) {
brokerService.setBrokerName(brokerName);
}
public boolean isStatisticsPluginEnabled() {
BrokerPlugin[] plugins = brokerService.getPlugins();
if (null != plugins) {
for (BrokerPlugin plugin : plugins) {
if (plugin instanceof StatisticsBrokerPlugin) {
return true;
}
}
}
return false;
}
public void enableStatisticsPlugin() {
if (!isStatisticsPluginEnabled()) {
BrokerPlugin[] newPlugins;
BrokerPlugin[] currentPlugins = brokerService.getPlugins();
if (null != currentPlugins && 0 < currentPlugins.length) {
newPlugins = new BrokerPlugin[currentPlugins.length + 1];
System.arraycopy(currentPlugins, 0, newPlugins, 0, currentPlugins.length);
} else {
newPlugins = new BrokerPlugin[1];
}
newPlugins[newPlugins.length - 1] = new StatisticsBrokerPlugin();
brokerService.setPlugins(newPlugins);
}
}
public void disableStatisticsPlugin() {
if (isStatisticsPluginEnabled()) {
BrokerPlugin[] currentPlugins = brokerService.getPlugins();
if (1 < currentPlugins.length) {
BrokerPlugin[] newPlugins = new BrokerPlugin[currentPlugins.length - 1];
int i = 0;
for (BrokerPlugin plugin : currentPlugins) {
if (!(plugin instanceof StatisticsBrokerPlugin)) {
newPlugins[i++] = plugin;
}
}
brokerService.setPlugins(newPlugins);
} else {
brokerService.setPlugins(null);
}
}
}
public boolean isAdvisoryForDeliveryEnabled() {
return getDefaultPolicyEntry().isAdvisoryForDelivery();
}
public void enableAdvisoryForDelivery() {
getDefaultPolicyEntry().setAdvisoryForDelivery(true);
}
public void disableAdvisoryForDelivery() {
getDefaultPolicyEntry().setAdvisoryForDelivery(false);
}
public boolean isAdvisoryForConsumedEnabled() {
return getDefaultPolicyEntry().isAdvisoryForConsumed();
}
public void enableAdvisoryForConsumed() {
getDefaultPolicyEntry().setAdvisoryForConsumed(true);
}
public void disableAdvisoryForConsumed() {
getDefaultPolicyEntry().setAdvisoryForConsumed(false);
}
public boolean isAdvisoryForDiscardingMessagesEnabled() {
return getDefaultPolicyEntry().isAdvisoryForDiscardingMessages();
}
public void enableAdvisoryForDiscardingMessages() {
getDefaultPolicyEntry().setAdvisoryForDiscardingMessages(true);
}
public void disableAdvisoryForDiscardingMessages() {
getDefaultPolicyEntry().setAdvisoryForDiscardingMessages(false);
}
public boolean isAdvisoryForFastProducersEnabled() {
return getDefaultPolicyEntry().isAdvisoryForFastProducers();
}
public void enableAdvisoryForFastProducers() {
getDefaultPolicyEntry().setAdvisoryForFastProducers(true);
}
public void disableAdvisoryForFastProducers() {
getDefaultPolicyEntry().setAdvisoryForFastProducers(false);
}
public boolean isAdvisoryForSlowConsumersEnabled() {
return getDefaultPolicyEntry().isAdvisoryForSlowConsumers();
}
public void enableAdvisoryForSlowConsumers() {
getDefaultPolicyEntry().setAdvisoryForSlowConsumers(true);
}
public void disableAdvisoryForSlowConsumers() {
getDefaultPolicyEntry().setAdvisoryForSlowConsumers(false);
}
/**
* Get the number of messages in a specific JMS Destination.
* <p/>
* The full name of the JMS destination including the prefix should be provided - i.e. queue://myQueue
* or topic://myTopic. If the destination type prefix is not included in the destination name, a prefix
* of "queue://" is assumed.
*
* @param destinationName the full name of the JMS Destination
* @return the number of messages in the JMS Destination
*/
public long getMessageCount(String destinationName) {
if (null == brokerService) {
throw new IllegalStateException("BrokerService has not yet been created - was before() called?");
}
// TODO: Figure out how to do this for Topics
Destination destination = getDestination(destinationName);
if (destination == null) {
throw new RuntimeException("Failed to find destination: " + destinationName);
}
// return destination.getMessageStore().getMessageCount();
return destination.getDestinationStatistics().getMessages().getCount();
}
/**
* Get the ActiveMQ destination
* <p/>
* The full name of the JMS destination including the prefix should be provided - i.e. queue://myQueue
* or topic://myTopic. If the destination type prefix is not included in the destination name, a prefix
* of "queue://" is assumed.
*
* @param destinationName the full name of the JMS Destination
* @return the ActiveMQ destination, null if not found
*/
public Destination getDestination(String destinationName) {
if (null == brokerService) {
throw new IllegalStateException("BrokerService has not yet been created - was before() called?");
}
Destination destination = null;
try {
destination = brokerService.getDestination(ActiveMQDestination.createDestination(destinationName, QUEUE_TYPE));
} catch (RuntimeException runtimeEx) {
throw runtimeEx;
} catch (Exception ex) {
throw new EmbeddedActiveMQBrokerException("Unexpected exception getting destination from broker", ex);
}
return destination;
}
private PolicyEntry getDefaultPolicyEntry() {
PolicyMap destinationPolicy = brokerService.getDestinationPolicy();
if (null == destinationPolicy) {
destinationPolicy = new PolicyMap();
brokerService.setDestinationPolicy(destinationPolicy);
}
PolicyEntry defaultEntry = destinationPolicy.getDefaultEntry();
if (null == defaultEntry) {
defaultEntry = new PolicyEntry();
destinationPolicy.setDefaultEntry(defaultEntry);
}
return defaultEntry;
}
public BytesMessage createBytesMessage() {
return internalClient.createBytesMessage();
}
public TextMessage createTextMessage() {
return internalClient.createTextMessage();
}
public MapMessage createMapMessage() {
return internalClient.createMapMessage();
}
public ObjectMessage createObjectMessage() {
return internalClient.createObjectMessage();
}
public StreamMessage createStreamMessage() {
return internalClient.createStreamMessage();
}
public BytesMessage createMessage(byte[] body) {
return this.createMessage(body, null);
}
public TextMessage createMessage(String body) {
return this.createMessage(body, null);
}
public MapMessage createMessage(Map<String, Object> body) {
return this.createMessage(body, null);
}
public ObjectMessage createMessage(Serializable body) {
return this.createMessage(body, null);
}
public BytesMessage createMessage(byte[] body, Map<String, Object> properties) {
BytesMessage message = this.createBytesMessage();
if (body != null) {
try {
message.writeBytes(body);
} catch (JMSException jmsEx) {
throw new EmbeddedActiveMQBrokerException(String.format("Failed to set body {%s} on BytesMessage", new String(body)), jmsEx);
}
}
setMessageProperties(message, properties);
return message;
}
public TextMessage createMessage(String body, Map<String, Object> properties) {
TextMessage message = this.createTextMessage();
if (body != null) {
try {
message.setText(body);
} catch (JMSException jmsEx) {
throw new EmbeddedActiveMQBrokerException(String.format("Failed to set body {%s} on TextMessage", body), jmsEx);
}
}
setMessageProperties(message, properties);
return message;
}
public MapMessage createMessage(Map<String, Object> body, Map<String, Object> properties) {
MapMessage message = this.createMapMessage();
if (body != null) {
for (Map.Entry<String, Object> entry : body.entrySet()) {
try {
message.setObject(entry.getKey(), entry.getValue());
} catch (JMSException jmsEx) {
throw new EmbeddedActiveMQBrokerException(String.format("Failed to set body entry {%s = %s} on MapMessage", entry.getKey(), entry.getValue().toString()), jmsEx);
}
}
}
setMessageProperties(message, properties);
return message;
}
public ObjectMessage createMessage(Serializable body, Map<String, Object> properties) {
ObjectMessage message = this.createObjectMessage();
if (body != null) {
try {
message.setObject(body);
} catch (JMSException jmsEx) {
throw new EmbeddedActiveMQBrokerException(String.format("Failed to set body {%s} on ObjectMessage", body.toString()), jmsEx);
}
}
setMessageProperties(message, properties);
return message;
}
public void pushMessage(String destinationName, Message message) {
if (destinationName == null) {
throw new IllegalArgumentException("pushMessage failure - destination name is required");
} else if (message == null) {
throw new IllegalArgumentException("pushMessage failure - a Message is required");
}
ActiveMQDestination destination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE);
internalClient.pushMessage(destination, message);
}
public BytesMessage pushMessage(String destinationName, byte[] body) {
BytesMessage message = createMessage(body, null);
pushMessage(destinationName, message);
return message;
}
public TextMessage pushMessage(String destinationName, String body) {
TextMessage message = createMessage(body, null);
pushMessage(destinationName, message);
return message;
}
public MapMessage pushMessage(String destinationName, Map<String, Object> body) {
MapMessage message = createMessage(body, null);
pushMessage(destinationName, message);
return message;
}
public ObjectMessage pushMessage(String destinationName, Serializable body) {
ObjectMessage message = createMessage(body, null);
pushMessage(destinationName, message);
return message;
}
public BytesMessage pushMessageWithProperties(String destinationName, byte[] body, Map<String, Object> properties) {
BytesMessage message = createMessage(body, properties);
pushMessage(destinationName, message);
return message;
}
public TextMessage pushMessageWithProperties(String destinationName, String body, Map<String, Object> properties) {
TextMessage message = createMessage(body, properties);
pushMessage(destinationName, message);
return message;
}
public MapMessage pushMessageWithProperties(String destinationName, Map<String, Object> body, Map<String, Object> properties) {
MapMessage message = createMessage(body, properties);
pushMessage(destinationName, message);
return message;
}
public ObjectMessage pushMessageWithProperties(String destinationName, Serializable body, Map<String, Object> properties) {
ObjectMessage message = createMessage(body, properties);
pushMessage(destinationName, message);
return message;
}
public Message peekMessage(String destinationName) {
if (null == brokerService) {
throw new NullPointerException("peekMessage failure - BrokerService is null");
}
if (destinationName == null) {
throw new IllegalArgumentException("peekMessage failure - destination name is required");
}
ActiveMQDestination destination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE);
Destination brokerDestination = null;
try {
brokerDestination = brokerService.getDestination(destination);
} catch (Exception ex) {
throw new EmbeddedActiveMQBrokerException("peekMessage failure - unexpected exception getting destination from BrokerService", ex);
}
if (brokerDestination == null) {
throw new IllegalStateException(String.format("peekMessage failure - destination %s not found in broker %s", destination.toString(), brokerService.getBrokerName()));
}
org.apache.activemq.command.Message[] messages = brokerDestination.browse();
if (messages != null && messages.length > 0) {
return (Message) messages[0];
}
return null;
}
public BytesMessage peekBytesMessage(String destinationName) {
return (BytesMessage) peekMessage(destinationName);
}
public TextMessage peekTextMessage(String destinationName) {
return (TextMessage) peekMessage(destinationName);
}
public MapMessage peekMapMessage(String destinationName) {
return (MapMessage) peekMessage(destinationName);
}
public ObjectMessage peekObjectMessage(String destinationName) {
return (ObjectMessage) peekMessage(destinationName);
}
public StreamMessage peekStreamMessage(String destinationName) {
return (StreamMessage) peekMessage(destinationName);
}
public static class EmbeddedActiveMQBrokerException extends RuntimeException {
public EmbeddedActiveMQBrokerException(String message) {
super(message);
}
public EmbeddedActiveMQBrokerException(String message, Exception cause) {
super(message, cause);
}
}
private class InternalClient {
ActiveMQConnectionFactory connectionFactory;
Connection connection;
Session session;
MessageProducer producer;
public InternalClient() {
}
void start() {
connectionFactory = createConnectionFactory();
try {
connection = connectionFactory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
producer = session.createProducer(null);
connection.start();
} catch (JMSException jmsEx) {
throw new EmbeddedActiveMQBrokerException("Internal Client creation failure", jmsEx);
}
}
void stop() {
if (null != connection) {
try {
connection.close();
} catch (JMSException jmsEx) {
log.warn("JMSException encounter closing InternalClient connection - ignoring", jmsEx);
}
}
}
public BytesMessage createBytesMessage() {
checkSession();
try {
return session.createBytesMessage();
} catch (JMSException jmsEx) {
throw new EmbeddedActiveMQBrokerException("Failed to create BytesMessage", jmsEx);
}
}
public TextMessage createTextMessage() {
checkSession();
try {
return session.createTextMessage();
} catch (JMSException jmsEx) {
throw new EmbeddedActiveMQBrokerException("Failed to create TextMessage", jmsEx);
}
}
public MapMessage createMapMessage() {
checkSession();
try {
return session.createMapMessage();
} catch (JMSException jmsEx) {
throw new EmbeddedActiveMQBrokerException("Failed to create MapMessage", jmsEx);
}
}
public ObjectMessage createObjectMessage() {
checkSession();
try {
return session.createObjectMessage();
} catch (JMSException jmsEx) {
throw new EmbeddedActiveMQBrokerException("Failed to create ObjectMessage", jmsEx);
}
}
public StreamMessage createStreamMessage() {
checkSession();
try {
return session.createStreamMessage();
} catch (JMSException jmsEx) {
throw new EmbeddedActiveMQBrokerException("Failed to create StreamMessage", jmsEx);
}
}
public void pushMessage(ActiveMQDestination destination, Message message) {
if (producer == null) {
throw new IllegalStateException("JMS MessageProducer is null - has the InternalClient been started?");
}
try {
producer.send(destination, message);
} catch (JMSException jmsEx) {
throw new EmbeddedActiveMQBrokerException(String.format("Failed to push %s to %s", message.getClass().getSimpleName(), destination.toString()), jmsEx);
}
}
void checkSession() {
if (session == null) {
throw new IllegalStateException("JMS Session is null - has the InternalClient been started?");
}
}
}
}