blob: a415e6e54722f8afcf738c77fb6fea6b4bfd5759 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.network;
import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeNotNull;
import java.io.File;
import java.net.MalformedURLException;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TopicSubscriber;
import javax.management.InstanceNotFoundException;
import javax.management.MBeanServerConnection;
import javax.management.ObjectName;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class NetworkBrokerDetachTest {
private final static String BROKER_NAME = "broker";
private final static String REM_BROKER_NAME = "networkedBroker";
private final static String DESTINATION_NAME = "testQ";
private final static int NUM_CONSUMERS = 1;
protected static final Logger LOG = LoggerFactory.getLogger(NetworkBrokerDetachTest.class);
protected final int numRestarts = 3;
protected final int networkTTL = 2;
protected final boolean dynamicOnly = false;
protected BrokerService broker;
protected BrokerService networkedBroker;
protected BrokerService createBroker() throws Exception {
BrokerService broker = new BrokerService();
broker.setBrokerName(BROKER_NAME);
configureBroker(broker);
broker.addConnector("tcp://localhost:61617");
NetworkConnector networkConnector = broker.addNetworkConnector("static:(tcp://localhost:62617?wireFormat.maxInactivityDuration=500)?useExponentialBackOff=false");
configureNetworkConnector(networkConnector);
return broker;
}
protected BrokerService createNetworkedBroker() throws Exception {
BrokerService broker = new BrokerService();
broker.setBrokerName(REM_BROKER_NAME);
configureBroker(broker);
broker.getManagementContext().setCreateConnector(false);
broker.addConnector("tcp://localhost:62617");
NetworkConnector networkConnector = broker.addNetworkConnector("static:(tcp://localhost:61617?wireFormat.maxInactivityDuration=500)?useExponentialBackOff=false");
configureNetworkConnector(networkConnector);
return broker;
}
private void configureNetworkConnector(NetworkConnector networkConnector) {
networkConnector.setDuplex(false);
networkConnector.setNetworkTTL(networkTTL);
networkConnector.setDynamicOnly(dynamicOnly);
}
// variants for each store....
protected void configureBroker(BrokerService broker) throws Exception {
//KahaPersistenceAdapter persistenceAdapter = new KahaPersistenceAdapter();
//persistenceAdapter.setDirectory(new File("target/activemq-data/kaha/" + broker.getBrokerName() + "/NetworBrokerDetatchTest"));
//broker.setPersistenceAdapter(persistenceAdapter);
KahaDBPersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter();
persistenceAdapter.setDirectory(new File("target/activemq-data/kahadb/" + broker.getBrokerName() + "NetworBrokerDetatchTest"));
broker.setPersistenceAdapter(persistenceAdapter);
// default AMQ
}
@Before
public void init() throws Exception {
broker = createBroker();
broker.setDeleteAllMessagesOnStartup(true);
broker.start();
networkedBroker = createNetworkedBroker();
networkedBroker.setDeleteAllMessagesOnStartup(true);
networkedBroker.start();
}
@After
public void cleanup() throws Exception {
networkedBroker.stop();
networkedBroker.waitUntilStopped();
broker.stop();
broker.waitUntilStopped();
}
@Test
public void testNetworkedBrokerDetach() throws Exception {
LOG.info("Creating Consumer on the networked broker ...");
// Create a consumer on the networked broker
ConnectionFactory consFactory = createConnectionFactory(networkedBroker);
Connection consConn = consFactory.createConnection();
Session consSession = consConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQDestination destination = (ActiveMQDestination) consSession.createQueue(DESTINATION_NAME);
for(int i=0; i<NUM_CONSUMERS; i++) {
consSession.createConsumer(destination);
}
assertTrue("got expected consumer count from mbean within time limit",
verifyConsumerCount(1, destination, broker));
LOG.info("Stopping Consumer on the networked broker ...");
// Closing the connection will also close the consumer
consConn.close();
// We should have 0 consumer for the queue on the local broker
assertTrue("got expected 0 count from mbean within time limit", verifyConsumerCount(0, destination, broker));
}
@Test
public void testNetworkedBrokerDurableSubAfterRestart() throws Exception {
final AtomicInteger count = new AtomicInteger(0);
MessageListener counter = new MessageListener() {
public void onMessage(Message message) {
count.incrementAndGet();
}
};
LOG.info("Creating durable consumer on each broker ...");
ActiveMQTopic destination = registerDurableConsumer(networkedBroker, counter);
registerDurableConsumer(broker, counter);
assertTrue("got expected consumer count from local broker mbean within time limit",
verifyConsumerCount(2, destination, broker));
assertTrue("got expected consumer count from network broker mbean within time limit",
verifyConsumerCount(2, destination, networkedBroker));
sendMessageTo(destination, broker);
assertTrue("Got one message on each", verifyMessageCount(2, count));
LOG.info("Stopping brokerTwo...");
networkedBroker.stop();
networkedBroker.waitUntilStopped();
LOG.info("restarting broker Two...");
networkedBroker = createNetworkedBroker();
networkedBroker.start();
LOG.info("Recreating durable Consumer on the broker after restart...");
registerDurableConsumer(networkedBroker, counter);
// give advisories a chance to percolate
TimeUnit.SECONDS.sleep(5);
sendMessageTo(destination, broker);
// expect similar after restart
assertTrue("got expected consumer count from local broker mbean within time limit",
verifyConsumerCount(2, destination, broker));
// a durable sub is auto bridged on restart unless dynamicOnly=true
assertTrue("got expected consumer count from network broker mbean within time limit",
verifyConsumerCount(2, destination, networkedBroker));
assertTrue("got no inactive subs on broker", verifyDurableConsumerCount(0, broker));
assertTrue("got no inactive subs on other broker", verifyDurableConsumerCount(0, networkedBroker));
assertTrue("Got two more messages after restart", verifyMessageCount(4, count));
TimeUnit.SECONDS.sleep(1);
assertTrue("still Got just two more messages", verifyMessageCount(4, count));
}
private boolean verifyMessageCount(final int i, final AtomicInteger count) throws Exception {
return Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception {
return i == count.get();
}
});
}
private ActiveMQTopic registerDurableConsumer(
BrokerService brokerService, MessageListener listener) throws Exception {
ConnectionFactory factory = createConnectionFactory(brokerService);
Connection connection = factory.createConnection();
connection.setClientID("DurableOne");
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQTopic destination = (ActiveMQTopic) session.createTopic(DESTINATION_NAME);
// unique to a broker
TopicSubscriber sub = session.createDurableSubscriber(destination, "SubOne" + brokerService.getBrokerName());
sub.setMessageListener(listener);
return destination;
}
private void sendMessageTo(ActiveMQTopic destination, BrokerService brokerService) throws Exception {
ConnectionFactory factory = createConnectionFactory(brokerService);
Connection conn = factory.createConnection();
conn.start();
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.createProducer(destination).send(session.createTextMessage("Hi"));
conn.close();
}
protected ConnectionFactory createConnectionFactory(final BrokerService broker) throws Exception {
String url = ((TransportConnector) broker.getTransportConnectors().get(0)).getServer().getConnectURI().toString();
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
connectionFactory.setOptimizedMessageDispatch(true);
connectionFactory.setCopyMessageOnSend(false);
connectionFactory.setUseCompression(false);
connectionFactory.setDispatchAsync(false);
connectionFactory.setUseAsyncSend(false);
connectionFactory.setOptimizeAcknowledge(false);
connectionFactory.setWatchTopicAdvisories(true);
ActiveMQPrefetchPolicy qPrefetchPolicy= new ActiveMQPrefetchPolicy();
qPrefetchPolicy.setQueuePrefetch(100);
qPrefetchPolicy.setTopicPrefetch(1000);
connectionFactory.setPrefetchPolicy(qPrefetchPolicy);
connectionFactory.setAlwaysSyncSend(true);
return connectionFactory;
}
// JMX Helper Methods
private boolean verifyConsumerCount(final long expectedCount, final ActiveMQDestination destination, final BrokerService broker) throws Exception {
return Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception {
boolean result = false;
try {
// We should have 1 consumer for the queue on the local broker
Object consumers = broker.getManagementContext().getAttribute(getObjectName(broker.getBrokerName(), destination.isQueue() ? "Queue" : "Topic", "Destination=" + destination.getPhysicalName()), "ConsumerCount");
if (consumers != null) {
LOG.info("Consumers for " + destination.getPhysicalName() + " on " + broker + " : " + consumers);
if (expectedCount == ((Long)consumers).longValue()) {
result = true;
}
}
} catch (Exception ignoreAndRetry) {
}
return result;
}
});
}
private boolean verifyDurableConsumerCount(final long expectedCount, final BrokerService broker) throws Exception {
return Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception {
boolean result = false;
MBeanServerConnection mbsc = getMBeanServerConnection();
if (mbsc != null) {
Set subs = broker.getManagementContext().queryNames(getObjectName(broker.getBrokerName(), "Subscription", "active=false,*"), null);
if (subs != null) {
LOG.info("inactive durable subs on " + broker + " : " + subs);
if (expectedCount == subs.size()) {
result = true;
}
}
}
return result;
}
});
}
private MBeanServerConnection getMBeanServerConnection() throws MalformedURLException {
final JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi");
MBeanServerConnection mbsc = null;
try {
JMXConnector jmxc = JMXConnectorFactory.connect(url, null);
mbsc = jmxc.getMBeanServerConnection();
} catch (Exception ignored) {
LOG.warn("getMBeanServer ex: " + ignored);
}
// If port 1099 is in use when the Broker starts, starting the jmx
// connector will fail. So, if we have no mbsc to query, skip the
// test.
assumeNotNull(mbsc);
return mbsc;
}
private ObjectName getObjectName(String brokerName, String type, String pattern) throws Exception {
ObjectName beanName = new ObjectName(
"org.apache.activemq:BrokerName=" + brokerName + ",Type=" + type +"," + pattern
);
return beanName;
}
}