blob: 162e3419be8b468ad270b1dcda892dbde95083a2 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.policy;
import junit.framework.Test;
import org.apache.activemq.JmsMultipleClientsTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.AbortSlowConsumerStrategyViewMBean;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.util.MessageIdList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.Connection;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.management.ObjectName;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.TabularData;
import java.util.ArrayList;
import java.util.List;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
public class AbortSlowConsumerTest extends JmsMultipleClientsTestSupport implements ExceptionListener {
private static final Logger LOG = LoggerFactory.getLogger(AbortSlowConsumerTest.class);
AbortSlowConsumerStrategy underTest;
public boolean abortConnection = false;
public long checkPeriod = 2 * 1000;
public long maxSlowDuration = 5 * 1000;
private List<Throwable> exceptions = new ArrayList<Throwable>();
@Override
protected void setUp() throws Exception {
exceptions.clear();
topic = true;
underTest = new AbortSlowConsumerStrategy();
super.setUp();
createDestination();
}
@Override
protected BrokerService createBroker() throws Exception {
BrokerService broker = super.createBroker();
PolicyEntry policy = new PolicyEntry();
underTest.setAbortConnection(abortConnection);
underTest.setCheckPeriod(checkPeriod);
underTest.setMaxSlowDuration(maxSlowDuration);
policy.setSlowConsumerStrategy(underTest);
policy.setQueuePrefetch(10);
policy.setTopicPrefetch(10);
PolicyMap pMap = new PolicyMap();
pMap.setDefaultEntry(policy);
broker.setDestinationPolicy(pMap);
return broker;
}
public void testRegularConsumerIsNotAborted() throws Exception {
startConsumers(destination);
for (Connection c : connections) {
c.setExceptionListener(this);
}
startProducers(destination, 100);
allMessagesList.waitForMessagesToArrive(10);
allMessagesList.assertAtLeastMessagesReceived(10);
}
public void initCombosForTestLittleSlowConsumerIsNotAborted() {
addCombinationValues("topic", new Object[]{Boolean.TRUE, Boolean.FALSE});
}
public void testLittleSlowConsumerIsNotAborted() throws Exception {
startConsumers(destination);
Entry<MessageConsumer, MessageIdList> consumertoAbort = consumers.entrySet().iterator().next();
consumertoAbort.getValue().setProcessingDelay(500);
for (Connection c : connections) {
c.setExceptionListener(this);
}
startProducers(destination, 12);
allMessagesList.waitForMessagesToArrive(10);
allMessagesList.assertAtLeastMessagesReceived(10);
}
public void initCombosForTestSlowConsumerIsAborted() {
addCombinationValues("abortConnection", new Object[]{Boolean.TRUE, Boolean.FALSE});
addCombinationValues("topic", new Object[]{Boolean.TRUE, Boolean.FALSE});
}
public void testSlowConsumerIsAborted() throws Exception {
startConsumers(destination);
Entry<MessageConsumer, MessageIdList> consumertoAbort = consumers.entrySet().iterator().next();
consumertoAbort.getValue().setProcessingDelay(8 * 1000);
for (Connection c : connections) {
c.setExceptionListener(this);
}
startProducers(destination, 100);
consumertoAbort.getValue().assertMessagesReceived(1);
TimeUnit.SECONDS.sleep(5);
consumertoAbort.getValue().assertAtMostMessagesReceived(1);
}
public void testSlowConsumerIsAbortedViaJmx() throws Exception {
underTest.setMaxSlowDuration(60*1000); // so jmx does the abort
startConsumers(destination);
Entry<MessageConsumer, MessageIdList> consumertoAbort = consumers.entrySet().iterator().next();
consumertoAbort.getValue().setProcessingDelay(8 * 1000);
for (Connection c : connections) {
c.setExceptionListener(this);
}
startProducers(destination, 100);
consumertoAbort.getValue().assertMessagesReceived(1);
ActiveMQDestination amqDest = (ActiveMQDestination)destination;
ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:Type=" +
(amqDest.isTopic() ? "Topic" : "Queue") +",Destination="
+ amqDest.getPhysicalName() + ",BrokerName=localhost");
QueueViewMBean queue = (QueueViewMBean) broker.getManagementContext().newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
ObjectName slowConsumerPolicyMBeanName = queue.getSlowConsumerStrategy();
assertNotNull(slowConsumerPolicyMBeanName);
AbortSlowConsumerStrategyViewMBean abortPolicy = (AbortSlowConsumerStrategyViewMBean)
broker.getManagementContext().newProxyInstance(slowConsumerPolicyMBeanName, AbortSlowConsumerStrategyViewMBean.class, true);
TimeUnit.SECONDS.sleep(3);
TabularData slowOnes = abortPolicy.getSlowConsumers();
assertEquals("one slow consumers", 1, slowOnes.size());
LOG.info("slow ones:" + slowOnes);
CompositeData slowOne = (CompositeData) slowOnes.values().iterator().next();
LOG.info("Slow one: " + slowOne);
assertTrue("we have an object name", slowOne.get("subscription") instanceof ObjectName);
abortPolicy.abortConsumer((ObjectName)slowOne.get("subscription"));
consumertoAbort.getValue().assertAtMostMessagesReceived(1);
slowOnes = abortPolicy.getSlowConsumers();
assertEquals("no slow consumers left", 0, slowOnes.size());
}
public void testOnlyOneSlowConsumerIsAborted() throws Exception {
consumerCount = 10;
startConsumers(destination);
Entry<MessageConsumer, MessageIdList> consumertoAbort = consumers.entrySet().iterator().next();
consumertoAbort.getValue().setProcessingDelay(8 * 1000);
for (Connection c : connections) {
c.setExceptionListener(this);
}
startProducers(destination, 100);
allMessagesList.waitForMessagesToArrive(99);
allMessagesList.assertAtLeastMessagesReceived(99);
consumertoAbort.getValue().assertMessagesReceived(1);
TimeUnit.SECONDS.sleep(5);
consumertoAbort.getValue().assertAtMostMessagesReceived(1);
}
public void testAbortAlreadyClosingConsumers() throws Exception {
consumerCount = 1;
startConsumers(destination);
for (MessageIdList list : consumers.values()) {
list.setProcessingDelay(6 * 1000);
}
for (Connection c : connections) {
c.setExceptionListener(this);
}
startProducers(destination, 100);
allMessagesList.waitForMessagesToArrive(consumerCount);
for (MessageConsumer consumer : consumers.keySet()) {
LOG.info("closing consumer: " + consumer);
/// will block waiting for on message till 6secs expire
consumer.close();
}
}
public void initCombosForTestAbortAlreadyClosedConsumers() {
addCombinationValues("abortConnection", new Object[]{Boolean.TRUE, Boolean.FALSE});
addCombinationValues("topic", new Object[]{Boolean.TRUE, Boolean.FALSE});
}
public void testAbortAlreadyClosedConsumers() throws Exception {
Connection conn = createConnectionFactory().createConnection();
conn.setExceptionListener(this);
connections.add(conn);
Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
final MessageConsumer consumer = sess.createConsumer(destination);
conn.start();
startProducers(destination, 20);
TimeUnit.SECONDS.sleep(1);
LOG.info("closing consumer: " + consumer);
consumer.close();
TimeUnit.SECONDS.sleep(5);
assertTrue("no exceptions : " + exceptions.toArray(), exceptions.isEmpty());
}
public void initCombosForTestAbortAlreadyClosedConnection() {
addCombinationValues("abortConnection", new Object[]{Boolean.TRUE, Boolean.FALSE});
addCombinationValues("topic", new Object[]{Boolean.TRUE, Boolean.FALSE});
}
public void testAbortAlreadyClosedConnection() throws Exception {
Connection conn = createConnectionFactory().createConnection();
conn.setExceptionListener(this);
Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
sess.createConsumer(destination);
conn.start();
startProducers(destination, 20);
TimeUnit.SECONDS.sleep(1);
LOG.info("closing connection: " + conn);
conn.close();
TimeUnit.SECONDS.sleep(5);
assertTrue("no exceptions : " + exceptions.toArray(), exceptions.isEmpty());
}
public void testAbortConsumerOnDeadConnection() throws Exception {
// socket proxy on pause, close could hang??
}
public void onException(JMSException exception) {
exceptions.add(exception);
exception.printStackTrace();
}
public static Test suite() {
return suite(AbortSlowConsumerTest.class);
}
}