blob: 1f460053285be834acd2661b8d01b453bb7be2f6 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.perf;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import junit.framework.Assert;
import junit.framework.AssertionFailedError;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
*/
public class InactiveDurableTopicTest extends TestCase {
private static final transient Logger LOG = LoggerFactory.getLogger(InactiveDurableTopicTest.class);
private static final int MESSAGE_COUNT = 2000;
private static final String DEFAULT_PASSWORD = "";
private static final String USERNAME = "testuser";
private static final String CLIENTID = "mytestclient";
private static final String TOPIC_NAME = "testevent";
private static final String SUBID = "subscription1";
private static final int DELIVERY_MODE = javax.jms.DeliveryMode.PERSISTENT;
private static final int DELIVERY_PRIORITY = javax.jms.Message.DEFAULT_PRIORITY;
private Connection connection;
private MessageProducer publisher;
private TopicSubscriber subscriber;
private Topic topic;
private Session session;
private ActiveMQConnectionFactory connectionFactory;
private BrokerService broker;
@Override
protected void setUp() throws Exception {
super.setUp();
broker = new BrokerService();
//broker.setPersistenceAdapter(new KahaPersistenceAdapter());
/*
* JournalPersistenceAdapterFactory factory = new
* JournalPersistenceAdapterFactory();
* factory.setDataDirectoryFile(broker.getDataDirectory());
* factory.setTaskRunnerFactory(broker.getTaskRunnerFactory());
* factory.setUseJournal(false); broker.setPersistenceFactory(factory);
*/
broker.addConnector(ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL);
broker.start();
connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_BROKER_URL);
/*
* Doesn't matter if you enable or disable these, so just leaving them
* out for this test case connectionFactory.setAlwaysSessionAsync(true);
* connectionFactory.setAsyncDispatch(true);
*/
connectionFactory.setUseAsyncSend(true);
}
@Override
protected void tearDown() throws Exception {
super.tearDown();
broker.stop();
}
public void test1CreateSubscription() throws Exception {
try {
/*
* Step 1 - Establish a connection with a client id and create a
* durable subscription
*/
connection = connectionFactory.createConnection(USERNAME, DEFAULT_PASSWORD);
assertNotNull(connection);
connection.setClientID(CLIENTID);
connection.start();
session = connection.createSession(false, javax.jms.Session.CLIENT_ACKNOWLEDGE);
assertNotNull(session);
topic = session.createTopic(TOPIC_NAME);
assertNotNull(topic);
subscriber = session.createDurableSubscriber(topic, SUBID, "", false);
assertNotNull(subscriber);
subscriber.close();
session.close();
connection.close();
} catch (JMSException ex) {
try {
connection.close();
} catch (Exception ignore) {
}
throw new AssertionFailedError("Create Subscription caught: " + ex);
}
}
public void test2ProducerTestCase() {
/*
* Step 2 - Establish a connection without a client id and create a
* producer and start pumping messages. We will get hung
*/
try {
connection = connectionFactory.createConnection(USERNAME, DEFAULT_PASSWORD);
assertNotNull(connection);
session = connection.createSession(false, javax.jms.Session.CLIENT_ACKNOWLEDGE);
assertNotNull(session);
topic = session.createTopic(TOPIC_NAME);
assertNotNull(topic);
publisher = session.createProducer(topic);
assertNotNull(publisher);
MapMessage msg = session.createMapMessage();
assertNotNull(msg);
msg.setString("key1", "value1");
int loop;
for (loop = 0; loop < MESSAGE_COUNT; loop++) {
msg.setInt("key2", loop);
publisher.send(msg, DELIVERY_MODE, DELIVERY_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
if (loop % 5000 == 0) {
LOG.info("Sent " + loop + " messages");
}
}
Assert.assertEquals(loop, MESSAGE_COUNT);
publisher.close();
session.close();
connection.stop();
connection.stop();
} catch (JMSException ex) {
try {
connection.close();
} catch (Exception ignore) {
}
throw new AssertionFailedError("Create Subscription caught: " + ex);
}
}
public void test3CreateSubscription() throws Exception {
try {
/*
* Step 1 - Establish a connection with a client id and create a
* durable subscription
*/
connection = connectionFactory.createConnection(USERNAME, DEFAULT_PASSWORD);
assertNotNull(connection);
connection.setClientID(CLIENTID);
connection.start();
session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
assertNotNull(session);
topic = session.createTopic(TOPIC_NAME);
assertNotNull(topic);
subscriber = session.createDurableSubscriber(topic, SUBID, "", false);
assertNotNull(subscriber);
int loop;
for (loop = 0; loop < MESSAGE_COUNT; loop++) {
Message msg = subscriber.receive();
if (loop % 500 == 0) {
LOG.debug("Received " + loop + " messages");
}
}
this.assertEquals(loop, MESSAGE_COUNT);
subscriber.close();
session.close();
connection.close();
} catch (JMSException ex) {
try {
connection.close();
} catch (Exception ignore) {
}
throw new AssertionFailedError("Create Subscription caught: " + ex);
}
}
}