QPID-7898: [Qpid JMS AMQP 0-x] Prevent possibilities of NPEs when 0-8 path parses a reply-to address that is not BURL formatted
diff --git a/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index adb79cf..d30f516 100644
--- a/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -781,6 +781,7 @@
protected void preDeliver(AbstractJMSMessage msg)
{
_session.setInRecovery(false);
+ msg.setAMQSession(_session);
switch (_acknowledgeMode)
{
@@ -793,9 +794,6 @@
_session.addUnacknowledgedMessage(msg.getDeliveryTag());
break;
case Session.CLIENT_ACKNOWLEDGE:
- // we set the session so that when the user calls acknowledge() it can call the method on session
- // to send out the appropriate frame
- msg.setAMQSession(_session);
_session.addUnacknowledgedMessage(msg.getDeliveryTag());
_session.markDirty();
break;
diff --git a/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java b/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
index cb61770..8a9aec8 100644
--- a/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
+++ b/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
@@ -34,14 +34,18 @@
import javax.jms.MessageNotWriteableException;
import javax.jms.Queue;
+import org.apache.qpid.QpidException;
+import org.apache.qpid.client.AMQAnyDestination;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQSession_0_8;
import org.apache.qpid.client.AMQTopic;
import org.apache.qpid.client.CustomJMSXProperty;
+import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.messaging.Address;
import org.apache.qpid.url.AMQBindingURL;
import org.apache.qpid.url.BindingURL;
@@ -297,15 +301,17 @@
else if(replyToEncoding.contains("/"))
{
String[] parts = replyToEncoding.split("/",2);
- dest = new NonBURLReplyToDestination(parts[0], parts[1]);
-
-
+ dest = new NonBURLReplyToDestination(ExchangeDefaults.DIRECT_EXCHANGE_CLASS, parts[0], parts[1]);
}
else
{
- if(getAMQSession().isQueueBound(replyToEncoding, null, null))
+ if (getAMQSession().isQueueBound(null, replyToEncoding, null))
{
- dest = new NonBURLReplyToDestination(replyToEncoding, "");
+ dest = new NonBURLReplyToDestination(ExchangeDefaults.DIRECT_EXCHANGE_CLASS, "", replyToEncoding);
+ }
+ else if (isExchangeExist(replyToEncoding))
+ {
+ dest = new NonBURLReplyToDestination(ExchangeDefaults.DIRECT_EXCHANGE_CLASS, replyToEncoding, "");
}
else
{
@@ -322,6 +328,32 @@
}
}
+ private boolean isExchangeExist(final String replyToEncoding)
+ {
+ try
+ {
+ AMQDestination amqDestination = new AMQDestination()
+ {
+ @Override
+ public String getAddressName()
+ {
+ return replyToEncoding;
+ }
+
+ @Override
+ public boolean isNameRequired()
+ {
+ return false;
+ }
+ };
+ return getAMQSession().isExchangeExist(amqDestination, false);
+ }
+ catch (Exception e)
+ {
+ return false;
+ }
+ }
+
public void setJMSReplyTo(Destination destination) throws JMSException
{
if (destination == null)
@@ -660,7 +692,7 @@
public DefaultRouterDestination(final String replyToEncoding)
{
super("",
- "direct",
+ ExchangeDefaults.DIRECT_EXCHANGE_CLASS,
replyToEncoding,
replyToEncoding);
}
@@ -682,10 +714,12 @@
{
private static final long serialVersionUID = 122897705932489259L;
- public NonBURLReplyToDestination(final String exchange, final String routingKey)
+ public NonBURLReplyToDestination(final String exchangeClass,
+ final String exchange,
+ final String routingKey)
{
super(exchange,
- null,
+ exchangeClass,
routingKey,
routingKey);
}
diff --git a/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
index 44681ac..d2cd007 100644
--- a/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
+++ b/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
@@ -34,8 +34,6 @@
{
- /** If the acknowledge mode is CLIENT_ACKNOWLEDGE the session is required */
-
private AMQMessageDelegate _delegate;
private boolean _redelivered;
private boolean _receivedFromServer;
@@ -398,12 +396,6 @@
- /**
- * The session is set when CLIENT_ACKNOWLEDGE mode is used so that the CHANNEL ACK can be sent when the user calls
- * acknowledge()
- *
- * @param s the AMQ session that delivered this message
- */
public void setAMQSession(AMQSession s)
{
_delegate.setAMQSession(s);
diff --git a/client/src/test/java/org/apache/qpid/client/message/AbstractJMSMessageTest.java b/client/src/test/java/org/apache/qpid/client/message/AbstractJMSMessageTest.java
index 325e2b7..2bde82b 100644
--- a/client/src/test/java/org/apache/qpid/client/message/AbstractJMSMessageTest.java
+++ b/client/src/test/java/org/apache/qpid/client/message/AbstractJMSMessageTest.java
@@ -21,17 +21,49 @@
package org.apache.qpid.client.message;
-import org.apache.qpid.test.utils.QpidTestCase;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.List;
import javax.jms.JMSException;
+import org.mockito.ArgumentMatcher;
+
+import org.apache.qpid.QpidException;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.test.utils.QpidTestCase;
+
public class AbstractJMSMessageTest extends QpidTestCase
{
+ private final AMQSession<?,?> _session = mock(AMQSession.class);
+ private final MessageFactoryRegistry _messageFactoryRegistry = MessageFactoryRegistry.newDefaultRegistry(_session);
+
+ public void testIncoming08ReplyTo() throws Exception
+ {
+ when(_session.isQueueBound(isNull(String.class), eq("knownQueue"), isNull(String.class))).thenReturn(true);
+ when(_session.isExchangeExist(isDestinationWithAddress("knownExchange"), anyBoolean())).thenReturn(true);
+
+ doReplyToTest("direct://amq.direct/knownQueue?routingkey='knownQueue'", "direct://amq.direct/knownQueue/knownQueue?routingkey='knownQueue'");
+ doReplyToTest("knownQueue", "direct:///knownQueue/knownQueue?routingkey='knownQueue'");
+ doReplyToTest("knownExchange", "direct://knownExchange//?routingkey=''");
+ doReplyToTest("news-service/sports", "direct://news-service/sports/sports?routingkey='sports'");
+ }
+
public void testSetNullJMSReplyTo08() throws JMSException
{
JMSTextMessage message = new JMSTextMessage(AMQMessageDelegateFactory.FACTORY_0_8);
- try
+ try
{
message.setJMSReplyTo(null);
}
@@ -44,7 +76,7 @@
public void testSetNullJMSReplyTo10() throws JMSException
{
JMSTextMessage message = new JMSTextMessage(AMQMessageDelegateFactory.FACTORY_0_10);
- try
+ try
{
message.setJMSReplyTo(null);
}
@@ -54,4 +86,37 @@
}
}
+ private void doReplyToTest(final String headerReplyTo, final String expectedReplyToAddress)
+ throws QpidException, JMSException
+ {
+ final ContentHeaderBody contentHeader = new ContentHeaderBody(new BasicContentHeaderProperties());
+ contentHeader.getProperties().setReplyTo(headerReplyTo);
+
+ final List<ContentBody> contentBodies = new ArrayList<>();
+ final AbstractJMSMessage message = _messageFactoryRegistry.createMessage(0, false,
+ "amq.direct",
+ "routingKey",
+ contentHeader,
+ contentBodies,
+ null,
+ null,
+ 0);
+ message.setAMQSession(_session);
+
+ assertNotNull(message.getJMSReplyTo());
+ assertEquals(expectedReplyToAddress, message.getJMSReplyTo().toString());
+ }
+
+ private AMQDestination isDestinationWithAddress(final String expectedAddress)
+ {
+ return argThat( new ArgumentMatcher<AMQDestination>()
+ {
+ @Override
+ public boolean matches(Object argument)
+ {
+ AMQDestination actual = (AMQDestination) argument;
+ return actual.getAddressName().equals(expectedAddress);
+ }
+ });
+ }
}