QPID-7898: [Qpid JMS AMQP 0-x] Prevent possibilities of NPEs when 0-8 path parses a reply-to address that is not BURL formatted
diff --git a/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index adb79cf..d30f516 100644
--- a/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -781,6 +781,7 @@
     protected void preDeliver(AbstractJMSMessage msg)
     {
         _session.setInRecovery(false);
+        msg.setAMQSession(_session);
 
         switch (_acknowledgeMode)
         {
@@ -793,9 +794,6 @@
                 _session.addUnacknowledgedMessage(msg.getDeliveryTag());
                 break;
             case Session.CLIENT_ACKNOWLEDGE:
-                // we set the session so that when the user calls acknowledge() it can call the method on session
-                // to send out the appropriate frame
-                msg.setAMQSession(_session);
                 _session.addUnacknowledgedMessage(msg.getDeliveryTag());
                 _session.markDirty();
                 break;
diff --git a/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java b/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
index cb61770..8a9aec8 100644
--- a/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
+++ b/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
@@ -34,14 +34,18 @@
 import javax.jms.MessageNotWriteableException;
 import javax.jms.Queue;
 
+import org.apache.qpid.QpidException;
+import org.apache.qpid.client.AMQAnyDestination;
 import org.apache.qpid.client.AMQDestination;
 import org.apache.qpid.client.AMQQueue;
 import org.apache.qpid.client.AMQSession;
 import org.apache.qpid.client.AMQSession_0_8;
 import org.apache.qpid.client.AMQTopic;
 import org.apache.qpid.client.CustomJMSXProperty;
+import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.messaging.Address;
 import org.apache.qpid.url.AMQBindingURL;
 import org.apache.qpid.url.BindingURL;
 
@@ -297,15 +301,17 @@
                     else if(replyToEncoding.contains("/"))
                     {
                         String[] parts = replyToEncoding.split("/",2);
-                        dest = new NonBURLReplyToDestination(parts[0], parts[1]);
-
-
+                        dest = new NonBURLReplyToDestination(ExchangeDefaults.DIRECT_EXCHANGE_CLASS, parts[0], parts[1]);
                     }
                     else
                     {
-                        if(getAMQSession().isQueueBound(replyToEncoding, null, null))
+                        if (getAMQSession().isQueueBound(null, replyToEncoding, null))
                         {
-                            dest = new NonBURLReplyToDestination(replyToEncoding, "");
+                            dest = new NonBURLReplyToDestination(ExchangeDefaults.DIRECT_EXCHANGE_CLASS, "", replyToEncoding);
+                        }
+                        else if (isExchangeExist(replyToEncoding))
+                        {
+                            dest = new NonBURLReplyToDestination(ExchangeDefaults.DIRECT_EXCHANGE_CLASS, replyToEncoding, "");
                         }
                         else
                         {
@@ -322,6 +328,32 @@
         }
     }
 
+    private boolean isExchangeExist(final String replyToEncoding)
+    {
+        try
+        {
+            AMQDestination amqDestination = new AMQDestination()
+            {
+                @Override
+                public String getAddressName()
+                {
+                    return replyToEncoding;
+                }
+
+                @Override
+                public boolean isNameRequired()
+                {
+                    return false;
+                }
+            };
+            return getAMQSession().isExchangeExist(amqDestination, false);
+        }
+        catch (Exception e)
+        {
+            return false;
+        }
+    }
+
     public void setJMSReplyTo(Destination destination) throws JMSException
     {
         if (destination == null)
@@ -660,7 +692,7 @@
         public DefaultRouterDestination(final String replyToEncoding)
         {
             super("",
-                  "direct",
+                  ExchangeDefaults.DIRECT_EXCHANGE_CLASS,
                   replyToEncoding,
                   replyToEncoding);
         }
@@ -682,10 +714,12 @@
     {
         private static final long serialVersionUID = 122897705932489259L;
 
-        public NonBURLReplyToDestination(final String exchange, final String routingKey)
+        public NonBURLReplyToDestination(final String exchangeClass,
+                                         final String exchange,
+                                         final String routingKey)
         {
             super(exchange,
-                  null,
+                  exchangeClass,
                   routingKey,
                   routingKey);
         }
diff --git a/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
index 44681ac..d2cd007 100644
--- a/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
+++ b/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
@@ -34,8 +34,6 @@
 {
 
 
-    /** If the acknowledge mode is CLIENT_ACKNOWLEDGE the session is required */
-
     private AMQMessageDelegate _delegate;
     private boolean _redelivered;
     private boolean _receivedFromServer;
@@ -398,12 +396,6 @@
 
 
 
-    /**
-     * The session is set when CLIENT_ACKNOWLEDGE mode is used so that the CHANNEL ACK can be sent when the user calls
-     * acknowledge()
-     *
-     * @param s the AMQ session that delivered this message
-     */
     public void setAMQSession(AMQSession s)
     {
         _delegate.setAMQSession(s);
diff --git a/client/src/test/java/org/apache/qpid/client/message/AbstractJMSMessageTest.java b/client/src/test/java/org/apache/qpid/client/message/AbstractJMSMessageTest.java
index 325e2b7..2bde82b 100644
--- a/client/src/test/java/org/apache/qpid/client/message/AbstractJMSMessageTest.java
+++ b/client/src/test/java/org/apache/qpid/client/message/AbstractJMSMessageTest.java
@@ -21,17 +21,49 @@
 package org.apache.qpid.client.message;
 
 
-import org.apache.qpid.test.utils.QpidTestCase;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.List;
 
 import javax.jms.JMSException;
 
+import org.mockito.ArgumentMatcher;
+
+import org.apache.qpid.QpidException;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.test.utils.QpidTestCase;
+
 public class AbstractJMSMessageTest extends QpidTestCase
 {
 
+    private final AMQSession<?,?> _session = mock(AMQSession.class);
+    private final MessageFactoryRegistry _messageFactoryRegistry = MessageFactoryRegistry.newDefaultRegistry(_session);
+
+    public void testIncoming08ReplyTo() throws Exception
+    {
+        when(_session.isQueueBound(isNull(String.class), eq("knownQueue"), isNull(String.class))).thenReturn(true);
+        when(_session.isExchangeExist(isDestinationWithAddress("knownExchange"), anyBoolean())).thenReturn(true);
+
+        doReplyToTest("direct://amq.direct/knownQueue?routingkey='knownQueue'", "direct://amq.direct/knownQueue/knownQueue?routingkey='knownQueue'");
+        doReplyToTest("knownQueue", "direct:///knownQueue/knownQueue?routingkey='knownQueue'");
+        doReplyToTest("knownExchange", "direct://knownExchange//?routingkey=''");
+        doReplyToTest("news-service/sports", "direct://news-service/sports/sports?routingkey='sports'");
+    }
+
     public void testSetNullJMSReplyTo08() throws JMSException
     {
         JMSTextMessage message = new JMSTextMessage(AMQMessageDelegateFactory.FACTORY_0_8);
-        try 
+        try
         {
             message.setJMSReplyTo(null);
         }
@@ -44,7 +76,7 @@
     public void testSetNullJMSReplyTo10() throws JMSException
     {
         JMSTextMessage message = new JMSTextMessage(AMQMessageDelegateFactory.FACTORY_0_10);
-        try 
+        try
         {
             message.setJMSReplyTo(null);
         }
@@ -54,4 +86,37 @@
         }
     }
 
+    private void doReplyToTest(final String headerReplyTo, final String expectedReplyToAddress)
+            throws QpidException, JMSException
+    {
+        final ContentHeaderBody contentHeader = new ContentHeaderBody(new BasicContentHeaderProperties());
+        contentHeader.getProperties().setReplyTo(headerReplyTo);
+
+        final List<ContentBody> contentBodies = new ArrayList<>();
+        final AbstractJMSMessage message = _messageFactoryRegistry.createMessage(0, false,
+                                                                                 "amq.direct",
+                                                                                 "routingKey",
+                                                                                 contentHeader,
+                                                                                 contentBodies,
+                                                                                 null,
+                                                                                 null,
+                                                                                 0);
+        message.setAMQSession(_session);
+
+        assertNotNull(message.getJMSReplyTo());
+        assertEquals(expectedReplyToAddress, message.getJMSReplyTo().toString());
+    }
+
+    private AMQDestination isDestinationWithAddress(final String expectedAddress)
+    {
+        return argThat( new ArgumentMatcher<AMQDestination>()
+        {
+            @Override
+            public boolean matches(Object argument)
+            {
+                AMQDestination actual = (AMQDestination) argument;
+                return actual.getAddressName().equals(expectedAddress);
+            }
+        });
+    }
 }