blob: 7cb58ff1ed2df9092bca9604cd903d4041543507 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.naming.NamingException;
import java.util.HashMap;
import java.util.Map;
public class PriorityQueueTest extends QpidBrokerTestCase
{
private static final int TIMEOUT = 1500;
protected final String QUEUE = "PriorityQueue";
private static final int MSG_COUNT = 50;
private Connection producerConnection;
private MessageProducer producer;
private Session producerSession;
private Queue queue;
private Connection consumerConnection;
private Session consumerSession;
private MessageConsumer consumer;
protected void setUp() throws Exception
{
super.setUp();
producerConnection = getConnection();
producerSession = producerConnection.createSession(true, Session.AUTO_ACKNOWLEDGE);
producerConnection.start();
consumerConnection = getConnection();
consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
protected void tearDown() throws Exception
{
producerConnection.close();
consumerConnection.close();
super.tearDown();
}
public void testPriority() throws JMSException, NamingException, AMQException
{
final Map<String,Object> arguments = new HashMap<String, Object>();
arguments.put("x-qpid-priorities",10);
((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments);
queue = (Queue) producerSession.createQueue("direct://amq.direct/"+QUEUE+"/"+QUEUE+"?durable='false'&autodelete='true'");
((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
producer = producerSession.createProducer(queue);
for (int msg = 0; msg < MSG_COUNT; msg++)
{
producer.setPriority(msg % 10);
producer.send(nextMessage(msg, false, producerSession, producer));
}
producerSession.commit();
producer.close();
producerSession.close();
producerConnection.close();
consumer = consumerSession.createConsumer(queue);
consumerConnection.start();
Message received;
int receivedCount = 0;
Message previous = null;
int messageCount = 0;
while((received = consumer.receive(1000))!=null)
{
messageCount++;
if(previous != null)
{
assertTrue("Messages arrived in unexpected order " + messageCount + " " + previous.getIntProperty("msg") + " " + received.getIntProperty("msg") + " " + previous.getJMSPriority() + " " + received.getJMSPriority(), (previous.getJMSPriority() > received.getJMSPriority()) || ((previous.getJMSPriority() == received.getJMSPriority()) && previous.getIntProperty("msg") < received.getIntProperty("msg")) );
}
previous = received;
receivedCount++;
}
assertEquals("Incorrect number of message received", 50, receivedCount);
}
public void testOddOrdering() throws AMQException, JMSException
{
final Map<String,Object> arguments = new HashMap<String, Object>();
arguments.put("x-qpid-priorities",3);
((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments);
queue = producerSession.createQueue("direct://amq.direct/"+QUEUE+"/"+QUEUE+"?durable='false'&autodelete='true'");
((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
producer = producerSession.createProducer(queue);
// In order ABC
producer.setPriority(9);
producer.send(nextMessage(1, false, producerSession, producer));
producer.setPriority(4);
producer.send(nextMessage(2, false, producerSession, producer));
producer.setPriority(1);
producer.send(nextMessage(3, false, producerSession, producer));
// Out of order BAC
producer.setPriority(4);
producer.send(nextMessage(4, false, producerSession, producer));
producer.setPriority(9);
producer.send(nextMessage(5, false, producerSession, producer));
producer.setPriority(1);
producer.send(nextMessage(6, false, producerSession, producer));
// Out of order BCA
producer.setPriority(4);
producer.send(nextMessage(7, false, producerSession, producer));
producer.setPriority(1);
producer.send(nextMessage(8, false, producerSession, producer));
producer.setPriority(9);
producer.send(nextMessage(9, false, producerSession, producer));
// Reverse order CBA
producer.setPriority(1);
producer.send(nextMessage(10, false, producerSession, producer));
producer.setPriority(4);
producer.send(nextMessage(11, false, producerSession, producer));
producer.setPriority(9);
producer.send(nextMessage(12, false, producerSession, producer));
producerSession.commit();
consumer = consumerSession.createConsumer(queue);
consumerConnection.start();
Message msg = consumer.receive(TIMEOUT);
assertEquals(1, msg.getIntProperty("msg"));
msg = consumer.receive(TIMEOUT);
assertEquals(5, msg.getIntProperty("msg"));
msg = consumer.receive(TIMEOUT);
assertEquals(9, msg.getIntProperty("msg"));
msg = consumer.receive(TIMEOUT);
assertEquals(12, msg.getIntProperty("msg"));
msg = consumer.receive(TIMEOUT);
assertEquals(2, msg.getIntProperty("msg"));
msg = consumer.receive(TIMEOUT);
assertEquals(4, msg.getIntProperty("msg"));
msg = consumer.receive(TIMEOUT);
assertEquals(7, msg.getIntProperty("msg"));
msg = consumer.receive(TIMEOUT);
assertEquals(11, msg.getIntProperty("msg"));
msg = consumer.receive(TIMEOUT);
assertEquals(3, msg.getIntProperty("msg"));
msg = consumer.receive(TIMEOUT);
assertEquals(6, msg.getIntProperty("msg"));
msg = consumer.receive(TIMEOUT);
assertEquals(8, msg.getIntProperty("msg"));
msg = consumer.receive(TIMEOUT);
assertEquals(10, msg.getIntProperty("msg"));
}
private Message nextMessage(int msg, boolean first, Session producerSession, MessageProducer producer) throws JMSException
{
Message send = producerSession.createTextMessage("Message: " + msg);
send.setIntProperty("msg", msg);
return send;
}
}