NO-JIRA: add integration test for the localMessagePriority option
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
index 5d1572b..cc1c8c1 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
@@ -29,6 +29,8 @@
 
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -74,6 +76,7 @@
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.DescribedType;
 import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.UnsignedByte;
 import org.apache.qpid.proton.amqp.UnsignedInteger;
 import org.hamcrest.Matcher;
 import org.hamcrest.Matchers;
@@ -2349,4 +2352,89 @@
             testPeer.waitForAllHandlersToComplete(3000);
         }
     }
+
+    @Repeat(repetitions = 1)
+    @Test(timeout=20000)
+    public void testLocalPriorityOrdering() throws Exception {
+        final int messageCount = 10;
+        assertTrue("Max 10 message priorities", messageCount <= 10);
+
+        final AtomicReference<Throwable> asyncError = new AtomicReference<Throwable>(null);
+        final CountDownLatch arrived = new CountDownLatch(messageCount);
+        final CountDownLatch delivered = new CountDownLatch(messageCount);
+
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer, "?jms.localMessagePriority=true");
+            connection.start();
+
+            testPeer.expectBegin();
+
+            final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue destination = session.createQueue(getTestName());
+            connection.start();
+
+            List<String> expectedPayloads = new ArrayList<>();
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlow(false, Matchers.greaterThan(UnsignedInteger.valueOf(messageCount)));
+            // Send messages with increasing priority as soon as the flow arrives
+            for (int i = 0; i < messageCount; i++) {
+                HeaderDescribedType header = new HeaderDescribedType();
+                header.setPriority(UnsignedByte.valueOf((byte) i));
+                testPeer.sendTransferToLastOpenedLinkOnLastOpenedSession(header, null, null, null, new AmqpValueDescribedType(String.valueOf(i)), i);
+
+                // Insert at start, creating a reverse-ordered list, reflecting the priority reordering.
+                expectedPayloads.add(0, String.valueOf(i));
+            }
+
+            // Expect dispositions to come back in the reverse order, as the client reorders deliver due to priority
+            for (int i = messageCount -1; i >= 0; i--) {
+                testPeer.expectDisposition(true, new AcceptedMatcher(), i, i);
+            }
+
+            List<String> receivedPayloads = new ArrayList<>();
+
+            MessageConsumer consumer = session.createConsumer(destination);
+
+            // Await the messages being prefetched before we set the listener to make the
+            // test viable, ongoing prefetching would make the reordering non-deterministic.
+            ((JmsConnection) connection).addConnectionListener(new JmsDefaultConnectionListener() {
+                @Override
+                public void onInboundMessage(JmsInboundMessageDispatch envelope) {
+                    arrived.countDown();
+                }
+            });
+
+            boolean awaitPrefetch = arrived.await(15, TimeUnit.SECONDS);
+            assertTrue("Messages not prefetched within given timeout, outstanding: " + arrived.getCount(), awaitPrefetch);
+
+            consumer.setMessageListener(new MessageListener() {
+                @Override
+                public void onMessage(Message message) {
+                    try {
+                        String payload = message.getBody(String.class);
+                        receivedPayloads.add(payload);
+                        LOG.debug("Got message with payload '{}'", payload);
+                    } catch (Throwable t) {
+                        asyncError.set(t);
+                    }
+                    delivered.countDown();
+                }
+            });
+
+            boolean awaitDelivered = delivered.await(15, TimeUnit.SECONDS);
+            assertTrue("Messages not delivered within given timeout, outstanding: " + delivered.getCount(), awaitDelivered);
+
+            Throwable ex = asyncError.get();
+            assertNull("Unexpected exception during delivery", ex);
+
+            testPeer.waitForAllHandlersToComplete(2000);
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(2000);
+
+            assertEquals("Message payloads not as expected", expectedPayloads, receivedPayloads);
+        }
+    }
 }