blob: 7ec0876019a8e5fd205406e302a8ff42f53ce37d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.usecases;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.BytesMessage;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import junit.framework.Assert;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.test.*;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.network.DiscoveryNetworkConnector;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.pool.PooledConnectionFactory;
import org.apache.activemq.usage.SystemUsage;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
public class AMQFailoverIssue extends org.apache.activemq.test.TestSupport {
private static final String URL1 = "tcp://localhost:61616";
private static final String QUEUE1_NAME = "test.queue.1";
private static final int MAX_CONSUMERS = 10;
private static final int MAX_PRODUCERS = 5;
private static final int NUM_MESSAGE_TO_SEND = 10000;
private static final int TOTAL_MESSAGES = MAX_PRODUCERS * NUM_MESSAGE_TO_SEND;
private static final boolean USE_FAILOVER = true;
private AtomicInteger messageCount = new AtomicInteger();
private CountDownLatch doneLatch;
@Override
public void setUp() throws Exception {
}
@Override
public void tearDown() throws Exception {
}
// This should fail with incubator-activemq-fuse-4.1.0.5
public void testFailoverIssue() throws Exception {
BrokerService brokerService1 = null;
ActiveMQConnectionFactory acf;
PooledConnectionFactory pcf;
DefaultMessageListenerContainer container1 = null;
try {
brokerService1 = createBrokerService("broker1", URL1, null);
brokerService1.start();
acf = createConnectionFactory(URL1, USE_FAILOVER);
pcf = new PooledConnectionFactory(acf);
// Only listen on the first queue.. let the 2nd queue fill up.
doneLatch = new CountDownLatch(TOTAL_MESSAGES);
container1 = createDefaultMessageListenerContainer(acf, new TestMessageListener1(0), QUEUE1_NAME);
container1.afterPropertiesSet();
Thread.sleep(5000);
final ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < MAX_PRODUCERS; i++) {
executor.submit(new PooledProducerTask(pcf, QUEUE1_NAME));
}
// Wait for all message to arrive.
assertTrue(doneLatch.await(45, TimeUnit.SECONDS));
executor.shutdown();
// Thread.sleep(30000);
Assert.assertEquals(TOTAL_MESSAGES, messageCount.get());
} finally {
container1.stop();
container1.destroy();
container1 = null;
brokerService1.stop();
brokerService1 = null;
}
}
private BrokerService createBrokerService(final String brokerName, final String uri1, final String uri2) throws Exception {
final BrokerService brokerService = new BrokerService();
brokerService.setBrokerName(brokerName);
brokerService.setPersistent(false);
brokerService.setUseJmx(true);
final SystemUsage memoryManager = new SystemUsage();
memoryManager.getMemoryUsage().setLimit(5000000);
brokerService.setSystemUsage(memoryManager);
final List<PolicyEntry> policyEntries = new ArrayList<PolicyEntry>();
final PolicyEntry entry = new PolicyEntry();
entry.setQueue(">");
// entry.setQueue(QUEUE1_NAME);
entry.setMemoryLimit(1);
policyEntries.add(entry);
final PolicyMap policyMap = new PolicyMap();
policyMap.setPolicyEntries(policyEntries);
brokerService.setDestinationPolicy(policyMap);
final TransportConnector tConnector = new TransportConnector();
tConnector.setUri(new URI(uri1));
tConnector.setName(brokerName + ".transportConnector");
brokerService.addConnector(tConnector);
if (uri2 != null) {
final NetworkConnector nc = new DiscoveryNetworkConnector(new URI("static:" + uri2));
nc.setBridgeTempDestinations(true);
nc.setBrokerName(brokerName);
nc.setPrefetchSize(1);
brokerService.addNetworkConnector(nc);
}
return brokerService;
}
public DefaultMessageListenerContainer createDefaultMessageListenerContainer(final ConnectionFactory acf, final MessageListener listener, final String queue) {
final DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
container.setConnectionFactory(acf);
container.setDestinationName(queue);
container.setMessageListener(listener);
container.setSessionTransacted(false);
container.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
container.setConcurrentConsumers(MAX_CONSUMERS);
return container;
}
public ActiveMQConnectionFactory createConnectionFactory(final String url, final boolean useFailover) {
final String failoverUrl = "failover:(" + url + ")";
final ActiveMQConnectionFactory acf = new ActiveMQConnectionFactory(useFailover ? failoverUrl : url);
acf.setCopyMessageOnSend(false);
acf.setUseAsyncSend(false);
acf.setDispatchAsync(true);
acf.setUseCompression(false);
acf.setOptimizeAcknowledge(false);
acf.setOptimizedMessageDispatch(true);
acf.setUseAsyncSend(false);
return acf;
}
private class TestMessageListener1 implements MessageListener {
private final long waitTime;
public TestMessageListener1(long waitTime) {
this.waitTime = waitTime;
}
public void onMessage(Message msg) {
try {
messageCount.incrementAndGet();
doneLatch.countDown();
Thread.sleep(waitTime);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
private static class PooledProducerTask implements Runnable {
private final String queueName;
private final PooledConnectionFactory pcf;
public PooledProducerTask(final PooledConnectionFactory pcf, final String queueName) {
this.pcf = pcf;
this.queueName = queueName;
}
public void run() {
try {
final JmsTemplate jmsTemplate = new JmsTemplate(pcf);
jmsTemplate.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
jmsTemplate.setExplicitQosEnabled(true);
jmsTemplate.setMessageIdEnabled(false);
jmsTemplate.setMessageTimestampEnabled(false);
jmsTemplate.afterPropertiesSet();
final byte[] bytes = new byte[2048];
final Random r = new Random();
r.nextBytes(bytes);
Thread.sleep(2000);
final AtomicInteger count = new AtomicInteger();
for (int i = 0; i < NUM_MESSAGE_TO_SEND; i++) {
jmsTemplate.send(queueName, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
final BytesMessage message = session.createBytesMessage();
message.writeBytes(bytes);
message.setIntProperty("count", count.incrementAndGet());
message.setStringProperty("producer", "pooled");
return message;
}
});
}
} catch (final Throwable e) {
e.printStackTrace();
}
}
}
}