blob: da2f367c48c95af51fd49741a285ef582ddb4524 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.usecases;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@RunWith(Parameterized.class)
public class MessageGroupReconnectDistributionTest {
public static final Logger LOG = LoggerFactory.getLogger(MessageGroupReconnectDistributionTest.class);
final Random random = new Random();
protected Connection connection;
protected Session session;
protected MessageProducer producer;
protected ActiveMQQueue destination = new ActiveMQQueue("GroupQ");
protected TransportConnector connector;
ActiveMQConnectionFactory connFactory;
BrokerService broker;
int numMessages = 10000;
int groupSize = 10;
int batchSize = 20;
@Parameterized.Parameter(0)
public int numConsumers = 4;
@Parameterized.Parameter(1)
public boolean consumerPriority = true;
@Parameterized.Parameters(name="numConsumers={0},consumerPriority={1}")
public static Iterable<Object[]> combinations() {
return Arrays.asList(new Object[][]{{4, true}, {10, true}});
}
@Before
public void setUp() throws Exception {
broker = createBroker();
broker.start();
connFactory = new ActiveMQConnectionFactory(connector.getConnectUri() + "?jms.prefetchPolicy.all=200");
connFactory.setWatchTopicAdvisories(false);
connection = connFactory.createConnection();
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
producer = session.createProducer(destination);
connection.start();
}
protected BrokerService createBroker() throws Exception {
BrokerService service = new BrokerService();
service.setAdvisorySupport(false);
service.setPersistent(false);
service.setUseJmx(true);
PolicyMap policyMap = new PolicyMap();
PolicyEntry policy = new PolicyEntry();
policy.setUseConsumerPriority(consumerPriority);
policy.setMessageGroupMapFactoryType("cached?cacheSize=" + (numConsumers - 1));
policyMap.setDefaultEntry(policy);
service.setDestinationPolicy(policyMap);
connector = service.addConnector("tcp://localhost:0");
return service;
}
@After
public void tearDown() throws Exception {
producer.close();
session.close();
connection.close();
broker.stop();
}
@Test(timeout = 5 * 60 * 1000)
public void testReconnect() throws Exception {
final AtomicLong totalConsumed = new AtomicLong(0);
ExecutorService executorService = Executors.newFixedThreadPool(numConsumers);
final ArrayList<AtomicLong> consumedCounters = new ArrayList<AtomicLong>(numConsumers);
final ArrayList<AtomicLong> batchCounters = new ArrayList<AtomicLong>(numConsumers);
for (int i = 0; i < numConsumers; i++) {
consumedCounters.add(new AtomicLong(0l));
batchCounters.add(new AtomicLong(0l));
final int id = i;
executorService.submit(new Runnable() {
int getBatchSize() {
return (id + 1) * batchSize;
}
@Override
public void run() {
try {
Session connectionSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
int batchSize = getBatchSize();
MessageConsumer messageConsumer = connectionSession.createConsumer(destWithPrefetch(destination));
Message message;
AtomicLong consumed = consumedCounters.get(id);
AtomicLong batches = batchCounters.get(id);
LOG.info("Consumer: " + id + ", batchSize:" + batchSize + ", totalConsumed:" + totalConsumed.get() + ", consumed:" + consumed.get());
while (totalConsumed.get() < numMessages) {
message = messageConsumer.receive(10000);
if (message == null) {
LOG.info("Consumer: " + id + ", batchSize:" + batchSize + ", null message (totalConsumed:" + totalConsumed.get() + ") consumed:" + consumed.get());
messageConsumer.close();
if (totalConsumed.get() == numMessages) {
break;
} else {
batchSize = getBatchSize();
messageConsumer = connectionSession.createConsumer(destWithPrefetch(destination));
batches.incrementAndGet();
continue;
}
}
consumed.incrementAndGet();
totalConsumed.incrementAndGet();
if (consumed.get() > 0 && consumed.intValue() % batchSize == 0) {
messageConsumer.close();
batchSize = getBatchSize();
messageConsumer = connectionSession.createConsumer(destWithPrefetch(destination));
batches.incrementAndGet();
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
});
TimeUnit.MILLISECONDS.sleep(200);
}
TimeUnit.SECONDS.sleep(1);
produceMessages(numMessages);
executorService.shutdown();
assertTrue("threads done on time", executorService.awaitTermination(10, TimeUnit.MINUTES));
assertEquals("All consumed", numMessages, totalConsumed.intValue());
LOG.info("Distribution: " + consumedCounters);
LOG.info("Batches: " + batchCounters);
double max = consumedCounters.get(0).longValue() * 1.5;
double min = consumedCounters.get(0).longValue() * 0.5;
for (AtomicLong l : consumedCounters) {
assertTrue("Even +/- 50% distribution on consumed:" + consumedCounters + ", outlier:" + l.get(),
l.longValue() < max && l.longValue() > min);
}
}
private Destination destWithPrefetch(ActiveMQQueue destination) throws Exception {
return destination;
}
private void produceMessages(int numMessages) throws JMSException {
int groupID=0;
for (int i = 0; i < numMessages; i++) {
if (i>0 && i%groupSize==0) {
groupID++;
}
TextMessage msga = session.createTextMessage("hello " + i);
msga.setStringProperty("JMSXGroupID", "Group-"+groupID);
producer.send(msga);
}
}
}