https://issues.apache.org/jira/browse/AMQ-6359

Allow a receiver link to enable consumer options on the subscription
such as exclusive and retroactive using options encoded on the address
(cherry picked from commit a35e8dc8a28768ddf7f14c29cb41fdc6e2e8a605)

Conflicts:
	activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java
index 20a8b9f..bb1436c 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java
@@ -46,6 +46,7 @@
 import org.apache.activemq.transport.amqp.AmqpProtocolConverter;
 import org.apache.activemq.transport.amqp.AmqpProtocolException;
 import org.apache.activemq.transport.amqp.ResponseHandler;
+import org.apache.activemq.util.IntrospectionSupport;
 import org.apache.qpid.proton.amqp.DescribedType;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.messaging.Target;
@@ -313,6 +314,22 @@
 
             int senderCredit = protonSender.getRemoteCredit();
 
+            // Allows the options on the destination to configure the consumerInfo
+            if (destination.getOptions() != null) {
+                Map<String, Object> options = IntrospectionSupport.extractProperties(
+                    new HashMap<String, Object>(destination.getOptions()), "consumer.");
+                IntrospectionSupport.setProperties(consumerInfo, options);
+                if (options.size() > 0) {
+                    String msg = "There are " + options.size()
+                        + " consumer options that couldn't be set on the consumer."
+                        + " Check the options are spelled correctly."
+                        + " Unknown parameters=[" + options + "]."
+                        + " This consumer cannot be started.";
+                    LOG.warn(msg);
+                    throw new AmqpProtocolException(AmqpError.INVALID_FIELD.toString(), msg);
+                }
+            }
+
             consumerInfo.setSelector(selector);
             consumerInfo.setNoRangeAcks(true);
             consumerInfo.setDestination(destination);
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
index e4b8d7a..3583656 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
@@ -55,8 +55,10 @@
 import org.apache.activemq.broker.jmx.BrokerViewMBean;
 import org.apache.activemq.broker.jmx.ConnectorViewMBean;
 import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.broker.jmx.SubscriptionViewMBean;
 import org.apache.activemq.transport.amqp.joram.ActiveMQAdmin;
 import org.apache.activemq.util.Wait;
+import org.apache.qpid.jms.JmsConnectionFactory;
 import org.junit.Test;
 import org.objectweb.jtests.jms.framework.TestConfig;
 import org.slf4j.Logger;
@@ -1177,6 +1179,85 @@
         }
     }
 
+    @Test(timeout = 60000)
+    public void testZeroPrefetchWithTwoConsumers() throws Exception {
+        JmsConnectionFactory cf = new JmsConnectionFactory(getAmqpURI("jms.prefetchPolicy.all=0"));
+        connection = cf.createConnection();
+        connection.start();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(getDestinationName());
+
+        MessageProducer producer = session.createProducer(queue);
+        producer.send(session.createTextMessage("Msg1"));
+        producer.send(session.createTextMessage("Msg2"));
+
+        // now lets receive it
+        MessageConsumer consumer1 = session.createConsumer(queue);
+        MessageConsumer consumer2 = session.createConsumer(queue);
+        TextMessage answer = (TextMessage)consumer1.receive(5000);
+        assertNotNull(answer);
+        assertEquals("Should have received a message!", answer.getText(), "Msg1");
+        answer = (TextMessage)consumer2.receive(5000);
+        assertNotNull(answer);
+        assertEquals("Should have received a message!", answer.getText(), "Msg2");
+
+        answer = (TextMessage)consumer2.receiveNoWait();
+        assertNull("Should have not received a message!", answer);
+    }
+
+    @Test(timeout=30000)
+    public void testRetroactiveConsumerSupported() throws Exception {
+        ActiveMQAdmin.enableJMSFrameTracing();
+
+        connection = createConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(getDestinationName() + "?consumer.retroactive=true");
+        MessageConsumer consumer = session.createConsumer(queue);
+
+        QueueViewMBean queueView = getProxyToQueue(getDestinationName());
+        assertNotNull(queueView);
+        assertEquals(1, queueView.getSubscriptions().length);
+
+        SubscriptionViewMBean subscriber = getProxyToQueueSubscriber(getDestinationName());
+        assertTrue(subscriber.isRetroactive());
+
+        consumer.close();
+    }
+
+    @Test(timeout=30000)
+    public void testExclusiveConsumerSupported() throws Exception {
+        ActiveMQAdmin.enableJMSFrameTracing();
+
+        connection = createConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(getDestinationName() + "?consumer.exclusive=true");
+        MessageConsumer consumer = session.createConsumer(queue);
+
+        QueueViewMBean queueView = getProxyToQueue(getDestinationName());
+        assertNotNull(queueView);
+        assertEquals(1, queueView.getSubscriptions().length);
+
+        SubscriptionViewMBean subscriber = getProxyToQueueSubscriber(getDestinationName());
+        assertTrue(subscriber.isExclusive());
+
+        consumer.close();
+    }
+
+    @Test(timeout=30000)
+    public void testUnpplicableDestinationOption() throws Exception {
+        ActiveMQAdmin.enableJMSFrameTracing();
+
+        connection = createConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(getDestinationName() + "?consumer.unknoen=true");
+        try {
+            session.createConsumer(queue);
+            fail("Should have failed to create consumer");
+        } catch (JMSException jmsEx) {
+        }
+    }
+
     protected void receiveMessages(MessageConsumer consumer) throws Exception {
         for (int i = 0; i < 10; i++) {
             Message message = consumer.receive(1000);