blob: 2e91d4c9df8d6463ebfbf07bef84ce080bb990e5 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.usecases;
import java.net.URI;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.util.MessageIdList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
*/
public class ThreeBrokerQueueNetworkTest extends JmsMultipleBrokersTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(ThreeBrokerQueueNetworkTest.class);
protected static final int MESSAGE_COUNT = 100;
private static final long MAX_WAIT_MILLIS = 10000;
interface Condition {
boolean isSatisified() throws Exception;
}
/**
* BrokerA -> BrokerB -> BrokerC
*/
public void testABandBCbrokerNetwork() throws Exception {
// Setup broker networks
bridgeBrokers("BrokerA", "BrokerB");
bridgeBrokers("BrokerB", "BrokerC");
startAllBrokers();
waitForBridgeFormation();
// Setup destination
Destination dest = createDestination("TEST.FOO", false);
// Setup consumers
MessageConsumer clientC = createConsumer("BrokerC", dest);
// Send messages
sendMessages("BrokerA", dest, MESSAGE_COUNT);
// Let's try to wait for any messages. Should be none.
Thread.sleep(1000);
// Get message count
MessageIdList msgsC = getConsumerMessages("BrokerC", clientC);
assertEquals(0, msgsC.getMessageCount());
}
/**
* BrokerA <- BrokerB -> BrokerC
*/
public void testBAandBCbrokerNetwork() throws Exception {
// Setup broker networks
bridgeBrokers("BrokerB", "BrokerA");
bridgeBrokers("BrokerB", "BrokerC");
startAllBrokers();
waitForBridgeFormation();
// Setup destination
Destination dest = createDestination("TEST.FOO", false);
// Setup consumers
MessageConsumer clientA = createConsumer("BrokerA", dest);
MessageConsumer clientC = createConsumer("BrokerC", dest);
Thread.sleep(2000); //et subscriptions get propagated
// Send messages
sendMessages("BrokerB", dest, MESSAGE_COUNT);
// Let's try to wait for any messages.
Thread.sleep(1000);
// Get message count
MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
MessageIdList msgsC = getConsumerMessages("BrokerC", clientC);
// Total received should be 100
assertEquals(MESSAGE_COUNT, msgsA.getMessageCount() + msgsC.getMessageCount());
}
/**
* BrokerA <- BrokerB -> BrokerC
*/
public void testBAandBCbrokerNetworkWithSelectorsSendFirst() throws Exception {
// Setup broker networks
bridgeBrokers("BrokerB", "BrokerA", true, 1, false);
bridgeBrokers("BrokerB", "BrokerC", true, 1, false);
startAllBrokers();
waitForBridgeFormation();
// Setup destination
Destination dest = createDestination("TEST.FOO", false);
// Send messages for broker A
HashMap<String, Object> props = new HashMap<String, Object>();
props.put("broker", "BROKER_A");
sendMessages("BrokerB", dest, MESSAGE_COUNT, props);
//Send messages for broker C
props.clear();
props.put("broker", "BROKER_C");
sendMessages("BrokerB", dest, MESSAGE_COUNT, props);
// Setup consumers
MessageConsumer clientA = createConsumer("BrokerA", dest, "broker = 'BROKER_A'");
MessageConsumer clientC = createConsumer("BrokerC", dest, "broker = 'BROKER_C'");
Thread.sleep(2000); //et subscriptions get propagated
// Let's try to wait for any messages.
//Thread.sleep(1000);
// Get message count
MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
MessageIdList msgsC = getConsumerMessages("BrokerC", clientC);
// Total received should be 100
assertEquals(MESSAGE_COUNT, msgsA.getMessageCount());
assertEquals(MESSAGE_COUNT, msgsC.getMessageCount());
}
/**
* BrokerA <- BrokerB -> BrokerC
*/
public void testBAandBCbrokerNetworkWithSelectorsSubscribeFirst() throws Exception {
// Setup broker networks
bridgeBrokers("BrokerB", "BrokerA", true, 1, false);
bridgeBrokers("BrokerB", "BrokerC", true, 1, false);
startAllBrokers();
waitForBridgeFormation();
// Setup destination
Destination dest = createDestination("TEST.FOO", false);
// Setup consumers
MessageConsumer clientA = createConsumer("BrokerA", dest, "broker = 'BROKER_A'");
MessageConsumer clientC = createConsumer("BrokerC", dest, "broker = 'BROKER_C'");
Thread.sleep(2000); //et subscriptions get propagated
// Send messages for broker A
HashMap<String, Object> props = new HashMap<String, Object>();
props.put("broker", "BROKER_A");
sendMessages("BrokerB", dest, MESSAGE_COUNT, props);
//Send messages for broker C
props.clear();
props.put("broker", "BROKER_C");
sendMessages("BrokerB", dest, MESSAGE_COUNT, props);
// Let's try to wait for any messages.
Thread.sleep(1000);
// Get message count
MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
MessageIdList msgsC = getConsumerMessages("BrokerC", clientC);
// Total received should be 100
assertEquals(MESSAGE_COUNT, msgsA.getMessageCount());
assertEquals(MESSAGE_COUNT, msgsC.getMessageCount());
}
/**
* BrokerA -> BrokerB <- BrokerC
*/
public void testABandCBbrokerNetwork() throws Exception {
// Setup broker networks
bridgeBrokers("BrokerA", "BrokerB");
bridgeBrokers("BrokerC", "BrokerB");
startAllBrokers();
waitForBridgeFormation();
// Setup destination
Destination dest = createDestination("TEST.FOO", false);
// Setup consumers
MessageConsumer clientB = createConsumer("BrokerB", dest);
// Send messages
sendMessages("BrokerA", dest, MESSAGE_COUNT);
sendMessages("BrokerC", dest, MESSAGE_COUNT);
// Get message count
MessageIdList msgsB = getConsumerMessages("BrokerB", clientB);
msgsB.waitForMessagesToArrive(MESSAGE_COUNT * 2);
assertEquals(MESSAGE_COUNT * 2, msgsB.getMessageCount());
}
/**
* BrokerA <-> BrokerB <-> BrokerC
*/
public void testAllConnectedBrokerNetwork() throws Exception {
// Setup broker networks
bridgeBrokers("BrokerA", "BrokerB");
bridgeBrokers("BrokerB", "BrokerA");
bridgeBrokers("BrokerB", "BrokerC");
bridgeBrokers("BrokerC", "BrokerB");
bridgeBrokers("BrokerA", "BrokerC");
bridgeBrokers("BrokerC", "BrokerA");
startAllBrokers();
waitForBridgeFormation();
// Setup destination
Destination dest = createDestination("TEST.FOO", false);
// Setup consumers
MessageConsumer clientA = createConsumer("BrokerA", dest);
MessageConsumer clientB = createConsumer("BrokerB", dest);
MessageConsumer clientC = createConsumer("BrokerC", dest);
// Send messages
sendMessages("BrokerA", dest, MESSAGE_COUNT);
sendMessages("BrokerB", dest, MESSAGE_COUNT);
sendMessages("BrokerC", dest, MESSAGE_COUNT);
// Let's try to wait for any messages.
Thread.sleep(1000);
// Get message count
MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
MessageIdList msgsB = getConsumerMessages("BrokerB", clientB);
MessageIdList msgsC = getConsumerMessages("BrokerC", clientC);
assertEquals(MESSAGE_COUNT * 3, msgsA.getMessageCount() + msgsB.getMessageCount() + msgsC.getMessageCount());
}
public void testAllConnectedUsingMulticast() throws Exception {
// Setup broker networks
bridgeAllBrokers();
startAllBrokers();
waitForBridgeFormation();
// Setup destination
Destination dest = createDestination("TEST.FOO", false);
// Setup consumers
MessageConsumer clientA = createConsumer("BrokerA", dest);
MessageConsumer clientB = createConsumer("BrokerB", dest);
MessageConsumer clientC = createConsumer("BrokerC", dest);
// Send messages
sendMessages("BrokerA", dest, MESSAGE_COUNT);
sendMessages("BrokerB", dest, MESSAGE_COUNT);
sendMessages("BrokerC", dest, MESSAGE_COUNT);
// Let's try to wait for any messages.
Thread.sleep(1000);
// Get message count
final MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
MessageIdList msgsB = getConsumerMessages("BrokerB", clientB);
MessageIdList msgsC = getConsumerMessages("BrokerC", clientC);
waitFor(new Condition() {
public boolean isSatisified() {
return msgsA.getMessageCount() == MESSAGE_COUNT;
}
});
assertEquals(MESSAGE_COUNT * 3, msgsA.getMessageCount() + msgsB.getMessageCount() + msgsC.getMessageCount());
}
// on slow machines some more waiting is required on account of slow advisories
private void waitFor(Condition condition) throws Exception {
final long expiry = System.currentTimeMillis() + MAX_WAIT_MILLIS;
while (!condition.isSatisified() && System.currentTimeMillis() < expiry) {
Thread.sleep(1000);
}
if (System.currentTimeMillis() >= expiry) {
LOG.error("expired while waiting for condition " + condition);
}
}
public void testAllConnectedUsingMulticastProducerConsumerOnA() throws Exception {
bridgeAllBrokers("default", 3, false);
startAllBrokers();
waitForBridgeFormation();
// Setup destination
Destination dest = createDestination("TEST.FOO", false);
// Setup consumers
int messageCount = 2000;
CountDownLatch messagesReceived = new CountDownLatch(messageCount);
MessageConsumer clientA = createConsumer("BrokerA", dest, messagesReceived);
// Let's try to wait for advisory percolation.
Thread.sleep(1000);
// Send messages
sendMessages("BrokerA", dest, messageCount);
assertTrue(messagesReceived.await(30, TimeUnit.SECONDS));
// Get message count
MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
assertEquals(messageCount, msgsA.getMessageCount());
}
public void testAllConnectedWithSpare() throws Exception {
bridgeAllBrokers("default", 3, false);
startAllBrokers();
waitForBridgeFormation();
// Setup destination
Destination dest = createDestination("TEST.FOO", false);
// Setup consumers
int messageCount = 2000;
CountDownLatch messagesReceived = new CountDownLatch(messageCount);
MessageConsumer clientA = createConsumer("BrokerA", dest, messagesReceived);
// ensure advisory percolation.
Thread.sleep(2000);
// Send messages
sendMessages("BrokerB", dest, messageCount);
assertTrue("messaged received within time limit", messagesReceived.await(30, TimeUnit.SECONDS));
// Get message count
MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
assertEquals(messageCount, msgsA.getMessageCount());
}
/*
* This test is disabled - as it fails with a fix for
* http://issues.apache.org/activemq/browse/AMQ-2530 - which highlights that
* For a Conduit bridge - local subscription Ids weren't removed in a ConduitBridge
* The test fails because on closing clientA - clientB correctly receives all the
* messages - ie. half dont get stuck on BrokerA -
*/
public void XtestMigrateConsumerStuckMessages() throws Exception {
boolean suppressQueueDuplicateSubscriptions = false;
bridgeAllBrokers("default", 3, suppressQueueDuplicateSubscriptions);
startAllBrokers();
waitForBridgeFormation();
// Setup destination
Destination dest = createDestination("TEST.FOO", false);
// Setup consumers
LOG.info("Consumer on A");
MessageConsumer clientA = createConsumer("BrokerA", dest);
// ensure advisors have percolated
Thread.sleep(2000);
LOG.info("Consumer on B");
int messageCount = 2000;
// will only get half of the messages
CountDownLatch messagesReceived = new CountDownLatch(messageCount/2);
MessageConsumer clientB = createConsumer("BrokerB", dest, messagesReceived);
// ensure advisors have percolated
Thread.sleep(2000);
LOG.info("Close consumer on A");
clientA.close();
// ensure advisors have percolated
Thread.sleep(2000);
LOG.info("Send to B");
sendMessages("BrokerB", dest, messageCount);
// Let's try to wait for any messages.
assertTrue(messagesReceived.await(30, TimeUnit.SECONDS));
// Get message count
MessageIdList msgs = getConsumerMessages("BrokerB", clientB);
// see will any more arrive
Thread.sleep(500);
assertEquals(messageCount/2, msgs.getMessageCount());
// pick up the stuck messages
messagesReceived = new CountDownLatch(messageCount/2);
clientA = createConsumer("BrokerA", dest, messagesReceived);
// Let's try to wait for any messages.
assertTrue(messagesReceived.await(30, TimeUnit.SECONDS));
msgs = getConsumerMessages("BrokerA", clientA);
assertEquals(messageCount/2, msgs.getMessageCount());
}
// use case: for maintenance, migrate consumers and producers from one
// node in the network to another so node can be replaced/updated
public void testMigrateConsumer() throws Exception {
boolean suppressQueueDuplicateSubscriptions = true;
boolean decreaseNetworkConsumerPriority = true;
bridgeAllBrokers("default", 3, suppressQueueDuplicateSubscriptions, decreaseNetworkConsumerPriority);
startAllBrokers();
waitForBridgeFormation();
// Setup destination
Destination dest = createDestination("TEST.FOO", false);
// Setup consumers
LOG.info("Consumer on A");
MessageConsumer clientA = createConsumer("BrokerA", dest);
// ensure advisors have percolated
Thread.sleep(2000);
LOG.info("Consumer on B");
int messageCount = 2000;
CountDownLatch messagesReceived = new CountDownLatch(messageCount);
MessageConsumer clientB = createConsumer("BrokerB", dest, messagesReceived);
// make the consumer slow so that any network consumer has a chance, even
// if it has a lower priority
MessageIdList msgs = getConsumerMessages("BrokerB", clientB);
msgs.setProcessingDelay(10);
// ensure advisors have percolated
Thread.sleep(2000);
LOG.info("Close consumer on A");
clientA.close();
// ensure advisors have percolated
Thread.sleep(2000);
LOG.info("Send to B");
sendMessages("BrokerB", dest, messageCount);
// Let's try to wait for any messages.
assertTrue("messages are received within limit", messagesReceived.await(60, TimeUnit.SECONDS));
assertEquals(messageCount, msgs.getMessageCount());
}
public void testNoDuplicateQueueSubs() throws Exception {
bridgeAllBrokers("default", 3, true);
startAllBrokers();
waitForBridgeFormation();
// Setup destination
Destination dest = createDestination("TEST.FOO", false);
// Setup consumers
String brokerName = "BrokerA";
MessageConsumer consumer = createConsumer(brokerName, dest);
// wait for advisories
Thread.sleep(2000);
// verify there is one consumer on each broker, no cycles
Collection<BrokerItem> brokerList = brokers.values();
for (Iterator<BrokerItem> i = brokerList.iterator(); i.hasNext();) {
BrokerService broker = i.next().broker;
verifyConsumerCount(broker, 1, dest);
}
consumer.close();
// wait for advisories
Thread.sleep(2000);
// verify there is no more consumers
for (Iterator<BrokerItem> i = brokerList.iterator(); i.hasNext();) {
BrokerService broker = i.next().broker;
verifyConsumerCount(broker, 0, dest);
}
}
public void testNoDuplicateQueueSubsHasLowestPriority() throws Exception {
boolean suppressQueueDuplicateSubscriptions = true;
boolean decreaseNetworkConsumerPriority = true;
bridgeAllBrokers("default", 3, suppressQueueDuplicateSubscriptions, decreaseNetworkConsumerPriority);
// Setup destination
final Destination dest = createDestination("TEST.FOO", false);
// delay the advisory messages so that one can percolate fully (cyclicly) before the other
BrokerItem brokerB = brokers.get("BrokerA");
brokerB.broker.setPlugins(new BrokerPlugin[]{new BrokerPlugin() {
public Broker installPlugin(Broker broker) throws Exception {
return new BrokerFilter(broker) {
final AtomicInteger count = new AtomicInteger();
@Override
public void preProcessDispatch(
MessageDispatch messageDispatch) {
if (messageDispatch.getDestination().getPhysicalName().contains("ActiveMQ.Advisory.Consumer")) {
// lets delay the first advisory
if (count.getAndIncrement() == 0) {
LOG.info("Sleeping on first advisory: " + messageDispatch);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
super.postProcessDispatch(messageDispatch);
}
};
}}
});
startAllBrokers();
waitForBridgeFormation();
// Setup consumers
String brokerName = "BrokerA";
createConsumer(brokerName, dest);
// wait for advisories
Thread.sleep(5000);
// verify there is one consumer on each broker, no cycles
Collection<BrokerItem> brokerList = brokers.values();
for (Iterator<BrokerItem> i = brokerList.iterator(); i.hasNext();) {
BrokerService broker = i.next().broker;
verifyConsumerCount(broker, 1, dest);
if (!brokerName.equals(broker.getBrokerName())) {
verifyConsumePriority(broker, ConsumerInfo.NETWORK_CONSUMER_PRIORITY, dest);
}
}
}
public void testDuplicateQueueSubs() throws Exception {
configureBroker(createBroker("BrokerD"));
bridgeAllBrokers("default", 3, false);
startAllBrokers();
waitForBridgeFormation();
// Setup destination
Destination dest = createDestination("TEST.FOO", false);
// Setup consumers
String brokerName = "BrokerA";
MessageConsumer consumer = createConsumer(brokerName, dest);
// wait for advisories
Thread.sleep(2000);
verifyConsumerCount(brokers.get(brokerName).broker, 1, dest);
// in a cyclic network, other brokers will get second order consumer
// an alternative route to A via each other
Collection<BrokerItem> brokerList = brokers.values();
for (Iterator<BrokerItem> i = brokerList.iterator(); i.hasNext();) {
BrokerService broker = i.next().broker;
if (!brokerName.equals(broker.getBrokerName())) {
verifyConsumerCount(broker, 5, dest);
verifyConsumePriority(broker, ConsumerInfo.NORMAL_PRIORITY, dest);
}
}
consumer.close();
// wait for advisories
Thread.sleep(2000);
for (Iterator<BrokerItem> i = brokerList.iterator(); i.hasNext();) {
BrokerService broker = i.next().broker;
if (!brokerName.equals(broker.getBrokerName())) {
logConsumerCount(broker, 0, dest);
}
}
for (Iterator<BrokerItem> i = brokerList.iterator(); i.hasNext();) {
BrokerService broker = i.next().broker;
verifyConsumerCount(broker, 0, dest);
}
}
private void verifyConsumerCount(BrokerService broker, int count, final Destination dest) throws Exception {
final RegionBroker regionBroker = (RegionBroker) broker.getRegionBroker();
waitFor(new Condition() {
public boolean isSatisified() throws Exception {
return !regionBroker.getDestinations(ActiveMQDestination.transform(dest)).isEmpty();
}
});
Queue internalQueue = (Queue) regionBroker.getDestinations(ActiveMQDestination.transform(dest)).iterator().next();
LOG.info("Verify: consumer count on " + broker.getBrokerName() + " matches:" + count + ", val:" + internalQueue.getConsumers().size());
assertEquals("consumer count on " + broker.getBrokerName() + " matches for q: " + internalQueue, count, internalQueue.getConsumers().size());
}
private void logConsumerCount(BrokerService broker, int count, final Destination dest) throws Exception {
final RegionBroker regionBroker = (RegionBroker) broker.getRegionBroker();
waitFor(new Condition() {
public boolean isSatisified() throws Exception {
return !regionBroker.getDestinations(ActiveMQDestination.transform(dest)).isEmpty();
}
});
Queue internalQueue = (Queue) regionBroker.getDestinations(ActiveMQDestination.transform(dest)).iterator().next();
LOG.info("Verify: consumer count on " + broker.getBrokerName() + " matches:" + count + ", val:" + internalQueue.getConsumers().size());
}
private void verifyConsumePriority(BrokerService broker, byte expectedPriority, Destination dest) throws Exception {
RegionBroker regionBroker = (RegionBroker) broker.getRegionBroker();
Queue internalQueue = (Queue) regionBroker.getDestinations(ActiveMQDestination.transform(dest)).iterator().next();
for (Subscription consumer : internalQueue.getConsumers()) {
assertEquals("consumer on " + broker.getBrokerName() + " matches priority: " + internalQueue, expectedPriority, consumer.getConsumerInfo().getPriority());
}
}
@Override
public void configureBroker(BrokerService brokerService) {
brokerService.setBrokerId(brokerService.getBrokerName());
}
@Override
public void setUp() throws Exception {
super.setAutoFail(true);
super.setUp();
createBroker(new URI("broker:(tcp://localhost:61616)/BrokerA?persistent=false&useJmx=false"));
createBroker(new URI("broker:(tcp://localhost:61617)/BrokerB?persistent=false&useJmx=false"));
createBroker(new URI("broker:(tcp://localhost:61618)/BrokerC?persistent=false&useJmx=false"));
}
}