blob: cc4a5a8bec9abdb98aaf0a7548007006fb19d011 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.advisory.ConsumerEvent;
import org.apache.activemq.advisory.ConsumerEventSource;
import org.apache.activemq.advisory.ConsumerListener;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.TopicRegion;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.network.DiscoveryNetworkConnector;
import org.apache.activemq.network.NetworkBridge;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.MessageIdList;
import org.apache.activemq.util.Wait;
import org.apache.activemq.xbean.BrokerFactoryBean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.Resource;
/**
* Test case support that allows the easy management and connection of several
* brokers.
*
*
*/
public class JmsMultipleBrokersTestSupport extends CombinationTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(JmsMultipleBrokersTestSupport.class);
public static final String AUTO_ASSIGN_TRANSPORT = "tcp://localhost:0";
public static int maxSetupTime = 5000;
protected Map<String, BrokerItem> brokers;
protected Map<String, Destination> destinations;
protected int messageSize = 1;
protected boolean persistentDelivery = true;
protected boolean verbose;
protected NetworkConnector bridgeBrokers(String localBrokerName, String remoteBrokerName) throws Exception {
return bridgeBrokers(localBrokerName, remoteBrokerName, false, 1, true);
}
protected NetworkConnector bridgeBrokers(String localBrokerName, String remoteBrokerName, boolean dynamicOnly) throws Exception {
BrokerService localBroker = brokers.get(localBrokerName).broker;
BrokerService remoteBroker = brokers.get(remoteBrokerName).broker;
return bridgeBrokers(localBroker, remoteBroker, dynamicOnly, 1, true, false);
}
protected NetworkConnector bridgeBrokers(String localBrokerName, String remoteBrokerName, boolean dynamicOnly, int networkTTL, boolean conduit) throws Exception {
BrokerService localBroker = brokers.get(localBrokerName).broker;
BrokerService remoteBroker = brokers.get(remoteBrokerName).broker;
return bridgeBrokers(localBroker, remoteBroker, dynamicOnly, networkTTL, conduit, false);
}
// Overwrite this method to specify how you want to bridge the two brokers
// By default, bridge them using add network connector of the local broker
// and the first connector of the remote broker
protected NetworkConnector bridgeBrokers(BrokerService localBroker, BrokerService remoteBroker, boolean dynamicOnly, int networkTTL, boolean conduit, boolean failover) throws Exception {
List<TransportConnector> transportConnectors = remoteBroker.getTransportConnectors();
URI remoteURI;
if (!transportConnectors.isEmpty()) {
remoteURI = transportConnectors.get(0).getConnectUri();
String uri = "static:(" + remoteURI + ")";
if (failover) {
uri = "static:(failover:(" + remoteURI + "))";
}
NetworkConnector connector = new DiscoveryNetworkConnector(new URI(uri));
connector.setName("to-" + remoteBroker.getBrokerName());
connector.setDynamicOnly(dynamicOnly);
connector.setNetworkTTL(networkTTL);
connector.setConduitSubscriptions(conduit);
localBroker.addNetworkConnector(connector);
maxSetupTime = 2000;
return connector;
} else {
throw new Exception("Remote broker has no registered connectors.");
}
}
// This will interconnect all brokers using multicast
protected void bridgeAllBrokers() throws Exception {
bridgeAllBrokers("default", 1, false, false);
}
protected void bridgeAllBrokers(String groupName, int ttl, boolean suppressduplicateQueueSubs) throws Exception {
bridgeAllBrokers(groupName, ttl, suppressduplicateQueueSubs, false);
}
protected void bridgeAllBrokers(String groupName, int ttl, boolean suppressduplicateQueueSubs, boolean decreasePriority) throws Exception {
Collection<BrokerItem> brokerList = brokers.values();
for (Iterator<BrokerItem> i = brokerList.iterator(); i.hasNext();) {
BrokerService broker = i.next().broker;
List<TransportConnector> transportConnectors = broker.getTransportConnectors();
if (transportConnectors.isEmpty()) {
broker.addConnector(new URI(AUTO_ASSIGN_TRANSPORT));
transportConnectors = broker.getTransportConnectors();
}
TransportConnector transport = transportConnectors.get(0);
transport.setDiscoveryUri(new URI("multicast://default?group=" + groupName));
NetworkConnector nc = broker.addNetworkConnector("multicast://default?group=" + groupName);
nc.setNetworkTTL(ttl);
nc.setSuppressDuplicateQueueSubscriptions(suppressduplicateQueueSubs);
nc.setDecreaseNetworkConsumerPriority(decreasePriority);
}
// Multicasting may take longer to setup
maxSetupTime = 8000;
}
protected void waitForBridgeFormation(final int min) throws Exception {
for (BrokerItem brokerItem : brokers.values()) {
final BrokerService broker = brokerItem.broker;
waitForBridgeFormation(broker, min, 0);
}
}
public boolean waitForBridgeFormation(final BrokerService broker, final int min, final int bridgeIndex) throws Exception {
return waitForBridgeFormation(broker, min, bridgeIndex, Wait.MAX_WAIT_MILLIS*2);
}
public boolean waitForBridgeFormation(final BrokerService broker, final int min, final int bridgeIndex, long wait) throws Exception {
boolean result = false;
if (!broker.getNetworkConnectors().isEmpty()) {
result = Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception {
int activeCount = 0;
for (NetworkBridge bridge : broker.getNetworkConnectors().get(bridgeIndex).activeBridges()) {
if (bridge.getRemoteBrokerName() != null) {
LOG.info("found bridge[" + bridge + "] to " + bridge.getRemoteBrokerName() + " on broker :" + broker.getBrokerName());
activeCount++;
}
}
return activeCount >= min;
}}, wait);
}
return result;
}
protected void waitForMinTopicRegionConsumerCount(final String name, final int count) throws Exception {
final BrokerService broker = brokers.get(name).broker;
final TopicRegion topicRegion = (TopicRegion) ((RegionBroker) broker.getRegionBroker()).getTopicRegion();
assertTrue("found expected consumers in topic region of" + name, Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
LOG.info("topic consumers: " + name +", " + topicRegion.getSubscriptions().toString());
return topicRegion.getSubscriptions().size() >= count;
}
}));
}
/**
* Timed wait for {@link #hasBridge(String, String)}.
*
* @see #hasBridge(String, String)
*
* @param localBrokerName
* - the name of the broker on the "local" side of the bridge
* @param remoteBrokerName
* - the name of the broker on the "remote" side of the bridge
* @param time
* - the maximum time to wait for the bridge to be established
* @param units
* - the units for <param>time</param>
* @throws InterruptedException
* - if the calling thread is interrupted
* @throws TimeoutException
* - if the bridge is not established within the time limit
* @throws Exception
* - some other unknown error occurs
*/
protected void waitForBridge(final String localBrokerName,
final String remoteBrokerName, long time, TimeUnit units)
throws InterruptedException, TimeoutException, Exception {
if (!Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() {
return hasBridge(localBrokerName, remoteBrokerName);
}
}, units.toMillis(time))) {
throw new TimeoutException("Bridge not established from broker "
+ localBrokerName + " to " + remoteBrokerName + " within "
+ units.toMillis(time) + " milliseconds.");
}
}
/**
* Determines whether a bridge has been established between the specified
* brokers.Establishment means that connections have been created and broker
* info has been exchanged. Due to the asynchronous nature of the
* connections, there is still a possibility that the bridge may fail
* shortly after establishment.
*
* @param localBrokerName
* - the name of the broker on the "local" side of the bridge
* @param remoteBrokerName
* - the name of the broker on the "remote" side of the bridge
*/
protected boolean hasBridge(String localBrokerName, String remoteBrokerName) {
final BrokerItem fromBroker = brokers.get(localBrokerName);
if (fromBroker == null) {
throw new IllegalArgumentException("Unknown broker: "
+ localBrokerName);
}
for (BrokerInfo peerInfo : fromBroker.broker.getRegionBroker()
.getPeerBrokerInfos()) {
if (peerInfo.getBrokerName().equals(remoteBrokerName)) {
return true;
}
}
return false;
}
protected void waitForBridgeFormation() throws Exception {
waitForBridgeFormation(1);
}
protected void startAllBrokers() throws Exception {
Collection<BrokerItem> brokerList = brokers.values();
for (Iterator<BrokerItem> i = brokerList.iterator(); i.hasNext();) {
BrokerService broker = i.next().broker;
broker.start();
broker.waitUntilStarted();
}
Thread.sleep(maxSetupTime);
}
protected BrokerService createBroker(String brokerName) throws Exception {
BrokerService broker = new BrokerService();
broker.setBrokerName(brokerName);
brokers.put(brokerName, new BrokerItem(broker));
return broker;
}
protected BrokerService createBroker(URI brokerUri) throws Exception {
BrokerService broker = BrokerFactory.createBroker(brokerUri);
configureBroker(broker);
brokers.put(broker.getBrokerName(), new BrokerItem(broker));
return broker;
}
protected void configureBroker(BrokerService broker) {
}
protected BrokerService createBroker(Resource configFile) throws Exception {
BrokerFactoryBean brokerFactory = new BrokerFactoryBean(configFile);
brokerFactory.afterPropertiesSet();
BrokerService broker = brokerFactory.getBroker();
brokers.put(broker.getBrokerName(), new BrokerItem(broker));
return broker;
}
protected ConnectionFactory getConnectionFactory(String brokerName) throws Exception {
BrokerItem brokerItem = brokers.get(brokerName);
if (brokerItem != null) {
return brokerItem.factory;
}
return null;
}
protected Connection createConnection(String brokerName) throws Exception {
BrokerItem brokerItem = brokers.get(brokerName);
if (brokerItem != null) {
return brokerItem.createConnection();
}
return null;
}
protected MessageConsumer createSyncConsumer(String brokerName, Destination dest) throws Exception {
BrokerItem brokerItem = brokers.get(brokerName);
if (brokerItem != null) {
Connection con = brokerItem.createConnection();
con.start();
Session sess = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = sess.createConsumer(dest);
return consumer;
}
return null;
}
protected MessageConsumer createConsumer(String brokerName, Destination dest) throws Exception {
return createConsumer(brokerName, dest, null, null);
}
protected MessageConsumer createConsumer(String brokerName, Destination dest, String messageSelector) throws Exception {
return createConsumer(brokerName, dest, null, messageSelector);
}
protected MessageConsumer createConsumer(String brokerName, Destination dest, CountDownLatch latch) throws Exception {
return createConsumer(brokerName, dest, latch, null);
}
protected MessageConsumer createConsumer(String brokerName, Destination dest, CountDownLatch latch, String messageSelector) throws Exception {
BrokerItem brokerItem = brokers.get(brokerName);
if (brokerItem != null) {
return brokerItem.createConsumer(dest, latch, messageSelector);
}
return null;
}
protected QueueBrowser createBrowser(String brokerName, Destination dest) throws Exception {
BrokerItem brokerItem = brokers.get(brokerName);
if (brokerItem != null) {
return brokerItem.createBrowser(dest);
}
return null;
}
protected MessageConsumer createDurableSubscriber(String brokerName, Topic dest, String name) throws Exception {
BrokerItem brokerItem = brokers.get(brokerName);
if (brokerItem != null) {
return brokerItem.createDurableSubscriber(dest, name);
}
return null;
}
protected MessageIdList getBrokerMessages(String brokerName) {
BrokerItem brokerItem = brokers.get(brokerName);
if (brokerItem != null) {
return brokerItem.getAllMessages();
}
return null;
}
protected MessageIdList getConsumerMessages(String brokerName, MessageConsumer consumer) {
BrokerItem brokerItem = brokers.get(brokerName);
if (brokerItem != null) {
return brokerItem.getConsumerMessages(consumer);
}
return null;
}
protected void assertConsumersConnect(String brokerName, Destination destination, final int count, long timeout) throws Exception {
BrokerItem brokerItem = brokers.get(brokerName);
Connection conn = brokerItem.createConnection();
conn.start();
ConsumerEventSource ces = new ConsumerEventSource(conn, destination);
try {
final AtomicInteger actualConnected = new AtomicInteger();
final CountDownLatch latch = new CountDownLatch(1);
ces.setConsumerListener(new ConsumerListener(){
public void onConsumerEvent(ConsumerEvent event) {
if( actualConnected.get() < count ) {
actualConnected.set(event.getConsumerCount());
}
if( event.getConsumerCount() >= count ) {
latch.countDown();
}
}
});
ces.start();
latch.await(timeout, TimeUnit.MILLISECONDS);
assertTrue("Expected at least "+count+" consumers to connect, but only "+actualConnected.get()+" connectect within "+timeout+" ms", actualConnected.get() >= count);
} finally {
ces.stop();
conn.close();
brokerItem.connections.remove(conn);
}
}
protected void sendMessages(String brokerName, Destination destination, int count) throws Exception {
sendMessages(brokerName, destination, count, null);
}
protected void sendMessages(String brokerName, Destination destination, int count, HashMap<String, Object>properties) throws Exception {
BrokerItem brokerItem = brokers.get(brokerName);
Connection conn = brokerItem.createConnection();
conn.start();
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = brokerItem.createProducer(destination, sess);
producer.setDeliveryMode(persistentDelivery ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
for (int i = 0; i < count; i++) {
TextMessage msg = createTextMessage(sess, conn.getClientID() + ": Message-" + i);
if (properties != null) {
for (String propertyName : properties.keySet()) {
msg.setObjectProperty(propertyName, properties.get(propertyName));
}
}
producer.send(msg);
onSend(i, msg);
}
producer.close();
sess.close();
conn.close();
brokerItem.connections.remove(conn);
}
protected void onSend(int i, TextMessage msg) {
}
protected TextMessage createTextMessage(Session session, String initText) throws Exception {
TextMessage msg = session.createTextMessage();
// Pad message text
if (initText.length() < messageSize) {
char[] data = new char[messageSize - initText.length()];
Arrays.fill(data, '*');
String str = new String(data);
msg.setText(initText + str);
// Do not pad message text
} else {
msg.setText(initText);
}
return msg;
}
protected ActiveMQDestination createDestination(String name, boolean topic) throws JMSException {
Destination dest;
if (topic) {
dest = new ActiveMQTopic(name);
destinations.put(name, dest);
return (ActiveMQDestination)dest;
} else {
dest = new ActiveMQQueue(name);
destinations.put(name, dest);
return (ActiveMQDestination)dest;
}
}
protected void setUp() throws Exception {
super.setUp();
brokers = new HashMap<String, BrokerItem>();
destinations = new HashMap<String, Destination>();
}
protected void tearDown() throws Exception {
destroyAllBrokers();
super.tearDown();
}
protected void destroyBroker(String brokerName) throws Exception {
BrokerItem brokerItem = brokers.remove(brokerName);
if (brokerItem != null) {
brokerItem.destroy();
}
}
protected void destroyAllBrokers() throws Exception {
for (Iterator<BrokerItem> i = brokers.values().iterator(); i.hasNext();) {
BrokerItem brokerItem = i.next();
brokerItem.destroy();
}
brokers.clear();
}
public String buildFailoverUriToAllBrokers() {
StringBuilder uriBuilder = new StringBuilder("failover:(");
int index = 1, size = brokers.size();
for (BrokerItem b : brokers.values()) {
uriBuilder.append(b.getConnectionUri());
if (index < size) {
uriBuilder.append(",");
index++;
}
}
uriBuilder.append(")");
return uriBuilder.toString();
}
// Class to group broker components together
public class BrokerItem {
public BrokerService broker;
public ActiveMQConnectionFactory factory;
public List<Connection> connections;
public Map<MessageConsumer, MessageIdList> consumers;
public MessageIdList allMessages = new MessageIdList();
public boolean persistent;
private IdGenerator id;
public BrokerItem(BrokerService broker) throws Exception {
this.broker = broker;
factory = new ActiveMQConnectionFactory(broker.getVmConnectorURI());
factory.setConnectionIDPrefix(broker.getBrokerName());
consumers = Collections.synchronizedMap(new HashMap<MessageConsumer, MessageIdList>());
connections = Collections.synchronizedList(new ArrayList<Connection>());
allMessages.setVerbose(verbose);
id = new IdGenerator(broker.getBrokerName() + ":");
}
public String getConnectionUri(){
return broker.getVmConnectorURI().toString();
}
public Connection createConnection() throws Exception {
Connection conn = factory.createConnection();
conn.setClientID(id.generateId());
connections.add(conn);
return conn;
}
public MessageConsumer createConsumer(Destination dest) throws Exception {
return createConsumer(dest, null, null);
}
public MessageConsumer createConsumer(Destination dest, String messageSelector) throws Exception {
return createConsumer(dest, null, messageSelector);
}
public MessageConsumer createConsumer(Destination dest, CountDownLatch latch, String messageSelector) throws Exception {
Connection c = createConnection();
c.start();
Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
return createConsumerWithSession(dest, s, latch, messageSelector);
}
public MessageConsumer createConsumerWithSession(Destination dest, Session sess) throws Exception {
return createConsumerWithSession(dest, sess, null, null);
}
public MessageConsumer createConsumerWithSession(Destination dest, Session sess, CountDownLatch latch, String messageSelector) throws Exception {
MessageConsumer client = sess.createConsumer(dest, messageSelector);
MessageIdList messageIdList = new MessageIdList();
messageIdList.setCountDownLatch(latch);
messageIdList.setParent(allMessages);
client.setMessageListener(messageIdList);
consumers.put(client, messageIdList);
return client;
}
public QueueBrowser createBrowser(Destination dest) throws Exception {
Connection c = createConnection();
c.start();
Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
return s.createBrowser((Queue)dest);
}
public MessageConsumer createDurableSubscriber(Topic dest, String name) throws Exception {
Connection c = createConnection();
c.start();
Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
return createDurableSubscriber(dest, s, name);
}
public MessageConsumer createDurableSubscriber(Topic dest, Session sess, String name) throws Exception {
MessageConsumer client = sess.createDurableSubscriber((Topic)dest, name);
MessageIdList messageIdList = new MessageIdList();
messageIdList.setParent(allMessages);
client.setMessageListener(messageIdList);
consumers.put(client, messageIdList);
return client;
}
public MessageIdList getAllMessages() {
return allMessages;
}
public MessageIdList getConsumerMessages(MessageConsumer consumer) {
return consumers.get(consumer);
}
public MessageProducer createProducer(Destination dest) throws Exception {
Connection c = createConnection();
c.start();
Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
return createProducer(dest, s);
}
public MessageProducer createProducer(Destination dest, Session sess) throws Exception {
MessageProducer client = sess.createProducer(dest);
client.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
return client;
}
public void destroy() throws Exception {
while (!connections.isEmpty()) {
Connection c = connections.remove(0);
try {
c.close();
} catch (ConnectionClosedException e) {
} catch (JMSException e) {
}
}
broker.stop();
broker.waitUntilStopped();
consumers.clear();
broker = null;
connections = null;
consumers = null;
factory = null;
}
}
}