blob: 27ed767c42c25b592438860e2f7825d9d6786ee8 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.failover;
import junit.framework.Test;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageConsumer;
import org.apache.activemq.AutoFailTestSupport;
import org.apache.activemq.TestSupport;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerPluginSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ConsumerBrokerExchange;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.util.DestinationPathSeparatorBroker;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.util.SocketProxy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.ServerSession;
import javax.jms.ServerSessionPool;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.TransactionRolledBackException;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.NoSuchElementException;
import java.util.Random;
import java.util.Stack;
import java.util.Vector;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
// see https://issues.apache.org/activemq/browse/AMQ-2473
// https://issues.apache.org/activemq/browse/AMQ-2590
public class FailoverTransactionTest extends TestSupport {
private static final Logger LOG = LoggerFactory.getLogger(FailoverTransactionTest.class);
private static final String QUEUE_NAME = "Failover.WithTx";
private static final String TRANSPORT_URI = "tcp://localhost:0";
private String url;
BrokerService broker;
final Random random = new Random();
public static Test suite() {
return suite(FailoverTransactionTest.class);
}
public void setUp() throws Exception {
super.setMaxTestTime(2 * 60 * 1000); // some boxes can be real slow
super.setAutoFail(true);
super.setUp();
}
public void tearDown() throws Exception {
super.tearDown();
stopBroker();
}
public void stopBroker() throws Exception {
if (broker != null) {
broker.stop();
}
}
private void startCleanBroker() throws Exception {
startBroker(true);
}
public void startBroker(boolean deleteAllMessagesOnStartup) throws Exception {
broker = createBroker(deleteAllMessagesOnStartup);
broker.start();
}
public void startBroker(boolean deleteAllMessagesOnStartup, String bindAddress) throws Exception {
broker = createBroker(deleteAllMessagesOnStartup, bindAddress);
broker.start();
}
public BrokerService createBroker(boolean deleteAllMessagesOnStartup) throws Exception {
return createBroker(deleteAllMessagesOnStartup, TRANSPORT_URI);
}
public BrokerService createBroker(boolean deleteAllMessagesOnStartup, String bindAddress) throws Exception {
broker = new BrokerService();
broker.setUseJmx(false);
broker.setAdvisorySupport(false);
broker.addConnector(bindAddress);
broker.setDeleteAllMessagesOnStartup(deleteAllMessagesOnStartup);
PolicyMap policyMap = new PolicyMap();
PolicyEntry defaultEntry = new PolicyEntry();
defaultEntry.setUsePrefetchExtension(false);
policyMap.setDefaultEntry(defaultEntry);
broker.setDestinationPolicy(policyMap);
url = broker.getTransportConnectors().get(0).getConnectUri().toString();
return broker;
}
public void configureConnectionFactory(ActiveMQConnectionFactory factory) {
// nothing to do
}
public void testFailoverProducerCloseBeforeTransaction() throws Exception {
startCleanBroker();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
configureConnectionFactory(cf);
Connection connection = cf.createConnection();
connection.start();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Queue destination = session.createQueue(QUEUE_NAME);
MessageConsumer consumer = session.createConsumer(destination);
produceMessage(session, destination);
// restart to force failover and connection state recovery before the commit
broker.stop();
startBroker(false, url);
session.commit();
assertNotNull("we got the message", consumer.receive(20000));
session.commit();
connection.close();
}
public void initCombosForTestFailoverCommitReplyLost() {
String osName = System.getProperty("os.name");
Object[] persistenceAdapters;
if (!osName.equalsIgnoreCase("AIX") && !osName.equalsIgnoreCase("SunOS")) {
persistenceAdapters = new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.LevelDB, PersistenceAdapterChoice.JDBC};
} else {
persistenceAdapters = new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC};
}
addCombinationValues("defaultPersistenceAdapter",persistenceAdapters);
}
@SuppressWarnings("unchecked")
public void testFailoverCommitReplyLost() throws Exception {
broker = createBroker(true);
setDefaultPersistenceAdapter(broker);
broker.setPlugins(new BrokerPlugin[]{
new BrokerPluginSupport() {
@Override
public void commitTransaction(ConnectionContext context,
TransactionId xid, boolean onePhase) throws Exception {
super.commitTransaction(context, xid, onePhase);
// so commit will hang as if reply is lost
context.setDontSendReponse(true);
Executors.newSingleThreadExecutor().execute(new Runnable() {
public void run() {
LOG.info("Stopping broker post commit...");
try {
broker.stop();
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
}
});
broker.start();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
configureConnectionFactory(cf);
Connection connection = cf.createConnection();
connection.start();
final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Queue destination = session.createQueue(QUEUE_NAME);
MessageConsumer consumer = session.createConsumer(destination);
produceMessage(session, destination);
final CountDownLatch commitDoneLatch = new CountDownLatch(1);
// broker will die on commit reply so this will hang till restart
Executors.newSingleThreadExecutor().execute(new Runnable() {
public void run() {
LOG.info("doing async commit...");
try {
session.commit();
} catch (JMSException e) {
assertTrue(e instanceof TransactionRolledBackException);
LOG.info("got commit exception: ", e);
}
commitDoneLatch.countDown();
LOG.info("done async commit");
}
});
// will be stopped by the plugin
broker.waitUntilStopped();
broker = createBroker(false, url);
setDefaultPersistenceAdapter(broker);
broker.start();
assertTrue("tx committed through failover", commitDoneLatch.await(30, TimeUnit.SECONDS));
// new transaction
Message msg = consumer.receive(20000);
LOG.info("Received: " + msg);
assertNotNull("we got the message", msg);
assertNull("we got just one message", consumer.receive(2000));
session.commit();
consumer.close();
connection.close();
// ensure no dangling messages with fresh broker etc
broker.stop();
broker.waitUntilStopped();
LOG.info("Checking for remaining/hung messages..");
broker = createBroker(false, url);
setDefaultPersistenceAdapter(broker);
broker.start();
// after restart, ensure no dangling messages
cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
configureConnectionFactory(cf);
connection = cf.createConnection();
connection.start();
Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
consumer = session2.createConsumer(destination);
msg = consumer.receive(1000);
LOG.info("Received: " + msg);
assertNull("no messges left dangling but got: " + msg, msg);
connection.close();
}
@SuppressWarnings("unchecked")
public void testFailoverCommitReplyLostWithDestinationPathSeparator() throws Exception {
broker = createBroker(true);
setDefaultPersistenceAdapter(broker);
broker.setPlugins(new BrokerPlugin[]{
new DestinationPathSeparatorBroker(),
new BrokerPluginSupport() {
@Override
public void commitTransaction(ConnectionContext context,
TransactionId xid, boolean onePhase) throws Exception {
super.commitTransaction(context, xid, onePhase);
// so commit will hang as if reply is lost
context.setDontSendReponse(true);
Executors.newSingleThreadExecutor().execute(new Runnable() {
public void run() {
LOG.info("Stopping broker post commit...");
try {
broker.stop();
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
}
});
broker.start();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
configureConnectionFactory(cf);
Connection connection = cf.createConnection();
connection.start();
final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Queue destination = session.createQueue(QUEUE_NAME.replace('.','/') + "?consumer.prefetchSize=0");
MessageConsumer consumer = session.createConsumer(destination);
produceMessage(session, destination);
final CountDownLatch commitDoneLatch = new CountDownLatch(1);
// broker will die on commit reply so this will hang till restart
Executors.newSingleThreadExecutor().execute(new Runnable() {
public void run() {
LOG.info("doing async commit...");
try {
session.commit();
} catch (JMSException e) {
assertTrue(e instanceof TransactionRolledBackException);
LOG.info("got commit exception: ", e);
}
commitDoneLatch.countDown();
LOG.info("done async commit");
}
});
// will be stopped by the plugin
broker.waitUntilStopped();
broker = createBroker(false, url);
setDefaultPersistenceAdapter(broker);
broker.setPlugins(new BrokerPlugin[]{new DestinationPathSeparatorBroker()});
broker.start();
assertTrue("tx committed trough failover", commitDoneLatch.await(30, TimeUnit.SECONDS));
// new transaction
Message msg = consumer.receive(20000);
LOG.info("Received: " + msg);
assertNotNull("we got the message", msg);
assertNull("we got just one message", consumer.receive(2000));
session.commit();
consumer.close();
connection.close();
// ensure no dangling messages with fresh broker etc
broker.stop();
broker.waitUntilStopped();
LOG.info("Checking for remaining/hung messages..");
broker = createBroker(false, url);
setDefaultPersistenceAdapter(broker);
broker.setPlugins(new BrokerPlugin[]{new DestinationPathSeparatorBroker()});
broker.start();
// after restart, ensure no dangling messages
cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
configureConnectionFactory(cf);
connection = cf.createConnection();
connection.start();
Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
consumer = session2.createConsumer(destination);
msg = consumer.receive(1000);
LOG.info("Received: " + msg);
assertNull("no messges left dangling but got: " + msg, msg);
connection.close();
ActiveMQDestination[] destinations = broker.getRegionBroker().getDestinations();
for (ActiveMQDestination dest : destinations) {
LOG.info("Destinations list: " + dest);
}
assertEquals("Only one destination", 1, broker.getRegionBroker().getDestinations().length);
}
public void initCombosForTestFailoverSendReplyLost() {
addCombinationValues("defaultPersistenceAdapter",
new Object[]{PersistenceAdapterChoice.KahaDB,
PersistenceAdapterChoice.JDBC
// not implemented for AMQ store or PersistenceAdapterChoice.LevelDB
});
}
@SuppressWarnings("unchecked")
public void testFailoverSendReplyLost() throws Exception {
broker = createBroker(true);
setDefaultPersistenceAdapter(broker);
broker.setPlugins(new BrokerPlugin[]{
new BrokerPluginSupport() {
@Override
public void send(ProducerBrokerExchange producerExchange,
org.apache.activemq.command.Message messageSend)
throws Exception {
// so send will hang as if reply is lost
super.send(producerExchange, messageSend);
producerExchange.getConnectionContext().setDontSendReponse(true);
Executors.newSingleThreadExecutor().execute(new Runnable() {
public void run() {
LOG.info("Stopping broker post send...");
try {
broker.stop();
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
}
});
broker.start();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")?jms.watchTopicAdvisories=false");
configureConnectionFactory(cf);
Connection connection = cf.createConnection();
connection.start();
final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Queue destination = session.createQueue(QUEUE_NAME);
MessageConsumer consumer = session.createConsumer(destination);
final CountDownLatch sendDoneLatch = new CountDownLatch(1);
// broker will die on send reply so this will hang till restart
Executors.newSingleThreadExecutor().execute(new Runnable() {
public void run() {
LOG.info("doing async send...");
try {
produceMessage(session, destination);
} catch (JMSException e) {
//assertTrue(e instanceof TransactionRolledBackException);
LOG.error("got send exception: ", e);
fail("got unexpected send exception" + e);
}
sendDoneLatch.countDown();
LOG.info("done async send");
}
});
// will be stopped by the plugin
broker.waitUntilStopped();
broker = createBroker(false, url);
setDefaultPersistenceAdapter(broker);
LOG.info("restarting....");
broker.start();
assertTrue("message sent through failover", sendDoneLatch.await(30, TimeUnit.SECONDS));
// new transaction
Message msg = consumer.receive(20000);
LOG.info("Received: " + msg);
assertNotNull("we got the message", msg);
assertNull("we got just one message", consumer.receive(2000));
consumer.close();
connection.close();
// verify stats
assertEquals("no newly queued messages", 0, ((RegionBroker) broker.getRegionBroker()).getDestinationStatistics().getEnqueues().getCount());
assertEquals("1 dequeue", 1, ((RegionBroker) broker.getRegionBroker()).getDestinationStatistics().getDequeues().getCount());
// ensure no dangling messages with fresh broker etc
broker.stop();
broker.waitUntilStopped();
LOG.info("Checking for remaining/hung messages with second restart..");
broker = createBroker(false, url);
setDefaultPersistenceAdapter(broker);
broker.start();
// after restart, ensure no dangling messages
cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
configureConnectionFactory(cf);
connection = cf.createConnection();
connection.start();
Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
consumer = session2.createConsumer(destination);
msg = consumer.receive(1000);
LOG.info("Received: " + msg);
assertNull("no messges left dangling but got: " + msg, msg);
connection.close();
}
public void initCombosForTestFailoverConnectionSendReplyLost() {
addCombinationValues("defaultPersistenceAdapter",
new Object[]{PersistenceAdapterChoice.KahaDB,
PersistenceAdapterChoice.JDBC
// last producer message id store feature not implemented for AMQ store
// or PersistenceAdapterChoice.LevelDB
});
}
@SuppressWarnings("unchecked")
public void testFailoverConnectionSendReplyLost() throws Exception {
broker = createBroker(true);
PersistenceAdapter store = setDefaultPersistenceAdapter(broker);
if (store instanceof KahaDBPersistenceAdapter) {
// duplicate checker not updated on canceled tasks, even it
// it was, recovery of the audit would fail as the message is
// not recorded in the store and the audit may not be up to date.
// So if duplicate messages are a absolute no no after restarts,
// ConcurrentStoreAndDispatchQueues must be disabled
((KahaDBPersistenceAdapter) store).setConcurrentStoreAndDispatchQueues(false);
}
final SocketProxy proxy = new SocketProxy();
broker.setPlugins(new BrokerPlugin[]{
new BrokerPluginSupport() {
private boolean firstSend = true;
@Override
public void send(ProducerBrokerExchange producerExchange,
org.apache.activemq.command.Message messageSend)
throws Exception {
// so send will hang as if reply is lost
super.send(producerExchange, messageSend);
if (firstSend) {
firstSend = false;
producerExchange.getConnectionContext().setDontSendReponse(true);
Executors.newSingleThreadExecutor().execute(new Runnable() {
public void run() {
LOG.info("Stopping connection post send...");
try {
proxy.close();
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
}
}
});
broker.start();
proxy.setTarget(new URI(url));
proxy.open();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + proxy.getUrl().toASCIIString() + ")?jms.watchTopicAdvisories=false");
configureConnectionFactory(cf);
Connection connection = cf.createConnection();
connection.start();
final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Queue destination = session.createQueue(QUEUE_NAME);
MessageConsumer consumer = session.createConsumer(destination);
final CountDownLatch sendDoneLatch = new CountDownLatch(1);
// proxy connection will die on send reply so this will hang on failover reconnect till open
Executors.newSingleThreadExecutor().execute(new Runnable() {
public void run() {
LOG.info("doing async send...");
try {
produceMessage(session, destination);
} catch (JMSException e) {
//assertTrue(e instanceof TransactionRolledBackException);
LOG.info("got send exception: ", e);
}
sendDoneLatch.countDown();
LOG.info("done async send");
}
});
// will be closed by the plugin
assertTrue("proxy was closed", proxy.waitUntilClosed(30));
LOG.info("restarting proxy");
proxy.open();
assertTrue("message sent through failover", sendDoneLatch.await(30, TimeUnit.SECONDS));
Message msg = consumer.receive(20000);
LOG.info("Received: " + msg);
assertNotNull("we got the message", msg);
assertNull("we got just one message", consumer.receive(2000));
consumer.close();
connection.close();
// verify stats, connection dup suppression means dups don't get to broker
assertEquals("one queued message", 1, ((RegionBroker) broker.getRegionBroker()).getDestinationStatistics().getEnqueues().getCount());
// ensure no dangling messages with fresh broker etc
broker.stop();
broker.waitUntilStopped();
LOG.info("Checking for remaining/hung messages with restart..");
broker = createBroker(false, url);
setDefaultPersistenceAdapter(broker);
broker.start();
// after restart, ensure no dangling messages
cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
configureConnectionFactory(cf);
connection = cf.createConnection();
connection.start();
Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
consumer = session2.createConsumer(destination);
msg = consumer.receive(1000);
LOG.info("Received: " + msg);
assertNull("no messges left dangling but got: " + msg, msg);
connection.close();
}
public void testFailoverProducerCloseBeforeTransactionFailWhenDisabled() throws Exception {
startCleanBroker();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")?trackTransactionProducers=false");
configureConnectionFactory(cf);
Connection connection = cf.createConnection();
connection.start();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Queue destination = session.createQueue(QUEUE_NAME);
MessageConsumer consumer = session.createConsumer(destination);
produceMessage(session, destination);
// restart to force failover and connection state recovery before the commit
broker.stop();
startBroker(false, url);
try {
session.commit();
fail("expect ex for rollback only on async exc");
} catch (JMSException expected) {
}
// without tracking producers, message will not be replayed on recovery
assertNull("we got the message", consumer.receive(5000));
session.commit();
connection.close();
}
public void testFailoverMultipleProducerCloseBeforeTransaction() throws Exception {
startCleanBroker();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
configureConnectionFactory(cf);
Connection connection = cf.createConnection();
connection.start();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Queue destination = session.createQueue(QUEUE_NAME);
MessageConsumer consumer = session.createConsumer(destination);
MessageProducer producer;
TextMessage message;
final int count = 10;
for (int i = 0; i < count; i++) {
producer = session.createProducer(destination);
message = session.createTextMessage("Test message: " + count);
producer.send(message);
producer.close();
}
// restart to force failover and connection state recovery before the commit
broker.stop();
startBroker(false, url);
session.commit();
for (int i = 0; i < count; i++) {
assertNotNull("we got all the message: " + count, consumer.receive(20000));
}
session.commit();
connection.close();
}
// https://issues.apache.org/activemq/browse/AMQ-2772
public void testFailoverWithConnectionConsumer() throws Exception {
startCleanBroker();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
configureConnectionFactory(cf);
Connection connection = cf.createConnection();
connection.start();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Queue destination = session.createQueue("testFailoverWithConnectionConsumer");
final CountDownLatch connectionConsumerGotOne = new CountDownLatch(1);
final Session poolSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.createConnectionConsumer(destination, null, new ServerSessionPool() {
public ServerSession getServerSession() throws JMSException {
return new ServerSession() {
public Session getSession() throws JMSException {
return poolSession;
}
public void start() throws JMSException {
connectionConsumerGotOne.countDown();
poolSession.run();
}
};
}
}, 1);
MessageConsumer consumer = session.createConsumer(destination);
MessageProducer producer;
TextMessage message;
final int count = 10;
for (int i = 0; i < count; i++) {
producer = session.createProducer(destination);
message = session.createTextMessage("Test message: " + count);
producer.send(message);
producer.close();
}
// restart to force failover and connection state recovery before the commit
broker.stop();
startBroker(false, url);
session.commit();
for (int i = 0; i < count - 1; i++) {
assertNotNull("Failed to get message: " + count, consumer.receive(20000));
}
session.commit();
connection.close();
assertTrue("connectionconsumer did not get a message", connectionConsumerGotOne.await(10, TimeUnit.SECONDS));
}
public void testFailoverConsumerAckLost() throws Exception {
// as failure depends on hash order of state tracker recovery, do a few times
for (int i = 0; i < 3; i++) {
try {
LOG.info("Iteration: " + i);
doTestFailoverConsumerAckLost(i);
} finally {
stopBroker();
}
}
}
@SuppressWarnings("unchecked")
public void doTestFailoverConsumerAckLost(final int pauseSeconds) throws Exception {
broker = createBroker(true);
setDefaultPersistenceAdapter(broker);
broker.setPlugins(new BrokerPlugin[]{
new BrokerPluginSupport() {
// broker is killed on delivered ack as prefetch is 1
@Override
public void acknowledge(
ConsumerBrokerExchange consumerExchange,
final MessageAck ack) throws Exception {
consumerExchange.getConnectionContext().setDontSendReponse(true);
Executors.newSingleThreadExecutor().execute(new Runnable() {
public void run() {
LOG.info("Stopping broker on ack: " + ack);
try {
broker.stop();
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
}
});
broker.start();
Vector<Connection> connections = new Vector<Connection>();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
configureConnectionFactory(cf);
Connection connection = cf.createConnection();
connection.start();
connections.add(connection);
final Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Queue destination = producerSession.createQueue(QUEUE_NAME + "?consumer.prefetchSize=1");
connection = cf.createConnection();
connection.start();
connections.add(connection);
final Session consumerSession1 = connection.createSession(true, Session.SESSION_TRANSACTED);
connection = cf.createConnection();
connection.start();
connections.add(connection);
final Session consumerSession2 = connection.createSession(true, Session.SESSION_TRANSACTED);
final MessageConsumer consumer1 = consumerSession1.createConsumer(destination);
final MessageConsumer consumer2 = consumerSession2.createConsumer(destination);
produceMessage(producerSession, destination);
produceMessage(producerSession, destination);
final Vector<Message> receivedMessages = new Vector<Message>();
final CountDownLatch commitDoneLatch = new CountDownLatch(1);
final AtomicBoolean gotTransactionRolledBackException = new AtomicBoolean(false);
Executors.newSingleThreadExecutor().execute(new Runnable() {
public void run() {
LOG.info("doing async commit after consume...");
try {
Message msg = consumer1.receive(20000);
LOG.info("consumer1 first attempt got message: " + msg);
receivedMessages.add(msg);
// give some variance to the runs
TimeUnit.SECONDS.sleep(random.nextInt(5));
// should not get a second message as there are two messages and two consumers
// and prefetch=1, but with failover and unordered connection restore it can get the second
// message.
// For the transaction to complete it needs to get the same one or two messages
// again so that the acks line up.
// If redelivery order is different, the commit should fail with an ex
//
msg = consumer1.receive(5000);
LOG.info("consumer1 second attempt got message: " + msg);
if (msg != null) {
receivedMessages.add(msg);
}
LOG.info("committing consumer1 session: " + receivedMessages.size() + " messsage(s)");
try {
consumerSession1.commit();
} catch (TransactionRolledBackException expected) {
LOG.info("got exception ex on commit", expected);
gotTransactionRolledBackException.set(true);
// ok, message one was not replayed so we expect the rollback
}
commitDoneLatch.countDown();
LOG.info("done async commit");
} catch (Exception e) {
e.printStackTrace();
}
}
});
// will be stopped by the plugin
broker.waitUntilStopped();
broker = createBroker(false, url);
setDefaultPersistenceAdapter(broker);
broker.start();
assertTrue("tx committed through failover", commitDoneLatch.await(30, TimeUnit.SECONDS));
LOG.info("received message count: " + receivedMessages.size());
// new transaction to get both messages from either consumer
for (int i=0; i<2; i++) {
Message msg = consumer1.receive(5000);
LOG.info("post: from consumer1 received: " + msg);
consumerSession1.commit();
if (msg == null) {
msg = consumer2.receive(10000);
LOG.info("post: from consumer2 received: " + msg);
consumerSession2.commit();
}
assertNotNull("got message [" + i + "]", msg);
}
for (Connection c : connections) {
c.close();
}
// ensure no dangling messages with fresh broker etc
broker.stop();
broker.waitUntilStopped();
LOG.info("Checking for remaining/hung messages..");
broker = createBroker(false, url);
setDefaultPersistenceAdapter(broker);
broker.start();
// after restart, ensure no dangling messages
cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
configureConnectionFactory(cf);
connection = cf.createConnection();
connection.start();
Session sweeperSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer sweeper = sweeperSession.createConsumer(destination);
Message msg = sweeper.receive(1000);
LOG.info("Sweep received: " + msg);
assertNull("no messges left dangling but got: " + msg, msg);
connection.close();
}
public void testPoolingNConsumesAfterReconnect() throws Exception {
broker = createBroker(true);
setDefaultPersistenceAdapter(broker);
broker.setPlugins(new BrokerPlugin[]{
new BrokerPluginSupport() {
int count = 0;
@Override
public void removeConsumer(ConnectionContext context, final ConsumerInfo info) throws Exception {
if (count++ == 1) {
Executors.newSingleThreadExecutor().execute(new Runnable() {
public void run() {
LOG.info("Stopping broker on removeConsumer: " + info);
try {
broker.stop();
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
}
}
});
broker.start();
Vector<Connection> connections = new Vector<Connection>();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
configureConnectionFactory(cf);
Connection connection = cf.createConnection();
connection.start();
connections.add(connection);
Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Queue destination = producerSession.createQueue(QUEUE_NAME + "?consumer.prefetchSize=1");
produceMessage(producerSession, destination);
connection.close();
connection = cf.createConnection();
connection.start();
connections.add(connection);
final Session consumerSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
final int sessionCount = 10;
final Stack<Session> sessions = new Stack<Session>();
for (int i = 0; i < sessionCount; i++) {
sessions.push(connection.createSession(false, Session.AUTO_ACKNOWLEDGE));
}
final int consumerCount = 1000;
final Deque<MessageConsumer> consumers = new ArrayDeque<MessageConsumer>();
for (int i = 0; i < consumerCount; i++) {
consumers.push(consumerSession.createConsumer(destination));
}
final ExecutorService executorService = Executors.newCachedThreadPool();
final FailoverTransport failoverTransport = ((ActiveMQConnection) connection).getTransport().narrow(FailoverTransport.class);
final TransportListener delegate = failoverTransport.getTransportListener();
failoverTransport.setTransportListener(new TransportListener() {
@Override
public void onCommand(Object command) {
delegate.onCommand(command);
}
@Override
public void onException(IOException error) {
delegate.onException(error);
}
@Override
public void transportInterupted() {
LOG.error("Transport interrupted: " + failoverTransport, new RuntimeException("HERE"));
for (int i = 0; i < consumerCount && !consumers.isEmpty(); i++) {
executorService.execute(new Runnable() {
public void run() {
MessageConsumer localConsumer = null;
try {
synchronized (delegate) {
localConsumer = consumers.pop();
}
localConsumer.receive(1);
LOG.info("calling close() " + ((ActiveMQMessageConsumer) localConsumer).getConsumerId());
localConsumer.close();
} catch (NoSuchElementException nse) {
} catch (Exception ignored) {
LOG.error("Ex on: " + ((ActiveMQMessageConsumer) localConsumer).getConsumerId(), ignored);
}
}
});
}
delegate.transportInterupted();
}
@Override
public void transportResumed() {
delegate.transportResumed();
}
});
MessageConsumer consumer = null;
synchronized (delegate) {
consumer = consumers.pop();
}
LOG.info("calling close to trigger broker stop " + ((ActiveMQMessageConsumer) consumer).getConsumerId());
consumer.close();
// will be stopped by the plugin
broker.waitUntilStopped();
broker = createBroker(false, url);
setDefaultPersistenceAdapter(broker);
broker.start();
consumer = consumerSession.createConsumer(destination);
LOG.info("finally consuming message: " + ((ActiveMQMessageConsumer) consumer).getConsumerId());
Message msg = null;
for (int i = 0; i < 4 && msg == null; i++) {
msg = consumer.receive(1000);
}
LOG.info("post: from consumer1 received: " + msg);
assertNotNull("got message after failover", msg);
msg.acknowledge();
for (Connection c : connections) {
c.close();
}
}
public void testAutoRollbackWithMissingRedeliveries() throws Exception {
broker = createBroker(true);
broker.start();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
configureConnectionFactory(cf);
Connection connection = cf.createConnection();
connection.start();
final Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Queue destination = producerSession.createQueue(QUEUE_NAME + "?consumer.prefetchSize=1");
final Session consumerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer consumer = consumerSession.createConsumer(destination);
produceMessage(producerSession, destination);
Message msg = consumer.receive(20000);
assertNotNull(msg);
broker.stop();
broker = createBroker(false, url);
// use empty jdbc store so that default wait(0) for redeliveries will timeout after failover
setPersistenceAdapter(broker, PersistenceAdapterChoice.JDBC);
broker.start();
try {
consumerSession.commit();
fail("expected transaciton rolledback ex");
} catch (TransactionRolledBackException expected) {
}
broker.stop();
broker = createBroker(false, url);
broker.start();
assertNotNull("should get rolledback message from original restarted broker", consumer.receive(20000));
connection.close();
}
public void testWaitForMissingRedeliveries() throws Exception {
LOG.info("testWaitForMissingRedeliveries()");
broker = createBroker(true);
broker.start();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")?jms.consumerFailoverRedeliveryWaitPeriod=30000");
configureConnectionFactory(cf);
Connection connection = cf.createConnection();
connection.start();
final Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Queue destination = producerSession.createQueue(QUEUE_NAME);
final Session consumerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer consumer = consumerSession.createConsumer(destination);
produceMessage(producerSession, destination);
Message msg = consumer.receive(20000);
if (msg == null) {
AutoFailTestSupport.dumpAllThreads("missing-");
}
assertNotNull("got message just produced", msg);
broker.stop();
broker = createBroker(false, url);
// use empty jdbc store so that wait for re-deliveries occur when failover resumes
setPersistenceAdapter(broker, PersistenceAdapterChoice.JDBC);
broker.start();
final CountDownLatch commitDone = new CountDownLatch(1);
final CountDownLatch gotException = new CountDownLatch(1);
// will block pending re-deliveries
Executors.newSingleThreadExecutor().execute(new Runnable() {
public void run() {
LOG.info("doing async commit...");
try {
consumerSession.commit();
} catch (JMSException ignored) {
ignored.printStackTrace();
gotException.countDown();
} finally {
commitDone.countDown();
}
}
});
broker.stop();
broker = createBroker(false, url);
broker.start();
assertTrue("commit was successful", commitDone.await(30, TimeUnit.SECONDS));
assertTrue("got exception on commit", gotException.await(30, TimeUnit.SECONDS));
assertNotNull("should get failed committed message", consumer.receive(5000));
connection.close();
}
public void testReDeliveryWhilePending() throws Exception {
LOG.info("testReDeliveryWhilePending()");
broker = createBroker(true);
broker.start();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")?jms.consumerFailoverRedeliveryWaitPeriod=10000");
configureConnectionFactory(cf);
Connection connection = cf.createConnection();
connection.start();
final Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Queue destination = producerSession.createQueue(QUEUE_NAME + "?consumer.prefetchSize=0");
final Session consumerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
final Session secondConsumerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer consumer = consumerSession.createConsumer(destination);
produceMessage(producerSession, destination);
Message msg = consumer.receive(20000);
if (msg == null) {
AutoFailTestSupport.dumpAllThreads("missing-");
}
assertNotNull("got message just produced", msg);
// add another consumer into the mix that may get the message after restart
MessageConsumer consumer2 = secondConsumerSession.createConsumer(consumerSession.createQueue(QUEUE_NAME + "?consumer.prefetchSize=1"));
broker.stop();
broker = createBroker(false, url);
broker.start();
final CountDownLatch commitDone = new CountDownLatch(1);
final CountDownLatch gotRollback = new CountDownLatch(1);
final Vector<Exception> exceptions = new Vector<Exception>();
// commit will fail due to failover with outstanding ack
Executors.newSingleThreadExecutor().execute(new Runnable() {
public void run() {
LOG.info("doing async commit...");
try {
consumerSession.commit();
} catch (TransactionRolledBackException ex) {
gotRollback.countDown();
} catch (JMSException ex) {
exceptions.add(ex);
} finally {
commitDone.countDown();
}
}
});
assertTrue("commit completed ", commitDone.await(15, TimeUnit.SECONDS));
assertTrue("got Rollback", gotRollback.await(15, TimeUnit.SECONDS));
assertTrue("no other exceptions", exceptions.isEmpty());
// consumer replay is hashmap order dependent on a failover connection state recover so need to deal with both cases
// consume message from one of the consumers
Message message = consumer2.receive(2000);
if (message == null) {
message = consumer.receive(2000);
}
consumerSession.commit();
secondConsumerSession.commit();
assertNotNull("got message after rollback", message);
// no message should be in dlq
MessageConsumer dlqConsumer = consumerSession.createConsumer(consumerSession.createQueue("ActiveMQ.DLQ"));
assertNull("nothing in the dlq", dlqConsumer.receive(2000));
connection.close();
}
private void produceMessage(final Session producerSession, Queue destination)
throws JMSException {
MessageProducer producer = producerSession.createProducer(destination);
TextMessage message = producerSession.createTextMessage("Test message");
producer.send(message);
producer.close();
}
}