blob: 74f6664decfc9bc6f9072b57d70531450672b671 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.usecases;
import java.io.File;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.io.Writer;
import java.lang.management.ManagementFactory;
import java.util.Date;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import junit.framework.Test;
import junit.framework.TestCase;
import junit.framework.TestSuite;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.store.kahadb.KahaDBStore;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DurableSubscriberNonPersistentMessageTest extends TestCase {
private final Logger LOG = LoggerFactory.getLogger(DurableSubscriberNonPersistentMessageTest.class);
private String brokerURL;
private String consumerBrokerURL;
int initialMaxMsgs = 10;
int cleanupMsgCount = 10;
int totalMsgCount = initialMaxMsgs + cleanupMsgCount;
int totalMsgReceived = 0;
int sleep = 500;
int reconnectSleep = 2000;
int messageTimeout = 1000;
int messageSize = 1024;
// Note: If ttl is set 0, the default set by the broker will be used if any
// setting a value greater than 0 will enable the producer to set the ttl on
// the message
long ttl = 0;
static String clientId = "Jason";
MBeanServer mbeanServer;
BrokerService broker;
@Override
protected void setUp() throws Exception {
super.setUp();
broker = new BrokerService();
TransportConnector transportConnector = broker.addConnector("tcp://localhost:0");
KahaDBStore store = new KahaDBStore();
store.setDirectory(new File("data"));
broker.setPersistenceAdapter(store);
broker.start();
brokerURL = "failover:(" + transportConnector.getPublishableConnectString() + ")";
consumerBrokerURL = brokerURL + "?jms.prefetchPolicy.all=100";
mbeanServer = ManagementFactory.getPlatformMBeanServer();
}
@Override
protected void tearDown() throws Exception {
broker.stop();
super.tearDown();
}
/**
* Create the test case
*
* @param testName
* name of the test case
*/
public DurableSubscriberNonPersistentMessageTest(String testName) {
super(testName);
}
/**
* @return the suite of tests being tested
*/
public static Test suite() {
return new TestSuite(DurableSubscriberNonPersistentMessageTest.class);
}
public void testDurableSubscriberNonPersistentMessage() {
String interest = "TEST";
LOG.info("Starting DurableSubscriberNonPersistentMessageTest");
try {
// create durable topic consumer and disconnect
createConsumer(interest, 0);
Thread.sleep(1000);
// produce 15 messages to topic
Producer producer = new Producer(brokerURL, interest, messageSize, ttl);
producer.sendMessages(totalMsgCount);
producer.close();
LOG.info(totalMsgCount + " messages sent");
// durable topic consumer will consume 10 messages and disconnect
createConsumer(interest, initialMaxMsgs);
Thread.sleep(reconnectSleep);
createConsumer(interest, cleanupMsgCount);
String brokerVersion = (String) mbeanServer.getAttribute(new ObjectName("org.apache.activemq:brokerName=localhost,type=Broker"), "BrokerVersion");
LOG.info("Test run on: " + brokerVersion);
final String theJmxObject = "org.apache.activemq:type=Broker,brokerName=localhost," +
"endpoint=Consumer,destinationType=Topic,destinationName=TEST,clientId=Jason," +
"consumerId=Durable(Jason_MyDurableTopic)";
assertTrue("pendingQueueSize should be zero", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
Integer pendingQueueSize = (Integer) mbeanServer.getAttribute(new ObjectName(theJmxObject), "PendingQueueSize");
LOG.info("pendingQueueSize = " + pendingQueueSize);
return pendingQueueSize.intValue() == 0;
}
}));
assertTrue("cursorMemoryUsage should be zero", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
Long cursorMemoryUsage = (Long) mbeanServer.getAttribute(new ObjectName(theJmxObject), "CursorMemoryUsage");
LOG.info("cursorMemoryUsage = " + cursorMemoryUsage);
return cursorMemoryUsage.longValue() == 0L;
}
}));
// Not sure what the behavior should be here, if the messages
// expired the received count shouldn't equal total message count
assertTrue(totalMsgReceived == initialMaxMsgs + cleanupMsgCount);
} catch (Exception e) {
LOG.error("Exception Executing DurableSubscriberNonPersistentMessageTest: " + getStackTrace(e));
fail("Should not throw any exceptions");
}
}
// create durable topic consumer and max number of messages
public void createConsumer(String interest, int maxMsgs) {
int messageReceived = 0;
int messagesNotReceived = 0;
LOG.info("Starting DurableSubscriber");
Consumer consumer = null;
try {
consumer = new Consumer(consumerBrokerURL, interest, clientId);
for (int i = 0; i < maxMsgs; i++) {
try {
Message msg = consumer.getMessage(messageTimeout);
if (msg != null) {
LOG.debug("Received Message: " + msg.toString());
messageReceived++;
totalMsgReceived++;
} else {
LOG.debug("message " + i + " not received");
messagesNotReceived++;
}
Thread.sleep(sleep);
} catch (InterruptedException ie) {
LOG.debug("Exception: " + ie);
}
}
consumer.close();
LOG.info("Consumer Finished");
LOG.info("Received " + messageReceived);
LOG.info("Not Received " + messagesNotReceived);
} catch (JMSException e) {
LOG.error("Exception Executing SimpleConsumer: " + getStackTrace(e));
}
}
public String getStackTrace(Throwable aThrowable) {
final Writer result = new StringWriter();
final PrintWriter printWriter = new PrintWriter(result);
aThrowable.printStackTrace(printWriter);
return result.toString();
}
public class Producer {
protected ConnectionFactory factory;
protected transient Connection connection;
protected transient Session session;
protected transient MessageProducer producer;
protected static final int messageSize = 1024;
public Producer(String brokerURL, String interest, int messageSize, long ttl) throws JMSException {
factory = new ActiveMQConnectionFactory(brokerURL);
connection = factory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
producer = session.createProducer(session.createTopic(interest));
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
if (ttl > 0) {
producer.setTimeToLive(ttl);
}
}
public void close() throws JMSException {
if (connection != null) {
connection.close();
}
}
protected void sendMessage() throws JMSException {
TextMessage textMessage = session.createTextMessage("test message");
producer.send(textMessage);
}
protected void sendMessages(int count) throws JMSException {
for (int i = 0; i < count; i++) {
TextMessage textMessage = session.createTextMessage(createMessageText(i));
producer.send(textMessage);
}
}
private String createMessageText(int index) {
StringBuffer buffer = new StringBuffer(messageSize);
buffer.append("Message: " + index + " sent at: " + new Date());
if (buffer.length() > messageSize) {
return buffer.substring(0, messageSize);
}
for (int i = buffer.length(); i < messageSize; i++) {
buffer.append(' ');
}
return buffer.toString();
}
protected void commitTransaction() throws JMSException {
session.commit();
}
}
public class Consumer {
private final ConnectionFactory factory;
private final ActiveMQConnection connection;
private final Session session;
private final MessageConsumer messageConsumer;
public Consumer(String brokerURL, String interest, String clientId) throws JMSException {
factory = new ActiveMQConnectionFactory(brokerURL);
connection = (ActiveMQConnection) factory.createConnection();
connection.setClientID(clientId);
connection.start();
connection.getPrefetchPolicy().setAll(15);
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createTopic(interest);
messageConsumer = session.createDurableSubscriber((Topic) destination, "MyDurableTopic");
}
public void deleteAllMessages() throws JMSException {
while (getMessage(500) != null) {
// empty queue
}
}
public Message getMessage(int timeout) throws JMSException {
return messageConsumer.receive(timeout);
}
public void close() throws JMSException {
if (messageConsumer != null) {
messageConsumer.close();
}
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
}
public Session getSession() {
return session;
}
}
}