blob: 957b12c6ddb6d202d73fccaceac64a13e3ad29ed [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.security.SecureRandom;
import java.util.Set;
import java.util.Vector;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import javax.net.ssl.KeyManager;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.jmx.BrokerViewMBean;
import org.apache.activemq.broker.jmx.ConnectorViewMBean;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.broker.jmx.SubscriptionViewMBean;
import org.apache.activemq.broker.jmx.TopicViewMBean;
import org.apache.activemq.spring.SpringSslContext;
import org.apache.activemq.store.kahadb.KahaDBStore;
import org.apache.activemq.transport.amqp.protocol.AmqpConnection;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AmqpTestSupport {
public static final String MESSAGE_NUMBER = "MessageNumber";
public static final String KAHADB_DIRECTORY = "target/activemq-data/";
@Rule public TestName name = new TestName();
protected static final Logger LOG = LoggerFactory.getLogger(AmqpTestSupport.class);
protected ExecutorService testService = Executors.newSingleThreadExecutor();
protected BrokerService brokerService;
protected Vector<Throwable> exceptions = new Vector<Throwable>();
protected int numberOfMessages;
protected URI amqpURI;
protected int amqpPort;
protected URI amqpSslURI;
protected int amqpSslPort;
protected URI amqpNioURI;
protected int amqpNioPort;
protected URI amqpNioPlusSslURI;
protected int amqpNioPlusSslPort;
protected URI openwireURI;
protected int openwirePort;
@Before
public void setUp() throws Exception {
LOG.info("========== start " + getTestName() + " ==========");
exceptions.clear();
startBroker();
this.numberOfMessages = 2000;
}
protected void createBroker(boolean deleteAllMessages) throws Exception {
brokerService = new BrokerService();
brokerService.setPersistent(isPersistent());
brokerService.setDeleteAllMessagesOnStartup(deleteAllMessages);
if (isPersistent()) {
KahaDBStore kaha = new KahaDBStore();
kaha.setDirectory(new File(KAHADB_DIRECTORY + getTestName()));
brokerService.setPersistenceAdapter(kaha);
}
brokerService.setSchedulerSupport(isSchedulerEnabled());
brokerService.setAdvisorySupport(false);
brokerService.setUseJmx(isUseJmx());
brokerService.getManagementContext().setCreateConnector(false);
performAdditionalConfiguration(brokerService);
SSLContext ctx = SSLContext.getInstance("TLS");
ctx.init(new KeyManager[0], new TrustManager[]{new DefaultTrustManager()}, new SecureRandom());
SSLContext.setDefault(ctx);
// Setup SSL context...
final File classesDir = new File(AmqpConnection.class.getProtectionDomain().getCodeSource().getLocation().getFile());
File keystore = new File(classesDir, "../../src/test/resources/keystore");
final SpringSslContext sslContext = new SpringSslContext();
sslContext.setKeyStore(keystore.getCanonicalPath());
sslContext.setKeyStorePassword("password");
sslContext.setTrustStore(keystore.getCanonicalPath());
sslContext.setTrustStorePassword("password");
sslContext.afterPropertiesSet();
brokerService.setSslContext(sslContext);
System.setProperty("javax.net.ssl.trustStore", keystore.getCanonicalPath());
System.setProperty("javax.net.ssl.trustStorePassword", "password");
System.setProperty("javax.net.ssl.trustStoreType", "jks");
System.setProperty("javax.net.ssl.keyStore", keystore.getCanonicalPath());
System.setProperty("javax.net.ssl.keyStorePassword", "password");
System.setProperty("javax.net.ssl.keyStoreType", "jks");
addTranportConnectors();
}
protected void performAdditionalConfiguration(BrokerService brokerService) throws Exception {
}
protected void addTranportConnectors() throws Exception {
TransportConnector connector = null;
if (isUseOpenWireConnector()) {
connector = brokerService.addConnector(
"tcp://0.0.0.0:" + openwirePort);
openwirePort = connector.getConnectUri().getPort();
openwireURI = connector.getPublishableConnectURI();
LOG.debug("Using openwire port " + openwirePort);
}
if (isUseTcpConnector()) {
connector = brokerService.addConnector(
"amqp://0.0.0.0:" + amqpPort + "?transport.transformer=" + getAmqpTransformer() + getAdditionalConfig());
amqpPort = connector.getConnectUri().getPort();
amqpURI = connector.getPublishableConnectURI();
LOG.debug("Using amqp port " + amqpPort);
}
if (isUseSslConnector()) {
connector = brokerService.addConnector(
"amqp+ssl://0.0.0.0:" + amqpSslPort + "?transport.transformer=" + getAmqpTransformer() + getAdditionalConfig());
amqpSslPort = connector.getConnectUri().getPort();
amqpSslURI = connector.getPublishableConnectURI();
LOG.debug("Using amqp+ssl port " + amqpSslPort);
}
if (isUseNioConnector()) {
connector = brokerService.addConnector(
"amqp+nio://0.0.0.0:" + amqpNioPort + "?transport.transformer=" + getAmqpTransformer() + getAdditionalConfig());
amqpNioPort = connector.getConnectUri().getPort();
amqpNioURI = connector.getPublishableConnectURI();
LOG.debug("Using amqp+nio port " + amqpNioPort);
}
if (isUseNioPlusSslConnector()) {
connector = brokerService.addConnector(
"amqp+nio+ssl://0.0.0.0:" + amqpNioPlusSslPort + "?transport.transformer=" + getAmqpTransformer() + getAdditionalConfig());
amqpNioPlusSslPort = connector.getConnectUri().getPort();
amqpNioPlusSslURI = connector.getPublishableConnectURI();
LOG.debug("Using amqp+nio+ssl port " + amqpNioPlusSslPort);
}
}
protected boolean isPersistent() {
return false;
}
protected boolean isUseJmx() {
return true;
}
protected boolean isSchedulerEnabled() {
return false;
}
protected boolean isUseOpenWireConnector() {
return false;
}
protected boolean isUseTcpConnector() {
return true;
}
protected boolean isUseSslConnector() {
return false;
}
protected boolean isUseNioConnector() {
return false;
}
protected boolean isUseNioPlusSslConnector() {
return false;
}
protected String getAmqpTransformer() {
return "jms";
}
protected String getAdditionalConfig() {
return "";
}
public void startBroker() throws Exception {
if (brokerService != null) {
throw new IllegalStateException("Broker is already created.");
}
createBroker(true);
brokerService.start();
brokerService.waitUntilStarted();
}
public void restartBroker() throws Exception {
restartBroker(false);
}
public void restartBroker(boolean deleteAllOnStartup) throws Exception {
stopBroker();
createBroker(deleteAllOnStartup);
brokerService.start();
brokerService.waitUntilStarted();
}
public void stopBroker() throws Exception {
LOG.debug("entering AmqpTestSupport.stopBroker");
if (brokerService != null) {
brokerService.stop();
brokerService.waitUntilStopped();
brokerService = null;
}
LOG.debug("exiting AmqpTestSupport.stopBroker");
}
@After
public void tearDown() throws Exception {
stopBroker();
LOG.info("========== tearDown " + getTestName() + " ==========");
}
public Connection createJMSConnection() throws JMSException {
if (!isUseOpenWireConnector()) {
throw new javax.jms.IllegalStateException("OpenWire TransportConnector was not configured.");
}
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(openwireURI);
return factory.createConnection();
}
public void sendMessages(String destinationName, int count, boolean topic) throws Exception {
Connection connection = createJMSConnection();
try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = null;
if (topic) {
destination = session.createTopic(destinationName);
} else {
destination = session.createQueue(destinationName);
}
sendMessages(connection, destination, count);
} finally {
connection.close();
}
}
public void sendMessages(Connection connection, Destination destination, int count) throws Exception {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
try {
MessageProducer p = session.createProducer(destination);
for (int i = 1; i <= count; i++) {
TextMessage message = session.createTextMessage();
message.setText("TextMessage: " + i);
message.setIntProperty(MESSAGE_NUMBER, i);
p.send(message);
}
} finally {
session.close();
}
}
public String getTestName() {
return name.getMethodName();
}
protected BrokerViewMBean getProxyToBroker() throws MalformedObjectNameException, JMSException {
ObjectName brokerViewMBean = new ObjectName(
"org.apache.activemq:type=Broker,brokerName=localhost");
BrokerViewMBean proxy = (BrokerViewMBean) brokerService.getManagementContext()
.newProxyInstance(brokerViewMBean, BrokerViewMBean.class, true);
return proxy;
}
protected ConnectorViewMBean getProxyToConnectionView(String connectionType) throws Exception {
ObjectName connectorQuery = new ObjectName(
"org.apache.activemq:type=Broker,brokerName=localhost,connector=clientConnectors,connectorName="+connectionType+"_//*");
Set<ObjectName> results = brokerService.getManagementContext().queryNames(connectorQuery, null);
if (results == null || results.isEmpty() || results.size() > 1) {
throw new Exception("Unable to find the exact Connector instance.");
}
ConnectorViewMBean proxy = (ConnectorViewMBean) brokerService.getManagementContext()
.newProxyInstance(results.iterator().next(), ConnectorViewMBean.class, true);
return proxy;
}
protected QueueViewMBean getProxyToQueue(String name) throws MalformedObjectNameException, JMSException {
ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName="+name);
QueueViewMBean proxy = (QueueViewMBean) brokerService.getManagementContext()
.newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
return proxy;
}
protected SubscriptionViewMBean getProxyToQueueSubscriber(String name) throws MalformedObjectNameException, JMSException, IOException {
ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName="+name);
QueueViewMBean proxy = (QueueViewMBean) brokerService.getManagementContext()
.newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
SubscriptionViewMBean subscription = null;
for (ObjectName subscriber : proxy.getSubscriptions()) {
subscription = (SubscriptionViewMBean) brokerService.getManagementContext()
.newProxyInstance(subscriber, SubscriptionViewMBean.class, true);
}
return subscription;
}
protected TopicViewMBean getProxyToTopic(String name) throws MalformedObjectNameException, JMSException {
ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Topic,destinationName="+name);
TopicViewMBean proxy = (TopicViewMBean) brokerService.getManagementContext()
.newProxyInstance(queueViewMBeanName, TopicViewMBean.class, true);
return proxy;
}
}