NO-JIRA: add integration test for the localMessagePriority option
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
index 5d1572b..cc1c8c1 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
@@ -29,6 +29,8 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -74,6 +76,7 @@
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.DescribedType;
import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.UnsignedByte;
import org.apache.qpid.proton.amqp.UnsignedInteger;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
@@ -2349,4 +2352,89 @@
testPeer.waitForAllHandlersToComplete(3000);
}
}
+
+ @Repeat(repetitions = 1)
+ @Test(timeout=20000)
+ public void testLocalPriorityOrdering() throws Exception {
+ final int messageCount = 10;
+ assertTrue("Max 10 message priorities", messageCount <= 10);
+
+ final AtomicReference<Throwable> asyncError = new AtomicReference<Throwable>(null);
+ final CountDownLatch arrived = new CountDownLatch(messageCount);
+ final CountDownLatch delivered = new CountDownLatch(messageCount);
+
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer, "?jms.localMessagePriority=true");
+ connection.start();
+
+ testPeer.expectBegin();
+
+ final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue destination = session.createQueue(getTestName());
+ connection.start();
+
+ List<String> expectedPayloads = new ArrayList<>();
+ testPeer.expectReceiverAttach();
+ testPeer.expectLinkFlow(false, Matchers.greaterThan(UnsignedInteger.valueOf(messageCount)));
+ // Send messages with increasing priority as soon as the flow arrives
+ for (int i = 0; i < messageCount; i++) {
+ HeaderDescribedType header = new HeaderDescribedType();
+ header.setPriority(UnsignedByte.valueOf((byte) i));
+ testPeer.sendTransferToLastOpenedLinkOnLastOpenedSession(header, null, null, null, new AmqpValueDescribedType(String.valueOf(i)), i);
+
+ // Insert at start, creating a reverse-ordered list, reflecting the priority reordering.
+ expectedPayloads.add(0, String.valueOf(i));
+ }
+
+ // Expect dispositions to come back in the reverse order, as the client reorders deliver due to priority
+ for (int i = messageCount -1; i >= 0; i--) {
+ testPeer.expectDisposition(true, new AcceptedMatcher(), i, i);
+ }
+
+ List<String> receivedPayloads = new ArrayList<>();
+
+ MessageConsumer consumer = session.createConsumer(destination);
+
+ // Await the messages being prefetched before we set the listener to make the
+ // test viable, ongoing prefetching would make the reordering non-deterministic.
+ ((JmsConnection) connection).addConnectionListener(new JmsDefaultConnectionListener() {
+ @Override
+ public void onInboundMessage(JmsInboundMessageDispatch envelope) {
+ arrived.countDown();
+ }
+ });
+
+ boolean awaitPrefetch = arrived.await(15, TimeUnit.SECONDS);
+ assertTrue("Messages not prefetched within given timeout, outstanding: " + arrived.getCount(), awaitPrefetch);
+
+ consumer.setMessageListener(new MessageListener() {
+ @Override
+ public void onMessage(Message message) {
+ try {
+ String payload = message.getBody(String.class);
+ receivedPayloads.add(payload);
+ LOG.debug("Got message with payload '{}'", payload);
+ } catch (Throwable t) {
+ asyncError.set(t);
+ }
+ delivered.countDown();
+ }
+ });
+
+ boolean awaitDelivered = delivered.await(15, TimeUnit.SECONDS);
+ assertTrue("Messages not delivered within given timeout, outstanding: " + delivered.getCount(), awaitDelivered);
+
+ Throwable ex = asyncError.get();
+ assertNull("Unexpected exception during delivery", ex);
+
+ testPeer.waitForAllHandlersToComplete(2000);
+
+ testPeer.expectClose();
+ connection.close();
+
+ testPeer.waitForAllHandlersToComplete(2000);
+
+ assertEquals("Message payloads not as expected", expectedPayloads, receivedPayloads);
+ }
+ }
}