https://issues.apache.org/jira/browse/AMQ-6359
Allow a receiver link to enable consumer options on the subscription
such as exclusive and retroactive using options encoded on the address
(cherry picked from commit a35e8dc8a28768ddf7f14c29cb41fdc6e2e8a605)
Conflicts:
activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java
index 20a8b9f..bb1436c 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java
@@ -46,6 +46,7 @@
import org.apache.activemq.transport.amqp.AmqpProtocolConverter;
import org.apache.activemq.transport.amqp.AmqpProtocolException;
import org.apache.activemq.transport.amqp.ResponseHandler;
+import org.apache.activemq.util.IntrospectionSupport;
import org.apache.qpid.proton.amqp.DescribedType;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Target;
@@ -313,6 +314,22 @@
int senderCredit = protonSender.getRemoteCredit();
+ // Allows the options on the destination to configure the consumerInfo
+ if (destination.getOptions() != null) {
+ Map<String, Object> options = IntrospectionSupport.extractProperties(
+ new HashMap<String, Object>(destination.getOptions()), "consumer.");
+ IntrospectionSupport.setProperties(consumerInfo, options);
+ if (options.size() > 0) {
+ String msg = "There are " + options.size()
+ + " consumer options that couldn't be set on the consumer."
+ + " Check the options are spelled correctly."
+ + " Unknown parameters=[" + options + "]."
+ + " This consumer cannot be started.";
+ LOG.warn(msg);
+ throw new AmqpProtocolException(AmqpError.INVALID_FIELD.toString(), msg);
+ }
+ }
+
consumerInfo.setSelector(selector);
consumerInfo.setNoRangeAcks(true);
consumerInfo.setDestination(destination);
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
index e4b8d7a..3583656 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
@@ -55,8 +55,10 @@
import org.apache.activemq.broker.jmx.BrokerViewMBean;
import org.apache.activemq.broker.jmx.ConnectorViewMBean;
import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.broker.jmx.SubscriptionViewMBean;
import org.apache.activemq.transport.amqp.joram.ActiveMQAdmin;
import org.apache.activemq.util.Wait;
+import org.apache.qpid.jms.JmsConnectionFactory;
import org.junit.Test;
import org.objectweb.jtests.jms.framework.TestConfig;
import org.slf4j.Logger;
@@ -1177,6 +1179,85 @@
}
}
+ @Test(timeout = 60000)
+ public void testZeroPrefetchWithTwoConsumers() throws Exception {
+ JmsConnectionFactory cf = new JmsConnectionFactory(getAmqpURI("jms.prefetchPolicy.all=0"));
+ connection = cf.createConnection();
+ connection.start();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(getDestinationName());
+
+ MessageProducer producer = session.createProducer(queue);
+ producer.send(session.createTextMessage("Msg1"));
+ producer.send(session.createTextMessage("Msg2"));
+
+ // now lets receive it
+ MessageConsumer consumer1 = session.createConsumer(queue);
+ MessageConsumer consumer2 = session.createConsumer(queue);
+ TextMessage answer = (TextMessage)consumer1.receive(5000);
+ assertNotNull(answer);
+ assertEquals("Should have received a message!", answer.getText(), "Msg1");
+ answer = (TextMessage)consumer2.receive(5000);
+ assertNotNull(answer);
+ assertEquals("Should have received a message!", answer.getText(), "Msg2");
+
+ answer = (TextMessage)consumer2.receiveNoWait();
+ assertNull("Should have not received a message!", answer);
+ }
+
+ @Test(timeout=30000)
+ public void testRetroactiveConsumerSupported() throws Exception {
+ ActiveMQAdmin.enableJMSFrameTracing();
+
+ connection = createConnection();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(getDestinationName() + "?consumer.retroactive=true");
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ QueueViewMBean queueView = getProxyToQueue(getDestinationName());
+ assertNotNull(queueView);
+ assertEquals(1, queueView.getSubscriptions().length);
+
+ SubscriptionViewMBean subscriber = getProxyToQueueSubscriber(getDestinationName());
+ assertTrue(subscriber.isRetroactive());
+
+ consumer.close();
+ }
+
+ @Test(timeout=30000)
+ public void testExclusiveConsumerSupported() throws Exception {
+ ActiveMQAdmin.enableJMSFrameTracing();
+
+ connection = createConnection();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(getDestinationName() + "?consumer.exclusive=true");
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ QueueViewMBean queueView = getProxyToQueue(getDestinationName());
+ assertNotNull(queueView);
+ assertEquals(1, queueView.getSubscriptions().length);
+
+ SubscriptionViewMBean subscriber = getProxyToQueueSubscriber(getDestinationName());
+ assertTrue(subscriber.isExclusive());
+
+ consumer.close();
+ }
+
+ @Test(timeout=30000)
+ public void testUnpplicableDestinationOption() throws Exception {
+ ActiveMQAdmin.enableJMSFrameTracing();
+
+ connection = createConnection();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(getDestinationName() + "?consumer.unknoen=true");
+ try {
+ session.createConsumer(queue);
+ fail("Should have failed to create consumer");
+ } catch (JMSException jmsEx) {
+ }
+ }
+
protected void receiveMessages(MessageConsumer consumer) throws Exception {
for (int i = 0; i < 10; i++) {
Message message = consumer.receive(1000);