blob: 1498c065572b5f501e0ccdc7713b10093af7b2f9 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.usecases;
import java.net.URI;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerPluginSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.network.ConditionalNetworkBridgeFilterFactory;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
*/
public class TwoBrokerQueueClientsReconnectTest extends JmsMultipleBrokersTestSupport {
protected static final int MESSAGE_COUNT = 100; // Best if a factor of 100
protected static final int PREFETCH_COUNT = 1;
protected static final int NETWORK_PREFETCH = 1;
private static final Logger LOG = LoggerFactory.getLogger(TwoBrokerQueueClientsReconnectTest.class);
protected int msgsClient1;
protected int msgsClient2;
protected String broker1;
protected String broker2;
public void testClientAReceivesOnly() throws Exception {
broker1 = "BrokerA";
broker2 = "BrokerB";
doOneClientReceivesOnly();
}
public void testClientBReceivesOnly() throws Exception {
broker1 = "BrokerB";
broker2 = "BrokerA";
doOneClientReceivesOnly();
}
public void doOneClientReceivesOnly() throws Exception {
// allow immediate replay back to origin
applyRateLimitNetworkFilter(0);
// Bridge brokers
bridgeBrokers(broker1, broker2);
bridgeBrokers(broker2, broker1);
// Run brokers
startAllBrokers();
// Create queue
Destination dest = createDestination("TEST.FOO", false);
// Create consumers
MessageConsumer client1 = createConsumer(broker1, dest);
MessageConsumer client2 = createConsumer(broker2, dest);
// Give clients time to register with broker
Thread.sleep(500);
// Always send messages to broker A
sendMessages("BrokerA", dest, MESSAGE_COUNT);
// Close the second client, messages should be sent to the first client
client2.close();
// Let the first client receive all messages
msgsClient1 += receiveAllMessages(client1);
client1.close();
// First client should have received 100 messages
assertEquals("Client for " + broker1 + " should have receive all messages.", MESSAGE_COUNT, msgsClient1);
}
public void testClientAReceivesOnlyAfterReconnect() throws Exception {
broker1 = "BrokerA";
broker2 = "BrokerB";
doOneClientReceivesOnlyAfterReconnect();
}
public void testClientBReceivesOnlyAfterReconnect() throws Exception {
broker1 = "BrokerB";
broker2 = "BrokerA";
doOneClientReceivesOnlyAfterReconnect();
}
public void doOneClientReceivesOnlyAfterReconnect() throws Exception {
// allow immediate replay back to origin
applyRateLimitNetworkFilter(0);
// Bridge brokers
bridgeBrokers(broker1, broker2);
bridgeBrokers(broker2, broker1);
// Run brokers
startAllBrokers();
// Create queue
Destination dest = createDestination("TEST.FOO", false);
// Create first consumer
MessageConsumer client1 = createConsumer(broker1, dest);
MessageConsumer client2 = createConsumer(broker2, dest);
// Give clients time to register with broker
Thread.sleep(500);
// Always send message to broker A
sendMessages("BrokerA", dest, MESSAGE_COUNT);
// Let the first client receive the first 20% of messages
msgsClient1 += receiveExactMessages(client1, (int)(MESSAGE_COUNT * 0.20));
// Disconnect the first client
client1.close();
// Create another client for the first broker
client1 = createConsumer(broker1, dest);
Thread.sleep(500);
// Close the second client, messages should be sent to the first client
client2.close();
// Receive the rest of the messages
msgsClient1 += receiveAllMessages(client1);
client1.close();
// The first client should have received 100 messages
assertEquals("Client for " + broker1 + " should have received all messages.", MESSAGE_COUNT, msgsClient1);
}
public void testTwoClientsReceiveClientADisconnects() throws Exception {
broker1 = "BrokerA";
broker2 = "BrokerB";
doTwoClientsReceiveOneClientDisconnects();
}
public void testTwoClientsReceiveClientBDisconnects() throws Exception {
broker1 = "BrokerB";
broker2 = "BrokerA";
doTwoClientsReceiveOneClientDisconnects();
}
public void doTwoClientsReceiveOneClientDisconnects() throws Exception {
// ensure all message do not flow across the network too quickly
applyRateLimitNetworkFilter(0.8 * MESSAGE_COUNT);
// Bridge brokers
bridgeBrokers(broker1, broker2);
bridgeBrokers(broker2, broker1);
// Run brokers
startAllBrokers();
// Create queue
Destination dest = createDestination("TEST.FOO", false);
// Create first client
MessageConsumer client1 = createConsumer(broker1, dest);
MessageConsumer client2 = createConsumer(broker2, dest);
// Give clients time to register with broker
Thread.sleep(500);
// Always send messages to broker A
sendMessages("BrokerA", dest, MESSAGE_COUNT);
LOG.info("Let each client receive 20% of the messages - 40% total");
msgsClient1 += receiveExactMessages(client1, (int)(MESSAGE_COUNT * 0.20));
msgsClient2 += receiveExactMessages(client2, (int)(MESSAGE_COUNT * 0.20));
// Disconnect the first client
client1.close();
LOG.info("Let the second client receive the rest of the messages");
msgsClient2 += receiveAllMessages(client2);
client2.close();
// First client should have received 20% of the messages
assertEquals("Client for " + broker1 + " should have received 20% of the messages.", (int)(MESSAGE_COUNT * 0.20), msgsClient1);
// Second client should have received 80% of the messages
assertEquals("Client for " + broker2 + " should have received 80% of the messages.", (int)(MESSAGE_COUNT * 0.80), msgsClient2);
}
public void testTwoClientsReceiveClientAReconnects() throws Exception {
broker1 = "BrokerA";
broker2 = "BrokerB";
doTwoClientsReceiveOneClientReconnects();
}
public void testTwoClientsReceiveClientBReconnects() throws Exception {
broker1 = "BrokerB";
broker2 = "BrokerA";
doTwoClientsReceiveOneClientReconnects();
}
public void doTwoClientsReceiveOneClientReconnects() throws Exception {
// ensure all message do not flow across the network too quickly
applyRateLimitNetworkFilter(0.2 * MESSAGE_COUNT);
// Bridge brokers
bridgeBrokers(broker1, broker2);
bridgeBrokers(broker2, broker1);
// Run brokers
startAllBrokers();
// Create queue
Destination dest = createDestination("TEST.FOO", false);
// Create the first client
MessageConsumer client1 = createConsumer(broker1, dest);
MessageConsumer client2 = createConsumer(broker2, dest);
// Give clients time to register with broker
Thread.sleep(500);
// Always send messages to broker A
sendMessages("BrokerA", dest, MESSAGE_COUNT);
// Let each client receive 20% of the messages - 40% total
msgsClient1 += receiveExactMessages(client1, (int)(MESSAGE_COUNT * 0.20));
msgsClient2 += receiveExactMessages(client2, (int)(MESSAGE_COUNT * 0.20));
LOG.info("msgsClient1=" + msgsClient1);
LOG.info("msgsClient2=" + msgsClient2);
Thread.sleep(1000);
LOG.info("Disconnect the first client");
client1.close();
LOG.info("Let the second client receive 20% more of the total messages");
msgsClient2 += receiveExactMessages(client2, (int)(MESSAGE_COUNT * 0.20));
LOG.info("msgsClient2=" + msgsClient2);
// Create another client for broker 1
client1 = createConsumer(broker1, dest);
Thread.sleep(1000);
// Let each client receive 20% of the messages - 40% total
msgsClient1 += receiveExactMessages(client1, (int)(MESSAGE_COUNT * 0.20));
client1.close();
LOG.info("new consumer addition, msgsClient1=" + msgsClient1);
Thread.sleep(2000);
msgsClient2 += receiveExactMessages(client2, (int)(MESSAGE_COUNT * 0.20));
client2.close();
LOG.info("msgsClient2=" + msgsClient2);
// First client should have received 40 messages
assertEquals("Client for " + broker1 + " should have received 40% of the messages.", (int)(MESSAGE_COUNT * 0.40), msgsClient1);
// Second client should have received 60 messages
assertEquals("Client for " + broker2 + " should have received 60% of the messages.", (int)(MESSAGE_COUNT * 0.60), msgsClient2);
}
private void applyRateLimitNetworkFilter(double rateLimit) {
ConditionalNetworkBridgeFilterFactory filterFactory = new ConditionalNetworkBridgeFilterFactory();
filterFactory.setReplayWhenNoConsumers(true);
filterFactory.setRateLimit((int) rateLimit);
filterFactory.setRateDuration(1000);
Collection<BrokerItem> brokerList = brokers.values();
for (Iterator<BrokerItem> i = brokerList.iterator(); i.hasNext();) {
BrokerService broker = i.next().broker;
broker.getDestinationPolicy().getDefaultEntry().setNetworkBridgeFilterFactory(filterFactory);
}
}
public void testTwoClientsReceiveTwoClientReconnects() throws Exception {
// ensure all message do not flow across the network too quickly
applyRateLimitNetworkFilter(0.5 * MESSAGE_COUNT);
broker1 = "BrokerA";
broker2 = "BrokerB";
// Bridge brokers
bridgeBrokers(broker1, broker2);
bridgeBrokers(broker2, broker1);
// Run brokers
startAllBrokers();
// Create queue
Destination dest = createDestination("TEST.FOO", false);
// Create the first client
MessageConsumer client1 = createConsumer(broker1, dest);
MessageConsumer client2 = createConsumer(broker2, dest);
// Give clients time to register with broker
Thread.sleep(500);
// Always send messages to broker A
sendMessages("BrokerA", dest, MESSAGE_COUNT);
// Let each client receive 20% of the messages - 40% total
msgsClient1 += receiveExactMessages(client1, (int)(MESSAGE_COUNT * 0.20));
msgsClient2 += receiveExactMessages(client2, (int)(MESSAGE_COUNT * 0.20));
LOG.info("Disconnect both clients");
client1.close();
client2.close();
// Let each client receive 30% more of the total messages - 60% total
LOG.info("Serially create another two clients for each broker and consume in turn");
client1 = createConsumer(broker1, dest);
msgsClient1 += receiveExactMessages(client1, (int)(MESSAGE_COUNT * 0.30));
client1.close();
// the close will allow replay or the replay of the remaining messages
client2 = createConsumer(broker2, dest);
msgsClient2 += receiveExactMessages(client2, (int)(MESSAGE_COUNT * 0.30));
client2.close();
// First client should have received 50% of the messages
assertEquals("Client for " + broker1 + " should have received 50% of the messages.", (int)(MESSAGE_COUNT * 0.50), msgsClient1);
// Second client should have received 50% of the messages
assertEquals("Client for " + broker2 + " should have received 50% of the messages.", (int)(MESSAGE_COUNT * 0.50), msgsClient2);
}
@SuppressWarnings("unchecked")
public void testDuplicateSend() throws Exception {
broker1 = "BrokerA";
broker2 = "BrokerB";
// enable producer audit for the network connector, off by default b/c of interference with composite
// dests and virtual topics
brokers.get(broker2).broker.getTransportConnectors().get(0).setAuditNetworkProducers(true);
bridgeBrokers(broker1, broker2);
final AtomicBoolean first = new AtomicBoolean();
final CountDownLatch gotMessageLatch = new CountDownLatch(1);
BrokerService brokerService = brokers.get(broker2).broker;
brokerService.setPersistent(true);
brokerService.setDeleteAllMessagesOnStartup(true);
brokerService.setPlugins(new BrokerPlugin[]{
new BrokerPluginSupport() {
@Override
public void send(final ProducerBrokerExchange producerExchange,
org.apache.activemq.command.Message messageSend)
throws Exception {
super.send(producerExchange, messageSend);
if (first.compareAndSet(false, true)) {
producerExchange.getConnectionContext().setDontSendReponse(true);
Executors.newSingleThreadExecutor().execute(new Runnable() {
@Override
public void run() {
try {
LOG.info("Waiting for recepit");
assertTrue("message received on time", gotMessageLatch.await(60, TimeUnit.SECONDS));
LOG.info("Stopping connection post send and receive and multiple producers");
producerExchange.getConnectionContext().getConnection().stop();
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
}
}
});
// Run brokers
startAllBrokers();
waitForBridgeFormation();
// Create queue
Destination dest = createDestination("TEST.FOO", false);
MessageConsumer client2 = createConsumer(broker2, dest);
sendMessages("BrokerA", dest, 1);
assertEquals("Client got message", 1, receiveExactMessages(client2, 1));
client2.close();
gotMessageLatch.countDown();
// message still pending on broker1
assertEquals("messages message still there", 1, brokers.get(broker1).broker.getAdminView().getTotalMessageCount());
client2 = createConsumer(broker2, dest);
LOG.info("Let the second client receive the rest of the messages");
assertEquals("no duplicate message", 0, receiveAllMessages(client2));
assertEquals("no duplicate message", 0, receiveAllMessages(client2));
assertEquals("no messages enqueued", 0, brokers.get(broker2).broker.getAdminView().getTotalMessageCount());
assertTrue("no messages enqueued on origin", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return 0 == brokers.get(broker1).broker.getAdminView().getTotalMessageCount();
}
}));
}
@SuppressWarnings("unchecked")
public void testDuplicateSendWithCursorAudit() throws Exception {
broker1 = "BrokerA";
broker2 = "BrokerB";
brokers.get(broker2).broker.getDestinationPolicy().getDefaultEntry().setEnableAudit(true);
bridgeBrokers(broker1, broker2);
final AtomicBoolean first = new AtomicBoolean();
final CountDownLatch gotMessageLatch = new CountDownLatch(1);
BrokerService brokerService = brokers.get(broker2).broker;
brokerService.setPersistent(true);
brokerService.setDeleteAllMessagesOnStartup(true);
brokerService.setPlugins(new BrokerPlugin[]{
new BrokerPluginSupport() {
@Override
public void send(final ProducerBrokerExchange producerExchange,
org.apache.activemq.command.Message messageSend)
throws Exception {
super.send(producerExchange, messageSend);
if (first.compareAndSet(false, true)) {
producerExchange.getConnectionContext().setDontSendReponse(true);
Executors.newSingleThreadExecutor().execute(new Runnable() {
@Override
public void run() {
try {
LOG.info("Waiting for recepit");
assertTrue("message received on time", gotMessageLatch.await(60, TimeUnit.SECONDS));
LOG.info("Stopping connection post send and receive and multiple producers");
producerExchange.getConnectionContext().getConnection().stop();
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
}
}
});
// Run brokers
startAllBrokers();
waitForBridgeFormation();
// Create queue
Destination dest = createDestination("TEST.FOO", false);
MessageConsumer client2 = createConsumer(broker2, dest);
sendMessages("BrokerA", dest, 1);
assertEquals("Client got message", 1, receiveExactMessages(client2, 1));
client2.close();
gotMessageLatch.countDown();
// message still pending on broker1
assertEquals("messages message still there", 1, brokers.get(broker1).broker.getAdminView().getTotalMessageCount());
client2 = createConsumer(broker2, dest);
LOG.info("Let the second client receive the rest of the messages");
assertEquals("no duplicate message", 0, receiveAllMessages(client2));
assertEquals("no duplicate message", 0, receiveAllMessages(client2));
assertEquals("1 messages enqueued on dlq", 1, brokers.get(broker2).broker.getAdminView().getTotalMessageCount());
assertTrue("no messages enqueued on origin", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return 0 == brokers.get(broker1).broker.getAdminView().getTotalMessageCount();
}
}));
}
@SuppressWarnings("unchecked")
public void testDuplicateSendWithNoAuditEnqueueCountStat() throws Exception {
broker1 = "BrokerA";
broker2 = "BrokerB";
NetworkConnector networkConnector = bridgeBrokers(broker1, broker2);
final AtomicBoolean first = new AtomicBoolean();
final CountDownLatch gotMessageLatch = new CountDownLatch(1);
BrokerService brokerService = brokers.get(broker2).broker;
brokerService.setPersistent(true);
brokerService.setDeleteAllMessagesOnStartup(true);
// disable concurrent dispatch otherwise store duplicate suppression will be skipped b/c cursor audit is already
// disabled so verification of stats will fail - ie: duplicate will be dispatched
((KahaDBPersistenceAdapter)brokerService.getPersistenceAdapter()).setConcurrentStoreAndDispatchQueues(false);
brokerService.setPlugins(new BrokerPlugin[]{
new BrokerPluginSupport() {
@Override
public void send(final ProducerBrokerExchange producerExchange,
org.apache.activemq.command.Message messageSend)
throws Exception {
super.send(producerExchange, messageSend);
if (first.compareAndSet(false, true)) {
producerExchange.getConnectionContext().setDontSendReponse(true);
Executors.newSingleThreadExecutor().execute(new Runnable() {
@Override
public void run() {
try {
LOG.info("Waiting for recepit");
assertTrue("message received on time", gotMessageLatch.await(60, TimeUnit.SECONDS));
LOG.info("Stopping connection post send and receive and multiple producers");
producerExchange.getConnectionContext().getConnection().stop();
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
}
}
});
// Create queue
ActiveMQDestination dest = createDestination("TEST.FOO", false);
// statically include our destination
networkConnector.addStaticallyIncludedDestination(dest);
// Run brokers
startAllBrokers();
waitForBridgeFormation();
sendMessages("BrokerA", dest, 1);
// wait for broker2 to get the initial forward
Wait.waitFor(new Wait.Condition(){
@Override
public boolean isSatisified() throws Exception {
return brokers.get(broker2).broker.getAdminView().getTotalMessageCount() == 1;
}
});
// message still pending on broker1
assertEquals("messages message still there", 1, brokers.get(broker1).broker.getAdminView().getTotalMessageCount());
// allow the bridge to be shutdown and restarted
gotMessageLatch.countDown();
// verify message is forwarded after restart
assertTrue("no messages enqueued on origin", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return 0 == brokers.get(broker1).broker.getAdminView().getTotalMessageCount();
}
}));
assertEquals("one messages pending", 1, brokers.get(broker2).broker.getAdminView().getTotalMessageCount());
assertEquals("one messages enqueued", 1, brokers.get(broker2).broker.getDestination(dest).getDestinationStatistics().getEnqueues().getCount());
}
@SuppressWarnings("unchecked")
public void testDuplicateSendWithNoAuditEnqueueCountStatConcurrentStoreAndDispatch() throws Exception {
broker1 = "BrokerA";
broker2 = "BrokerB";
NetworkConnector networkConnector = bridgeBrokers(broker1, broker2);
final AtomicBoolean first = new AtomicBoolean();
final CountDownLatch gotMessageLatch = new CountDownLatch(1);
BrokerService brokerService = brokers.get(broker2).broker;
brokerService.setPersistent(true);
brokerService.setDeleteAllMessagesOnStartup(true);
brokerService.setPlugins(new BrokerPlugin[]{
new BrokerPluginSupport() {
@Override
public void send(final ProducerBrokerExchange producerExchange,
org.apache.activemq.command.Message messageSend)
throws Exception {
super.send(producerExchange, messageSend);
if (first.compareAndSet(false, true)) {
producerExchange.getConnectionContext().setDontSendReponse(true);
Executors.newSingleThreadExecutor().execute(new Runnable() {
@Override
public void run() {
try {
LOG.info("Waiting for recepit");
assertTrue("message received on time", gotMessageLatch.await(60, TimeUnit.SECONDS));
LOG.info("Stopping connection post send and receive by local queue over bridge");
producerExchange.getConnectionContext().getConnection().stop();
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
}
}
});
// Create queue
final ActiveMQDestination dest = createDestination("TEST.FOO", false);
// statically include our destination
networkConnector.addStaticallyIncludedDestination(dest);
// Run brokers
startAllBrokers();
waitForBridgeFormation();
sendMessages("BrokerA", dest, 1);
// wait for broker2 to get the initial forward
Wait.waitFor(new Wait.Condition(){
@Override
public boolean isSatisified() throws Exception {
return brokers.get(broker2).broker.getAdminView().getTotalMessageCount() == 1;
}
});
// message still pending on broker1
assertEquals("messages message still there", 1, brokers.get(broker1).broker.getAdminView().getTotalMessageCount());
// allow the bridge to be shutdown and restarted
gotMessageLatch.countDown();
// verify message is forwarded after restart
assertTrue("no messages enqueued on origin", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return 0 == brokers.get(broker1).broker.getAdminView().getTotalMessageCount();
}
}));
// duplicate ready to dispatch
assertEquals("one messages pending", 2, brokers.get(broker2).broker.getAdminView().getTotalMessageCount());
assertEquals("one messages enqueued", 2, brokers.get(broker2).broker.getDestination(dest).getDestinationStatistics().getEnqueues().getCount());
assertEquals("one messages", 2, brokers.get(broker2).broker.getDestination(dest).getDestinationStatistics().getMessages().getCount());
// only one message available in the store...
Connection conn = createConnection(broker2);
conn.start();
Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageConsumer messageConsumer = sess.createConsumer(dest);
assertEquals("Client got message", 1, receiveExactMessages(messageConsumer, 1));
messageConsumer.close(); // no ack
assertTrue("1 messages enqueued on origin", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return 1 == brokers.get(broker2).broker.getDestination(dest).getDestinationStatistics().getMessages().getCount();
}
}));
// restart to validate message not acked due to duplicate processing
// consume again and ack
destroyAllBrokers();
createBroker(new URI("broker:(tcp://localhost:0)/BrokerB?useJmx=true&advisorySupport=false")).start();
assertEquals("Receive after restart and previous receive unacked", 1, receiveExactMessages(createConsumer(broker2, dest), 1));
assertTrue("no messages enqueued", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return 0 == brokers.get(broker2).broker.getDestination(dest).getDestinationStatistics().getMessages().getCount();
}
}));
final ActiveMQDestination dlq = createDestination("ActiveMQ.DLQ", false);
assertTrue("one message still on dlq", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return 1 == brokers.get(broker2).broker.getDestination(dlq).getDestinationStatistics().getMessages().getCount();
}
}));
}
protected int receiveExactMessages(MessageConsumer consumer, int msgCount) throws Exception {
Message msg;
int i;
for (i = 0; i < msgCount; i++) {
msg = consumer.receive(4000);
if (msg == null) {
LOG.error("Consumer failed to receive exactly " + msgCount + " messages. Actual messages received is: " + i);
break;
}
}
return i;
}
protected int receiveAllMessages(MessageConsumer consumer) throws Exception {
int msgsReceived = 0;
Message msg;
do {
msg = consumer.receive(1000);
if (msg != null) {
msgsReceived++;
}
} while (msg != null);
return msgsReceived;
}
@Override
protected MessageConsumer createConsumer(String brokerName, Destination dest) throws Exception {
Connection conn = createConnection(brokerName);
conn.start();
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
return sess.createConsumer(dest);
}
@Override
protected void configureBroker(BrokerService broker) {
PolicyMap policyMap = new PolicyMap();
PolicyEntry defaultEntry = new PolicyEntry();
defaultEntry.setExpireMessagesPeriod(0);
defaultEntry.setEnableAudit(false);
policyMap.setDefaultEntry(defaultEntry);
broker.setDestinationPolicy(policyMap);
}
@Override
protected NetworkConnector bridgeBrokers(BrokerService localBroker, BrokerService remoteBroker, boolean dynamicOnly, int networkTTL, boolean conduit, boolean failover) throws Exception {
NetworkConnector nc = super.bridgeBrokers(localBroker,remoteBroker, dynamicOnly, networkTTL, conduit, failover);
nc.setPrefetchSize(NETWORK_PREFETCH);
nc.setDecreaseNetworkConsumerPriority(true);
return nc;
}
@Override
public void setUp() throws Exception {
super.setAutoFail(true);
super.setUp();
createBroker(new URI("broker:(tcp://localhost:0)/BrokerA?persistent=false&useJmx=true"));
createBroker(new URI("broker:(tcp://localhost:0)/BrokerB?persistent=false&useJmx=true"));
// Configure broker connection factory
ActiveMQConnectionFactory factoryA;
ActiveMQConnectionFactory factoryB;
factoryA = (ActiveMQConnectionFactory)getConnectionFactory("BrokerA");
factoryB = (ActiveMQConnectionFactory)getConnectionFactory("BrokerB");
// Set prefetch policy
ActiveMQPrefetchPolicy policy = new ActiveMQPrefetchPolicy();
policy.setAll(PREFETCH_COUNT);
factoryA.setPrefetchPolicy(policy);
factoryB.setPrefetchPolicy(policy);
factoryA.setWatchTopicAdvisories(false);
factoryB.setWatchTopicAdvisories(false);
msgsClient1 = 0;
msgsClient2 = 0;
}
}