blob: a724e6c66e3e20336e77cc42fd253ca87a9c8443 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.queue;
import org.apache.log4j.Logger;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.AMQException;
import org.apache.qpid.management.common.mbeans.ManagedQueue;
import org.apache.qpid.server.logging.AbstractTestLogging;
import org.apache.qpid.test.utils.JMXTestUtils;
import org.apache.qpid.framing.AMQShortString;
import javax.jms.*;
import javax.naming.NamingException;
import java.util.HashMap;
import java.util.Map;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.io.IOException;
public class ProducerFlowControlTest extends AbstractTestLogging
{
private static final int TIMEOUT = 10000;
private static final Logger _logger = Logger.getLogger(ProducerFlowControlTest.class);
private Connection producerConnection;
private MessageProducer producer;
private Session producerSession;
private Queue queue;
private Connection consumerConnection;
private Session consumerSession;
private MessageConsumer consumer;
private final AtomicInteger _sentMessages = new AtomicInteger();
private JMXTestUtils _jmxUtils;
private boolean _jmxUtilConnected;
private static final String USER = "admin";
public void setUp() throws Exception
{
_jmxUtils = new JMXTestUtils(this, USER , USER);
_jmxUtils.setUp();
_jmxUtilConnected=false;
super.setUp();
_monitor.markDiscardPoint();
producerConnection = getConnection();
producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
producerConnection.start();
consumerConnection = getConnection();
consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
public void tearDown() throws Exception
{
if(_jmxUtilConnected)
{
try
{
_jmxUtils.close();
}
catch (IOException e)
{
e.printStackTrace();
}
}
producerConnection.close();
consumerConnection.close();
super.tearDown();
}
public void testCapacityExceededCausesBlock()
throws JMSException, NamingException, AMQException, InterruptedException
{
String queueName = getTestQueueName();
final Map<String,Object> arguments = new HashMap<String, Object>();
arguments.put("x-qpid-capacity",1000);
arguments.put("x-qpid-flow-resume-capacity",800);
((AMQSession) producerSession).createQueue(new AMQShortString(queueName), true, false, false, arguments);
queue = producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'");
((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
producer = producerSession.createProducer(queue);
_sentMessages.set(0);
// try to send 5 messages (should block after 4)
sendMessagesAsync(producer, producerSession, 5, 50L);
Thread.sleep(5000);
assertEquals("Incorrect number of message sent before blocking", 4, _sentMessages.get());
consumer = consumerSession.createConsumer(queue);
consumerConnection.start();
consumer.receive();
Thread.sleep(1000);
assertEquals("Message incorrectly sent after one message received", 4, _sentMessages.get());
consumer.receive();
Thread.sleep(1000);
assertEquals("Message not sent after two messages received", 5, _sentMessages.get());
}
public void testBrokerLogMessages()
throws JMSException, NamingException, AMQException, InterruptedException, IOException
{
String queueName = getTestQueueName();
final Map<String,Object> arguments = new HashMap<String, Object>();
arguments.put("x-qpid-capacity",1000);
arguments.put("x-qpid-flow-resume-capacity",800);
((AMQSession) producerSession).createQueue(new AMQShortString(queueName), true, false, false, arguments);
queue = producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'");
((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
producer = producerSession.createProducer(queue);
_sentMessages.set(0);
// try to send 5 messages (should block after 4)
sendMessagesAsync(producer, producerSession, 5, 50L);
Thread.sleep(5000);
List<String> results = waitAndFindMatches("QUE-1003");
assertEquals("Did not find correct number of QUE-1003 queue overfull messages", 1, results.size());
consumer = consumerSession.createConsumer(queue);
consumerConnection.start();
while(consumer.receive(1000) != null);
results = waitAndFindMatches("QUE-1004");
assertEquals("Did not find correct number of UNDERFULL queue underfull messages", 1, results.size());
}
public void testClientLogMessages()
throws JMSException, NamingException, AMQException, InterruptedException, IOException
{
String queueName = getTestQueueName();
setTestClientSystemProperty("qpid.flow_control_wait_failure","3000");
setTestClientSystemProperty("qpid.flow_control_wait_notify_period","1000");
Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Map<String,Object> arguments = new HashMap<String, Object>();
arguments.put("x-qpid-capacity",1000);
arguments.put("x-qpid-flow-resume-capacity",800);
((AMQSession) session).createQueue(new AMQShortString(queueName), true, false, false, arguments);
queue = producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'");
((AMQSession) session).declareAndBind((AMQDestination)queue);
producer = session.createProducer(queue);
_sentMessages.set(0);
// try to send 5 messages (should block after 4)
MessageSender sender = sendMessagesAsync(producer, producerSession, 5, 50L);
Thread.sleep(TIMEOUT);
List<String> results = waitAndFindMatches("Message send delayed by", TIMEOUT);
assertTrue("No delay messages logged by client",results.size()!=0);
results = findMatches("Message send failed due to timeout waiting on broker enforced flow control");
assertEquals("Incorrect number of send failure messages logged by client",1,results.size());
}
public void testFlowControlOnCapacityResumeEqual()
throws JMSException, NamingException, AMQException, InterruptedException
{
String queueName = getTestQueueName();
final Map<String,Object> arguments = new HashMap<String, Object>();
arguments.put("x-qpid-capacity",1000);
arguments.put("x-qpid-flow-resume-capacity",1000);
((AMQSession) producerSession).createQueue(new AMQShortString(queueName), true, false, false, arguments);
queue = producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'");
((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
producer = producerSession.createProducer(queue);
_sentMessages.set(0);
// try to send 5 messages (should block after 4)
sendMessagesAsync(producer, producerSession, 5, 50L);
Thread.sleep(5000);
assertEquals("Incorrect number of message sent before blocking", 4, _sentMessages.get());
consumer = consumerSession.createConsumer(queue);
consumerConnection.start();
consumer.receive();
Thread.sleep(1000);
assertEquals("Message incorrectly sent after one message received", 5, _sentMessages.get());
}
public void testFlowControlSoak()
throws Exception, NamingException, AMQException, InterruptedException
{
String queueName = getTestQueueName();
_sentMessages.set(0);
final int numProducers = 10;
final int numMessages = 100;
final Map<String,Object> arguments = new HashMap<String, Object>();
arguments.put("x-qpid-capacity",6000);
arguments.put("x-qpid-flow-resume-capacity",3000);
((AMQSession) consumerSession).createQueue(new AMQShortString(queueName), false, false, false, arguments);
queue = producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='false'");
((AMQSession) consumerSession).declareAndBind((AMQDestination)queue);
consumerConnection.start();
Connection[] producers = new Connection[numProducers];
for(int i = 0 ; i < numProducers; i ++)
{
producers[i] = getConnection();
producers[i].start();
Session session = producers[i].createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer myproducer = session.createProducer(queue);
MessageSender sender = sendMessagesAsync(myproducer, session, numMessages, 50L);
}
consumer = consumerSession.createConsumer(queue);
consumerConnection.start();
for(int j = 0; j < numProducers * numMessages; j++)
{
Message msg = consumer.receive(5000);
Thread.sleep(50L);
assertNotNull("Message not received("+j+"), sent: "+_sentMessages.get(), msg);
}
Message msg = consumer.receive(500);
assertNull("extra message received", msg);
for(int i = 0; i < numProducers; i++)
{
producers[i].close();
}
}
public void testSendTimeout()
throws JMSException, NamingException, AMQException, InterruptedException
{
String queueName = getTestQueueName();
setTestClientSystemProperty("qpid.flow_control_wait_failure","3000");
Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Map<String,Object> arguments = new HashMap<String, Object>();
arguments.put("x-qpid-capacity",1000);
arguments.put("x-qpid-flow-resume-capacity",800);
((AMQSession) session).createQueue(new AMQShortString(queueName), true, false, false, arguments);
queue = producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'");
((AMQSession) session).declareAndBind((AMQDestination)queue);
producer = session.createProducer(queue);
_sentMessages.set(0);
// try to send 5 messages (should block after 4)
MessageSender sender = sendMessagesAsync(producer, producerSession, 5, 50L);
Thread.sleep(10000);
Exception e = sender.getException();
assertNotNull("No timeout exception on sending", e);
}
public void testFlowControlAttributeModificationViaJMX()
throws JMSException, NamingException, AMQException, InterruptedException, Exception
{
_jmxUtils.open();
_jmxUtilConnected = true;
String queueName = getTestQueueName();
//create queue
final Map<String,Object> arguments = new HashMap<String, Object>();
arguments.put("x-qpid-capacity",0);
arguments.put("x-qpid-flow-resume-capacity",0);
((AMQSession) producerSession).createQueue(new AMQShortString(queueName), true, false, false, arguments);
queue = producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'");
((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
producer = producerSession.createProducer(queue);
Thread.sleep(1000);
//Create a JMX MBean proxy for the queue
ManagedQueue queueMBean = _jmxUtils.getManagedObject(ManagedQueue.class, _jmxUtils.getQueueObjectName("test", queueName));
assertNotNull(queueMBean);
//check current attribute values are 0 as expected
assertTrue("Capacity was not the expected value", queueMBean.getCapacity() == 0L);
assertTrue("FlowResumeCapacity was not the expected value", queueMBean.getFlowResumeCapacity() == 0L);
//set new values that will cause flow control to be active, and the queue to become overfull after 1 message is sent
queueMBean.setCapacity(250L);
queueMBean.setFlowResumeCapacity(250L);
assertTrue("Capacity was not the expected value", queueMBean.getCapacity() == 250L);
assertTrue("FlowResumeCapacity was not the expected value", queueMBean.getFlowResumeCapacity() == 250L);
assertFalse("Queue should not be overfull", queueMBean.isFlowOverfull());
// try to send 2 messages (should block after 1)
_sentMessages.set(0);
sendMessagesAsync(producer, producerSession, 2, 50L);
Thread.sleep(2000);
//check only 1 message was sent, and queue is overfull
assertEquals("Incorrect number of message sent before blocking", 1, _sentMessages.get());
assertTrue("Queue should be overfull", queueMBean.isFlowOverfull());
//raise the attribute values, causing the queue to become underfull and allow the second message to be sent.
queueMBean.setCapacity(300L);
queueMBean.setFlowResumeCapacity(300L);
Thread.sleep(2000);
//check second message was sent, and caused the queue to become overfull again
assertEquals("Second message was not sent after lifting FlowResumeCapacity", 2, _sentMessages.get());
assertTrue("Queue should be overfull", queueMBean.isFlowOverfull());
//raise capacity above queue depth, check queue remains overfull as FlowResumeCapacity still exceeded
queueMBean.setCapacity(700L);
assertTrue("Queue should be overfull", queueMBean.isFlowOverfull());
//receive a message, check queue becomes underfull
consumer = consumerSession.createConsumer(queue);
consumerConnection.start();
consumer.receive();
//perform a synchronous op on the connection
((AMQSession) consumerSession).sync();
assertFalse("Queue should not be overfull", queueMBean.isFlowOverfull());
consumer.receive();
}
private MessageSender sendMessagesAsync(final MessageProducer producer,
final Session producerSession,
final int numMessages,
long sleepPeriod)
{
MessageSender sender = new MessageSender(producer, producerSession, numMessages,sleepPeriod);
new Thread(sender).start();
return sender;
}
private void sendMessages(MessageProducer producer, Session producerSession, int numMessages, long sleepPeriod)
throws JMSException
{
for (int msg = 0; msg < numMessages; msg++)
{
producer.send(nextMessage(msg, producerSession));
_sentMessages.incrementAndGet();
try
{
((AMQSession)producerSession).sync();
}
catch (AMQException e)
{
e.printStackTrace();
throw new RuntimeException(e);
}
}
}
private static final byte[] BYTE_300 = new byte[300];
private Message nextMessage(int msg, Session producerSession) throws JMSException
{
BytesMessage send = producerSession.createBytesMessage();
send.writeBytes(BYTE_300);
send.setIntProperty("msg", msg);
return send;
}
private class MessageSender implements Runnable
{
private final MessageProducer _producer;
private final Session _producerSession;
private final int _numMessages;
private JMSException _exception;
private long _sleepPeriod;
public MessageSender(MessageProducer producer, Session producerSession, int numMessages, long sleepPeriod)
{
_producer = producer;
_producerSession = producerSession;
_numMessages = numMessages;
_sleepPeriod = sleepPeriod;
}
public void run()
{
try
{
sendMessages(_producer, _producerSession, _numMessages, _sleepPeriod);
}
catch (JMSException e)
{
_exception = e;
}
}
public JMSException getException()
{
return _exception;
}
}
}