blob: 0c042f482eb528c0cc84faee0f786ea6133000ad [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.usecases;
import static org.junit.Assert.assertTrue;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.network.DiscoveryNetworkConnector;
import org.apache.activemq.network.NetworkConnector;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.Test;
public class RequestReplyToTopicViaThreeNetworkHopsTest {
protected static final int CONCURRENT_CLIENT_COUNT = 5;
protected static final int CONCURRENT_SERVER_COUNT = 5;
protected static final int TOTAL_CLIENT_ITER = 10;
protected static int Next_broker_num = 0;
protected EmbeddedTcpBroker edge1;
protected EmbeddedTcpBroker edge2;
protected EmbeddedTcpBroker core1;
protected EmbeddedTcpBroker core2;
protected boolean testError = false;
protected boolean fatalTestError = false;
protected int echoResponseFill = 0; // Number of "filler" response messages per request
protected static Log LOG;
public boolean duplex = true;
static {
LOG = LogFactory.getLog(RequestReplyToTopicViaThreeNetworkHopsTest.class);
}
public RequestReplyToTopicViaThreeNetworkHopsTest() throws Exception {
edge1 = new EmbeddedTcpBroker("edge", 1);
edge2 = new EmbeddedTcpBroker("edge", 2);
core1 = new EmbeddedTcpBroker("core", 1);
core2 = new EmbeddedTcpBroker("core", 2);
// duplex is necessary to serialise sends with consumer/destination creation
edge1.coreConnectTo(core1, duplex);
edge2.coreConnectTo(core2, duplex);
core1.coreConnectTo(core2, duplex);
}
public void logMessage(String msg) {
System.out.println(msg);
System.out.flush();
}
public void testMessages(Session sess, MessageProducer req_prod, Destination resp_dest, int num_msg) throws Exception {
MessageConsumer resp_cons;
TextMessage msg;
MessageClient cons_client;
int cur;
int tot_expected;
resp_cons = sess.createConsumer(resp_dest);
cons_client = new MessageClient(resp_cons, num_msg);
cons_client.start();
cur = 0;
while ((cur < num_msg) && (!fatalTestError)) {
msg = sess.createTextMessage("MSG AAAA " + cur);
msg.setIntProperty("SEQ", 100 + cur);
msg.setStringProperty("TEST", "TOPO");
msg.setJMSReplyTo(resp_dest);
if (cur == (num_msg - 1))
msg.setBooleanProperty("end-of-response", true);
sendWithRetryOnDeletedDest(req_prod, msg);
LOG.debug("Sent:" + msg);
cur++;
}
//
// Give the consumer some time to receive the response.
//
cons_client.waitShutdown(5000);
//
// Now shutdown the consumer if it's still running.
//
if (cons_client.shutdown())
LOG.debug("Consumer client shutdown complete");
else
LOG.debug("Consumer client shutdown incomplete!!!");
//
// Check that the correct number of messages was received.
//
tot_expected = num_msg * (echoResponseFill + 1);
if (cons_client.getNumMsgReceived() == tot_expected) {
LOG.debug("Have " + tot_expected + " messages, as-expected");
} else {
testError = true;
if (cons_client.getNumMsgReceived() == 0)
fatalTestError = true;
LOG.error("Have " + cons_client.getNumMsgReceived() + " messages; expected " + tot_expected + " on destination " + resp_dest);
}
resp_cons.close();
}
protected void sendWithRetryOnDeletedDest(MessageProducer prod, Message msg) throws JMSException {
try {
if (LOG.isDebugEnabled())
LOG.debug("SENDING REQUEST message " + msg);
prod.send(msg);
} catch (JMSException jms_exc) {
System.out.println("AAA: " + jms_exc.getMessage());
throw jms_exc;
}
}
/**
* Test one destination between the given "producer broker" and "consumer broker" specified.
*/
public void testOneDest(Connection conn, Session sess, Destination cons_dest, int num_msg) throws Exception {
Destination prod_dest;
MessageProducer msg_prod;
//
// Create the Producer to the echo request Queue
//
LOG.trace("Creating echo queue and producer");
prod_dest = sess.createQueue("echo");
msg_prod = sess.createProducer(prod_dest);
//
// Pass messages around.
//
testMessages(sess, msg_prod, cons_dest, num_msg);
msg_prod.close();
}
/**
* TEST TEMPORARY TOPICS
*/
public void testTempTopic(String prod_broker_url, String cons_broker_url) throws Exception {
Connection conn;
Session sess;
Destination cons_dest;
int num_msg;
num_msg = 5;
LOG.debug("TESTING TEMP TOPICS " + prod_broker_url + " -> " + cons_broker_url + " (" + num_msg + " messages)");
//
// Connect to the bus.
//
conn = createConnection(cons_broker_url);
conn.start();
sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
//
// Create the destination on which messages are being tested.
//
LOG.trace("Creating destination");
cons_dest = sess.createTemporaryTopic();
testOneDest(conn, sess, cons_dest, num_msg);
//
// Cleanup
//
sess.close();
conn.close();
}
/**
* TEST TOPICS
*/
public void testTopic(String prod_broker_url, String cons_broker_url) throws Exception {
int num_msg;
Connection conn;
Session sess;
String topic_name;
Destination cons_dest;
num_msg = 5;
LOG.info("TESTING TOPICS " + prod_broker_url + " -> " + cons_broker_url + " (" + num_msg + " messages)");
conn = createConnection(cons_broker_url);
conn.start();
sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
//
// Create the destination on which messages are being tested.
//
topic_name = "topotest2.perm.topic";
LOG.trace("Removing existing Topic");
removeTopic(conn, topic_name);
LOG.trace("Creating Topic, " + topic_name);
cons_dest = sess.createTopic(topic_name);
testOneDest(conn, sess, cons_dest, num_msg);
//
// Cleanup
//
removeTopic(conn, topic_name);
sess.close();
conn.close();
}
/**
* TEST TEMPORARY QUEUES
*/
public void testTempQueue(String prod_broker_url, String cons_broker_url) throws Exception {
int num_msg;
Connection conn;
Session sess;
Destination cons_dest;
num_msg = 5;
LOG.info("TESTING TEMP QUEUES " + prod_broker_url + " -> " + cons_broker_url + " (" + num_msg + " messages)");
//
// Connect to the bus.
//
conn = createConnection(cons_broker_url);
conn.start();
sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
//
// Create the destination on which messages are being tested.
//
LOG.trace("Creating destination");
cons_dest = sess.createTemporaryQueue();
testOneDest(conn, sess, cons_dest, num_msg);
//
// Cleanup
//
sess.close();
conn.close();
}
/**
* TEST QUEUES
*/
public void testQueue(String prod_broker_url, String cons_broker_url) throws Exception {
int num_msg;
Connection conn;
Session sess;
String queue_name;
Destination cons_dest;
num_msg = 5;
LOG.info("TESTING QUEUES " + prod_broker_url + " -> " + cons_broker_url + " (" + num_msg + " messages)");
conn = createConnection(cons_broker_url);
conn.start();
sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
//
// Create the destination on which messages are being tested.
//
queue_name = "topotest2.perm.queue";
LOG.trace("Removing existing Queue");
removeQueue(conn, queue_name);
LOG.trace("Creating Queue, " + queue_name);
cons_dest = sess.createQueue(queue_name);
testOneDest(conn, sess, cons_dest, num_msg);
removeQueue(conn, queue_name);
sess.close();
conn.close();
}
@Test
public void runWithTempTopicReplyTo() throws Exception {
EchoService echo_svc;
TopicTrafficGenerator traffic_gen;
Thread start1;
Thread start2;
Thread start3;
Thread start4;
ThreadPoolExecutor clientExecPool;
final CountDownLatch clientCompletionLatch;
int iter;
fatalTestError = false;
testError = false;
//
// Execute up to 20 clients at a time to simulate that load.
//
clientExecPool = new ThreadPoolExecutor(CONCURRENT_CLIENT_COUNT, CONCURRENT_CLIENT_COUNT, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10000));
clientCompletionLatch = new CountDownLatch(TOTAL_CLIENT_ITER);
// Use threads to avoid startup deadlock since the first broker started waits until
// it knows the name of the remote broker before finishing its startup, which means
// the remote must already be running.
start1 = new Thread() {
@Override
public void run() {
try {
edge1.start();
} catch (Exception ex) {
LOG.error(null, ex);
}
}
};
start2 = new Thread() {
@Override
public void run() {
try {
edge2.start();
} catch (Exception ex) {
LOG.error(null, ex);
}
}
};
start3 = new Thread() {
@Override
public void run() {
try {
core1.start();
} catch (Exception ex) {
LOG.error(null, ex);
}
}
};
start4 = new Thread() {
@Override
public void run() {
try {
core2.start();
} catch (Exception ex) {
LOG.error(null, ex);
}
}
};
start1.start();
start2.start();
start3.start();
start4.start();
start1.join();
start2.join();
start3.join();
start4.join();
traffic_gen = new TopicTrafficGenerator(edge1.getConnectionUrl(), edge2.getConnectionUrl());
traffic_gen.start();
//
// Now start the echo service with that queue.
//
echo_svc = new EchoService("echo", edge1.getConnectionUrl());
echo_svc.start();
//
// Run the tests on Temp Topics.
//
LOG.info("** STARTING TEMP TOPIC TESTS");
iter = 0;
while ((iter < TOTAL_CLIENT_ITER) && (!fatalTestError)) {
clientExecPool.execute(new Runnable() {
@Override
public void run() {
try {
RequestReplyToTopicViaThreeNetworkHopsTest.this.testTempTopic(edge1.getConnectionUrl(), edge2.getConnectionUrl());
} catch (Exception exc) {
LOG.error("test exception", exc);
fatalTestError = true;
testError = true;
}
clientCompletionLatch.countDown();
}
});
iter++;
}
boolean allDoneOnTime = clientCompletionLatch.await(20, TimeUnit.MINUTES);
LOG.info("** FINISHED TEMP TOPIC TESTS AFTER " + iter + " ITERATIONS, testError:" + testError + ", fatal: " + fatalTestError + ", onTime:"
+ allDoneOnTime);
Thread.sleep(100);
echo_svc.shutdown();
traffic_gen.shutdown();
shutdown();
assertTrue("test completed in time", allDoneOnTime);
assertTrue("no errors", !testError);
}
public void shutdown() throws Exception {
edge1.stop();
edge2.stop();
core1.stop();
core2.stop();
}
protected Connection createConnection(String url) throws Exception {
return org.apache.activemq.ActiveMQConnection.makeConnection(url);
}
protected static void removeQueue(Connection conn, String dest_name) throws java.lang.Exception {
org.apache.activemq.command.ActiveMQDestination dest;
if (conn instanceof org.apache.activemq.ActiveMQConnection) {
dest = org.apache.activemq.command.ActiveMQDestination.createDestination(dest_name, org.apache.activemq.command.ActiveMQDestination.QUEUE_TYPE);
((org.apache.activemq.ActiveMQConnection) conn).destroyDestination(dest);
}
}
protected static void removeTopic(Connection conn, String dest_name) throws java.lang.Exception {
org.apache.activemq.command.ActiveMQDestination dest;
if (conn instanceof org.apache.activemq.ActiveMQConnection) {
dest = org.apache.activemq.command.ActiveMQDestination.createDestination(dest_name, org.apache.activemq.command.ActiveMQDestination.TOPIC_TYPE);
((org.apache.activemq.ActiveMQConnection) conn).destroyDestination(dest);
}
}
public static String fmtMsgInfo(Message msg) throws Exception {
StringBuilder msg_desc;
String prop;
Enumeration<?> prop_enum;
msg_desc = new StringBuilder();
msg_desc = new StringBuilder();
if (msg instanceof TextMessage) {
msg_desc.append(((TextMessage) msg).getText());
} else {
msg_desc.append("[");
msg_desc.append(msg.getClass().getName());
msg_desc.append("]");
}
prop_enum = msg.getPropertyNames();
while (prop_enum.hasMoreElements()) {
prop = (String) prop_enum.nextElement();
msg_desc.append("; ");
msg_desc.append(prop);
msg_desc.append("=");
msg_desc.append(msg.getStringProperty(prop));
}
return msg_desc.toString();
}
protected class EmbeddedTcpBroker {
protected BrokerService brokerSvc;
protected int brokerNum;
protected String brokerName;
protected String brokerId;
protected int port;
protected String tcpUrl;
protected String fullUrl;
public EmbeddedTcpBroker(String name, int number) throws Exception {
brokerSvc = new BrokerService();
synchronized (this.getClass()) {
brokerNum = Next_broker_num;
Next_broker_num++;
}
brokerName = name + number;
brokerId = brokerName;
brokerSvc.setBrokerName(brokerName);
brokerSvc.setBrokerId(brokerId);
brokerSvc.setPersistent(false);
brokerSvc.setUseJmx(false);
port = 60000 + (brokerNum * 10);
tcpUrl = "tcp://127.0.0.1:" + Integer.toString(port);
fullUrl = tcpUrl + "?jms.watchTopicAdvisories=false";
brokerSvc.addConnector(tcpUrl);
}
public Connection createConnection() throws URISyntaxException, JMSException {
Connection result;
result = org.apache.activemq.ActiveMQConnection.makeConnection(this.fullUrl);
return result;
}
public String getConnectionUrl() {
return this.fullUrl;
}
public void coreConnectTo(EmbeddedTcpBroker other, boolean duplex_f) throws Exception {
this.makeConnectionTo(other, duplex_f, true);
this.makeConnectionTo(other, duplex_f, false);
if (!duplex_f) {
other.makeConnectionTo(this, duplex_f, true);
other.makeConnectionTo(this, duplex_f, false);
}
}
public void start() throws Exception {
brokerSvc.start();
brokerSvc.waitUntilStarted();
}
public void stop() throws Exception {
brokerSvc.stop();
}
protected void makeConnectionTo(EmbeddedTcpBroker other, boolean duplex_f, boolean queue_f) throws Exception {
NetworkConnector nw_conn;
String prefix;
ActiveMQDestination excl_dest;
ArrayList<ActiveMQDestination> excludes;
nw_conn = new DiscoveryNetworkConnector(new URI("static:(" + other.tcpUrl + ")"));
nw_conn.setDuplex(duplex_f);
if (queue_f)
nw_conn.setConduitSubscriptions(false);
else
nw_conn.setConduitSubscriptions(true);
nw_conn.setNetworkTTL(3);
nw_conn.setSuppressDuplicateQueueSubscriptions(true);
nw_conn.setDecreaseNetworkConsumerPriority(true);
nw_conn.setBridgeTempDestinations(queue_f);
if (queue_f) {
prefix = "queue";
excl_dest = ActiveMQDestination.createDestination(">", ActiveMQDestination.TOPIC_TYPE);
} else {
prefix = "topic";
excl_dest = ActiveMQDestination.createDestination(">", ActiveMQDestination.QUEUE_TYPE);
}
excludes = new ArrayList<ActiveMQDestination>();
excludes.add(excl_dest);
nw_conn.setExcludedDestinations(excludes);
if (duplex_f)
nw_conn.setName(this.brokerId + "<-" + prefix + "->" + other.brokerId);
else
nw_conn.setName(this.brokerId + "-" + prefix + "->" + other.brokerId);
brokerSvc.addNetworkConnector(nw_conn);
}
}
protected class MessageClient extends java.lang.Thread {
protected MessageConsumer msgCons;
protected boolean shutdownInd;
protected int expectedCount;
protected int lastSeq = 0;
protected int msgCount = 0;
protected boolean haveFirstSeq;
protected CountDownLatch shutdownLatch;
public MessageClient(MessageConsumer cons, int num_to_expect) {
msgCons = cons;
expectedCount = (num_to_expect * (echoResponseFill + 1));
shutdownLatch = new CountDownLatch(1);
}
@Override
public void run() {
CountDownLatch latch;
try {
synchronized (this) {
latch = shutdownLatch;
}
shutdownInd = false;
processMessages();
latch.countDown();
} catch (Exception exc) {
LOG.error("message client error", exc);
}
}
public void waitShutdown(long timeout) {
CountDownLatch latch;
try {
synchronized (this) {
latch = shutdownLatch;
}
if (latch != null)
latch.await(timeout, TimeUnit.MILLISECONDS);
else
LOG.info("echo client shutdown: client does not appear to be active");
} catch (InterruptedException int_exc) {
LOG.warn("wait for message client shutdown interrupted", int_exc);
}
}
public boolean shutdown() {
boolean down_ind;
if (!shutdownInd) {
shutdownInd = true;
}
waitShutdown(200);
synchronized (this) {
if ((shutdownLatch == null) || (shutdownLatch.getCount() == 0))
down_ind = true;
else
down_ind = false;
}
return down_ind;
}
public int getNumMsgReceived() {
return msgCount;
}
protected void processMessages() throws Exception {
Message in_msg;
haveFirstSeq = false;
//
// Stop at shutdown time or after any test error is detected.
//
while ((!shutdownInd) && (!fatalTestError)) {
in_msg = msgCons.receive(100);
if (in_msg != null) {
msgCount++;
checkMessage(in_msg);
}
}
msgCons.close();
}
protected void checkMessage(Message in_msg) throws Exception {
int seq;
LOG.debug("received message " + fmtMsgInfo(in_msg) + " from " + in_msg.getJMSDestination());
//
// Only check messages with a sequence number.
//
if (in_msg.propertyExists("SEQ")) {
seq = in_msg.getIntProperty("SEQ");
if ((haveFirstSeq) && (seq != (lastSeq + 1))) {
LOG.error("***ERROR*** incorrect sequence number; expected " + Integer.toString(lastSeq + 1) + " but have " + Integer.toString(seq));
testError = true;
}
lastSeq = seq;
if (msgCount > expectedCount) {
LOG.error("*** have more messages than expected; have " + msgCount + "; expect " + expectedCount);
testError = true;
}
}
if (in_msg.propertyExists("end-of-response")) {
LOG.trace("received end-of-response message");
}
}
}
/**
*
*/
protected class EchoService extends java.lang.Thread {
protected String destName;
protected Connection jmsConn;
protected Session sess;
protected MessageConsumer msg_cons;
protected boolean Shutdown_ind;
protected Destination req_dest;
protected CountDownLatch waitShutdown;
protected ThreadPoolExecutor processorPool;
public EchoService(String dest, Connection broker_conn) throws Exception {
destName = dest;
jmsConn = broker_conn;
Shutdown_ind = false;
sess = jmsConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
req_dest = sess.createQueue(destName);
msg_cons = sess.createConsumer(req_dest);
jmsConn.start();
waitShutdown = new CountDownLatch(1);
processorPool = new ThreadPoolExecutor(CONCURRENT_SERVER_COUNT, CONCURRENT_SERVER_COUNT, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(
10000));
}
public EchoService(String dest, String broker_url) throws Exception {
this(dest, ActiveMQConnection.makeConnection(broker_url));
}
@Override
public void run() {
Message req;
try {
LOG.info("STARTING ECHO SERVICE");
while (!Shutdown_ind) {
req = msg_cons.receive(100);
if (req != null) {
processorPool.execute(new EchoRequestProcessor(sess, req));
}
}
} catch (Exception ex) {
LOG.error("error processing echo service requests", ex);
} finally {
LOG.info("shutting down test echo service");
try {
jmsConn.stop();
} catch (javax.jms.JMSException jms_exc) {
LOG.warn("error on shutting down JMS connection", jms_exc);
}
synchronized (this) {
waitShutdown.countDown();
}
}
}
/**
* Shut down the service, waiting up to 3 seconds for the service to terminate.
*/
public void shutdown() {
CountDownLatch wait_l;
synchronized (this) {
wait_l = waitShutdown;
}
Shutdown_ind = true;
try {
if (wait_l != null) {
if (wait_l.await(3000, TimeUnit.MILLISECONDS))
LOG.info("echo service shutdown complete");
else
LOG.warn("timeout waiting for echo service shutdown");
} else {
LOG.info("echo service shutdown: service does not appear to be active");
}
} catch (InterruptedException int_exc) {
LOG.warn("interrupted while waiting for echo service shutdown");
}
}
}
/**
*
*/
protected class EchoRequestProcessor implements Runnable {
protected Session session;
protected Destination resp_dest;
protected MessageProducer msg_prod;
protected Message request;
public EchoRequestProcessor(Session sess, Message req) throws Exception {
this.session = sess;
this.request = req;
this.resp_dest = req.getJMSReplyTo();
if (resp_dest == null) {
throw new Exception("invalid request: no reply-to destination given");
}
this.msg_prod = session.createProducer(this.resp_dest);
}
@Override
public void run() {
try {
this.processRequest(this.request);
} catch (Exception ex) {
LOG.error("Failed to process request", ex);
}
}
/**
* Process one request for the Echo Service.
*/
protected void processRequest(Message req) throws Exception {
if (LOG.isDebugEnabled())
LOG.debug("ECHO request message " + req.toString());
resp_dest = req.getJMSReplyTo();
if (resp_dest != null) {
msg_prod = session.createProducer(resp_dest);
LOG.debug("SENDING ECHO RESPONSE to:" + resp_dest);
msg_prod.send(req);
LOG.debug((((ActiveMQSession) session).getConnection()).getBrokerName() + " SENT ECHO RESPONSE to " + resp_dest);
msg_prod.close();
msg_prod = null;
} else {
LOG.warn("invalid request: no reply-to destination given");
}
}
}
protected class TopicTrafficGenerator extends java.lang.Thread {
protected Connection conn1;
protected Connection conn2;
protected Session sess1;
protected Session sess2;
protected Destination dest;
protected MessageProducer prod;
protected MessageConsumer cons;
protected boolean Shutdown_ind;
protected int send_count;
public TopicTrafficGenerator(String url1, String url2) throws Exception {
conn1 = createConnection(url1);
conn2 = createConnection(url2);
sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
conn1.start();
conn2.start();
dest = sess1.createTopic("traffic");
prod = sess1.createProducer(dest);
dest = sess2.createTopic("traffic");
cons = sess2.createConsumer(dest);
}
public void shutdown() {
Shutdown_ind = true;
}
@Override
public void run() {
Message msg;
try {
LOG.info("Starting Topic Traffic Generator");
while (!Shutdown_ind) {
msg = sess1.createTextMessage("TRAFFIC");
prod.send(msg);
send_count++;
//
// Time out the receipt; early messages may not make it.
//
msg = cons.receive(250);
}
} catch (JMSException jms_exc) {
LOG.warn("traffic generator failed on jms exception", jms_exc);
} finally {
LOG.info("Shutdown of Topic Traffic Generator; send count = " + send_count);
if (conn1 != null) {
try {
conn1.stop();
} catch (JMSException jms_exc) {
LOG.warn("failed to shutdown connection", jms_exc);
}
}
if (conn2 != null) {
try {
conn2.stop();
} catch (JMSException jms_exc) {
LOG.warn("failed to shutdown connection", jms_exc);
}
}
}
}
}
}