blob: 8d3117b73f13b1052bfe41d4d64571e2d53d18fa [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR ONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.usecases;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.File;
import java.io.IOException;
import java.util.Vector;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.transport.InactivityIOException;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DurableSubProcessMultiRestartTest {
private static final Logger LOG = LoggerFactory.getLogger(DurableSubProcessMultiRestartTest.class);
public static final long RUNTIME = 1 * 60 * 1000;
private BrokerService broker;
private ActiveMQTopic topic;
private final ReentrantReadWriteLock processLock = new ReentrantReadWriteLock(true);
private int restartCount = 0;
private final int SUBSCRIPTION_ID = 1;
static final Vector<Throwable> exceptions = new Vector<Throwable>();
/**
* The test creates a durable subscriber and producer with a broker that is
* continually restarted.
*
* Producer creates a message every .5 seconds -creates a new connection for
* each message
*
* durable subscriber - comes online for 10 seconds, - then goes offline for
* a "moment" - repeats the cycle
*
* approx every 10 seconds the broker restarts. Subscriber and Producer
* connections will be closed BEFORE the restart.
*
* The Durable subscriber is "unsubscribed" before the the end of the test.
*
* checks for number of kahaDB files left on filesystem.
*
* @throws Exception
*/
@Test
public void testProcess() throws Exception {
DurableSubscriber durableSubscriber = new DurableSubscriber(SUBSCRIPTION_ID);
MsgProducer msgProducer = new MsgProducer();
try {
// register the subscription & start messages
durableSubscriber.start();
msgProducer.start();
long endTime = System.currentTimeMillis() + RUNTIME;
while (endTime > System.currentTimeMillis()) {
Thread.sleep(10000);
restartBroker();
}
} catch (Throwable e) {
exit("ProcessTest.testProcess failed.", e);
}
// wait for threads to finish
try {
msgProducer.join();
durableSubscriber.join();
} catch (InterruptedException e) {
e.printStackTrace(System.out);
}
// restart broker one last time
restartBroker();
assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
final KahaDBPersistenceAdapter pa = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
assertTrue("only less than two journal files should be left: " + pa.getStore().getJournal().getFileMap().size(),
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return pa.getStore().getJournal().getFileMap().size() <= 2;
}
}, TimeUnit.MINUTES.toMillis(3))
);
LOG.info("DONE.");
}
private void restartBroker() throws Exception {
LOG.info("Broker restart: waiting for components.");
processLock.writeLock().lock();
try {
destroyBroker();
startBroker(false);
restartCount++;
LOG.info("Broker restarted. count: " + restartCount);
} finally {
processLock.writeLock().unlock();
}
}
/**
* Producers messages
*
*/
final class MsgProducer extends Thread {
String url = "vm://" + DurableSubProcessMultiRestartTest.getName();
final ConnectionFactory cf = new ActiveMQConnectionFactory(url);
private long msgCount;
int messageRover = 0;
public MsgProducer() {
super("MsgProducer");
setDaemon(true);
}
@Override
public void run() {
long endTime = RUNTIME + System.currentTimeMillis();
try {
while (endTime > System.currentTimeMillis()) {
Thread.sleep(500);
processLock.readLock().lock();
try {
send();
} finally {
processLock.readLock().unlock();
}
LOG.info("MsgProducer msgCount=" + msgCount);
}
} catch (Throwable e) {
exit("Server.run failed", e);
}
}
public void send() throws JMSException {
LOG.info("Sending ... ");
Connection con = cf.createConnection();
Session sess = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer prod = sess.createProducer(null);
Message message = sess.createMessage();
message.setIntProperty("ID", ++messageRover);
message.setBooleanProperty("COMMIT", true);
prod.send(topic, message);
msgCount++;
LOG.info("Message Sent.");
sess.close();
con.close();
}
}
/**
* Consumes massages from a durable subscription. Goes online/offline
* periodically.
*/
private final class DurableSubscriber extends Thread {
String url = "tcp://localhost:61656";
final ConnectionFactory cf = new ActiveMQConnectionFactory(url);
public static final String SUBSCRIPTION_NAME = "subscription";
private final int id;
private final String conClientId;
private long msgCount;
public DurableSubscriber(int id) throws JMSException {
super("DurableSubscriber" + id);
setDaemon(true);
this.id = id;
conClientId = "cli" + id;
subscribe();
}
@Override
public void run() {
long end = System.currentTimeMillis() + RUNTIME;
try {
// while (true) {
while (end > System.currentTimeMillis()) {
processLock.readLock().lock();
try {
process(5000);
} finally {
processLock.readLock().unlock();
}
}
unsubscribe();
} catch (JMSException maybe) {
if (maybe.getCause() instanceof IOException) {
// ok on broker shutdown;
} else {
exit(toString() + " failed with JMSException", maybe);
}
} catch (Throwable e) {
exit(toString() + " failed.", e);
}
LOG.info(toString() + " DONE. MsgCout=" + msgCount);
}
private void process(long duration) throws JMSException {
LOG.info(toString() + " ONLINE.");
Connection con = openConnection();
Session sess = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = sess.createDurableSubscriber(topic, SUBSCRIPTION_NAME);
long end = System.currentTimeMillis() + duration;
try {
while (end > System.currentTimeMillis()) {
Message message = consumer.receive(100);
if (message != null) {
LOG.info(toString() + "received message...");
msgCount++;
}
}
} finally {
sess.close();
con.close();
LOG.info(toString() + " OFFLINE.");
}
}
private Connection openConnection() throws JMSException {
Connection con = cf.createConnection();
con.setClientID(conClientId);
con.start();
return con;
}
private void subscribe() throws JMSException {
Connection con = openConnection();
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.createDurableSubscriber(topic, SUBSCRIPTION_NAME);
LOG.info(toString() + " SUBSCRIBED");
session.close();
con.close();
}
private void unsubscribe() throws JMSException {
Connection con = openConnection();
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.unsubscribe(SUBSCRIPTION_NAME);
LOG.info(toString() + " UNSUBSCRIBED");
session.close();
con.close();
}
@Override
public String toString() {
return "DurableSubscriber[id=" + id + "]";
}
}
// -------- helper methods -----------
public static void exit(String message) {
exit(message, null);
}
public static void exit(String message, Throwable e) {
Throwable cause = new RuntimeException(message, e);
LOG.error(message, cause);
exceptions.add(cause);
fail(cause.toString());
}
@Before
public void setUp() throws Exception {
topic = new ActiveMQTopic("TopicT");
startBroker();
}
@After
public void tearDown() throws Exception {
destroyBroker();
}
private void startBroker() throws Exception {
startBroker(true);
}
private void startBroker(boolean deleteAllMessages) throws Exception {
if (broker != null)
return;
broker = BrokerFactory.createBroker("broker:(vm://" + getName() + ")");
broker.setBrokerName(getName());
broker.setAdvisorySupport(false);
broker.setDeleteAllMessagesOnStartup(deleteAllMessages);
broker.setKeepDurableSubsActive(true);
File kahadbData = new File("activemq-data/" + getName() + "-kahadb");
if (deleteAllMessages)
delete(kahadbData);
broker.setPersistent(true);
KahaDBPersistenceAdapter kahadb = new KahaDBPersistenceAdapter();
kahadb.setDirectory(kahadbData);
kahadb.setJournalMaxFileLength(20 * 1024);
broker.setPersistenceAdapter(kahadb);
broker.addConnector("tcp://localhost:61656");
broker.getSystemUsage().getMemoryUsage().setLimit(256 * 1024 * 1024);
broker.getSystemUsage().getTempUsage().setLimit(256 * 1024 * 1024);
broker.getSystemUsage().getStoreUsage().setLimit(256 * 1024 * 1024);
broker.start();
}
protected static String getName() {
return "DurableSubProcessMultiRestartTest";
}
private static boolean delete(File path) {
if (path == null)
return true;
if (path.isDirectory()) {
for (File file : path.listFiles()) {
delete(file);
}
}
return path.delete();
}
private void destroyBroker() throws Exception {
if (broker == null)
return;
broker.stop();
broker = null;
}
}