blob: 770ba6ea375e85d1005594d88745e4d8d7c45fd3 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.network;
import java.io.File;
import java.net.URI;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TopicSubscriber;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.util.Wait;
import org.apache.activemq.util.Wait.Condition;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
@RunWith(Parameterized.class)
public class ForceDurableNetworkBridgeTest extends DynamicNetworkTestSupport {
protected static final Logger LOG = LoggerFactory.getLogger(ForceDurableNetworkBridgeTest.class);
protected String testTopicName2 = "include.nonforced.bar";
protected String staticTopic = "include.static.bar";
protected String staticTopic2 = "include.static.nonforced.bar";
public static enum FLOW {FORWARD, REVERSE};
private BrokerService broker1;
private BrokerService broker2;
private Session session1;
private final FLOW flow;
@Parameters
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
{FLOW.FORWARD},
{FLOW.REVERSE}
});
}
public ForceDurableNetworkBridgeTest(final FLOW flow) {
this.flow = flow;
}
@Before
public void setUp() throws Exception {
doSetUp(true, tempFolder.newFolder(), tempFolder.newFolder());
}
@After
public void tearDown() throws Exception {
doTearDown();
}
@Test
public void testForceDurableSubscriptionStatic() throws Exception {
final ActiveMQTopic topic = new ActiveMQTopic(staticTopic);
assertNCDurableSubsCount(broker2, topic, 1);
assertConsumersCount(broker2, topic, 1);
//Static so consumers stick around
assertNCDurableSubsCount(broker2, topic, 1);
assertConsumersCount(broker2, topic, 1);
}
@Test
public void testConsumerNotForceDurableSubscriptionStatic() throws Exception {
final ActiveMQTopic topic = new ActiveMQTopic(staticTopic2);
assertConsumersCount(broker2, topic, 1);
assertNCDurableSubsCount(broker2, topic, 0);
}
@Test
public void testConsumerNotForceDurableSubscription() throws Exception {
final ActiveMQTopic topic = new ActiveMQTopic(testTopicName2);
MessageConsumer sub1 = session1.createConsumer(topic);
assertConsumersCount(broker2, topic, 1);
assertNCDurableSubsCount(broker2, topic, 0);
sub1.close();
assertNCDurableSubsCount(broker2, topic, 0);
assertConsumersCount(broker2, topic, 0);
}
@Test
public void testConsumerNotForceDurableWithAnotherDurable() throws Exception {
final ActiveMQTopic topic = new ActiveMQTopic(testTopicName2);
TopicSubscriber durSub = session1.createDurableSubscriber(topic, subName);
session1.createConsumer(topic);
//1 consumer because of conduit
//1 durable sub
assertConsumersCount(broker2, topic, 1);
assertNCDurableSubsCount(broker2, topic, 1);
//Remove the sub
durSub.close();
Thread.sleep(1000);
removeSubscription(broker1, topic, subName);
//The durable should be gone even though there is a consumer left
//since we are not forcing durable subs
assertNCDurableSubsCount(broker2, topic, 0);
//consumers count ends up being 0 here, even though there is a non-durable consumer left,
//because the durable sub is destroyed and it is a conduit subscription
//this is another good reason to want to enable forcing of durables
assertConsumersCount(broker2, topic, 0);
}
@Test
public void testForceDurableSubscription() throws Exception {
final ActiveMQTopic topic = new ActiveMQTopic(testTopicName);
MessageConsumer sub1 = session1.createConsumer(topic);
assertNCDurableSubsCount(broker2, topic, 1);
assertConsumersCount(broker2, topic, 1);
sub1.close();
assertNCDurableSubsCount(broker2, topic, 0);
assertConsumersCount(broker2, topic, 0);
}
@Test
public void testForceDurableMultiSubscriptions() throws Exception {
final ActiveMQTopic topic = new ActiveMQTopic(testTopicName);
MessageConsumer sub1 = session1.createConsumer(topic);
MessageConsumer sub2 = session1.createConsumer(topic);
MessageConsumer sub3 = session1.createConsumer(topic);
assertNCDurableSubsCount(broker2, topic, 1);
assertConsumersCount(broker2, topic, 1);
sub1.close();
sub2.close();
assertNCDurableSubsCount(broker2, topic, 1);
assertConsumersCount(broker2, topic, 1);
sub3.close();
assertNCDurableSubsCount(broker2, topic, 0);
assertConsumersCount(broker2, topic, 0);
}
@Test
public void testForceDurableSubWithDurableCreatedFirst() throws Exception {
final ActiveMQTopic topic = new ActiveMQTopic(testTopicName);
TopicSubscriber durSub = session1.createDurableSubscriber(topic, subName);
durSub.close();
assertNCDurableSubsCount(broker2, topic, 1);
MessageConsumer sub1 = session1.createConsumer(topic);
Thread.sleep(1000);
assertNCDurableSubsCount(broker2, topic, 1);
sub1.close();
Thread.sleep(1000);
assertNCDurableSubsCount(broker2, topic, 1);
removeSubscription(broker1, topic, subName);
assertNCDurableSubsCount(broker2, topic, 0);
}
@Test
public void testForceDurableSubWithNonDurableCreatedFirst() throws Exception {
final ActiveMQTopic topic = new ActiveMQTopic(testTopicName);
MessageConsumer sub1 = session1.createConsumer(topic);
assertNCDurableSubsCount(broker2, topic, 1);
TopicSubscriber durSub = session1.createDurableSubscriber(topic, subName);
durSub.close();
Thread.sleep(1000);
assertNCDurableSubsCount(broker2, topic, 1);
removeSubscription(broker1, topic, subName);
Thread.sleep(1000);
assertConsumersCount(broker2, topic, 1);
assertNCDurableSubsCount(broker2, topic, 1);
sub1.close();
assertNCDurableSubsCount(broker2, topic, 0);
}
@Test
public void testDurableSticksAroundOnConsumerClose() throws Exception {
final ActiveMQTopic topic = new ActiveMQTopic(testTopicName);
//Create the normal consumer first
MessageConsumer sub1 = session1.createConsumer(topic);
assertNCDurableSubsCount(broker2, topic, 1);
TopicSubscriber durSub = session1.createDurableSubscriber(topic, subName);
durSub.close();
sub1.close();
Thread.sleep(1000);
//Both consumer and durable are closed but the durable should stick around
assertConsumersCount(broker2, topic, 1);
assertNCDurableSubsCount(broker2, topic, 1);
removeSubscription(broker1, topic, subName);
assertConsumersCount(broker2, topic, 0);
assertNCDurableSubsCount(broker2, topic, 0);
}
protected void restartBrokers() throws Exception {
doTearDown();
doSetUp(false, localBroker.getDataDirectoryFile(), remoteBroker.getDataDirectoryFile());
}
protected void doSetUp(boolean deleteAllMessages, File localDataDir,
File remoteDataDir) throws Exception {
included = new ActiveMQTopic(testTopicName);
doSetUpRemoteBroker(deleteAllMessages, remoteDataDir);
doSetUpLocalBroker(deleteAllMessages, localDataDir);
//Give time for advisories to propagate
Thread.sleep(1000);
}
protected void doSetUpLocalBroker(boolean deleteAllMessages, File dataDir) throws Exception {
localBroker = createLocalBroker(dataDir);
localBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
localBroker.start();
localBroker.waitUntilStarted();
URI localURI = localBroker.getVmConnectorURI();
ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(localURI);
fac.setAlwaysSyncSend(true);
fac.setDispatchAsync(false);
localConnection = fac.createConnection();
localConnection.setClientID("clientId");
localConnection.start();
Wait.waitFor(new Condition() {
@Override
public boolean isSatisified() throws Exception {
return localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1;
}
}, 10000, 500);
localSession = localConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
if (flow.equals(FLOW.FORWARD)) {
broker1 = localBroker;
session1 = localSession;
} else {
broker2 = localBroker;
}
}
protected void doSetUpRemoteBroker(boolean deleteAllMessages, File dataDir) throws Exception {
remoteBroker = createRemoteBroker(dataDir);
remoteBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
remoteBroker.start();
remoteBroker.waitUntilStarted();
URI remoteURI = remoteBroker.getVmConnectorURI();
ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(remoteURI);
remoteConnection = fac.createConnection();
remoteConnection.setClientID("clientId");
remoteConnection.start();
remoteSession = remoteConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
if (flow.equals(FLOW.FORWARD)) {
broker2 = remoteBroker;
} else {
broker1 = remoteBroker;
session1 = remoteSession;
}
}
protected BrokerService createLocalBroker(File dataDir) throws Exception {
BrokerService brokerService = new BrokerService();
brokerService.setMonitorConnectionSplits(true);
brokerService.setDataDirectoryFile(dataDir);
brokerService.setBrokerName("localBroker");
brokerService.addNetworkConnector(configureLocalNetworkConnector());
brokerService.addConnector("tcp://localhost:0");
brokerService.setDestinations(new ActiveMQDestination[] {
new ActiveMQTopic(testTopicName),
new ActiveMQTopic(testTopicName2),
new ActiveMQTopic(excludeTopicName)});
return brokerService;
}
protected NetworkConnector configureLocalNetworkConnector() throws Exception {
List<TransportConnector> transportConnectors = remoteBroker.getTransportConnectors();
URI remoteURI = transportConnectors.get(0).getConnectUri();
String uri = "static:(" + remoteURI + ")";
NetworkConnector connector = new DiscoveryNetworkConnector(new URI(uri));
connector.setName("networkConnector");
connector.setDynamicOnly(false);
connector.setDecreaseNetworkConsumerPriority(false);
connector.setConduitSubscriptions(true);
connector.setDuplex(true);
connector.setStaticBridge(false);
connector.setStaticallyIncludedDestinations(Lists.<ActiveMQDestination>newArrayList(
new ActiveMQTopic(staticTopic + "?forceDurable=true"),
new ActiveMQTopic(staticTopic2)));
connector.setDynamicallyIncludedDestinations(
Lists.<ActiveMQDestination>newArrayList(
new ActiveMQTopic("include.test.>?forceDurable=true"),
new ActiveMQTopic(testTopicName2)));
connector.setExcludedDestinations(
Lists.<ActiveMQDestination>newArrayList(new ActiveMQTopic(excludeTopicName)));
return connector;
}
protected BrokerService createRemoteBroker(File dataDir) throws Exception {
BrokerService brokerService = new BrokerService();
brokerService.setBrokerName("remoteBroker");
brokerService.setUseJmx(false);
brokerService.setDataDirectoryFile(dataDir);
brokerService.addConnector("tcp://localhost:0");
brokerService.setDestinations(new ActiveMQDestination[] {
new ActiveMQTopic(testTopicName),
new ActiveMQTopic(testTopicName2),
new ActiveMQTopic(excludeTopicName)});
return brokerService;
}
}