blob: 97d825177ce8d0f72c592f385e4451ef5e4512ea [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*
*/
package org.apache.qpid.test.client;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.test.utils.FailoverBaseCase;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.NamingException;
import java.util.Enumeration;
import java.util.Random;
public class QueueBrowserAutoAckTest extends FailoverBaseCase
{
protected Connection _clientConnection;
protected Session _clientSession;
protected Queue _queue;
protected static final String MESSAGE_ID_PROPERTY = "MessageIDProperty";
protected boolean CLUSTERED = Boolean.getBoolean("profile.clustered");
public void setUp() throws Exception
{
super.setUp();
//Create Client
_clientConnection = getConnection();
_clientConnection.start();
setupSession();
_queue = _clientSession.createQueue(getTestQueueName());
_clientSession.createConsumer(_queue).close();
//Ensure there are no messages on the queue to start with.
checkQueueDepth(0);
}
protected void setupSession() throws Exception
{
_clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
public void tearDown() throws Exception
{
if (_clientConnection != null)
{
_clientConnection.close();
}
super.tearDown();
}
protected void sendMessages(int num) throws JMSException
{
Connection producerConnection = null;
try
{
producerConnection = getConnection();
}
catch (Exception e)
{
fail("Unable to lookup connection in JNDI.");
}
sendMessages(producerConnection, num);
}
protected void sendMessages(String connection, int num) throws JMSException
{
Connection producerConnection = null;
try
{
producerConnection = getConnectionFactory(connection).createConnection("guest", "guest");
}
catch (Exception e)
{
e.printStackTrace();
fail("Unable to lookup connection in JNDI.");
}
sendMessages(producerConnection, num);
}
protected void sendMessages(Connection producerConnection, int messageSendCount) throws JMSException
{
producerConnection.start();
Session producerSession = producerConnection.createSession(true, Session.AUTO_ACKNOWLEDGE);
//Ensure _queue is created
producerSession.createConsumer(_queue).close();
MessageProducer producer = producerSession.createProducer(_queue);
for (int messsageID = 0; messsageID < messageSendCount; messsageID++)
{
TextMessage textMsg = producerSession.createTextMessage("Message " + messsageID);
textMsg.setIntProperty(MESSAGE_ID_PROPERTY, messsageID);
producer.send(textMsg);
}
producerSession.commit();
producerConnection.close();
}
/**
* Using the Protocol getQueueDepth method ensure that the correct number of messages are on the queue.
*
* Also uses a QueueBrowser as a second method of validating the message count on the queue.
*
* @param expectedDepth The expected Queue depth
* @throws JMSException on error
*/
protected void checkQueueDepth(int expectedDepth) throws JMSException
{
// create QueueBrowser
_logger.info("Creating Queue Browser");
QueueBrowser queueBrowser = _clientSession.createBrowser(_queue);
// check for messages
if (_logger.isDebugEnabled())
{
_logger.debug("Checking for " + expectedDepth + " messages with QueueBrowser");
}
//Check what the session believes the queue count to be.
long queueDepth = 0;
try
{
queueDepth = ((AMQSession) _clientSession).getQueueDepth((AMQDestination) _queue);
}
catch (AMQException e)
{
}
assertEquals("Session reports Queue expectedDepth not as expected", expectedDepth, queueDepth);
// Browse the queue to get a second opinion
int msgCount = 0;
Enumeration msgs = queueBrowser.getEnumeration();
while (msgs.hasMoreElements())
{
msgs.nextElement();
msgCount++;
}
if (_logger.isDebugEnabled())
{
_logger.debug("Found " + msgCount + " messages total in browser");
}
// check to see if all messages found
assertEquals("Browser did not find all messages", expectedDepth, msgCount);
//Close browser
queueBrowser.close();
}
protected void closeBrowserBeforeAfterGetNext(int messageCount) throws JMSException
{
QueueBrowser queueBrowser = _clientSession.createBrowser(_queue);
Enumeration msgs = queueBrowser.getEnumeration();
int msgCount = 0;
while (msgs.hasMoreElements() && msgCount < messageCount)
{
msgs.nextElement();
msgCount++;
}
try
{
queueBrowser.close();
}
catch (JMSException e)
{
fail("Close should happen without error:" + e.getMessage());
}
}
/**
* This method checks that multiple calls to getEnumeration() on a queueBrowser provide the same behaviour.
*
* @param sentMessages The number of messages sent
* @param browserEnumerationCount The number of times to call getEnumeration()
* @throws JMSException
*/
protected void checkMultipleGetEnum(int sentMessages, int browserEnumerationCount) throws JMSException
{
QueueBrowser queueBrowser = _clientSession.createBrowser(_queue);
for (int count = 0; count < browserEnumerationCount; count++)
{
_logger.info("Checking getEnumeration:" + count);
Enumeration msgs = queueBrowser.getEnumeration();
int msgCount = 0;
while (msgs.hasMoreElements())
{
msgs.nextElement();
msgCount++;
}
// Verify that the browser can see all the messages sent.
assertEquals(sentMessages, msgCount);
}
try
{
queueBrowser.close();
}
catch (JMSException e)
{
fail("Close should happen without error:" + e.getMessage());
}
}
protected void checkOverlappingMultipleGetEnum(int expectedMessages, int browserEnumerationCount) throws JMSException
{
checkOverlappingMultipleGetEnum(expectedMessages, browserEnumerationCount, null);
}
protected void checkOverlappingMultipleGetEnum(int expectedMessages, int browserEnumerationCount, String selector) throws JMSException
{
QueueBrowser queueBrowser = selector == null ?
_clientSession.createBrowser(_queue) : _clientSession.createBrowser(_queue);
// _clientSession.createBrowser(_queue) : _clientSession.createBrowser(_queue, selector);
Enumeration[] msgs = new Enumeration[browserEnumerationCount];
int[] msgCount = new int[browserEnumerationCount];
//create Enums
for (int count = 0; count < browserEnumerationCount; count++)
{
msgs[count] = queueBrowser.getEnumeration();
}
//interleave reads
for (int cnt = 0; cnt < expectedMessages; cnt++)
{
for (int count = 0; count < browserEnumerationCount; count++)
{
if (msgs[count].hasMoreElements())
{
msgs[count].nextElement();
msgCount[count]++;
}
}
}
//validate all browsers get right message count.
for (int count = 0; count < browserEnumerationCount; count++)
{
assertEquals(msgCount[count], expectedMessages);
}
try
{
queueBrowser.close();
}
catch (JMSException e)
{
fail("Close should happen without error:" + e.getMessage());
}
}
protected void validate(int messages) throws JMSException
{
//Create a new connection to validate message content
Connection connection = null;
try
{
connection = getConnection();
}
catch (Exception e)
{
fail("Unable to make validation connection");
}
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
MessageConsumer consumer = session.createConsumer(_queue);
_logger.info("Verify messages are still on the queue");
Message tempMsg;
for (int msgCount = 0; msgCount < messages; msgCount++)
{
tempMsg = (TextMessage) consumer.receive(RECEIVE_TIMEOUT);
if (tempMsg == null)
{
fail("Message " + msgCount + " not retrieved from queue");
}
}
//Close this new connection
connection.close();
_logger.info("All messages recevied from queue");
//ensure no message left.
checkQueueDepth(0);
}
protected void checkQueueDepthWithSelectors(int totalMessages, int clients) throws JMSException
{
String selector = MESSAGE_ID_PROPERTY + " % " + clients;
checkOverlappingMultipleGetEnum(totalMessages / clients, clients, selector);
}
/**
* This tests you can browse an empty queue, see QPID-785
*
* @throws Exception
*/
public void testBrowsingEmptyQueue() throws Exception
{
checkQueueDepth(0);
}
/*
* Test Messages Remain on Queue
* Create a queu and send messages to it. Browse them and then receive them all to verify they were still there
*
*/
public void testQueueBrowserMsgsRemainOnQueue() throws Exception
{
int messages = 10;
sendMessages(messages);
checkQueueDepth(messages);
validate(messages);
}
public void testClosingBrowserMidReceiving() throws NamingException, JMSException
{
int messages = 100;
sendMessages(messages);
checkQueueDepth(messages);
closeBrowserBeforeAfterGetNext(10);
validate(messages);
}
/**
* This tests that multiple getEnumerations on a QueueBrowser return the required number of messages.
* @throws NamingException
* @throws JMSException
*/
public void testMultipleGetEnum() throws NamingException, JMSException
{
int messages = 10;
sendMessages(messages);
checkQueueDepth(messages);
checkMultipleGetEnum(messages, 5);
validate(messages);
}
public void testMultipleOverlappingGetEnum() throws NamingException, JMSException
{
int messages = 25;
sendMessages(messages);
checkQueueDepth(messages);
checkOverlappingMultipleGetEnum(messages, 5);
validate(messages);
}
public void testBrowsingWithSelector() throws JMSException
{
int messages = 40;
sendMessages(messages);
checkQueueDepth(messages);
for (int clients = 2; clients <= 10; clients++)
{
checkQueueDepthWithSelectors(messages, clients);
}
validate(messages);
}
/**
* Testing that a QueueBrowser doesn't actually consume messages from a broker when it fails over.
* @throws JMSException
*/
public void testFailoverWithQueueBrowser() throws JMSException
{
int messages = 5;
sendMessages("connection1", messages);
if (!CLUSTERED)
{
sendMessages("connection2", messages);
}
checkQueueDepth(messages);
_logger.info("Creating Queue Browser");
QueueBrowser queueBrowser = _clientSession.createBrowser(_queue);
long queueDepth = 0;
try
{
queueDepth = ((AMQSession) _clientSession).getQueueDepth((AMQDestination) _queue);
}
catch (AMQException e)
{
fail("Caught exception getting queue depth: " + e.getMessage());
}
assertEquals("Session reports Queue depth not as expected", messages, queueDepth);
int msgCount = 0;
int failPoint = 0;
failPoint = new Random().nextInt(messages) + 1;
Enumeration msgs = queueBrowser.getEnumeration();
while (msgs.hasMoreElements())
{
msgs.nextElement();
msgCount++;
if (msgCount == failPoint)
{
failBroker(getFailingPort());
}
}
assertTrue("We should get atleast " + messages + " msgs.", msgCount >= messages);
if (_logger.isDebugEnabled())
{
_logger.debug("QBAAT Found " + msgCount + " messages total in browser");
}
//Close browser
queueBrowser.close();
_logger.info("Closed Queue Browser, validating messages on broker.");
//Validate all messages still on Broker
validate(messages);
}
public void testFailoverAsQueueBrowserCreated() throws JMSException
{
// The IoServiceListenerSupport seems to get stuck in with a managedSession that isn't closing when requested.
// So it hangs waiting for the session.
int messages = 50;
sendMessages("connection1", messages);
if (!CLUSTERED)
{
sendMessages("connection2", messages);
}
failBroker(getFailingPort());
checkQueueDepth(messages);
//Validate all messages still on Broker 1
validate(messages);
}
}