blob: d04e1f5ba68bc043ce2c329a7260d4c5c6f4a1e6 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.usecases;
import java.lang.Thread.UncaughtExceptionHandler;
import java.net.URI;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.management.ObjectName;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.ManagementContext;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class VerifyNetworkConsumersDisconnectTest extends JmsMultipleBrokersTestSupport implements UncaughtExceptionHandler {
public static final int BROKER_COUNT = 3;
public static final int CONSUMER_COUNT = 5;
public static final int MESSAGE_COUNT = 0;
public static final boolean DUPLEX = false;
public static final boolean CONDUIT = true;
public static final int NETWORK_TTL = 6;
private static final Logger LOG = LoggerFactory.getLogger(VerifyNetworkConsumersDisconnectTest.class);
public static final int TIMEOUT = 30000;
protected Map<String, MessageConsumer> consumerMap;
Map<Thread, Throwable> unhandledExceptions = new HashMap<Thread, Throwable>();
private void assertNoUnhandledExceptions() {
for( Entry<Thread, Throwable> e: unhandledExceptions.entrySet()) {
LOG.error("Thread:" + e.getKey() + " Had unexpected: " + e.getValue());
}
assertTrue("There are no unhandled exceptions, see: log for detail on: " + unhandledExceptions,
unhandledExceptions.isEmpty());
}
public NetworkConnector bridge(String from, String to, boolean conduitNetworkQueueSubscriptions) throws Exception {
NetworkConnector networkConnector = bridgeBrokers(from, to, true, NETWORK_TTL, CONDUIT);
networkConnector.setSuppressDuplicateQueueSubscriptions(true);
networkConnector.setDecreaseNetworkConsumerPriority(true);
networkConnector.setDuplex(DUPLEX);
networkConnector.setConduitNetworkQueueSubscriptions(conduitNetworkQueueSubscriptions);
return networkConnector;
}
public NetworkConnector bridge(String from, String to) throws Exception {
return bridge(from, to, false);
}
/*why conduit proxy proxy consumers gets us in a knot w.r.t removal
DC-7 for CA-9, add DB-15, remove CA-9, add CB-8
CB-8 add DC-7
CB-8 - why not dead?
CB-8 for BA-6, add BD-15, remove BA-6
BD-15 for DA-11, add DC-7
*/
public void testConsumerOnEachBroker() throws Exception {
bridge("Broker0", "Broker1");
if (!DUPLEX) bridge("Broker1", "Broker0");
bridge("Broker1", "Broker2");
if (!DUPLEX) bridge("Broker2", "Broker1");
startAllBrokers();
waitForBridgeFormation(brokers.get("Broker0").broker, 1, 0);
waitForBridgeFormation(brokers.get("Broker2").broker, 1, 0);
waitForBridgeFormation(brokers.get("Broker1").broker, 1, 0);
waitForBridgeFormation(brokers.get("Broker1").broker, 1, 1);
Destination dest = createDestination("TEST.FOO", false);
// Setup consumers
for (int i = 0; i < BROKER_COUNT; i++) {
consumerMap.put("Consumer:" + i + ":0", createConsumer("Broker" + i, dest));
}
assertExactConsumersConnect("Broker0", 3, 1, TIMEOUT);
assertExactConsumersConnect("Broker2", 3, 1, TIMEOUT);
// piggy in the middle
assertExactConsumersConnect("Broker1", 3, 1, TIMEOUT);
assertNoUnhandledExceptions();
LOG.info("Complete the mesh - 0->2");
// shorter route
NetworkConnector nc = bridge("Broker0", "Broker2");
nc.setBrokerName("Broker0");
nc.start();
if (!DUPLEX) {
LOG.info("... complete the mesh - 2->0");
nc = bridge("Broker2", "Broker0");
nc.setBrokerName("Broker2");
nc.start();
}
// wait for consumers to get propagated
for (int i = 0; i < BROKER_COUNT; i++) {
assertExactConsumersConnect("Broker" + i, 3, 1, TIMEOUT);
}
// reverse order close
consumerMap.get("Consumer:" + 2 + ":0").close();
TimeUnit.SECONDS.sleep(1);
consumerMap.get("Consumer:" + 1 + ":0").close();
TimeUnit.SECONDS.sleep(1);
consumerMap.get("Consumer:" + 0 + ":0").close();
LOG.info("Check for no consumers..");
for (int i = 0; i < BROKER_COUNT; i++) {
assertExactConsumersConnect("Broker" + i, 0, 0, TIMEOUT);
}
}
public void testConsumerOnEachBrokerNetworkQueueConduitSubs() throws Exception {
bridge("Broker0", "Broker1", true);
if (!DUPLEX) bridge("Broker1", "Broker0", true);
bridge("Broker1", "Broker2", true);
if (!DUPLEX) bridge("Broker2", "Broker1", true);
startAllBrokers();
waitForBridgeFormation(brokers.get("Broker0").broker, 1, 0);
waitForBridgeFormation(brokers.get("Broker2").broker, 1, 0);
waitForBridgeFormation(brokers.get("Broker1").broker, 1, 0);
waitForBridgeFormation(brokers.get("Broker1").broker, 1, 1);
Destination dest = createDestination("TEST.FOO", false);
// Setup consumers
for (int i = 0; i < BROKER_COUNT; i++) {
consumerMap.put("Consumer:" + i + ":0", createConsumer("Broker" + i, dest));
}
//Conduit network queue conduit subs is true so should only be 2 subs
assertExactConsumersConnect("Broker0", 2, 1, TIMEOUT);
assertExactConsumersConnect("Broker2", 2, 1, TIMEOUT);
// still should be 3 subs for the middle broker, 1 for each direction
assertExactConsumersConnect("Broker1", 3, 1, TIMEOUT);
assertNoUnhandledExceptions();
LOG.info("Complete the mesh - 0->2");
// shorter route
NetworkConnector nc = bridge("Broker0", "Broker2");
nc.setBrokerName("Broker0");
nc.start();
if (!DUPLEX) {
LOG.info("... complete the mesh - 2->0");
nc = bridge("Broker2", "Broker0");
nc.setBrokerName("Broker2");
nc.start();
}
// reverse order close
consumerMap.get("Consumer:" + 2 + ":0").close();
TimeUnit.SECONDS.sleep(1);
consumerMap.get("Consumer:" + 1 + ":0").close();
TimeUnit.SECONDS.sleep(1);
consumerMap.get("Consumer:" + 0 + ":0").close();
LOG.info("Check for no consumers..");
for (int i = 0; i < BROKER_COUNT; i++) {
assertExactConsumersConnect("Broker" + i, 0, 0, TIMEOUT);
}
}
public void testXConsumerOnEachBroker() throws Exception {
bridge("Broker0", "Broker1");
if (!DUPLEX) bridge("Broker1", "Broker0");
bridge("Broker1", "Broker2");
if (!DUPLEX) bridge("Broker2", "Broker1");
startAllBrokers();
waitForBridgeFormation(brokers.get("Broker0").broker, 1, 0);
waitForBridgeFormation(brokers.get("Broker2").broker, 1, 0);
waitForBridgeFormation(brokers.get("Broker1").broker, 1, 0);
waitForBridgeFormation(brokers.get("Broker1").broker, 1, 1);
Destination dest = createDestination("TEST.FOO", false);
// Setup consumers
for (int i = 0; i < BROKER_COUNT; i++) {
for (int j=0; j< CONSUMER_COUNT; j++)
consumerMap.put("Consumer:" + i + ":" + j, createConsumer("Broker" + i, dest));
}
for (int i = 0; i < BROKER_COUNT; i++) {
assertExactConsumersConnect("Broker" + i, CONSUMER_COUNT + (BROKER_COUNT -1), 1, TIMEOUT);
}
assertNoUnhandledExceptions();
LOG.info("Complete the mesh - 0->2");
// shorter route
NetworkConnector nc = bridge("Broker0", "Broker2");
nc.setBrokerName("Broker0");
nc.start();
waitForBridgeFormation(brokers.get("Broker0").broker, 1, 1);
if (!DUPLEX) {
LOG.info("... complete the mesh - 2->0");
nc = bridge("Broker2", "Broker0");
nc.setBrokerName("Broker2");
nc.start();
}
waitForBridgeFormation(brokers.get("Broker2").broker, 1, 1);
for (int i = 0; i < BROKER_COUNT; i++) {
assertExactConsumersConnect("Broker" + i, CONSUMER_COUNT + (BROKER_COUNT -1), 1, TIMEOUT);
}
// reverse order close
for (int i=0; i<CONSUMER_COUNT; i++) {
consumerMap.get("Consumer:" + 2 + ":" + i).close();
TimeUnit.SECONDS.sleep(1);
consumerMap.get("Consumer:" + 1 + ":" + i).close();
TimeUnit.SECONDS.sleep(1);
consumerMap.get("Consumer:" + 0 + ":" + i).close();
}
LOG.info("Check for no consumers..");
for (int i = 0; i < BROKER_COUNT; i++) {
assertExactConsumersConnect("Broker" + i, 0, 0, TIMEOUT);
}
}
protected void assertExactConsumersConnect(final String brokerName, final int count, final int numChecks, long timeout) throws Exception {
final ManagementContext context = brokers.get(brokerName).broker.getManagementContext();
final AtomicInteger stability = new AtomicInteger(0);
assertTrue("Expected consumers count: " + count + " on: " + brokerName, Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
try {
QueueViewMBean queueViewMBean = (QueueViewMBean) context.newProxyInstance(brokers.get(brokerName).broker.getAdminView().getQueues()[0], QueueViewMBean.class, false);
long currentCount = queueViewMBean.getConsumerCount();
LOG.info("On " + brokerName + " current consumer count for " + queueViewMBean + ", " + currentCount);
LinkedList<String> consumerIds = new LinkedList<String>();
for (ObjectName objectName : queueViewMBean.getSubscriptions()) {
consumerIds.add(objectName.getKeyProperty("consumerId"));
}
LOG.info("Sub IDs: " + consumerIds);
if (currentCount == count) {
stability.incrementAndGet();
} else {
stability.set(0);
}
return stability.get() > numChecks;
} catch (Exception e) {
LOG.warn(": ", e);
return false;
}
}
}, timeout));
}
@Override
public void setUp() throws Exception {
super.setAutoFail(true);
super.setUp();
unhandledExceptions.clear();
Thread.setDefaultUncaughtExceptionHandler(this);
// Setup n brokers
for (int i = 0; i < BROKER_COUNT; i++) {
createBroker(new URI("broker:(tcp://localhost:6161" + i + ")/Broker" + i + "?persistent=false&useJmx=true&brokerId=Broker" + i));
}
consumerMap = new LinkedHashMap<String, MessageConsumer>();
}
@Override
protected void configureBroker(BrokerService brokerService) {
PolicyEntry policyEntry = new PolicyEntry();
policyEntry.setExpireMessagesPeriod(0);
PolicyMap policyMap = new PolicyMap();
policyMap.setDefaultEntry(policyEntry);
brokerService.setDestinationPolicy(policyMap);
}
@Override
public void uncaughtException(Thread t, Throwable e) {
synchronized(unhandledExceptions) {
unhandledExceptions.put(t, e);
}
}
}