blob: 0abcfbbf594d7a3d67ad952f1bc7c1125218a4fc [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.bugs;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerRegistry;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.junit.Test;
import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.connection.SingleConnectionFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
public class LoadBalanceTest {
private static final Logger LOG = LoggerFactory.getLogger(LoadBalanceTest.class);
private static final String TESTING_QUEUE = "testingqueue";
private static int networkBridgePrefetch = 1000;
@Test
public void does_load_balance_between_consumers() throws Exception {
BrokerService brokerService1 = null;
BrokerService brokerService2 = null;
final int total = 100;
final AtomicInteger broker1Count = new AtomicInteger(0);
final AtomicInteger broker2Count = new AtomicInteger(0);
final CountDownLatch startProducer = new CountDownLatch(1);
try {
{
brokerService1 = new BrokerService();
brokerService1.setBrokerName("one");
brokerService1.setUseJmx(false);
brokerService1
.setPersistenceAdapter(new MemoryPersistenceAdapter());
brokerService1.addConnector("nio://0.0.0.0:61616");
final NetworkConnector network1 = brokerService1
.addNetworkConnector("static:(tcp://localhost:51515)");
network1.setName("network1");
network1.setDynamicOnly(true);
network1.setNetworkTTL(3);
network1.setPrefetchSize(networkBridgePrefetch);
network1.setConduitSubscriptions(false);
network1.setDecreaseNetworkConsumerPriority(false);
network1.setDispatchAsync(false);
brokerService1.start();
}
{
brokerService2 = new BrokerService();
brokerService2.setBrokerName("two");
brokerService2.setUseJmx(false);
brokerService2
.setPersistenceAdapter(new MemoryPersistenceAdapter());
brokerService2.addConnector("nio://0.0.0.0:51515");
final NetworkConnector network2 = brokerService2
.addNetworkConnector("static:(tcp://localhost:61616)");
network2.setName("network1");
network2.setDynamicOnly(true);
network2.setNetworkTTL(3);
network2.setPrefetchSize(networkBridgePrefetch);
network2.setConduitSubscriptions(false);
network2.setDecreaseNetworkConsumerPriority(false);
network2.setDispatchAsync(false);
brokerService2.start();
}
final ExecutorService pool = Executors.newSingleThreadExecutor();
final ActiveMQConnectionFactory connectionFactory1 = new ActiveMQConnectionFactory(
"vm://one");
final SingleConnectionFactory singleConnectionFactory1 = new SingleConnectionFactory(
connectionFactory1);
singleConnectionFactory1.setReconnectOnException(true);
final DefaultMessageListenerContainer container1 = new DefaultMessageListenerContainer();
container1.setConnectionFactory(singleConnectionFactory1);
container1.setMaxConcurrentConsumers(1);
container1.setDestination(new ActiveMQQueue("testingqueue"));
container1.setMessageListener(new MessageListener() {
public void onMessage(final Message message) {
broker1Count.incrementAndGet();
}
});
container1.afterPropertiesSet();
container1.start();
pool.submit(new Callable<Object>() {
public Object call() throws Exception {
try {
final ActiveMQConnectionFactory connectionFactory2 = new ActiveMQConnectionFactory(
"vm://two");
final SingleConnectionFactory singleConnectionFactory2 = new SingleConnectionFactory(
connectionFactory2);
singleConnectionFactory2.setReconnectOnException(true);
final DefaultMessageListenerContainer container2 = new DefaultMessageListenerContainer();
container2
.setConnectionFactory(singleConnectionFactory2);
container2.setMaxConcurrentConsumers(1);
container2.setDestination(new ActiveMQQueue(
"testingqueue"));
container2.setMessageListener(new MessageListener() {
public void onMessage(final Message message) {
broker2Count.incrementAndGet();
}
});
container2.afterPropertiesSet();
container2.start();
assertTrue("wait for start signal", startProducer.await(20, TimeUnit.SECONDS));
final CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(
singleConnectionFactory2);
final JmsTemplate template = new JmsTemplate(
cachingConnectionFactory);
final ActiveMQQueue queue = new ActiveMQQueue(
"testingqueue");
for (int i = 0; i < total; i++) {
template.send(queue, new MessageCreator() {
public Message createMessage(
final Session session)
throws JMSException {
final TextMessage message = session
.createTextMessage();
message.setText("Hello World!");
return message;
}
});
}
// give spring time to scale back again
while (container2.getActiveConsumerCount() > 1) {
System.out.println("active consumer count: "
+ container2.getActiveConsumerCount());
System.out.println("concurrent consumer count: "
+ container2.getConcurrentConsumers());
Thread.sleep(1000);
}
cachingConnectionFactory.destroy();
container2.destroy();
} catch (final Throwable t) {
t.printStackTrace();
}
return null;
}
});
waitForBridgeFormation(10000);
startProducer.countDown();
pool.shutdown();
pool.awaitTermination(10, TimeUnit.SECONDS);
LOG.info("broker1Count " + broker1Count.get() + ", broker2Count " + broker2Count.get());
int count = 0;
// give it 10 seconds
while (count++ < 10
&& broker1Count.get() + broker2Count.get() != total) {
LOG.info("broker1Count " + broker1Count.get() + ", broker2Count " + broker2Count.get());
Thread.sleep(1000);
}
container1.destroy();
} finally {
try {
if (brokerService1 != null) {
brokerService1.stop();
}
} catch (final Throwable t) {
t.printStackTrace();
}
try {
if (brokerService2 != null) {
brokerService2.stop();
}
} catch (final Throwable t) {
t.printStackTrace();
}
}
if (broker1Count.get() < 25 || broker2Count.get() < 25) {
fail("Each broker should have gotten at least 25 messages but instead broker1 got "
+ broker1Count.get()
+ " and broker2 got "
+ broker2Count.get());
}
}
@Test
public void does_xml_multicast_load_balance_between_consumers() throws Exception {
final int total = 100;
final AtomicInteger broker1Count = new AtomicInteger(0);
final AtomicInteger broker2Count = new AtomicInteger(0);
final ExecutorService pool = Executors.newSingleThreadExecutor();
final CountDownLatch startProducer = new CountDownLatch(1);
final String xmlConfig = getClass().getPackage().getName().replace('.','/') + "/loadbalancetest.xml";
System.setProperty("lbt.networkBridgePrefetch", String.valueOf(networkBridgePrefetch));
System.setProperty("lbt.brokerName", "one");
final ActiveMQConnectionFactory connectionFactory1 = new ActiveMQConnectionFactory(
"vm://one?brokerConfig=xbean:" + xmlConfig);
final SingleConnectionFactory singleConnectionFactory1 = new SingleConnectionFactory(
connectionFactory1);
singleConnectionFactory1.setReconnectOnException(true);
final DefaultMessageListenerContainer container1 = new DefaultMessageListenerContainer();
container1.setConnectionFactory(singleConnectionFactory1);
container1.setMaxConcurrentConsumers(1);
container1.setDestination(new ActiveMQQueue(TESTING_QUEUE));
container1.setMessageListener(new MessageListener() {
public void onMessage(final Message message) {
broker1Count.incrementAndGet();
}
});
container1.afterPropertiesSet();
container1.start();
pool.submit(new Callable<Object>() {
public Object call() throws Exception {
System.setProperty("lbt.brokerName", "two");
final ActiveMQConnectionFactory connectionFactory2 = new ActiveMQConnectionFactory(
"vm://two?brokerConfig=xbean:" + xmlConfig);
final SingleConnectionFactory singleConnectionFactory2 = new SingleConnectionFactory(
connectionFactory2);
singleConnectionFactory2.setReconnectOnException(true);
final DefaultMessageListenerContainer container2 = new DefaultMessageListenerContainer();
container2.setConnectionFactory(singleConnectionFactory2);
container2.setMaxConcurrentConsumers(1);
container2.setDestination(new ActiveMQQueue(TESTING_QUEUE));
container2.setMessageListener(new MessageListener() {
public void onMessage(final Message message) {
broker2Count.incrementAndGet();
}
});
container2.afterPropertiesSet();
container2.start();
assertTrue("wait for start signal", startProducer.await(20, TimeUnit.SECONDS));
final CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(
singleConnectionFactory2);
final JmsTemplate template = new JmsTemplate(
cachingConnectionFactory);
final ActiveMQQueue queue = new ActiveMQQueue(TESTING_QUEUE);
for (int i = 0; i < total; i++) {
template.send(queue, new MessageCreator() {
public Message createMessage(final Session session)
throws JMSException {
final TextMessage message = session
.createTextMessage();
message.setText("Hello World!");
return message;
}
});
}
return null;
}
});
// give network a chance to build, needs advisories
waitForBridgeFormation(10000);
startProducer.countDown();
pool.shutdown();
pool.awaitTermination(10, TimeUnit.SECONDS);
LOG.info("broker1Count " + broker1Count.get() + ", broker2Count " + broker2Count.get());
int count = 0;
// give it 10 seconds
while (count++ < 10 && broker1Count.get() + broker2Count.get() != total) {
LOG.info("broker1Count " + broker1Count.get() + ", broker2Count " + broker2Count.get());
Thread.sleep(1000);
}
if (broker1Count.get() < 25 || broker2Count.get() < 25) {
fail("Each broker should have gotten at least 25 messages but instead broker1 got "
+ broker1Count.get()
+ " and broker2 got "
+ broker2Count.get());
}
}
// need to ensure broker bridge is alive before starting the consumer
// peeking at the internals will give us this info
private void waitForBridgeFormation(long delay) throws Exception {
long done = System.currentTimeMillis() + delay;
while (done > System.currentTimeMillis()) {
BrokerService broker = BrokerRegistry.getInstance().lookup("two");
if (broker != null && !broker.getNetworkConnectors().isEmpty()) {
if (!broker.getNetworkConnectors().get(0).activeBridges().isEmpty()) {
return;
}
}
Thread.sleep(1000);
}
}
}