blob: 55da21b7978a40cc397bff6f9dff9ca06cf6b353 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.usecases;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import java.io.File;
import java.util.Arrays;
import java.util.Collection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.CommandTypes;
import org.apache.activemq.store.kahadb.KahaDBStore;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Test for spec compliance for durable subscriptions that change the noLocal flag.
*/
@RunWith(Parameterized.class)
public class DurableSubscriptionWithNoLocalTest {
private static final Logger LOG = LoggerFactory.getLogger(DurableSubscriptionWithNoLocalTest.class);
private final int MSG_COUNT = 10;
private final String KAHADB_DIRECTORY = "target/activemq-data/";
@Rule public TestName name = new TestName();
private BrokerService brokerService;
private String connectionUri;
private ActiveMQConnectionFactory factory;
private final boolean keepDurableSubsActive;
@Parameters(name="keepDurableSubsActive={0}")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
{true},
{false}
});
}
public DurableSubscriptionWithNoLocalTest(final boolean keepDurableSubsActive) {
this.keepDurableSubsActive = keepDurableSubsActive;
}
@Before
public void setUp() throws Exception {
createBroker(true);
}
@After
public void tearDown() throws Exception {
brokerService.stop();
brokerService.waitUntilStopped();
}
/**
* Make sure that NoLocal works for connection started/stopped
*
* @throws JMSException
*/
@Test(timeout = 60000)
public void testNoLocalStillWorkWithConnectionRestart() throws Exception {
ActiveMQConnection connection = null;
try {
connection = (ActiveMQConnection) factory.createConnection();
connection.setClientID("test-client");
connection.start();
test(connection, "test message 1");
connection.stop();
connection.start();
test(connection, "test message 2");
} finally {
if (connection != null) {
connection.close();
}
}
}
/**
* Make sure that NoLocal works for multiple connections to the same subscription
*
* @throws JMSException
*/
@Test(timeout = 60000)
public void testNoLocalStillWorksNewConnection() throws Exception {
ActiveMQConnection connection = null;
try {
connection = (ActiveMQConnection) factory.createConnection();
connection.setClientID("test-client");
connection.start();
test(connection, "test message 1");
} finally {
if (connection != null) {
connection.close();
}
}
try {
connection = (ActiveMQConnection) factory.createConnection();
connection.setClientID("test-client");
connection.start();
test(connection, "test message 2");
} finally {
if (connection != null) {
connection.close();
}
}
}
/**
* Make sure that NoLocal works after restart
*
* @throws JMSException
*/
@Test(timeout = 60000)
public void testNoLocalStillWorksRestartBroker() throws Exception {
ActiveMQConnection connection = null;
try {
connection = (ActiveMQConnection) factory.createConnection();
connection.setClientID("test-client");
connection.start();
test(connection, "test message 1");
} finally {
if (connection != null) {
connection.close();
}
}
tearDown();
createBroker(false);
try {
connection = (ActiveMQConnection) factory.createConnection();
connection.setClientID("test-client");
connection.start();
test(connection, "test message 2");
} finally {
if (connection != null) {
connection.close();
}
}
}
void test(final ActiveMQConnection connection, final String body) throws Exception {
Session incomingMessagesSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = incomingMessagesSession.createTopic("test.topic");
TopicSubscriber consumer = incomingMessagesSession.createDurableSubscriber(topic, "test-subscription", null, true);
Session outgoingMessagesSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = outgoingMessagesSession.createTopic("test.topic");
MessageProducer producer = outgoingMessagesSession.createProducer(destination);
TextMessage textMessage = outgoingMessagesSession.createTextMessage(body);
producer.send(textMessage);
producer.close();
System.out.println("message sent: " + textMessage.getJMSMessageID() + "; body: " + textMessage.getText());
outgoingMessagesSession.close();
assertNull(consumer.receive(2000));
consumer.close();
incomingMessagesSession.close();
}
@Test(timeout = 60000)
public void testDurableSubWithNoLocalChange() throws Exception {
TopicConnection connection = factory.createTopicConnection();
connection.setClientID(getClientId());
connection.start();
TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(getDestinationName());
TopicPublisher publisher = session.createPublisher(topic);
LOG.debug("Create DurableSubscriber with noLocal = true");
TopicSubscriber subscriber = session.createSubscriber(topic);
TopicSubscriber durableSub = session.createDurableSubscriber(topic, getSubscriptionName(), null, true);
LOG.debug("Sending " + MSG_COUNT + " messages to topic");
for (int i = 0; i < MSG_COUNT; i++) {
publisher.publish(session.createMessage());
}
LOG.info("Attempting to receive messages from non-durable subscriber");
for (int i = 0; i < MSG_COUNT; i++) {
assertNotNull(subscriber.receive(500));
}
LOG.info("Attempting to receive messages from (noLocal=true) subscriber");
assertNull(durableSub.receive(500));
LOG.debug("Sending " + MSG_COUNT + " messages to topic");
for (int i = 0; i < MSG_COUNT; i++) {
publisher.publish(session.createMessage());
}
LOG.debug("Close DurableSubscriber with noLocal=true");
durableSub.close();
LOG.debug("Create DurableSubscriber with noLocal=false");
durableSub = session.createDurableSubscriber(topic, getSubscriptionName(), null, false);
LOG.info("Attempting to receive messages from reconnected (noLocal=false) subscription");
assertNull(durableSub.receive(500));
LOG.debug("Sending " + MSG_COUNT + " messages to topic");
for (int i = 0; i < MSG_COUNT; i++) {
publisher.publish(session.createMessage());
}
LOG.info("Attempting to receive messages from (noLocal=false) durable subscriber");
for (int i = 0; i < MSG_COUNT; i++) {
assertNotNull(durableSub.receive(500));
}
// Should be empty now
assertNull(durableSub.receive(100));
}
@Test(timeout = 60000)
public void testInvertedDurableSubWithNoLocalChange() throws Exception {
TopicConnection connection = factory.createTopicConnection();
connection.setClientID(getClientId());
connection.start();
TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(getDestinationName());
TopicPublisher publisher = session.createPublisher(topic);
LOG.debug("Create DurableSubscriber with noLocal = true");
TopicSubscriber subscriber = session.createSubscriber(topic);
TopicSubscriber durableSub = session.createDurableSubscriber(topic, getSubscriptionName(), null, false);
LOG.debug("Sending " + MSG_COUNT + " messages to topic");
for (int i = 0; i < MSG_COUNT; i++) {
publisher.publish(session.createMessage());
}
LOG.info("Attempting to receive messages from non-durable subscriber");
for (int i = 0; i < MSG_COUNT; i++) {
assertNotNull(subscriber.receive(500));
}
LOG.info("Attempting to receive messages from (noLocal=false) durable subscriber");
for (int i = 0; i < MSG_COUNT; i++) {
assertNotNull(durableSub.receive(500));
}
LOG.debug("Sending " + MSG_COUNT + " messages to topic");
for (int i = 0; i < MSG_COUNT; i++) {
publisher.publish(session.createMessage());
}
LOG.debug("Close DurableSubscriber with noLocal=true");
durableSub.close();
LOG.debug("Create DurableSubscriber with noLocal=false");
durableSub = session.createDurableSubscriber(topic, getSubscriptionName(), null, true);
LOG.info("Attempting to receive messages from reconnected (noLocal=true) subscription");
assertNull(durableSub.receive(500));
LOG.debug("Sending " + MSG_COUNT + " messages to topic");
for (int i = 0; i < MSG_COUNT; i++) {
publisher.publish(session.createMessage());
}
LOG.info("Attempting to receive messages from reconnected (noLocal=true) subscription");
assertNull(durableSub.receive(500));
// Should be empty now
assertNull(durableSub.receive(100));
}
@Test(timeout = 60000)
public void testDurableSubWithNoLocalChangeAfterRestart() throws Exception {
TopicConnection connection = factory.createTopicConnection();
connection.setClientID(getClientId());
connection.start();
TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(getDestinationName());
TopicPublisher publisher = session.createPublisher(topic);
LOG.debug("Create DurableSubscriber with noLocal = true");
TopicSubscriber subscriber = session.createSubscriber(topic);
TopicSubscriber durableSub = session.createDurableSubscriber(topic, getSubscriptionName(), null, true);
LOG.debug("Sending " + MSG_COUNT + " messages to topic");
for (int i = 0; i < MSG_COUNT; i++) {
publisher.publish(session.createMessage());
}
LOG.info("Attempting to receive messages from non-durable subscriber");
for (int i = 0; i < MSG_COUNT; i++) {
assertNotNull(subscriber.receive(500));
}
LOG.info("Attempting to receive messages from (noLocal=true) subscriber");
assertNull(durableSub.receive(500));
LOG.debug("Sending " + MSG_COUNT + " messages to topic");
for (int i = 0; i < MSG_COUNT; i++) {
publisher.publish(session.createMessage());
}
tearDown();
createBroker(false);
connection = factory.createTopicConnection();
connection.setClientID(getClientId());
connection.start();
session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
topic = session.createTopic(getDestinationName());
publisher = session.createPublisher(topic);
LOG.debug("Create DurableSubscriber with noLocal=false");
durableSub = session.createDurableSubscriber(topic, getSubscriptionName(), null, false);
LOG.info("Attempting to receive messages from reconnected (noLocal=false) subscription");
assertNull(durableSub.receive(500));
LOG.debug("Sending " + MSG_COUNT + " messages to topic");
for (int i = 0; i < MSG_COUNT; i++) {
publisher.publish(session.createMessage());
}
LOG.info("Attempting to receive messages from (noLocal=false) durable subscriber");
for (int i = 0; i < MSG_COUNT; i++) {
assertNotNull(durableSub.receive(500));
}
// Should be empty now
assertNull(durableSub.receive(100));
}
@Test(timeout = 60000)
public void testInvertedDurableSubWithNoLocalChangeAfterRestart() throws Exception {
TopicConnection connection = factory.createTopicConnection();
connection.setClientID(getClientId());
connection.start();
TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(getDestinationName());
TopicPublisher publisher = session.createPublisher(topic);
LOG.debug("Create DurableSubscriber with noLocal = true");
TopicSubscriber subscriber = session.createSubscriber(topic);
TopicSubscriber durableSub = session.createDurableSubscriber(topic, getSubscriptionName(), null, false);
LOG.debug("Sending " + MSG_COUNT + " messages to topic");
for (int i = 0; i < MSG_COUNT; i++) {
publisher.publish(session.createMessage());
}
LOG.info("Attempting to receive messages from non-durable subscriber");
for (int i = 0; i < MSG_COUNT; i++) {
assertNotNull(subscriber.receive(500));
}
LOG.info("Attempting to receive messages from (noLocal=false) durable subscriber");
for (int i = 0; i < MSG_COUNT; i++) {
assertNotNull(durableSub.receive(500));
}
LOG.debug("Sending " + MSG_COUNT + " messages to topic");
for (int i = 0; i < MSG_COUNT; i++) {
publisher.publish(session.createMessage());
}
tearDown();
createBroker(false);
connection = factory.createTopicConnection();
connection.setClientID(getClientId());
connection.start();
session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
topic = session.createTopic(getDestinationName());
publisher = session.createPublisher(topic);
LOG.debug("Create DurableSubscriber with noLocal=true");
durableSub = session.createDurableSubscriber(topic, getSubscriptionName(), null, true);
LOG.info("Attempting to receive messages from (noLocal=true) subscriber");
assertNull(durableSub.receive(500));
LOG.debug("Sending " + MSG_COUNT + " messages to topic");
for (int i = 0; i < MSG_COUNT; i++) {
publisher.publish(session.createMessage());
}
LOG.info("Attempting to receive messages from (noLocal=true) subscriber");
assertNull(durableSub.receive(500));
// Should be empty now
assertNull(durableSub.receive(100));
}
private void createBroker(boolean deleteAllMessages) throws Exception {
KahaDBStore kaha = new KahaDBStore();
kaha.setDirectory(new File(KAHADB_DIRECTORY + "-" + name.getMethodName()));
brokerService = new BrokerService();
brokerService.setPersistent(true);
brokerService.setPersistenceAdapter(kaha);
brokerService.setStoreOpenWireVersion(CommandTypes.PROTOCOL_VERSION);
brokerService.setUseJmx(false);
brokerService.setDeleteAllMessagesOnStartup(deleteAllMessages);
brokerService.setKeepDurableSubsActive(keepDurableSubsActive);
TransportConnector connector = brokerService.addConnector("tcp://0.0.0.0:0");
brokerService.start();
brokerService.waitUntilStarted();
connectionUri = connector.getPublishableConnectString();
factory = new ActiveMQConnectionFactory(connectionUri);
}
private String getDestinationName() {
return name.getMethodName();
}
private String getClientId() {
return name.getMethodName() + "-Client";
}
private String getSubscriptionName() {
return name.getMethodName() + "-Subscription";
}
}