blob: c968127ef7c190e6be28c595ca25657afbc26b88 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.usecases;
import java.net.URI;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.util.ThreadTracker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class NetworkOfTwentyBrokersTest extends JmsMultipleBrokersTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(NetworkOfTwentyBrokersTest.class);
// This will interconnect all brokers using multicast
protected void bridgeAllBrokers() throws Exception {
bridgeAllBrokers("TwentyBrokersTest", 1, false, false);
}
protected void bridgeAllBrokers(String groupName, int ttl, boolean suppressduplicateQueueSubs) throws Exception {
bridgeAllBrokers(groupName, ttl, suppressduplicateQueueSubs, false);
}
protected void bridgeAllBrokers(String groupName, int ttl, boolean suppressduplicateQueueSubs, boolean decreasePriority) throws Exception {
Collection<BrokerItem> brokerList = brokers.values();
for (Iterator<BrokerItem> i = brokerList.iterator(); i.hasNext();) {
BrokerService broker = i.next().broker;
List<TransportConnector> transportConnectors = broker.getTransportConnectors();
if (transportConnectors.isEmpty()) {
broker.addConnector(new URI(AUTO_ASSIGN_TRANSPORT));
transportConnectors = broker.getTransportConnectors();
}
TransportConnector transport = transportConnectors.get(0);
if (transport.getDiscoveryUri() == null) {
transport.setDiscoveryUri(new URI("multicast://default?group=" + groupName));
}
List<NetworkConnector> networkConnectors = broker.getNetworkConnectors();
if (networkConnectors.isEmpty()) {
broker.addNetworkConnector("multicast://default?group=" + groupName);
networkConnectors = broker.getNetworkConnectors();
}
NetworkConnector nc = networkConnectors.get(0);
nc.setNetworkTTL(ttl);
nc.setSuppressDuplicateQueueSubscriptions(suppressduplicateQueueSubs);
nc.setDecreaseNetworkConsumerPriority(decreasePriority);
}
// Multicasting may take longer to setup
maxSetupTime = 8000;
}
protected BrokerService createBroker(String brokerName) throws Exception {
BrokerService broker = new BrokerService();
broker.setPersistent(false);
broker.setUseJmx(false);
broker.setBrokerName(brokerName);
broker.addConnector(new URI(AUTO_ASSIGN_TRANSPORT));
brokers.put(brokerName, new BrokerItem(broker));
return broker;
}
/* AMQ-3077 Bug */
public void testBrokers() throws Exception {
int X = 20;
int i;
LOG.info("Creating X Brokers");
for (i = 0; i < X; i++) {
createBroker("Broker" + i);
}
bridgeAllBrokers();
startAllBrokers();
waitForBridgeFormation(X-1);
LOG.info("Waiting for complete formation");
try {
Thread.sleep(10000);
} catch (Exception e) {
}
verifyPeerBrokerInfos(X-1);
LOG.info("Stopping half the brokers");
for (i = 0; i < X/2; i++) {
destroyBroker("Broker" + i);
}
LOG.info("Waiting for complete stop");
try {
Thread.sleep(10000);
} catch (Exception e) {
}
verifyPeerBrokerInfos((X/2) -1);
LOG.info("Recreating first half");
for (i = 0; i < X/2; i++) {
createBroker("Broker" + i);
}
bridgeAllBrokers();
startAllBrokers();
waitForBridgeFormation(X-1);
LOG.info("Waiting for complete reformation");
try {
Thread.sleep(10000);
} catch (Exception e) {
}
verifyPeerBrokerInfos(X-1);
}
public void testPeerBrokerCountHalfPeer() throws Exception {
createBroker("A");
createBroker("B");
bridgeBrokers("A", "B");
startAllBrokers();
verifyPeerBrokerInfo(brokers.get("A"), 1);
verifyPeerBrokerInfo(brokers.get("B"), 0);
}
public void testPeerBrokerCountHalfPeerTwice() throws Exception {
createBroker("A");
createBroker("B");
bridgeBrokers("A", "B");
bridgeBrokers("A", "B");
startAllBrokers();
verifyPeerBrokerInfo(brokers.get("A"), 1);
verifyPeerBrokerInfo(brokers.get("B"), 0);
}
public void testPeerBrokerCountFullPeer() throws Exception {
createBroker("A");
createBroker("B");
bridgeBrokers("A", "B");
bridgeBrokers("B", "A");
startAllBrokers();
verifyPeerBrokerInfo(brokers.get("A"), 1);
verifyPeerBrokerInfo(brokers.get("B"), 1);
}
public void testPeerBrokerCountFullPeerDuplex() throws Exception {
createBroker("A");
createBroker("B");
NetworkConnector nc = bridgeBrokers("A", "B");
nc.setDuplex(true);
startAllBrokers();
verifyPeerBrokerInfo(brokers.get("A"), 1);
verifyPeerBrokerInfo(brokers.get("B"), 1);
}
private void verifyPeerBrokerInfo(BrokerItem brokerItem, final int max) {
BrokerService broker = brokerItem.broker;
RegionBroker regionBroker = (RegionBroker) broker.getRegionBroker();
LOG.info("verify infos " + broker.getBrokerName() + ", len: " + regionBroker.getPeerBrokerInfos().length);
for (BrokerInfo info : regionBroker.getPeerBrokerInfos()) {
LOG.info(info.getBrokerName());
}
assertEquals(broker.getBrokerName(), max, regionBroker.getPeerBrokerInfos().length);
}
private void verifyPeerBrokerInfos(final int max) {
Collection<BrokerItem> brokerList = brokers.values();
for (Iterator<BrokerItem> i = brokerList.iterator(); i.hasNext();) {
verifyPeerBrokerInfo(i.next(), max);
}
}
@Override
public void setUp() throws Exception {
super.setAutoFail(true);
super.setUp();
}
@Override
public void tearDown() throws Exception {
super.tearDown();
ThreadTracker.result();
}
}