Merge pull request #904 from amazon-mq/remove-dead-code

[NO JIRA] Removing unused variable `concurrentStoreAndDispatchTransactions` in `KahaDBStore`
diff --git a/Jenkinsfile b/Jenkinsfile
index d4c09be..67d1cd0 100644
--- a/Jenkinsfile
+++ b/Jenkinsfile
@@ -87,6 +87,13 @@
             }
         }
 
+        stage('Verify') {
+            steps {
+                echo 'Running apache-rat:check'
+                sh 'mvn apache-rat:check'
+            }
+        }
+
         stage('Tests') {
             steps {
                 echo 'Running tests'
diff --git a/activemq-broker/pom.xml b/activemq-broker/pom.xml
index 25750c3..2b421a2 100644
--- a/activemq-broker/pom.xml
+++ b/activemq-broker/pom.xml
@@ -106,9 +106,9 @@
             <link>http://junit.sourceforge.net/javadoc/</link>
           </links>
           <stylesheetfile>${basedir}/../etc/css/stylesheet.css</stylesheetfile>
-          <linksource>true</linksource>
           <maxmemory>256m</maxmemory>
           <source>${source-version}</source>
+          <noindex>true</noindex>
           <groups>
             <group>
               <title>JMS Client</title>
diff --git a/activemq-broker/src/test/java/org/apache/activemq/ActiveMQJms2Test.java b/activemq-broker/src/test/java/org/apache/activemq/ActiveMQJms2Test.java
deleted file mode 100644
index 51e2614..0000000
--- a/activemq-broker/src/test/java/org/apache/activemq/ActiveMQJms2Test.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq;
-
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class ActiveMQJms2Test {
-
-    protected static ActiveMQConnectionFactory activemqConnectionFactory = null;
-
-    protected Connection connection = null;
-    protected Session session = null;
-    protected MessageProducer messageProducer = null;
-
-    @BeforeClass
-    public static void setUpClass() {
-        activemqConnectionFactory = new ActiveMQConnectionFactory("vm://localhost?marshal=false&broker.persistent=false"); 	
-    }
-
-    @AfterClass
-    public static void tearDownClass() {
-        activemqConnectionFactory = null;
-    }
-
-    @Before
-    public void setUp() throws JMSException {
-        connection = activemqConnectionFactory.createConnection();
-        connection.start();
-
-        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-        messageProducer = session.createProducer(session.createQueue("AMQ.JMS2.TEST"));
-    }
-
-    @After
-    public void tearDown() {
-        if(messageProducer != null) {
-            try { messageProducer.close(); } catch (Exception e) { } finally { messageProducer = null; }
-        }
-
-        if(session != null) {
-            try { session.close(); } catch (Exception e) { } finally { session = null; }
-        }
-
-        if(connection != null) {
-            try { connection.close(); } catch (Exception e) { } finally { connection = null; }
-        }
-    }
-
-    @Test(expected = UnsupportedOperationException.class)
-    public void testConnectionFactoryCreateContext() {
-        activemqConnectionFactory.createContext();
-    }
-
-    @Test(expected = UnsupportedOperationException.class)
-    public void testConnectionFactoryCreateContextSession() {
-        activemqConnectionFactory.createContext(Session.AUTO_ACKNOWLEDGE);
-    }
-
-    @Test(expected = UnsupportedOperationException.class)
-    public void testConnectionFactoryCreateContextUserPass() {
-        activemqConnectionFactory.createContext("admin", "admin");
-    }
-
-    @Test(expected = UnsupportedOperationException.class)
-    public void testConnectionFactoryCreateContextUserPassSession() {
-        activemqConnectionFactory.createContext("admin", "admin", Session.AUTO_ACKNOWLEDGE);
-    }
-
-    @Test(expected = UnsupportedOperationException.class)
-    public void testConnectionSharedConnectionConsumer() throws JMSException {
-        connection.createSharedConnectionConsumer(null, null, null, null, 10);
-    }
-
-    @Test(expected = UnsupportedOperationException.class)
-    public void testConnectionSharedDurableConnectionConsumer() throws JMSException {
-        connection.createSharedDurableConnectionConsumer(null, null, null, null, 10);
-    }
-
-    @Test(expected = UnsupportedOperationException.class)
-    public void testSessionAckMode() throws JMSException {
-        connection.createSession(Session.AUTO_ACKNOWLEDGE);
-    }
-
-    @Test(expected = UnsupportedOperationException.class)
-    public void testSessionDurableConsumer() throws JMSException {
-        session.createDurableConsumer(null, null);
-    }
-
-    @Test(expected = UnsupportedOperationException.class)
-    public void testSessionDurableConsumerSelectorNoLocal() throws JMSException {
-        session.createDurableConsumer(null, null, null, true);
-    }
-
-    @Test(expected = UnsupportedOperationException.class)
-    public void testSessionSharedConsumer() throws JMSException {
-        session.createSharedConsumer(null, null);
-    }
-
-    @Test(expected = UnsupportedOperationException.class)
-    public void testSessionSharedConsumerSelector() throws JMSException {
-        session.createSharedConsumer(null, null, null);
-    }
-
-    @Test(expected = UnsupportedOperationException.class)
-    public void testSessionSharedDurableConsumer() throws JMSException {
-        session.createSharedDurableConsumer(null, null);
-    }
-
-    @Test(expected = UnsupportedOperationException.class)
-    public void testSessionSharedDurableConsumerSelector() throws JMSException {
-        session.createSharedDurableConsumer(null, null, null);
-    }
-
-    @Test(expected = UnsupportedOperationException.class)
-    public void testProducerDeliveryDelayGet() throws JMSException {
-        messageProducer.getDeliveryDelay();
-    }
-
-    @Test(expected = UnsupportedOperationException.class)
-    public void testProducerDeliveryDelaySet() throws JMSException {
-        messageProducer.setDeliveryDelay(1000l);
-    }
-
-    @Test(expected = UnsupportedOperationException.class)
-    public void testProducerSendMessageCompletionListener() throws JMSException {
-         messageProducer.send(session.createQueue("AMQ.TEST"), null);
-    }
-
-    @Test(expected = UnsupportedOperationException.class)
-    public void testProducerSendMessageQoSParamsCompletionListener() throws JMSException {
-         messageProducer.send(null, 1, 4, 0l, null);
-    }
-
-    @Test(expected = UnsupportedOperationException.class)
-    public void testProducerSendDestinationMessageCompletionListener() throws JMSException {
-         messageProducer.send(session.createQueue("AMQ.TEST"), null, null);
-    }
-
-    @Test(expected = UnsupportedOperationException.class)
-    public void testProducerSendDestinationMessageQosParamsCompletionListener() throws JMSException {
-         messageProducer.send(session.createQueue("AMQ.TEST"), null, 1, 4, 0l, null);
-    }
-
-}
diff --git a/activemq-client/pom.xml b/activemq-client/pom.xml
index dc10320..96f9880 100644
--- a/activemq-client/pom.xml
+++ b/activemq-client/pom.xml
@@ -103,9 +103,9 @@
             <link>http://junit.sourceforge.net/javadoc/</link>
           </links>
           <stylesheetfile>${basedir}/../etc/css/stylesheet.css</stylesheetfile>
-          <linksource>true</linksource>
           <maxmemory>256m</maxmemory>
           <source>${source-version}</source>
+          <noindex>true</noindex>
           <groups>
             <group>
               <title>JMS Client</title>
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
index 461dbe1..2324448 100644
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
@@ -292,7 +292,11 @@
      */
     @Override
     public JMSContext createContext() {
-        throw new UnsupportedOperationException("createContext() is not supported");
+        try {
+            return new ActiveMQContext(createActiveMQConnection());
+        } catch (JMSException e) {
+            throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+        }
     }
 
     /**
@@ -300,7 +304,11 @@
      */
     @Override
     public JMSContext createContext(String userName, String password) {
-        throw new UnsupportedOperationException("createContext() is not supported");
+        try {
+            return new ActiveMQContext(createActiveMQConnection(userName, password));
+        } catch (JMSException e) {
+            throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+        }
     }
 
     /**
@@ -308,7 +316,11 @@
      */
     @Override
     public JMSContext createContext(String userName, String password, int sessionMode) {
-        throw new UnsupportedOperationException("createContext() is not supported");
+        try {
+            return new ActiveMQContext(createActiveMQConnection(userName, password), sessionMode);
+        } catch (JMSException e) {
+            throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+        }
     }
 
     /**
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConsumer.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConsumer.java
new file mode 100644
index 0000000..409506d
--- /dev/null
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConsumer.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import javax.jms.JMSConsumer;
+import javax.jms.JMSException;
+import javax.jms.JMSRuntimeException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+
+import org.apache.activemq.util.JMSExceptionSupport;
+
+public class ActiveMQConsumer implements JMSConsumer {
+    
+    private final ActiveMQContext activemqContext;
+    private final MessageConsumer activemqMessageConsumer;
+
+    ActiveMQConsumer(ActiveMQContext activemqContext, MessageConsumer activemqMessageConsumer) {
+        this.activemqContext = activemqContext;
+        this.activemqMessageConsumer = activemqMessageConsumer;
+    }
+
+    @Override
+    public String getMessageSelector() {
+        try {
+            return activemqMessageConsumer.getMessageSelector();
+        } catch (JMSException e) {
+            throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+        }
+    }
+
+    @Override
+    public MessageListener getMessageListener() throws JMSRuntimeException {
+        try {
+            return activemqMessageConsumer.getMessageListener();
+        } catch (JMSException e) {
+            throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+        }
+    }
+
+    @Override
+    public void setMessageListener(MessageListener listener) throws JMSRuntimeException {
+        try {
+            activemqMessageConsumer.setMessageListener(listener);
+        } catch (JMSException e) {
+            throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+        }
+    }
+
+    @Override
+    public Message receive() {
+        try {
+            return activemqMessageConsumer.receive();
+        } catch (JMSException e) {
+            throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+        }
+    }
+
+    @Override
+    public Message receive(long timeout) {
+        try {
+            return activemqMessageConsumer.receive(timeout);
+        } catch (JMSException e) {
+            throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+        }
+    }
+
+    @Override
+    public Message receiveNoWait() {
+        try {
+            return activemqMessageConsumer.receiveNoWait();
+        } catch (JMSException e) {
+            throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+        }
+    }
+
+    @Override
+    public void close() {
+        try {
+            activemqMessageConsumer.close();
+        } catch (JMSException e) {
+            throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+        }        
+    }
+
+    @Override
+    public <T> T receiveBody(Class<T> c) {
+        throw new UnsupportedOperationException("receiveBody(Class<T>) is not supported");
+    }
+
+    @Override
+    public <T> T receiveBody(Class<T> c, long timeout) {
+        throw new UnsupportedOperationException("receiveBody(Class<T>, long) is not supported");
+    }
+
+    @Override
+    public <T> T receiveBodyNoWait(Class<T> c) {
+        throw new UnsupportedOperationException("receiveBodyNoWait(Class<T>) is not supported");
+    }
+
+}
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQContext.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQContext.java
new file mode 100644
index 0000000..4966d09
--- /dev/null
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQContext.java
@@ -0,0 +1,560 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.io.Serializable;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.jms.BytesMessage;
+import javax.jms.ConnectionMetaData;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.IllegalStateRuntimeException;
+import javax.jms.JMSConsumer;
+import javax.jms.JMSContext;
+import javax.jms.JMSException;
+import javax.jms.JMSProducer;
+import javax.jms.JMSRuntimeException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.Session;
+import javax.jms.StreamMessage;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import org.apache.activemq.util.JMSExceptionSupport;
+
+/**
+ * In terms of the JMS 1.1 API a JMSContext should be thought of as 
+ * representing both a Connection and a Session. Although the simplified 
+ * API removes the need for applications to use those objects, the concepts 
+ * of connection and session remain important. A connection represents a 
+ * physical link to the JMS server and a session represents a 
+ * single-threaded context for sending and receiving messages. 
+ *
+ *
+ * @see javax.jms.JMSContext
+ */
+
+public class ActiveMQContext implements JMSContext {
+
+    private static final boolean DEFAULT_AUTO_START = true;
+
+    private final ActiveMQConnection activemqConnection;
+    private final AtomicLong connectionCounter;
+    private ActiveMQSession activemqSession = null;
+
+    // Configuration
+    private boolean autoStart = DEFAULT_AUTO_START;
+    private final int sessionMode;
+
+    // State
+    private boolean closeInvoked = false;
+    private final AtomicBoolean startInvoked = new AtomicBoolean(false);
+    private ActiveMQMessageProducer activemqMessageProducer = null;
+
+    ActiveMQContext(final ActiveMQConnection activemqConnection) {
+        this.activemqConnection = activemqConnection;
+        this.sessionMode = AUTO_ACKNOWLEDGE;
+        this.connectionCounter = new AtomicLong(1l);
+    }
+
+    ActiveMQContext(final ActiveMQConnection activemqConnection, final int sessionMode) {
+        this.activemqConnection = activemqConnection;
+        this.sessionMode = sessionMode;
+        this.connectionCounter = new AtomicLong(1l);
+    }
+
+    private ActiveMQContext(final ActiveMQConnection activemqConnection, final int sessionMode, final AtomicLong connectionCounter) {
+        this.activemqConnection = activemqConnection;
+        this.sessionMode = sessionMode;
+        this.connectionCounter = connectionCounter;
+    }
+
+    @Override
+    public JMSContext createContext(int sessionMode) {
+        if(connectionCounter.get() == 0l) {
+            throw new JMSRuntimeException("Context already closed");
+        }
+
+        connectionCounter.incrementAndGet();
+        return new ActiveMQContext(activemqConnection, sessionMode, connectionCounter);
+    }
+
+    @Override
+    public JMSProducer createProducer() {
+        return new ActiveMQProducer(this, getCreatedActiveMQMessageProducer());
+    }
+
+    @Override
+    public String getClientID() {
+        try {
+            return this.activemqConnection.getClientID();
+        } catch (JMSException e) {
+            throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+        }
+    }
+
+    @Override
+    public void setClientID(String clientID) {
+        try {
+            this.activemqConnection.setClientID(clientID);
+        } catch (JMSException e) {
+            throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+        }
+    }
+
+    @Override
+    public ConnectionMetaData getMetaData() {
+        checkContextState();
+        try {
+            return this.activemqConnection.getMetaData();
+        } catch (JMSException e) {
+                throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+        }
+    }
+
+    @Override
+    public ExceptionListener getExceptionListener() {
+        checkContextState();
+        try {
+            return this.activemqConnection.getExceptionListener();
+        } catch (JMSException e) {
+            throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+        }
+    }
+
+    @Override
+    public void setExceptionListener(ExceptionListener listener) {
+        checkContextState();
+        try {
+            this.activemqConnection.setExceptionListener(listener);
+        } catch (JMSException e) {
+            throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+        }
+    }
+
+    @Override
+    public void start() {
+        checkContextState();
+        try {
+            if(startInvoked.compareAndSet(false, true)) {
+                this.activemqConnection.start();
+            }
+        } catch (JMSException e) {
+            throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+        }
+    }
+
+    @Override
+    public void stop() {
+        checkContextState();
+        try {
+            if(startInvoked.compareAndSet(true, false)) {
+                this.activemqConnection.stop();
+            }
+        } catch (JMSException e) {
+            throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+        }
+    }
+
+    @Override
+    public void setAutoStart(boolean autoStart) {
+       this.autoStart = autoStart;
+    }
+
+    @Override
+    public boolean getAutoStart() {
+        return this.autoStart;
+    }
+
+    @Override
+    public synchronized void close() {
+        JMSRuntimeException firstException = null;
+
+        if(this.activemqMessageProducer != null) {
+            try {
+                this.activemqMessageProducer.close();
+            } catch (JMSException e) {
+                if(firstException == null) { 
+                    firstException = JMSExceptionSupport.convertToJMSRuntimeException(e);
+                }
+            }
+        }
+
+        if(this.activemqSession != null) {
+            try {
+               this.activemqSession.close();
+            } catch (JMSException e) {
+               if(firstException == null) {
+                   firstException = JMSExceptionSupport.convertToJMSRuntimeException(e);
+               }
+            }
+        }
+
+        if(connectionCounter.decrementAndGet() == 0) {
+            if(this.activemqConnection != null) {
+                try {
+                    closeInvoked = true;
+                    this.activemqConnection.close();
+                } catch (JMSException e) {
+                    if(firstException == null) {
+                        firstException = JMSExceptionSupport.convertToJMSRuntimeException(e);
+                    }
+                }
+            }
+        }
+
+        if(firstException != null) {
+            throw firstException;
+        }
+    }
+
+    @Override
+    public BytesMessage createBytesMessage() {
+        checkContextState();
+        try {
+            return activemqSession.createBytesMessage();
+        } catch (JMSException e) {
+            throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+        }
+     }
+
+    @Override
+    public MapMessage createMapMessage() {
+        checkContextState();
+        try {
+            return activemqSession.createMapMessage();
+        } catch (JMSException e) {
+            throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+        }
+    }
+
+    @Override
+    public Message createMessage() {
+        checkContextState();
+        try {
+            return activemqSession.createMessage();
+        } catch (JMSException e) {
+            throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+        }
+    }
+
+    @Override
+    public ObjectMessage createObjectMessage() {
+        checkContextState();
+        try {
+            return activemqSession.createObjectMessage();
+        } catch (JMSException e) {
+            throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+        }
+    }
+
+    @Override
+    public ObjectMessage createObjectMessage(Serializable object) {
+        checkContextState();
+        try {
+            return activemqSession.createObjectMessage(object);
+        } catch (JMSException e) {
+            throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+        }
+    }
+
+    @Override
+    public StreamMessage createStreamMessage() {
+        checkContextState();
+        try {
+            return activemqSession.createStreamMessage();
+        } catch (JMSException e) {
+            throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+        }
+    }
+
+    @Override
+    public TextMessage createTextMessage() {
+        checkContextState();
+        try {
+            return activemqSession.createTextMessage();
+        } catch (JMSException e) {
+            throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+        }
+    }
+
+    @Override
+    public TextMessage createTextMessage(String text) {
+        checkContextState();
+        try {
+            return activemqSession.createTextMessage(text);
+        } catch (JMSException e) {
+            throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+        }
+    }
+
+    @Override
+    public boolean getTransacted() {
+        checkContextState();
+        try {
+            return activemqSession.getTransacted();
+        } catch (JMSException e) {
+            throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+        }
+    }
+
+    @Override
+    public int getSessionMode() {
+        return this.sessionMode;
+    }
+
+    @Override
+    public void commit() {
+        checkContextState();
+        try {
+            activemqSession.commit();
+        } catch (JMSException e) {
+            throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+        }
+    }
+
+    @Override
+    public void rollback() {
+        checkContextState();
+        try {
+            activemqSession.rollback();
+        } catch (JMSException e) {
+            throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+        }
+    }
+
+    @Override
+    public void recover() {
+        checkContextState();
+        try {
+            activemqSession.recover();
+        } catch (JMSException e) {
+            throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+        }
+    }
+
+    @Override
+    public JMSConsumer createConsumer(Destination destination) {
+        checkContextState();
+        try {
+            if(getAutoStart()) {
+                start();
+            }
+            return new ActiveMQConsumer(this, activemqSession.createConsumer(destination));
+        } catch (JMSException e) {
+            throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+        }
+    }
+
+    @Override
+    public JMSConsumer createConsumer(Destination destination, String messageSelector) {
+        checkContextState();
+        try {
+            if(getAutoStart()) {
+                start();
+            }
+            return new ActiveMQConsumer(this, activemqSession.createConsumer(destination, messageSelector));
+        } catch (JMSException e) {
+            throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+        }
+    }
+
+    @Override
+    public JMSConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) {
+        checkContextState();
+        try {
+            if(getAutoStart()) {
+                start();
+            }
+            return new ActiveMQConsumer(this, activemqSession.createConsumer(destination, messageSelector, noLocal));
+        } catch (JMSException e) {
+            throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+        }
+    }
+
+    @Override
+    public Queue createQueue(String queueName) {
+        checkContextState();
+        try {
+            return activemqSession.createQueue(queueName);
+        } catch (JMSException e) {
+            throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+        }
+    }
+
+    @Override
+    public Topic createTopic(String topicName) {
+        checkContextState();
+        try {
+            return activemqSession.createTopic(topicName);
+        } catch (JMSException e) {
+            throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+        }
+    }
+
+    @Override
+    public JMSConsumer createDurableConsumer(Topic topic, String name) {
+        checkContextState();
+        try {
+            if(getAutoStart()) {
+                start();
+            }
+            return new ActiveMQConsumer(this, activemqSession.createDurableConsumer(topic, name));
+        } catch (JMSException e) {
+            throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+        }
+    }
+
+    @Override
+    public JMSConsumer createDurableConsumer(Topic topic, String name, String messageSelector, boolean noLocal) {
+        checkContextState();
+        try {
+            if(getAutoStart()) {
+                start();
+            }
+            return new ActiveMQConsumer(this, activemqSession.createDurableConsumer(topic, name, messageSelector, noLocal));
+        } catch (JMSException e) {
+            throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+        }
+    }
+
+    @Override
+    public JMSConsumer createSharedDurableConsumer(Topic topic, String name) {
+        throw new UnsupportedOperationException("createSharedDurableConsumer(topic, name) is not supported");
+    }
+
+    @Override
+    public JMSConsumer createSharedDurableConsumer(Topic topic, String name, String messageSelector) {
+        throw new UnsupportedOperationException("createDurableConsumer(topic, name, messageSelector) is not supported");
+    }
+
+    @Override
+    public JMSConsumer createSharedConsumer(Topic topic, String sharedSubscriptionName) {
+        throw new UnsupportedOperationException("createSharedConsumer(topic, sharedSubscriptionName) is not supported");
+    }
+
+    @Override
+    public JMSConsumer createSharedConsumer(Topic topic, String sharedSubscriptionName, String messageSelector) {
+        throw new UnsupportedOperationException("createSharedConsumer(topic, sharedSubscriptionName, messageSelector) is not supported");
+    }
+
+    @Override
+    public QueueBrowser createBrowser(Queue queue) {
+        checkContextState();
+        try {
+            if(getAutoStart()) {
+                start();
+            }
+            return activemqSession.createBrowser(queue);
+        } catch (JMSException e) {
+            throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+        }
+    }
+
+    @Override
+    public QueueBrowser createBrowser(Queue queue, String messageSelector) {
+        checkContextState();
+        try {
+            if(getAutoStart()) {
+                start();
+            }
+            return activemqSession.createBrowser(queue, messageSelector);
+        } catch (JMSException e) {
+            throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+        }
+    }
+
+    @Override
+    public TemporaryQueue createTemporaryQueue() {
+        checkContextState();
+        try {
+            return activemqSession.createTemporaryQueue();
+        } catch (JMSException e) {
+            throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+        }
+    }
+
+    @Override
+    public TemporaryTopic createTemporaryTopic() {
+        checkContextState();
+        try {
+            return activemqSession.createTemporaryTopic();
+        } catch (JMSException e) {
+            throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+        }
+    }
+
+    @Override
+    public void unsubscribe(String name) {
+        checkContextState();
+        try {
+            activemqSession.unsubscribe(name);
+        } catch (JMSException e) {
+            throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+        }
+    }
+
+    @Override
+    public void acknowledge() {
+        checkContextState();
+        try {
+            activemqSession.acknowledge();
+        } catch (JMSException e) {
+            throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+        }
+    }
+
+    private void checkContextState() {
+        if (activemqConnection == null) {
+            throw new JMSRuntimeException("Connection not available");
+        }
+
+        if (activemqSession == null) {
+            if (closeInvoked) {
+                throw new IllegalStateRuntimeException("Context is closed");
+            }
+            try {
+            	Session jmsSession = activemqConnection.createSession(SESSION_TRANSACTED == sessionMode, sessionMode);
+            	activemqSession = ActiveMQSession.class.cast(jmsSession);
+            } catch (JMSException e) {
+                throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+            }
+         }
+    }
+
+    private ActiveMQMessageProducer getCreatedActiveMQMessageProducer() {
+        checkContextState();
+        
+        if (this.activemqMessageProducer == null) {
+            try {
+                this.activemqMessageProducer = new ActiveMQMessageProducer(activemqSession, activemqSession.getNextProducerId(), null, activemqConnection.getSendTimeout());
+            } catch (JMSException e) {
+                throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+            }
+         }
+        return this.activemqMessageProducer;
+    }
+
+}
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java
index 9fd2439..9e9ebd4 100644
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java
@@ -223,7 +223,7 @@
     public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
         this.send(destination, message, deliveryMode, priority, timeToLive, (AsyncCallback)null);
     }
-    
+
     /**
      *
      * @param message the message to send
@@ -240,7 +240,6 @@
     @Override
     public void send(Message message, CompletionListener completionListener) throws JMSException {
         throw new UnsupportedOperationException("send(Message, CompletionListener) is not supported");
-
     }
 
     @Override
@@ -287,6 +286,11 @@
     }
 
     public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, AsyncCallback onComplete) throws JMSException {
+        this.send(destination, message, deliveryMode, priority, timeToLive, getDisableMessageID(), getDisableMessageTimestamp(), onComplete);
+    }
+
+    public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, boolean disableMessageID, boolean disableMessageTimestamp, AsyncCallback onComplete) throws JMSException {
+
         checkClosed();
         if (destination == null) {
             if (info.getDestination() == null) {
@@ -322,7 +326,7 @@
             }
         }
 
-        this.session.send(this, dest, message, deliveryMode, priority, timeToLive, producerWindow, sendTimeout, onComplete);
+        this.session.send(this, dest, message, deliveryMode, priority, timeToLive, disableMessageID, disableMessageTimestamp, producerWindow, sendTimeout, onComplete);
 
         stats.onMessage();
     }
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducerSupport.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducerSupport.java
index b7dd6cf..accc7e5 100644
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducerSupport.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducerSupport.java
@@ -16,11 +16,15 @@
  */
 package org.apache.activemq;
 
+import java.util.Set;
+
 import javax.jms.DeliveryMode;
 import javax.jms.Destination;
 import javax.jms.IllegalStateException;
+import javax.jms.IllegalStateRuntimeException;
 import javax.jms.JMSException;
 import javax.jms.Message;
+import javax.jms.MessageFormatRuntimeException;
 import javax.jms.MessageProducer;
 
 /**
@@ -82,13 +86,13 @@
      * <P>
      * Message IDs are enabled by default.
      *
-     * @param value indicates if message IDs are disabled
+     * @param disableMessageID indicates if message IDs are disabled
      * @throws javax.jms.JMSException if the JMS provider fails to close the producer due to
      *                      some internal error.
      */
-    public void setDisableMessageID(boolean value) throws JMSException {
+    public void setDisableMessageID(boolean disableMessageID) throws JMSException {
         checkClosed();
-        this.disableMessageID = value;
+        this.disableMessageID = disableMessageID;
     }
 
     /**
@@ -118,13 +122,13 @@
      * <P>
      * Message timestamps are enabled by default.
      *
-     * @param value indicates if message timestamps are disabled
+     * @param disableMessageTimestamp indicates if message timestamps are disabled
      * @throws javax.jms.JMSException if the JMS provider fails to close the producer due to
      *                      some internal error.
      */
-    public void setDisableMessageTimestamp(boolean value) throws JMSException {
+    public void setDisableMessageTimestamp(boolean disableMessageTimestamp) throws JMSException {
         checkClosed();
-        this.disableMessageTimestamp = value;
+        this.disableMessageTimestamp = disableMessageTimestamp;
     }
 
     /**
@@ -345,4 +349,46 @@
     public void setSendTimeout(int sendTimeout) {
         this.sendTimeout = sendTimeout;
     }
+
+    // See JMS 2.0 spec sections 3.5.1 and 3.8.1.1
+    public static final Set<String> JMS_PROPERTY_NAMES_DISALLOWED = Set.of("JMSDeliveryMode", "JMSPriority", "JMSMessageID", "JMSTimestamp", "JMSCorrelationID", "JMSType", "NULL", "TRUE", "FALSE", "NOT", "AND", "OR", "BETWEEN", "LIKE", "IN", "IS", "ESCAPE");
+
+    public static void validateValidPropertyName(String propertyName) throws IllegalStateRuntimeException {
+        if(propertyName == null || propertyName.length() == 0) {
+            throw new IllegalArgumentException("Invalid JMS property name must not be null or empty");
+        }
+
+        if(JMS_PROPERTY_NAMES_DISALLOWED.contains(propertyName)) {
+            throw new IllegalArgumentException("Invalid JMS property: " + propertyName + " name is in disallowed list");
+        }
+
+        char first = propertyName.charAt(0);
+        if(!(Character.isJavaIdentifierStart(first))) {
+            throw new IllegalArgumentException("Invalid JMS property: " + propertyName + " name starts with invalid character: " + first);
+        }
+
+        for (int i=1; i < propertyName.length(); i++) {
+            char c = propertyName.charAt(i);
+            if (!(Character.isJavaIdentifierPart(c))) {
+                throw new IllegalArgumentException("Invalid JMS property: " + propertyName + " name contains invalid character: " + c);
+            }
+        }
+    }
+
+    public static void validateValidPropertyValue(String propertyName, Object propertyValue) throws IllegalStateRuntimeException {
+        boolean invalid = true;
+
+        if(propertyValue == null || propertyValue instanceof String ||
+           propertyValue instanceof Integer || propertyValue instanceof Short ||
+           propertyValue instanceof Float || propertyValue instanceof Long ||
+           propertyValue instanceof Boolean || propertyValue instanceof Byte ||
+           propertyValue instanceof Character || propertyValue instanceof Double) {
+           invalid = false;
+        }
+
+        if(invalid) {
+            throw new MessageFormatRuntimeException("Invalid JMS property: " + propertyName + " value class: " + propertyValue.getClass().getName() + " is not permitted by specification");
+        }
+    }
+
 }
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageTransformation.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageTransformation.java
index 2d6094e..f456b5f 100644
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageTransformation.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageTransformation.java
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq;
 
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
 import java.net.MalformedURLException;
 import java.util.Enumeration;
 
@@ -179,6 +181,7 @@
         toMessage.setJMSReplyTo(transformDestination(fromMessage.getJMSReplyTo()));
         toMessage.setJMSDestination(transformDestination(fromMessage.getJMSDestination()));
         toMessage.setJMSDeliveryMode(fromMessage.getJMSDeliveryMode());
+        toMessage.setJMSDeliveryTime(getFromMessageDeliveryTime(fromMessage)); // TODO: AMQ-8500 DeliveryTime support ref: ActiveMQSession#send
         toMessage.setJMSRedelivered(fromMessage.getJMSRedelivered());
         toMessage.setJMSType(fromMessage.getJMSType());
         toMessage.setJMSExpiration(fromMessage.getJMSExpiration());
@@ -193,4 +196,23 @@
             toMessage.setObjectProperty(name, obj);
         }
     }
+
+    private static long getFromMessageDeliveryTime(Message fromMessage) throws JMSException {
+        Method deliveryTimeGetMethod = null;
+        try {
+            Class<?> clazz = fromMessage.getClass();
+            Method method = clazz.getMethod("getJMSDeliveryTime");
+            if (!Modifier.isAbstract(method.getModifiers())) {
+                deliveryTimeGetMethod = method;
+            }
+        } catch (NoSuchMethodException e) {
+            // We fallback to JMSTimestamp for jms v1.x
+        }
+
+        if (deliveryTimeGetMethod != null) {
+            return fromMessage.getJMSDeliveryTime();
+        }
+
+        return fromMessage.getJMSTimestamp();
+    }
 }
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQProducer.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQProducer.java
new file mode 100644
index 0000000..7a6c621
--- /dev/null
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQProducer.java
@@ -0,0 +1,545 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import javax.jms.BytesMessage;
+import javax.jms.CompletionListener;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.IllegalStateRuntimeException;
+import javax.jms.JMSException;
+import javax.jms.JMSProducer;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageFormatRuntimeException;
+import javax.jms.ObjectMessage;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.util.JMSExceptionSupport;
+import org.apache.activemq.util.TypeConversionSupport;
+
+public class ActiveMQProducer implements JMSProducer {
+
+    private final ActiveMQContext activemqContext;
+    private final ActiveMQMessageProducer activemqMessageProducer;
+
+    // QoS override of defaults on a per-JMSProducer instance basis
+    private String correlationId = null;
+    private byte[] correlationIdBytes = null;
+    private Integer deliveryMode = null;
+    private Boolean disableMessageID = false;
+    private Boolean disableMessageTimestamp = false;
+    private Integer priority = null;
+    private Destination replyTo = null;
+    private Long timeToLive = null;
+    private String type = null;
+
+    // Properties applied to all messages on a per-JMS producer instance basis
+    private Map<String, Object> messageProperties = null;
+
+    ActiveMQProducer(ActiveMQContext activemqContext, ActiveMQMessageProducer activemqMessageProducer) {
+        this.activemqContext = activemqContext;
+        this.activemqMessageProducer = activemqMessageProducer;
+    }
+
+    @Override
+    public JMSProducer send(Destination destination, Message message) {
+        try {
+            if(this.correlationId != null) {
+                message.setJMSCorrelationID(this.correlationId);
+            }
+
+            if(this.correlationIdBytes != null) {
+                message.setJMSCorrelationIDAsBytes(this.correlationIdBytes);
+            }
+
+            if(this.replyTo != null) {
+                message.setJMSReplyTo(this.replyTo);
+            }
+
+            if(this.type != null) {
+                message.setJMSType(this.type);
+            }
+
+            if(messageProperties != null && !messageProperties.isEmpty()) {
+                for(Map.Entry<String, Object> propertyEntry : messageProperties.entrySet()) {
+                    message.setObjectProperty(propertyEntry.getKey(), propertyEntry.getValue());
+                }
+            }
+
+            activemqMessageProducer.send(destination, message, getDeliveryMode(), getPriority(), getTimeToLive(), getDisableMessageID(), getDisableMessageTimestamp(), null);
+        } catch (JMSException e) {
+            throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+        }
+        return this;
+    }
+
+    @Override
+    public JMSProducer send(Destination destination, String body) {
+        TextMessage textMessage = activemqContext.createTextMessage(body);
+        send(destination, textMessage);
+        return this;
+    }
+
+    @Override
+    public JMSProducer send(Destination destination, Map<String, Object> body) {
+        MapMessage mapMessage = activemqContext.createMapMessage();
+
+        if (body != null) {
+            try {
+               for (Map.Entry<String, Object> mapEntry : body.entrySet()) {
+                  final String key = mapEntry.getKey();
+                  final Object value = mapEntry.getValue();
+                  final Class<?> valueObject = value.getClass();
+                  if (String.class.isAssignableFrom(valueObject)) {
+                      mapMessage.setString(key, String.class.cast(value));
+                  } else if (Integer.class.isAssignableFrom(valueObject)) {
+                      mapMessage.setInt(key, Integer.class.cast(value));
+                  } else if (Long.class.isAssignableFrom(valueObject)) {
+                      mapMessage.setLong(key, Long.class.cast(value));
+                  } else if (Double.class.isAssignableFrom(valueObject)) {
+                      mapMessage.setDouble(key, Double.class.cast(value));
+                  } else if (Boolean.class.isAssignableFrom(valueObject)) {
+                      mapMessage.setBoolean(key, Boolean.class.cast(value));
+                  } else if (Character.class.isAssignableFrom(valueObject)) {
+                      mapMessage.setChar(key, Character.class.cast(value));
+                  } else if (Short.class.isAssignableFrom(valueObject)) {
+                      mapMessage.setShort(key, Short.class.cast(value));
+                  } else if (Float.class.isAssignableFrom(valueObject)) {
+                      mapMessage.setFloat(key, Float.class.cast(value));
+                  } else if (Byte.class.isAssignableFrom(valueObject)) {
+                      mapMessage.setByte(key, Byte.class.cast(value));
+                  } else if (byte[].class.isAssignableFrom(valueObject)) {
+                      byte[] array = byte[].class.cast(value);
+                      mapMessage.setBytes(key, array, 0, array.length);
+                  } else {
+                      mapMessage.setObject(key, value);
+                  }
+               }
+            } catch (JMSException e) {
+               throw new MessageFormatRuntimeException(e.getMessage());
+            }
+         }
+         send(destination, mapMessage);
+         return this;
+    }
+
+    @Override
+    public JMSProducer send(Destination destination, byte[] body) {
+        BytesMessage bytesMessage = activemqContext.createBytesMessage();
+
+        try {
+            if(body != null) {
+                bytesMessage.writeBytes(body);
+            }
+            send(destination, bytesMessage);
+        } catch (JMSException e) {
+            throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+        }
+        return this;
+    }
+
+    @Override
+    public JMSProducer send(Destination destination, Serializable body) {
+        ObjectMessage objectMessage = activemqContext.createObjectMessage(body);
+        send(destination, objectMessage);
+        return this;
+    }
+
+    @Override
+    public JMSProducer setDisableMessageID(boolean disableMessageID) {
+        this.disableMessageID = disableMessageID;
+        return this;
+    }
+
+    @Override
+    public boolean getDisableMessageID() {
+        if(this.disableMessageID != null) {
+            return this.disableMessageID;
+        }
+
+        try {
+            return this.activemqMessageProducer.getDisableMessageID();
+        } catch (JMSException e) {
+            throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+        }
+    }
+
+    @Override
+    public JMSProducer setDisableMessageTimestamp(boolean disableMessageTimestamp) {
+        this.disableMessageTimestamp = disableMessageTimestamp;
+        return this;
+    }
+
+    @Override
+    public boolean getDisableMessageTimestamp() {
+        if(this.disableMessageTimestamp != null) {
+            return this.disableMessageTimestamp;
+        }
+
+        try {
+            return this.activemqMessageProducer.getDisableMessageTimestamp();
+        } catch (JMSException e) {
+            throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+        }
+    }
+
+    @Override
+    public JMSProducer setDeliveryMode(int deliveryMode) {
+        if (deliveryMode != DeliveryMode.PERSISTENT && deliveryMode != DeliveryMode.NON_PERSISTENT) {
+            throw new IllegalStateRuntimeException("unknown delivery mode: " + deliveryMode);
+        }
+        this.deliveryMode = deliveryMode;
+        return this;
+    }
+
+    @Override
+    public int getDeliveryMode() {
+        if(deliveryMode != null) {
+            return deliveryMode;
+        }
+
+        try {
+            return this.activemqMessageProducer.getDeliveryMode();
+        } catch (JMSException e) {
+            throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+        }
+    }
+
+    @Override
+    public JMSProducer setPriority(int priority) {
+        if (priority < 0 || priority > 9) {
+            throw new IllegalStateRuntimeException("default priority must be a value between 0 and 9");
+        }
+        this.priority = priority;
+        return this;
+    }
+
+    @Override
+    public int getPriority() {
+        if(priority != null) {
+            return priority;
+        }
+
+        try {
+            return this.activemqMessageProducer.getPriority();
+        } catch (JMSException e) {
+            throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+        }
+    }
+
+    @Override
+    public JMSProducer setTimeToLive(long timeToLive) {
+        this.timeToLive = timeToLive;
+        return this;
+    }
+
+    @Override
+    public long getTimeToLive() {
+        if(timeToLive != null) {
+            return timeToLive;
+        }
+
+        try {
+            return this.activemqMessageProducer.getTimeToLive();
+        } catch (JMSException e) {
+            throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+        }
+    }
+
+    @Override
+    public JMSProducer setDeliveryDelay(long deliveryDelay) {
+        throw new UnsupportedOperationException("setDeliveryDelay(long) is not supported");
+    }
+
+    @Override
+    public long getDeliveryDelay() {
+        throw new UnsupportedOperationException("getDeliveryDelay() is not supported");
+    }
+
+    @Override
+    public JMSProducer setAsync(CompletionListener completionListener) {
+        throw new UnsupportedOperationException("setAsync(CompletionListener) is not supported");
+    }
+
+    @Override
+    public CompletionListener getAsync() {
+        throw new UnsupportedOperationException("getAsync() is not supported");
+    }
+
+    @Override
+    public JMSProducer setProperty(String name, boolean value) {
+        checkProperty(name, value);
+        getCreatedMessageProperties().put(name, value);
+        return this;
+    }
+
+    @Override
+    public JMSProducer setProperty(String name, byte value) {
+        checkProperty(name, value);
+        getCreatedMessageProperties().put(name, value);
+        return this;
+    }
+
+    @Override
+    public JMSProducer setProperty(String name, short value) {
+        checkProperty(name, value);
+        getCreatedMessageProperties().put(name, value);
+        return this;
+    }
+
+    @Override
+    public JMSProducer setProperty(String name, int value) {
+        checkProperty(name, value);
+        getCreatedMessageProperties().put(name, value);
+        return this;
+    }
+
+    @Override
+    public JMSProducer setProperty(String name, long value) {
+        checkProperty(name, value);
+        getCreatedMessageProperties().put(name, value);
+        return this;
+    }
+
+    @Override
+    public JMSProducer setProperty(String name, float value) {
+        checkProperty(name, value);
+        getCreatedMessageProperties().put(name, value);
+        return this;
+    }
+
+    @Override
+    public JMSProducer setProperty(String name, double value) {
+        checkProperty(name, value);
+        getCreatedMessageProperties().put(name, value);
+        return this;
+    }
+
+    @Override
+    public JMSProducer setProperty(String name, String value) {
+        checkProperty(name, value);
+        getCreatedMessageProperties().put(name, value);
+        return this;
+    }
+
+    @Override
+    public JMSProducer setProperty(String name, Object value) {
+        checkProperty(name, value);
+        getCreatedMessageProperties().put(name, value);
+        return this;
+    }
+
+    @Override
+    public JMSProducer clearProperties() {
+        getCreatedMessageProperties().clear();
+        return this;
+    }
+
+    @Override
+    public boolean propertyExists(String name) {
+        if(name == null || name.isEmpty()) {
+            return false;
+        }
+        return getCreatedMessageProperties().containsKey(name);
+    }
+
+    @Override
+    public boolean getBooleanProperty(String name) {
+        Object value = getCreatedMessageProperties().get(name);
+        if (value == null) {
+            throw new NullPointerException("property " + name + " was null");
+        }
+        Boolean rc = (Boolean)TypeConversionSupport.convert(value, Boolean.class);
+        if (rc == null) {
+            throw new MessageFormatRuntimeException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a boolean");
+        }
+        return rc.booleanValue();
+    }
+
+    @Override
+    public byte getByteProperty(String name) {
+        Object value = getCreatedMessageProperties().get(name);
+        if (value == null) {
+            throw new NullPointerException("property " + name + " was null");
+        }
+        Byte rc = (Byte)TypeConversionSupport.convert(value, Byte.class);
+        if (rc == null) {
+            throw new MessageFormatRuntimeException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a byte");
+        }
+        return rc.byteValue();
+    }
+
+    @Override
+    public short getShortProperty(String name) {
+        Object value = getCreatedMessageProperties().get(name);
+        if (value == null) {
+            throw new NullPointerException("property " + name + " was null");
+        }
+        Short rc = (Short)TypeConversionSupport.convert(value, Short.class);
+        if (rc == null) {
+            throw new MessageFormatRuntimeException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a short");
+        }
+        return rc.shortValue();
+    }
+
+    @Override
+    public int getIntProperty(String name) {
+        Object value = getCreatedMessageProperties().get(name);
+        if (value == null) {
+            throw new NullPointerException("property " + name + " was null");
+        }
+        Integer rc = (Integer)TypeConversionSupport.convert(value, Integer.class);
+        if (rc == null) {
+            throw new MessageFormatRuntimeException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as an integer");
+        }
+        return rc.intValue();
+    }
+
+    @Override
+    public long getLongProperty(String name) {
+        Object value = getCreatedMessageProperties().get(name);
+        if (value == null) {
+            throw new NullPointerException("property " + name + " was null");
+        }
+        Long rc = (Long)TypeConversionSupport.convert(value, Long.class);
+        if (rc == null) {
+            throw new MessageFormatRuntimeException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a long");
+        }
+        return rc.longValue();
+    }
+
+    @Override
+    public float getFloatProperty(String name) {
+        Object value = getCreatedMessageProperties().get(name);
+        if (value == null) {
+            throw new NullPointerException("property " + name + " was null");
+        }
+        Float rc = (Float)TypeConversionSupport.convert(value, Float.class);
+        if (rc == null) {
+            throw new MessageFormatRuntimeException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a float");
+        }
+        return rc.floatValue();
+    }
+
+    @Override
+    public double getDoubleProperty(String name) {
+        Object value = getCreatedMessageProperties().get(name);
+        if (value == null) {
+            throw new NullPointerException("property " + name + " was null");
+        }
+        Double rc = (Double)TypeConversionSupport.convert(value, Double.class);
+        if (rc == null) {
+            throw new MessageFormatRuntimeException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a double");
+        }
+        return rc.doubleValue();
+    }
+
+    @Override
+    public String getStringProperty(String name) {
+        Object value = getCreatedMessageProperties().get(name);
+        if (value == null) {
+            throw new NullPointerException("property " + name + " was null");
+        }
+        String rc = (String)TypeConversionSupport.convert(value, String.class);
+        if (rc == null) {
+            throw new MessageFormatRuntimeException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a string");
+        }
+        return rc;
+    }
+
+    @Override
+    public Object getObjectProperty(String name) {
+        if (name == null) {
+            throw new NullPointerException("Property name cannot be null");
+        }
+
+        // TODO: Update PropertyExpression to handle converting message headers to properties for JMSProducer.
+        //PropertyExpression expression = new PropertyExpression(name);
+        //return expression.evaluate(this);
+        return getCreatedMessageProperties().get(name);
+    }
+
+    @Override
+    public Set<String> getPropertyNames() {
+        return getCreatedMessageProperties().keySet();
+    }
+
+    @Override
+    public JMSProducer setJMSCorrelationIDAsBytes(byte[] correlationID) {
+        if(correlationID != null) {
+            this.correlationIdBytes = Arrays.copyOf(correlationID, correlationID.length);
+        }
+        return this;
+    }
+
+    @Override
+    public byte[] getJMSCorrelationIDAsBytes() {
+        return this.correlationIdBytes;
+    }
+
+    @Override
+    public JMSProducer setJMSCorrelationID(String correlationID) {
+        this.correlationId = correlationID;
+        return this;
+    }
+
+    @Override
+    public String getJMSCorrelationID() {
+        return this.correlationId;
+    }
+
+    @Override
+    public JMSProducer setJMSType(String type) {
+        this.type = type;
+        return this;
+    }
+
+    @Override
+    public String getJMSType() {
+        return this.type;
+    }
+
+    @Override
+    public JMSProducer setJMSReplyTo(Destination replyTo) {
+        this.replyTo = replyTo;
+        return this;
+    }
+
+    @Override
+    public Destination getJMSReplyTo() {
+        return replyTo;
+    }
+
+    private void checkProperty(String name, Object value) {
+        ActiveMQMessageProducerSupport.validateValidPropertyName(name);
+        ActiveMQMessageProducerSupport.validateValidPropertyValue(name, value);
+    }
+
+    private Map<String, Object> getCreatedMessageProperties() {
+        if(messageProperties == null) {
+            messageProperties = new HashMap<>();
+        }
+        return messageProperties;
+    }
+}
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
index 2ba1fe1..66f7347 100644
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
@@ -19,6 +19,8 @@
 import java.io.File;
 import java.io.InputStream;
 import java.io.Serializable;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
 import java.net.URL;
 import java.util.Collections;
 import java.util.Iterator;
@@ -1394,12 +1396,14 @@
 
     @Override
     public MessageConsumer createDurableConsumer(Topic topic, String name) throws JMSException {
-        throw new UnsupportedOperationException("createDurableConsumer(Topic, name) is not supported");
+        checkClosed();
+        return createDurableSubscriber(topic, name, null, false);
     }
 
     @Override
     public MessageConsumer createDurableConsumer(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException {
-        throw new UnsupportedOperationException("createDurableConsumer(Topic, name, messageSelector, noLocal) is not supported");
+        checkClosed();
+        return createDurableSubscriber(topic, name, messageSelector, noLocal);
     }
 
     @Override
@@ -1939,6 +1943,26 @@
      */
     protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive,
                         MemoryUsage producerWindow, int sendTimeout, AsyncCallback onComplete) throws JMSException {
+        send(producer, destination, message, deliveryMode, priority, timeToLive, producer.getDisableMessageID(), producer.getDisableMessageID(), producerWindow, sendTimeout, onComplete);
+    }
+
+    /**
+     * Sends the message for dispatch by the broker.
+     *
+     * @param producer - message producer.
+     * @param destination - message destination.
+     * @param message - message to be sent.
+     * @param deliveryMode - JMS message delivery mode.
+     * @param priority - message priority.
+     * @param timeToLive - message expiration.
+     * @param disableTimestamp - disable timestamp.
+     * @param disableMessageID - optionally, disable messageID.
+     * @param producerWindow
+     * @param onComplete
+     * @throws JMSException
+     */
+    protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive,
+                        boolean disableMessageID, boolean disableMessageTimestamp, MemoryUsage producerWindow, int sendTimeout, AsyncCallback onComplete) throws JMSException {
 
         checkClosed();
         if (destination.isTemporary() && connection.isDeleted(destination)) {
@@ -1956,12 +1980,22 @@
             //Set the "JMS" header fields on the original message, see 1.1 spec section 3.4.11
             message.setJMSDeliveryMode(deliveryMode);
             long expiration = 0L;
-            if (!producer.getDisableMessageTimestamp()) {
-                long timeStamp = System.currentTimeMillis();
+            long timeStamp = System.currentTimeMillis();
+            if (timeToLive > 0) {
+                expiration = timeToLive + timeStamp;
+            }
+
+            // TODO: AMQ-8500 - update this when openwire supports JMSDeliveryTime
+            // ref: ActiveMQMessageTransformation#copyProperties
+            if(!(message instanceof ActiveMQMessage)) {
+                setForeignMessageDeliveryTime(message, timeStamp);
+            } else {
+                message.setJMSDeliveryTime(timeStamp);
+            }
+            if (!disableMessageTimestamp && !producer.getDisableMessageTimestamp()) {
                 message.setJMSTimestamp(timeStamp);
-                if (timeToLive > 0) {
-                    expiration = timeToLive + timeStamp;
-                }
+            } else {
+                message.setJMSTimestamp(0l);
             }
             message.setJMSExpiration(expiration);
             message.setJMSPriority(priority);
@@ -2287,4 +2321,22 @@
     protected ThreadPoolExecutor getConnectionExecutor() {
         return this.connectionExecutor;
     }
+
+    private static void setForeignMessageDeliveryTime(final Message foreignMessage, final long deliveryTime) throws JMSException {
+        // Check for JMS v2 message via presence of setJMSDeliveryTime
+        Method deliveryTimeMethod = null;
+        try {
+            Class<?> clazz = foreignMessage.getClass();
+            Method method = clazz.getMethod("setJMSDeliveryTime", new Class[] { long.class });
+            if (!Modifier.isAbstract(method.getModifiers())) {
+                deliveryTimeMethod = method;
+            }
+        } catch (NoSuchMethodException e) {
+            // non-JMS v2 message
+        }
+
+        if (deliveryTimeMethod != null) {
+            foreignMessage.setJMSDeliveryTime(deliveryTime);
+        }
+    }
 }
diff --git a/activemq-client/src/main/java/org/apache/activemq/util/JMSExceptionSupport.java b/activemq-client/src/main/java/org/apache/activemq/util/JMSExceptionSupport.java
index a73f01f..41711ca 100644
--- a/activemq-client/src/main/java/org/apache/activemq/util/JMSExceptionSupport.java
+++ b/activemq-client/src/main/java/org/apache/activemq/util/JMSExceptionSupport.java
@@ -16,10 +16,21 @@
  */
 package org.apache.activemq.util;
 
+import javax.jms.IllegalStateRuntimeException;
+import javax.jms.InvalidClientIDRuntimeException;
+import javax.jms.InvalidDestinationRuntimeException;
+import javax.jms.InvalidSelectorRuntimeException;
 import javax.jms.JMSException;
+import javax.jms.JMSRuntimeException;
 import javax.jms.JMSSecurityException;
+import javax.jms.JMSSecurityRuntimeException;
 import javax.jms.MessageEOFException;
 import javax.jms.MessageFormatException;
+import javax.jms.MessageFormatRuntimeException;
+import javax.jms.MessageNotWriteableRuntimeException;
+import javax.jms.ResourceAllocationRuntimeException;
+import javax.jms.TransactionInProgressRuntimeException;
+import javax.jms.TransactionRolledBackRuntimeException;
 
 import org.apache.activemq.MaxFrameSizeExceededException;
 
@@ -105,4 +116,38 @@
         exception.initCause(cause);
         return exception;
     }
+
+    public static JMSRuntimeException convertToJMSRuntimeException(JMSException e) {
+        if (e instanceof javax.jms.IllegalStateException) {
+            return new IllegalStateRuntimeException(e.getMessage(), e.getErrorCode(), e);
+        }
+        if (e instanceof javax.jms.InvalidClientIDException) {
+            return new InvalidClientIDRuntimeException(e.getMessage(), e.getErrorCode(), e);
+        }
+        if (e instanceof javax.jms.InvalidDestinationException) {
+            return new InvalidDestinationRuntimeException(e.getMessage(), e.getErrorCode(), e);
+        }
+        if (e instanceof javax.jms.InvalidSelectorException) {
+            return new InvalidSelectorRuntimeException(e.getMessage(), e.getErrorCode(), e);
+        }
+        if (e instanceof javax.jms.JMSSecurityException) {
+            return new JMSSecurityRuntimeException(e.getMessage(), e.getErrorCode(), e);
+        }
+        if (e instanceof javax.jms.MessageFormatException) {
+            return new MessageFormatRuntimeException(e.getMessage(), e.getErrorCode(), e);
+        }
+        if (e instanceof javax.jms.MessageNotWriteableException) {
+            return new MessageNotWriteableRuntimeException(e.getMessage(), e.getErrorCode(), e);
+        }
+        if (e instanceof javax.jms.ResourceAllocationException) {
+            return new ResourceAllocationRuntimeException(e.getMessage(), e.getErrorCode(), e);
+        }
+        if (e instanceof javax.jms.TransactionInProgressException) {
+            return new TransactionInProgressRuntimeException(e.getMessage(), e.getErrorCode(), e);
+        }
+        if (e instanceof javax.jms.TransactionRolledBackException) {
+            return new TransactionRolledBackRuntimeException(e.getMessage(), e.getErrorCode(), e);
+        }
+        return new JMSRuntimeException(e.getMessage(), e.getErrorCode(), e);
+    }
 }
diff --git a/activemq-unit-tests/pom.xml b/activemq-unit-tests/pom.xml
index 0770874..7a8ec71 100644
--- a/activemq-unit-tests/pom.xml
+++ b/activemq-unit-tests/pom.xml
@@ -323,9 +323,9 @@
             <link>http://junit.sourceforge.net/javadoc/</link>
           </links>
           <stylesheetfile>${basedir}/../etc/css/stylesheet.css</stylesheetfile>
-          <linksource>true</linksource>
           <maxmemory>256m</maxmemory>
           <source>${source-version}</source>
+          <noindex>true</noindex>
           <groups>
             <group>
               <title>JMS Client</title>
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2AckModesTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2AckModesTest.java
new file mode 100644
index 0000000..25e45ec
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2AckModesTest.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.jms2;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSConsumer;
+import javax.jms.JMSContext;
+import javax.jms.Message;
+
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.broker.jmx.DestinationViewMBean;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.util.Wait;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(value = Parameterized.class)
+public class ActiveMQJMS2AckModesTest extends ActiveMQJMS2TestBase {
+
+    private final String destinationName;
+    private final String destinationType;
+    private final int ackMode;
+    private final String messagePayload;
+
+    public ActiveMQJMS2AckModesTest(String destinationType, int ackMode) {
+        this.destinationName = "AMQ.JMS2.ACKMODE." + Integer.toString(ackMode) + destinationType.toUpperCase();
+        this.destinationType = destinationType;
+        this.ackMode = ackMode;
+        this.messagePayload = "Test message destType: " + destinationType + " ackMode: " + Integer.toString(ackMode);
+    }
+
+    @Parameterized.Parameters(name="destinationType={0}, ackMode={1}")
+    public static Collection<Object[]> data() {
+        return Arrays.asList(new Object[][] {
+                {"queue", ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE },
+                {"queue", ActiveMQSession.AUTO_ACKNOWLEDGE },
+                {"queue", ActiveMQSession.CLIENT_ACKNOWLEDGE },
+                {"queue", ActiveMQSession.DUPS_OK_ACKNOWLEDGE },
+                {"queue", ActiveMQSession.SESSION_TRANSACTED },
+                {"topic", ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE },
+                {"topic", ActiveMQSession.AUTO_ACKNOWLEDGE },
+                {"topic", ActiveMQSession.CLIENT_ACKNOWLEDGE },
+                {"topic", ActiveMQSession.DUPS_OK_ACKNOWLEDGE },
+                {"topic", ActiveMQSession.SESSION_TRANSACTED },
+                {"temp-queue", ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE },
+                {"temp-queue", ActiveMQSession.AUTO_ACKNOWLEDGE },
+                {"temp-queue", ActiveMQSession.CLIENT_ACKNOWLEDGE },
+                {"temp-queue", ActiveMQSession.DUPS_OK_ACKNOWLEDGE },
+                {"temp-queue", ActiveMQSession.SESSION_TRANSACTED },
+                {"temp-topic", ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE },
+                {"temp-topic", ActiveMQSession.AUTO_ACKNOWLEDGE },
+                {"temp-topic", ActiveMQSession.CLIENT_ACKNOWLEDGE },
+                {"temp-topic", ActiveMQSession.DUPS_OK_ACKNOWLEDGE },
+                {"temp-topic", ActiveMQSession.SESSION_TRANSACTED }
+        });
+    }
+
+    @Test
+    public void testAcknowledgementMode() {
+
+        try(JMSContext jmsContext = activemqConnectionFactory.createContext(DEFAULT_JMS_USER, DEFAULT_JMS_PASS, ackMode)) {
+            assertNotNull(jmsContext);
+            Destination destination = ActiveMQJMS2TestSupport.generateDestination(jmsContext, destinationType, destinationName);
+            assertNotNull(destination);
+            JMSConsumer jmsConsumer = jmsContext.createConsumer(destination);
+            assertNotNull(jmsConsumer);
+            jmsContext.start();
+
+            Message message = ActiveMQJMS2TestSupport.generateMessage(jmsContext, "text", messagePayload);
+
+            List<String> sentMessageIds = new LinkedList<>();
+            for(int deliveryMode : Arrays.asList(DeliveryMode.NON_PERSISTENT, DeliveryMode.PERSISTENT)) {
+                MessageData sendMessageData = new MessageData();
+                sendMessageData.setMessage(message).setDeliveryMode(deliveryMode);
+                sentMessageIds.add(ActiveMQJMS2TestSupport.sendMessage(jmsContext, destination, sendMessageData));
+            }
+
+            // For session transacted ack after all messages are sent
+            switch(ackMode) {
+            case ActiveMQSession.SESSION_TRANSACTED: jmsContext.commit(); break;
+            default: break;
+            }
+
+            Message recvMessage = null;
+            Message lastRecvMessage = null;
+            List<Message> recvMessages = new LinkedList<>();
+
+            recvMessage = jmsConsumer.receive(2000l);
+
+            while (recvMessage != null) {
+                recvMessages.add(recvMessage);
+                lastRecvMessage = recvMessage;
+                switch(ackMode) {
+                case ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE: lastRecvMessage.acknowledge(); break;
+                default: break;
+                }
+
+                recvMessage = jmsConsumer.receive(500l);
+            }
+
+            assertEquals(Integer.valueOf(2), Integer.valueOf(recvMessages.size()));
+
+            switch(ackMode) {
+            case ActiveMQSession.CLIENT_ACKNOWLEDGE: lastRecvMessage.acknowledge(); break;
+            case ActiveMQSession.SESSION_TRANSACTED: jmsContext.commit(); break;
+            default: break;
+            }
+            jmsConsumer.close();
+
+            int foundCount = 0;
+            for(int validDeliveryMode : Arrays.asList(DeliveryMode.NON_PERSISTENT, DeliveryMode.PERSISTENT)) {
+                for(javax.jms.Message tmpMessage : recvMessages) {
+                    if(tmpMessage.getJMSDeliveryMode() == validDeliveryMode) {
+                        MessageData messageData = new MessageData();
+                        messageData.setMessageType("text").setMessagePayload(messagePayload).setDeliveryMode(validDeliveryMode);
+                        ActiveMQJMS2TestSupport.validateMessageData(tmpMessage, messageData);
+                        foundCount++;
+                    }
+                }
+            }
+            assertEquals(Integer.valueOf(2), Integer.valueOf(foundCount));
+
+            DestinationViewMBean destinationViewMBean = getDestinationViewMBean(destinationType, (ActiveMQDestination)destination);
+            assertTrue("QueueSize = 0 expected", Wait.waitFor(new Wait.Condition() {
+                @Override
+                public boolean isSatisified() throws Exception {
+                    return destinationViewMBean.getQueueSize() == 0l;
+                }
+            }, 5000l, 10l));
+        } catch (Exception e) {
+            fail(e.getMessage());
+        }
+    }
+
+
+}
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2ContextTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2ContextTest.java
new file mode 100644
index 0000000..4891603
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2ContextTest.java
@@ -0,0 +1,360 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.jms2;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.Enumeration;
+
+import javax.jms.CompletionListener;
+import javax.jms.Destination;
+import javax.jms.JMSConsumer;
+import javax.jms.JMSContext;
+import javax.jms.JMSException;
+import javax.jms.JMSProducer;
+import javax.jms.JMSRuntimeException;
+import javax.jms.Message;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import org.apache.activemq.ActiveMQContext;
+import org.junit.Test;
+
+public class ActiveMQJMS2ContextTest extends ActiveMQJMS2TestBase {
+
+    @Test
+    public void testConnectionFactoryCreateContext() {
+        try(JMSContext jmsContext = activemqConnectionFactory.createContext()) {
+            assertNotNull(jmsContext);
+            jmsContext.start();
+            assertTrue(ActiveMQContext.class.isAssignableFrom(jmsContext.getClass()));
+            Destination destination = jmsContext.createQueue(methodNameDestinationName);
+            sendMessage(jmsContext, destination, "Test-" + methodNameDestinationName);
+            recvMessage(jmsContext, destination, "Test-" + methodNameDestinationName);
+        } catch (JMSException e) {
+            fail(e.getMessage());
+        }
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testConnectionFactoryCreateContextSession() {
+        activemqConnectionFactory.createContext(Session.AUTO_ACKNOWLEDGE);
+    }
+
+    @Test
+    public void testConnectionFactoryCreateContextUserPass() {
+        try(JMSContext jmsContext = activemqConnectionFactory.createContext(DEFAULT_JMS_USER, DEFAULT_JMS_PASS)) {
+            assertNotNull(jmsContext);
+            jmsContext.start();
+            assertTrue(ActiveMQContext.class.isAssignableFrom(jmsContext.getClass()));
+            Destination destination = jmsContext.createQueue(methodNameDestinationName);
+            sendMessage(jmsContext, destination, "Test-" + methodNameDestinationName);
+            recvMessage(jmsContext, destination, "Test-" + methodNameDestinationName);
+        } catch (JMSException e) {
+            fail(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testConnectionFactoryCreateContextUserPassSession() {
+        try(JMSContext jmsContext = activemqConnectionFactory.createContext(DEFAULT_JMS_USER, DEFAULT_JMS_PASS, Session.AUTO_ACKNOWLEDGE)) {
+            assertNotNull(jmsContext);
+            assertTrue(ActiveMQContext.class.isAssignableFrom(jmsContext.getClass()));
+        }
+    }
+
+    @Test
+    public void testConnectionFactoryCreateContexMultiContext() {
+        JMSContext secondJMSContext = null;
+        JMSContext thirdJMSContext = null;
+
+        try(JMSContext jmsContext = activemqConnectionFactory.createContext(DEFAULT_JMS_USER, DEFAULT_JMS_PASS)) {
+            assertNotNull(jmsContext);
+            jmsContext.start();
+            assertTrue(ActiveMQContext.class.isAssignableFrom(jmsContext.getClass()));
+
+            Destination testDestination = jmsContext.createQueue(methodNameDestinationName);
+            sendMessage(jmsContext, testDestination, "Test-" + methodNameDestinationName);
+            recvMessage(jmsContext, testDestination, "Test-" + methodNameDestinationName);
+
+            secondJMSContext = jmsContext.createContext(Session.AUTO_ACKNOWLEDGE);
+        } catch (JMSException e) {
+            fail(e.getMessage());
+        }
+
+        // First context closed
+        String secondTestDestinationName = methodNameDestinationName + ".SECOND";
+        Destination secondTestDestination = secondJMSContext.createQueue(secondTestDestinationName);
+
+        try {
+            sendMessage(secondJMSContext, secondTestDestination, "Test-" + methodNameDestinationName);
+            recvMessage(secondJMSContext, secondTestDestination, "Test-" + methodNameDestinationName);
+        } catch (JMSException e) {
+            fail(e.getMessage());
+        } finally {
+            if(secondJMSContext != null) {
+                try { secondJMSContext.close(); } catch (JMSRuntimeException e) { fail(e.getMessage()); }
+            }
+        }
+
+        // Attempt to obtain a third context after all contexts have been closed
+        boolean caught = false;
+        try {
+            thirdJMSContext = secondJMSContext.createContext(Session.AUTO_ACKNOWLEDGE);
+            fail("JMSRuntimeException expected");
+        } catch (JMSRuntimeException e) {
+            assertNull(thirdJMSContext);
+            caught = true;
+            assertEquals("Context already closed", e.getMessage());
+        }
+        assertTrue(caught);
+    }
+
+    @Test
+    public void testConnectionFactoryCreateContextBrowse() {
+        try(JMSContext jmsContext = activemqConnectionFactory.createContext()) {
+            assertNotNull(jmsContext);
+            jmsContext.start();
+            assertTrue(ActiveMQContext.class.isAssignableFrom(jmsContext.getClass()));
+            Destination destination = jmsContext.createQueue(methodNameDestinationName);
+            sendMessage(jmsContext, destination, "Test-" + methodNameDestinationName);
+            browseMessage(jmsContext, destination, "Test-" + methodNameDestinationName, true);
+        } catch (JMSException e) {
+            fail(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testConnectionFactoryCreateContextBrowseAutoStart() {
+        try(JMSContext jmsContext = activemqConnectionFactory.createContext()) {
+            assertNotNull(jmsContext);
+            assertTrue(ActiveMQContext.class.isAssignableFrom(jmsContext.getClass()));
+            Destination destination = jmsContext.createQueue(methodNameDestinationName);
+            sendMessage(jmsContext, destination, "Test-" + methodNameDestinationName);
+            browseMessage(jmsContext, destination, "Test-" + methodNameDestinationName, true);
+        } catch (JMSException e) {
+            fail(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testConnectionFactoryCreateContextBrowseAutoStartFalse() {
+        try(JMSContext jmsContext = activemqConnectionFactory.createContext()) {
+            assertNotNull(jmsContext);
+            jmsContext.setAutoStart(false);
+            assertTrue(ActiveMQContext.class.isAssignableFrom(jmsContext.getClass()));
+            Destination destination = jmsContext.createQueue(methodNameDestinationName);
+            sendMessage(jmsContext, destination, "Test-" + methodNameDestinationName);
+            browseMessage(jmsContext, destination, "Test-" + methodNameDestinationName, false);
+        } catch (JMSException e) {
+            fail(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testConnectionFactoryCreateContextBrowseAutoStartFalseStartDelayed() {
+        try(JMSContext jmsContext = activemqConnectionFactory.createContext()) {
+            assertNotNull(jmsContext);
+            jmsContext.setAutoStart(false);
+            assertTrue(ActiveMQContext.class.isAssignableFrom(jmsContext.getClass()));
+            Destination destination = jmsContext.createQueue(methodNameDestinationName);
+            sendMessage(jmsContext, destination, "Test-" + methodNameDestinationName);
+            jmsContext.start();
+            browseMessage(jmsContext, destination, "Test-" + methodNameDestinationName, true);
+        } catch (JMSException e) {
+            fail(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testDisableMessageID() {
+        try(JMSContext jmsContext = activemqConnectionFactory.createContext()) {
+            assertNotNull(jmsContext);
+            jmsContext.setAutoStart(false);
+            assertTrue(ActiveMQContext.class.isAssignableFrom(jmsContext.getClass()));
+            jmsContext.start();
+            JMSProducer jmsProducer = jmsContext.createProducer();
+            jmsProducer.setDisableMessageID(true);
+            Destination destination = jmsContext.createQueue(methodNameDestinationName);
+            Message sendMessage = jmsContext.createTextMessage("Test-" + methodNameDestinationName);
+            jmsProducer.send(destination, sendMessage);
+            JMSConsumer jmsConsumer = jmsContext.createConsumer(destination);
+            Message recvMessage = jmsConsumer.receive(5000l);
+            assertNotNull(recvMessage);
+            // Verify messageID since ActiveMQ does not disableMessageID
+            assertEquals(sendMessage.getJMSMessageID(), recvMessage.getJMSMessageID());
+        } catch (JMSException e) {
+            fail(e.getMessage());
+        }
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testConnectionSharedConnectionConsumer() throws JMSException {
+        connection.createSharedConnectionConsumer(null, null, null, null, 10);
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testConnectionSharedDurableConnectionConsumer() throws JMSException {
+        connection.createSharedDurableConnectionConsumer(null, null, null, null, 10);
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testSessionAckMode() throws JMSException {
+        connection.createSession(Session.AUTO_ACKNOWLEDGE);
+    }
+
+    @Test
+    public void testSessionDurableConsumer() throws JMSException {
+        try(JMSContext jmsContext = activemqConnectionFactory.createContext()) {
+            assertNotNull(jmsContext);
+            jmsContext.setAutoStart(false);
+            jmsContext.setClientID(testName.getMethodName());
+            jmsContext.start();
+            assertTrue(ActiveMQContext.class.isAssignableFrom(jmsContext.getClass()));
+            Topic topic = jmsContext.createTopic(methodNameDestinationName);
+
+            JMSConsumer jmsConsumerRegisterDurableConsumer = jmsContext.createDurableConsumer(topic, testName.getMethodName());
+            jmsConsumerRegisterDurableConsumer.close();
+            sendMessage(jmsContext, topic, "Test-" + methodNameDestinationName);
+            recvMessageDurable(jmsContext, topic, testName.getMethodName(), null, false, "Test-" + methodNameDestinationName);
+        } catch (JMSException e) {
+            fail(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testSessionDurableConsumerSelectorNoLocal() throws JMSException {
+        try(JMSContext jmsContext = activemqConnectionFactory.createContext()) {
+            assertNotNull(jmsContext);
+            jmsContext.setAutoStart(false);
+            jmsContext.setClientID(testName.getMethodName());
+            jmsContext.start();
+            assertTrue(ActiveMQContext.class.isAssignableFrom(jmsContext.getClass()));
+            Topic topic = jmsContext.createTopic(methodNameDestinationName);
+
+            JMSConsumer jmsConsumerRegisterDurableConsumer = jmsContext.createDurableConsumer(topic, testName.getMethodName(), "TRUE", false);
+            jmsConsumerRegisterDurableConsumer.close();
+            sendMessage(jmsContext, topic, "Test-" + methodNameDestinationName);
+            recvMessageDurable(jmsContext, topic, testName.getMethodName(), "TRUE", false, "Test-" + methodNameDestinationName);
+        } catch (JMSException e) {
+            fail(e.getMessage());
+        }
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testSessionSharedConsumer() throws JMSException {
+        session.createSharedConsumer(null, null);
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testSessionSharedConsumerSelector() throws JMSException {
+        session.createSharedConsumer(null, null, null);
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testSessionSharedDurableConsumer() throws JMSException {
+        session.createSharedDurableConsumer(null, null);
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testSessionSharedDurableConsumerSelector() throws JMSException {
+        session.createSharedDurableConsumer(null, null, null);
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testProducerDeliveryDelayGet() throws JMSException {
+        messageProducer.getDeliveryDelay();
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testProducerDeliveryDelaySet() throws JMSException {
+        messageProducer.setDeliveryDelay(1000l);
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testProducerSendMessageCompletionListener() throws JMSException {
+         messageProducer.send(session.createQueue(methodNameDestinationName), null, (CompletionListener)null);
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testProducerSendMessageQoSParamsCompletionListener() throws JMSException {
+         messageProducer.send(null, 1, 4, 0l, null);
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testProducerSendDestinationMessageCompletionListener() throws JMSException {
+         messageProducer.send(session.createQueue(methodNameDestinationName), null, null);
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testProducerSendDestinationMessageQosParamsCompletionListener() throws JMSException {
+         messageProducer.send(session.createQueue(methodNameDestinationName), null, 1, 4, 0l, null);
+    }
+
+    protected static void sendMessage(JMSContext jmsContext, Destination testDestination, String textBody) {
+        assertNotNull(jmsContext);
+        JMSProducer jmsProducer = jmsContext.createProducer();
+        jmsProducer.send(testDestination, textBody);
+    }
+
+    protected static void browseMessage(JMSContext jmsContext, Destination testDestination, String expectedTextBody, boolean expectFound) throws JMSException {
+        assertNotNull(jmsContext);
+        assertTrue(Queue.class.isAssignableFrom(testDestination.getClass()));
+        Queue testQueue = Queue.class.cast(testDestination);
+        try(QueueBrowser queueBrowser = jmsContext.createBrowser(testQueue)) {
+            Enumeration<?> messageEnumeration = queueBrowser.getEnumeration();
+            assertNotNull(messageEnumeration);
+
+            boolean found = false; 
+            while(!found && messageEnumeration.hasMoreElements()) {
+                javax.jms.Message message = (javax.jms.Message)messageEnumeration.nextElement();
+                assertNotNull(message);
+                assertTrue(TextMessage.class.isAssignableFrom(message.getClass()));
+                assertEquals(expectedTextBody, TextMessage.class.cast(message).getText());
+                found = true;
+            }
+            assertEquals(expectFound, found);
+        }
+    }
+
+    protected static void recvMessage(JMSContext jmsContext, Destination testDestination, String expectedTextBody) throws JMSException {
+        assertNotNull(jmsContext);
+        try(JMSConsumer jmsConsumer = jmsContext.createConsumer(testDestination)) {
+            javax.jms.Message message = jmsConsumer.receive(1000l);
+            assertNotNull(message);
+            assertTrue(TextMessage.class.isAssignableFrom(message.getClass()));
+            assertEquals(expectedTextBody, TextMessage.class.cast(message).getText());
+        }
+    }
+
+    protected static void recvMessageDurable(JMSContext jmsContext, Topic testTopic, String subscriptionName, String selector, boolean noLocal, String expectedTextBody) throws JMSException {
+        assertNotNull(jmsContext);
+        try(JMSConsumer jmsConsumer = jmsContext.createDurableConsumer(testTopic, subscriptionName, selector, noLocal)) {
+            javax.jms.Message message = jmsConsumer.receive(1000l);
+            assertNotNull(message);
+            assertTrue(TextMessage.class.isAssignableFrom(message.getClass()));
+            assertEquals(expectedTextBody, TextMessage.class.cast(message).getText());
+        }
+    }
+}
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2MessageListenerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2MessageListenerTest.java
new file mode 100644
index 0000000..917a26b
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2MessageListenerTest.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.jms2;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSConsumer;
+import javax.jms.JMSContext;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.util.Wait;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(value = Parameterized.class)
+public class ActiveMQJMS2MessageListenerTest extends ActiveMQJMS2TestBase {
+
+    private final String destinationName;
+    private final String destinationType;
+    private final int ackMode;
+    private final String messagePayload;
+
+    public ActiveMQJMS2MessageListenerTest(String destinationType, int ackMode) {
+        this.destinationName = "AMQ.JMS2.ACKMODE." + Integer.toString(ackMode) + destinationType.toUpperCase();
+        this.destinationType = destinationType;
+        this.ackMode = ackMode;
+        this.messagePayload = "Test message destType: " + destinationType + " ackMode: " + Integer.toString(ackMode);
+    }
+
+    @Parameterized.Parameters(name="destinationType={0}, ackMode={1}")
+    public static Collection<Object[]> data() {
+        return Arrays.asList(new Object[][] {
+                {"queue", ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE },
+                {"queue", Session.AUTO_ACKNOWLEDGE },
+                {"queue", Session.CLIENT_ACKNOWLEDGE },
+                {"queue", Session.DUPS_OK_ACKNOWLEDGE },
+                {"queue", Session.SESSION_TRANSACTED }
+        });
+    }
+
+    @Test
+    public void testMessageListener() {
+
+        try(JMSContext jmsContext = activemqConnectionFactory.createContext(DEFAULT_JMS_USER, DEFAULT_JMS_PASS, ackMode)) {
+            assertNotNull(jmsContext);
+            Destination destination = ActiveMQJMS2TestSupport.generateDestination(jmsContext, destinationType, destinationName);
+            assertNotNull(destination);
+            QueueViewMBean localQueueViewMBean = getQueueViewMBean((ActiveMQDestination)destination);
+            JMSConsumer jmsConsumer = jmsContext.createConsumer(destination);
+
+            AtomicInteger receivedMessageCount = new AtomicInteger();
+            AtomicInteger exceptionCount = new AtomicInteger();
+            CountDownLatch countDownLatch = new CountDownLatch(2);
+
+            jmsConsumer.setMessageListener(new MessageListener() {
+                @Override
+                public void onMessage(Message message) {
+                    countDownLatch.countDown();
+                    receivedMessageCount.incrementAndGet();
+                    try {
+                        switch(ackMode) {
+                        case Session.CLIENT_ACKNOWLEDGE: message.acknowledge(); break;
+                        case ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE: message.acknowledge(); break;
+                        default: break;
+                        }
+                    } catch (JMSException e) {
+                        exceptionCount.incrementAndGet();
+                    }
+                }
+            });
+            jmsContext.start();
+
+            Message message = ActiveMQJMS2TestSupport.generateMessage(jmsContext, "text", messagePayload);
+
+            List<String> sentMessageIds = new LinkedList<>();
+            for(int deliveryMode : Arrays.asList(DeliveryMode.NON_PERSISTENT, DeliveryMode.PERSISTENT)) {
+                MessageData sendMessageData = new MessageData();
+                sendMessageData.setMessage(message).setDeliveryMode(deliveryMode);
+                sentMessageIds.add(ActiveMQJMS2TestSupport.sendMessage(jmsContext, destination, sendMessageData));
+            }
+
+            // For session transacted ack we ack after all messages are sent
+            switch(ackMode) {
+            case ActiveMQSession.SESSION_TRANSACTED:
+                assertEquals(Long.valueOf(0), Long.valueOf(localQueueViewMBean.getEnqueueCount()));
+                jmsContext.commit();
+                assertEquals(Long.valueOf(2), Long.valueOf(localQueueViewMBean.getEnqueueCount()));
+                break;
+            default: 
+                assertEquals(Long.valueOf(2), Long.valueOf(localQueueViewMBean.getEnqueueCount()));
+                break;
+            }
+
+            countDownLatch.await(5, TimeUnit.SECONDS);
+
+            assertEquals(Integer.valueOf(2), Integer.valueOf(receivedMessageCount.get()));
+            assertEquals(Integer.valueOf(0), Integer.valueOf(exceptionCount.get()));
+
+            // For session transacted we ack after all messages are received
+            switch(ackMode) {
+            case ActiveMQSession.SESSION_TRANSACTED:
+                assertEquals(Long.valueOf(0), Long.valueOf(localQueueViewMBean.getDequeueCount()));
+                jmsContext.commit();
+                break;
+            default: break;
+            }
+            jmsConsumer.close();
+
+            assertTrue("DequeueCount = 2 and QueueSize = 0 expected", Wait.waitFor(new Wait.Condition() {
+                @Override
+                public boolean isSatisified() throws Exception {
+                    return localQueueViewMBean.getDequeueCount() == 2l && localQueueViewMBean.getQueueSize() == 0l;
+                }
+            }, 5000l, 100l));
+
+        } catch (Exception e) {
+            fail(e.getMessage());
+        }
+    }
+}
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2MessageTypesTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2MessageTypesTest.java
new file mode 100644
index 0000000..465c067
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2MessageTypesTest.java
@@ -0,0 +1,597 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.jms2;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.IllegalStateRuntimeException;
+import javax.jms.JMSConsumer;
+import javax.jms.JMSContext;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+import javax.jms.Topic;
+
+import org.apache.activemq.util.Wait;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(value = Parameterized.class)
+public class ActiveMQJMS2MessageTypesTest extends ActiveMQJMS2TestBase {
+
+    private final Set<Integer> VALID_PRIORITIES = Set.of(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+    private final String clientID;
+    private final String destinationType;
+    private final String messagePayload;
+    private final String messageType;
+
+    public ActiveMQJMS2MessageTypesTest(String destinationType, String messageType) {
+        this.clientID = destinationType + "-" + messageType;
+        this.destinationType = destinationType;
+        this.messagePayload = "Test message payload";
+        this.messageType = messageType;
+    }
+
+    @Parameterized.Parameters(name="destinationType={0}, messageType={1}")
+    public static Collection<Object[]> data() {
+        return Arrays.asList(new Object[][] {
+                {"queue", "bytes"},
+                {"queue", "map"},
+                {"queue", "object"},
+                {"queue", "stream"},
+                {"queue", "text"},
+                {"topic", "bytes"},
+                {"topic", "map"},
+                {"topic", "object"},
+                {"topic", "stream"},
+                {"topic", "text"},
+                {"temp-queue", "bytes"},
+                {"temp-queue", "map"},
+                {"temp-queue", "object"},
+                {"temp-queue", "stream"},
+                {"temp-queue", "text"},
+                {"temp-topic", "bytes"},
+                {"temp-topic", "map"},
+                {"temp-topic", "object"},
+                {"temp-topic", "stream"},
+                {"temp-topic", "text"},
+        });
+    }
+
+    @Test
+    public void testMessageDeliveryMode() {
+
+        try(JMSContext jmsContext = activemqConnectionFactory.createContext(DEFAULT_JMS_USER, DEFAULT_JMS_PASS, Session.AUTO_ACKNOWLEDGE)) {
+            assertNotNull(jmsContext);
+            Destination destination = ActiveMQJMS2TestSupport.generateDestination(jmsContext, destinationType, methodNameDestinationName);
+            assertNotNull(destination);
+            JMSConsumer jmsConsumer = jmsContext.createConsumer(destination);
+            assertNotNull(jmsConsumer);
+
+            Message message = ActiveMQJMS2TestSupport.generateMessage(jmsContext, messageType, messagePayload);
+
+            List<String> sentMessageIds = new LinkedList<>();
+            for(int deliveryMode : Arrays.asList(DeliveryMode.NON_PERSISTENT, DeliveryMode.PERSISTENT)) {
+                MessageData sendMessageData = new MessageData();
+                sendMessageData.setMessage(message).setDeliveryMode(deliveryMode);
+                sentMessageIds.add(ActiveMQJMS2TestSupport.sendMessage(jmsContext, destination, sendMessageData));
+            }
+
+            List<Message> recvMessages = new CopyOnWriteArrayList<>();
+            AtomicBoolean receivedExpected = new AtomicBoolean(false);
+            AtomicInteger exceptionCount = new AtomicInteger();
+
+            jmsConsumer.setMessageListener(new MessageListener() {
+                private boolean recvNonPersistent = false;
+                private boolean recvPersistent = false;
+
+                @Override
+                public void onMessage(Message message) {
+                    recvMessages.add(message);
+                    try {
+                        switch(message.getJMSDeliveryMode()) {
+                        case DeliveryMode.NON_PERSISTENT: recvNonPersistent = true; break;
+                        case DeliveryMode.PERSISTENT: recvPersistent = true; break;
+                        default: break;
+                        }
+
+                        if(recvNonPersistent && recvPersistent) {
+                            receivedExpected.set(true);
+                        }
+                    } catch (JMSException e) {
+                        exceptionCount.incrementAndGet();
+                    }
+                }
+            });
+            jmsContext.start();
+
+            assertTrue("Expected to receive both DeliveryModes", Wait.waitFor(new Wait.Condition() {
+                @Override
+                public boolean isSatisified() throws Exception {
+                    return receivedExpected.get();
+                }
+            }, 5000l, 100l));
+            jmsConsumer.close();
+            jmsContext.stop();
+
+            assertEquals(Integer.valueOf(2), Integer.valueOf(recvMessages.size()));
+        } catch (Exception e) {
+            fail(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testMessageDeliveryModeInvalid() {
+
+        try(JMSContext jmsContext = activemqConnectionFactory.createContext(DEFAULT_JMS_USER, DEFAULT_JMS_PASS, Session.AUTO_ACKNOWLEDGE)) {
+            assertNotNull(jmsContext);
+            Destination destination = ActiveMQJMS2TestSupport.generateDestination(jmsContext, destinationType, methodNameDestinationName);
+            assertNotNull(destination);
+            jmsContext.start();
+
+            Message message = ActiveMQJMS2TestSupport.generateMessage(jmsContext, messageType, messagePayload);
+            MessageData sendMessageData = new MessageData();
+            sendMessageData.setMessage(message).setDeliveryMode(99);
+            boolean caught = false;
+            try {
+                ActiveMQJMS2TestSupport.sendMessage(jmsContext, destination, sendMessageData);
+                fail("IlegalStateRuntimeException expected");
+            } catch (IllegalStateRuntimeException e) {
+                assertEquals("unknown delivery mode: 99", e.getMessage());
+                caught = true;
+            }
+            assertTrue(caught);
+        } catch (Exception e) {
+            fail(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testMessagePriority() {
+
+        try(JMSContext jmsContext = activemqConnectionFactory.createContext(DEFAULT_JMS_USER, DEFAULT_JMS_PASS, Session.AUTO_ACKNOWLEDGE)) {
+            assertNotNull(jmsContext);
+            Destination destination = ActiveMQJMS2TestSupport.generateDestination(jmsContext, destinationType, methodNameDestinationName);
+            assertNotNull(destination);
+            JMSConsumer jmsConsumer = jmsContext.createConsumer(destination);
+            assertNotNull(jmsConsumer);
+
+            Message message = ActiveMQJMS2TestSupport.generateMessage(jmsContext, messageType, messagePayload);
+            List<String> sentMessageIds = new LinkedList<>();
+            for(int priority : VALID_PRIORITIES) {
+                MessageData sendMessageData = new MessageData();
+                sendMessageData.setMessage(message).setPriority(priority);
+                sentMessageIds.add(ActiveMQJMS2TestSupport.sendMessage(jmsContext, destination, sendMessageData));
+            }
+
+            final List<Message> recvMessages = new CopyOnWriteArrayList<>();
+            final CountDownLatch countDownLatch = new CountDownLatch(10);
+
+            jmsConsumer.setMessageListener(new MessageListener() {
+                @Override
+                public void onMessage(Message message) {
+                    recvMessages.add(message);
+                    countDownLatch.countDown();
+                }
+            });
+            jmsContext.start();
+
+            assertTrue("Expected to receive 10 messages", countDownLatch.await(5, TimeUnit.SECONDS));
+            jmsConsumer.close();
+            jmsContext.stop();
+
+            // Check that exactly 10 messages have been received 
+            assertEquals(Integer.valueOf(10), Integer.valueOf(recvMessages.size()));
+
+            int validatedCount = 0;
+            for(int validatedPriority : VALID_PRIORITIES) {
+                for(javax.jms.Message tmpMessage : recvMessages) {
+                    if(tmpMessage.getJMSPriority() == validatedPriority) {
+                        MessageData messageData = new MessageData();
+                        messageData.setMessageType(messageType).setMessagePayload(messagePayload).setPriority(validatedPriority);
+                        ActiveMQJMS2TestSupport.validateMessageData(tmpMessage, messageData);
+                        validatedCount++;
+                    }
+                }
+            }
+            assertEquals(Integer.valueOf(10), Integer.valueOf(validatedCount));
+        } catch (Exception e) {
+            fail(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testMessagePriorityInvalidLower() {
+
+        try(JMSContext jmsContext = activemqConnectionFactory.createContext(DEFAULT_JMS_USER, DEFAULT_JMS_PASS, Session.AUTO_ACKNOWLEDGE)) {
+            assertNotNull(jmsContext);
+            Destination destination = ActiveMQJMS2TestSupport.generateDestination(jmsContext, destinationType, methodNameDestinationName);
+            assertNotNull(destination);
+
+            Message message = ActiveMQJMS2TestSupport.generateMessage(jmsContext, messageType, messagePayload);
+            MessageData sendMessageData = new MessageData();
+            sendMessageData.setMessage(message).setPriority(-1);
+            boolean caught = false;
+            try {
+                ActiveMQJMS2TestSupport.sendMessage(jmsContext, destination, sendMessageData);
+                fail("IlegalStateRuntimeException expected");
+            } catch (IllegalStateRuntimeException e) {
+                assertEquals("default priority must be a value between 0 and 9", e.getMessage());
+                caught = true;
+            }
+            assertTrue(caught);
+        } catch (Exception e) {
+            fail(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testMessagePriorityInvalidHigher() {
+
+        try(JMSContext jmsContext = activemqConnectionFactory.createContext(DEFAULT_JMS_USER, DEFAULT_JMS_PASS, Session.AUTO_ACKNOWLEDGE)) {
+            assertNotNull(jmsContext);
+            Destination destination = ActiveMQJMS2TestSupport.generateDestination(jmsContext, destinationType, methodNameDestinationName);
+            assertNotNull(destination);
+
+            Message message = ActiveMQJMS2TestSupport.generateMessage(jmsContext, messageType, messagePayload);
+            MessageData sendMessageData = new MessageData();
+            sendMessageData.setMessage(message).setPriority(10);
+            boolean caught = false;
+            try {
+                ActiveMQJMS2TestSupport.sendMessage(jmsContext, destination, sendMessageData);
+                fail("IlegalStateRuntimeException expected");
+            } catch (IllegalStateRuntimeException e) {
+                assertEquals("default priority must be a value between 0 and 9", e.getMessage());
+                caught = true;
+            }
+            assertTrue(caught);
+        } catch (Exception e) {
+            fail(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testMessageTimestampTimeToLive() {
+
+        long timeToLive = 900000l;
+
+        try(JMSContext jmsContext = activemqConnectionFactory.createContext(DEFAULT_JMS_USER, DEFAULT_JMS_PASS, Session.AUTO_ACKNOWLEDGE)) {
+            assertNotNull(jmsContext);
+            Destination destination = ActiveMQJMS2TestSupport.generateDestination(jmsContext, destinationType, methodNameDestinationName);
+            assertNotNull(destination);
+            JMSConsumer jmsConsumer = jmsContext.createConsumer(destination);
+            assertNotNull(jmsConsumer);
+
+            Message message = ActiveMQJMS2TestSupport.generateMessage(jmsContext, messageType, messagePayload);
+
+            List<String> sentMessageIds = new LinkedList<>();
+            long messageExpiration = Long.MIN_VALUE;
+            long messageTimestamp = Long.MIN_VALUE;
+
+            MessageData sendMessageData = new MessageData();
+            sendMessageData.setMessage(message).setTimeToLive(timeToLive);
+            sentMessageIds.add(ActiveMQJMS2TestSupport.sendMessage(jmsContext, destination, sendMessageData));
+            messageExpiration = message.getJMSExpiration();
+            messageTimestamp = message.getJMSTimestamp();
+            assertNotEquals(Long.MIN_VALUE, messageExpiration);
+            assertNotEquals(Long.MIN_VALUE, messageTimestamp);
+
+            final List<Message> recvMessages = new CopyOnWriteArrayList<>();
+            final CountDownLatch countDownLatch = new CountDownLatch(1);
+
+            jmsConsumer.setMessageListener(new MessageListener() {
+                @Override
+                public void onMessage(Message message) {
+                    recvMessages.add(message);
+                    countDownLatch.countDown();
+                }
+            });
+            jmsContext.start();
+
+            assertTrue("Expected to receive 1 message", countDownLatch.await(5l, TimeUnit.SECONDS));
+            jmsConsumer.close();
+            jmsContext.stop();
+
+            // Check that exactly 1 messages have been received 
+            assertEquals(Integer.valueOf(1), Integer.valueOf(recvMessages.size()));
+
+            int validatedCount = 0;
+            for(javax.jms.Message tmpMessage : recvMessages) {
+                MessageData recvMessageData = new MessageData();
+                recvMessageData.setMessageType(messageType).setMessagePayload(messagePayload).setExpiration(messageExpiration).setTimestamp(messageTimestamp);
+                ActiveMQJMS2TestSupport.validateMessageData(tmpMessage, recvMessageData);
+                validatedCount++;
+            }
+            assertEquals(Integer.valueOf(1), Integer.valueOf(validatedCount));
+        } catch (Exception e) {
+            fail(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testMessageDisableTimestamp() {
+
+        long timeToLive = 900000l;
+
+        try(JMSContext jmsContext = activemqConnectionFactory.createContext(DEFAULT_JMS_USER, DEFAULT_JMS_PASS, Session.AUTO_ACKNOWLEDGE)) {
+            assertNotNull(jmsContext);
+            Destination destination = ActiveMQJMS2TestSupport.generateDestination(jmsContext, destinationType, methodNameDestinationName);
+            assertNotNull(destination);
+            JMSConsumer jmsConsumer = jmsContext.createConsumer(destination);
+            assertNotNull(jmsConsumer);
+
+            Message message = ActiveMQJMS2TestSupport.generateMessage(jmsContext, messageType, messagePayload);
+
+            List<String> sentMessageIds = new LinkedList<>();
+            long messageExpiration = Long.MIN_VALUE;
+            long messageTimestamp = Long.MIN_VALUE;
+            MessageData sendMessageData = new MessageData();
+            sendMessageData.setMessage(message).setTimeToLive(timeToLive).setDisableMessageTimestamp(true);
+            sentMessageIds.add(ActiveMQJMS2TestSupport.sendMessage(jmsContext, destination, sendMessageData));
+            messageExpiration = message.getJMSExpiration();
+            messageTimestamp = message.getJMSTimestamp();
+            assertEquals(0l, messageTimestamp);
+            assertNotEquals(Long.MIN_VALUE, messageExpiration);
+
+            final List<Message> recvMessages = new CopyOnWriteArrayList<>();
+            final CountDownLatch countDownLatch = new CountDownLatch(1);
+
+            jmsConsumer.setMessageListener(new MessageListener() {
+                @Override
+                public void onMessage(Message message) {
+                    recvMessages.add(message);
+                    countDownLatch.countDown();
+                }
+            });
+            jmsContext.start();
+
+            assertTrue("Expected to receive 1 message", countDownLatch.await(5l, TimeUnit.SECONDS));
+            jmsConsumer.close();
+            jmsContext.stop();
+
+            for(javax.jms.Message tmpMessage : recvMessages) {
+                MessageData recvMessageData = new MessageData();
+                recvMessageData.setMessagePayload(messagePayload).setMessageType(messageType).setExpiration(messageExpiration).setTimestamp(messageTimestamp).setDisableMessageTimestamp(true);
+                ActiveMQJMS2TestSupport.validateMessageData(tmpMessage, recvMessageData);
+            }
+        } catch (Exception e) {
+            fail(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testMessageNonQOSHeaders() {
+
+        String jmsCorrelationID = UUID.randomUUID().toString();
+
+        try(JMSContext jmsContext = activemqConnectionFactory.createContext(DEFAULT_JMS_USER, DEFAULT_JMS_PASS, Session.AUTO_ACKNOWLEDGE)) {
+            assertNotNull(jmsContext);
+            Destination destination = ActiveMQJMS2TestSupport.generateDestination(jmsContext, destinationType, methodNameDestinationName);
+            assertNotNull(destination);
+            JMSConsumer jmsConsumer = jmsContext.createConsumer(destination);
+            assertNotNull(jmsConsumer);
+
+            Message message = ActiveMQJMS2TestSupport.generateMessage(jmsContext, messageType, messagePayload);
+            Destination jmsReplyTo = ActiveMQJMS2TestSupport.generateDestination(jmsContext, destinationType, methodNameDestinationName + ".REPLYTO");
+            String jmsType = message.getClass().getName(); 
+
+            List<String> sentMessageIds = new LinkedList<>();
+            MessageData sendMessageData = new MessageData();
+            sendMessageData.setMessage(message).setCorrelationID(jmsCorrelationID).setReplyTo(jmsReplyTo).setJMSType(jmsType);
+            sentMessageIds.add(ActiveMQJMS2TestSupport.sendMessage(jmsContext, destination, sendMessageData));
+
+            final List<Message> recvMessages = new CopyOnWriteArrayList<>();
+            final CountDownLatch countDownLatch = new CountDownLatch(1);
+
+            jmsConsumer.setMessageListener(new MessageListener() {
+                @Override
+                public void onMessage(Message message) {
+                    recvMessages.add(message);
+                    countDownLatch.countDown();
+                }
+            });
+            jmsContext.start();
+
+            assertTrue("Expected to receive 1 message", countDownLatch.await(5l, TimeUnit.SECONDS));
+            jmsConsumer.close();
+            jmsContext.stop();
+
+            for(javax.jms.Message tmpMessage : recvMessages) {
+                MessageData recvMessageData = new MessageData();
+                recvMessageData.setMessagePayload(messagePayload).setMessageType(messageType).setCorrelationID(jmsCorrelationID).setReplyTo(jmsReplyTo).setJMSType(jmsType);
+                ActiveMQJMS2TestSupport.validateMessageData(tmpMessage, recvMessageData);
+            }
+        } catch (Exception e) {
+            fail(e.getMessage());
+        }
+    }   
+
+    @Test
+    public void testMessageDisableMessageID() {
+
+        try(JMSContext jmsContext = activemqConnectionFactory.createContext(DEFAULT_JMS_USER, DEFAULT_JMS_PASS, Session.AUTO_ACKNOWLEDGE)) {
+            assertNotNull(jmsContext);
+            Destination destination = ActiveMQJMS2TestSupport.generateDestination(jmsContext, destinationType, methodNameDestinationName);
+            assertNotNull(destination);
+            JMSConsumer jmsConsumer = jmsContext.createConsumer(destination);
+            assertNotNull(jmsConsumer);
+
+            Message message = ActiveMQJMS2TestSupport.generateMessage(jmsContext, messageType, messagePayload);
+            String jmsMessageID = (ActiveMQJMS2TestSupport.sendMessage(jmsContext, destination, message));
+
+            final List<Message> recvMessages = new CopyOnWriteArrayList<>();
+            final CountDownLatch countDownLatch = new CountDownLatch(1);
+
+            jmsConsumer.setMessageListener(new MessageListener() {
+                @Override
+                public void onMessage(Message message) {
+                    recvMessages.add(message);
+                    countDownLatch.countDown();
+                }
+            });
+            jmsContext.start();
+
+            assertTrue("Expected to receive 1 message", countDownLatch.await(5l, TimeUnit.SECONDS));
+            jmsConsumer.close();
+            jmsContext.stop();
+
+            for(javax.jms.Message tmpMessage : recvMessages) {
+                MessageData messageData = new MessageData();
+                messageData.setMessageType(messageType).setMessagePayload(messagePayload).setMessageID(jmsMessageID);
+                ActiveMQJMS2TestSupport.validateMessageData(tmpMessage, messageData);
+            }
+        } catch (Exception e) {
+            fail(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testTopicDurableSubscriber() {
+        // Skip if this is not a topic test run
+        if(!"topic".equals(destinationType)) {
+            return;
+        }
+
+        try(JMSContext jmsContext = activemqConnectionFactory.createContext(DEFAULT_JMS_USER, DEFAULT_JMS_PASS, Session.AUTO_ACKNOWLEDGE)) {
+            assertNotNull(jmsContext);
+            jmsContext.setClientID(clientID);
+
+            Destination destination = ActiveMQJMS2TestSupport.generateDestination(jmsContext, destinationType, methodNameDestinationName);
+            assertNotNull(destination);
+            assertTrue(destination instanceof Topic);
+            Topic topic = Topic.class.cast(destination);
+
+            JMSConsumer jmsConsumerRegister = jmsContext.createDurableConsumer(topic, methodNameDestinationName);
+            assertNotNull(jmsConsumerRegister);
+            jmsContext.start();
+            jmsConsumerRegister.close();
+
+            Message message = ActiveMQJMS2TestSupport.generateMessage(jmsContext, messageType, messagePayload);
+            String jmsMessageID = (ActiveMQJMS2TestSupport.sendMessage(jmsContext, destination, message));
+
+            JMSConsumer jmsConsumerDurable = jmsContext.createDurableConsumer(topic, methodNameDestinationName);
+            assertNotNull(jmsConsumerDurable);
+
+            final List<Message> recvMessages = new CopyOnWriteArrayList<>();
+            final CountDownLatch countDownLatch = new CountDownLatch(1);
+            jmsConsumerDurable.setMessageListener(new MessageListener() {
+                @Override
+                public void onMessage(Message message) {
+                    recvMessages.add(message);
+                    countDownLatch.countDown();
+                }
+            });
+
+            assertTrue("Expected to receive 1 message", countDownLatch.await(5l, TimeUnit.SECONDS));
+            jmsConsumerDurable.close();
+            jmsContext.stop();
+
+            for(javax.jms.Message tmpMessage : recvMessages) {
+                MessageData messageData = new MessageData();
+                messageData.setMessageType(messageType).setMessagePayload(messagePayload).setMessageID(jmsMessageID);
+                ActiveMQJMS2TestSupport.validateMessageData(tmpMessage, messageData);
+            }
+        } catch (Exception e) {
+            fail(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testTopicDurableSubscriberSelector() {
+        // Skip if this is not a topic test run
+        if(!"topic".equals(destinationType)) {
+            return;
+        }
+
+        try(JMSContext jmsContext = activemqConnectionFactory.createContext(DEFAULT_JMS_USER, DEFAULT_JMS_PASS, Session.AUTO_ACKNOWLEDGE)) {
+            assertNotNull(jmsContext);
+            jmsContext.setClientID(clientID);
+
+            Destination destination = ActiveMQJMS2TestSupport.generateDestination(jmsContext, destinationType, methodNameDestinationName);
+            assertNotNull(destination);
+            assertTrue(destination instanceof Topic);
+            Topic topic = Topic.class.cast(destination);
+
+            JMSConsumer jmsConsumerRegister = jmsContext.createDurableConsumer(topic, methodNameDestinationName, "JMSPriority=7", false);
+            assertNotNull(jmsConsumerRegister);
+            assertEquals("JMSPriority=7", jmsConsumerRegister.getMessageSelector());
+            jmsContext.start();
+
+            jmsConsumerRegister.close();
+
+            Message message = ActiveMQJMS2TestSupport.generateMessage(jmsContext, messageType, messagePayload);
+
+            List<String> sentMessageIds = new LinkedList<>();
+            String matchMessageId = null;
+            for(int priority=0; priority<10; priority++) {
+                MessageData sendMessageData = new MessageData();
+                sendMessageData.setMessage(message).setPriority(priority);
+                String sentMessageId = ActiveMQJMS2TestSupport.sendMessage(jmsContext, destination, sendMessageData);
+                sentMessageIds.add(sentMessageId);
+                if(priority == 7) {
+                    matchMessageId = sentMessageId;
+                }
+            }
+
+            JMSConsumer jmsConsumerDurable = jmsContext.createDurableConsumer(topic, methodNameDestinationName, "JMSPriority=7", false);
+            assertNotNull(jmsConsumerDurable);
+            assertEquals("JMSPriority=7", jmsConsumerDurable.getMessageSelector());
+
+            final List<Message> recvMessages = new CopyOnWriteArrayList<>();
+            final CountDownLatch countDownLatch = new CountDownLatch(1);
+            jmsConsumerDurable.setMessageListener(new MessageListener() {
+                @Override
+                public void onMessage(Message message) {
+                    recvMessages.add(message);
+                    countDownLatch.countDown();
+                }
+            });
+
+            assertTrue("Expected to receive 1 message", countDownLatch.await(5l, TimeUnit.SECONDS));
+            jmsConsumerDurable.close();
+            jmsContext.stop();
+
+            for(javax.jms.Message tmpMessage : recvMessages) {
+                MessageData recvMessageData = new MessageData();
+                recvMessageData.setMessageType(messageType).setMessagePayload(messagePayload).setMessageID(matchMessageId);
+                ActiveMQJMS2TestSupport.validateMessageData(tmpMessage, recvMessageData);
+            }
+        } catch (Exception e) {
+            fail(e.getMessage());
+        }
+    }
+}
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2ProducerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2ProducerTest.java
new file mode 100644
index 0000000..84bc8c4
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2ProducerTest.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.jms2;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.time.ZonedDateTime;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Set;
+
+import javax.jms.Destination;
+import javax.jms.JMSConsumer;
+import javax.jms.JMSContext;
+import javax.jms.JMSException;
+import javax.jms.JMSProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQMessageProducerSupport;
+import org.junit.Test;
+
+public class ActiveMQJMS2ProducerTest extends ActiveMQJMS2TestBase {
+
+    private static final String PROPERTY_NAME_VALID="ValidName";
+    private static final String PROPERTY_VALUE_VALID="valid value";
+
+    private static final Set<String> PROPERTY_NAMES_INVALID = Set.of("contains+plus+sign", "123startswithnumber", "contains blank space", "has-dash", "with&amp");
+    private static final Set<Object> PROPERTY_VALUES_INVALID = Set.of(new HashSet<String>(), new HashMap<Integer, String>(), ZonedDateTime.now());
+
+    @Test
+    public void testInvalidPropertyNameNullEmpty() {
+        try(JMSContext jmsContext = activemqConnectionFactory.createContext(DEFAULT_JMS_USER, DEFAULT_JMS_PASS, Session.AUTO_ACKNOWLEDGE)) {
+            assertNotNull(jmsContext);
+            JMSProducer jmsProducer = jmsContext.createProducer();
+            jmsContext.start();
+
+            boolean caught = false;
+            try {
+                jmsProducer.setProperty(null, PROPERTY_VALUE_VALID);
+            } catch (Exception e) {
+                assertEquals("Invalid JMS property name must not be null or empty", e.getMessage());
+                caught = true;
+            }
+            assertTrue(caught);
+            caught = false;
+
+            try {
+                jmsProducer.setProperty("", PROPERTY_VALUE_VALID);
+            } catch (Exception e) {
+                assertEquals("Invalid JMS property name must not be null or empty", e.getMessage());
+                caught = true;
+            }
+            assertTrue(caught);
+            caught = false;
+        } catch (Exception e) {
+            fail(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testInvalidPropertyNameKeyword() {
+        try(JMSContext jmsContext = activemqConnectionFactory.createContext(DEFAULT_JMS_USER, DEFAULT_JMS_PASS, Session.AUTO_ACKNOWLEDGE)) {
+            assertNotNull(jmsContext);
+            JMSProducer jmsProducer = jmsContext.createProducer();
+            jmsContext.start();
+
+            for(String propertyName : ActiveMQMessageProducerSupport.JMS_PROPERTY_NAMES_DISALLOWED) {
+                boolean caught = false;
+                try {
+                    jmsProducer.setProperty(propertyName, PROPERTY_VALUE_VALID);
+                } catch (Exception e) {
+                    assertEquals("Invalid JMS property: " + propertyName + " name is in disallowed list", e.getMessage());
+                    caught = true;
+                }
+                assertTrue("Expected exception for propertyName: " + propertyName, caught);
+            }
+        } catch (Exception e) {
+            fail(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testInvalidPropertyNameInvalidJavaIdentifier() {
+        try(JMSContext jmsContext = activemqConnectionFactory.createContext(DEFAULT_JMS_USER, DEFAULT_JMS_PASS, Session.AUTO_ACKNOWLEDGE)) {
+            assertNotNull(jmsContext);
+            JMSProducer jmsProducer = jmsContext.createProducer();
+            jmsContext.start();
+
+            for(String propertyName : PROPERTY_NAMES_INVALID) {
+                boolean caught = false;
+                try {
+                    jmsProducer.setProperty(propertyName, PROPERTY_VALUE_VALID);
+                } catch (Exception e) {
+                    assertTrue(e.getMessage().startsWith("Invalid JMS property: " + propertyName + " name"));
+                    assertTrue(e.getMessage().contains("invalid character"));
+                    caught = true;
+                }
+                assertTrue("Expected exception for propertyName: " + propertyName, caught);
+            }
+        } catch (Exception e) {
+            fail(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testInvalidPropertyNameInvalidValueClassType() {
+        try(JMSContext jmsContext = activemqConnectionFactory.createContext(DEFAULT_JMS_USER, DEFAULT_JMS_PASS, Session.AUTO_ACKNOWLEDGE)) {
+            assertNotNull(jmsContext);
+            JMSProducer jmsProducer = jmsContext.createProducer();
+            jmsContext.start();
+
+            for(Object propertyValue : PROPERTY_VALUES_INVALID) {
+                boolean caught = false;
+                try {
+                    jmsProducer.setProperty(PROPERTY_NAME_VALID, propertyValue);
+                } catch (Exception e) {
+                    assertEquals("Invalid JMS property: " + PROPERTY_NAME_VALID + " value class: " + propertyValue.getClass().getName() + " is not permitted by specification", e.getMessage());
+                    caught = true;
+                }
+                assertTrue("Expected exception for propertyValue: " + propertyValue, caught);
+            }
+        } catch (Exception e) {
+            fail(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testDisableTimestamp() {
+        try(JMSContext jmsContext = activemqConnectionFactory.createContext(DEFAULT_JMS_USER, DEFAULT_JMS_PASS, Session.AUTO_ACKNOWLEDGE)) {
+            assertNotNull(jmsContext);
+            Destination destination = jmsContext.createQueue(methodNameDestinationName);
+            JMSProducer jmsProducer1 = jmsContext.createProducer();
+            jmsProducer1.setDisableMessageTimestamp(true);
+            jmsContext.start();
+
+            String textBody = "Test-" + methodNameDestinationName;
+            jmsProducer1.send(destination, textBody);
+            verifyTimestamp(jmsContext, destination, textBody, true);
+
+            // Re-enable timestatmp
+            jmsProducer1.setDisableMessageTimestamp(false);
+            jmsProducer1.send(destination, textBody);
+            verifyTimestamp(jmsContext, destination, textBody, false);
+
+            // Create second producer from same Context
+            JMSProducer jmsProducer2 = jmsContext.createProducer();
+            jmsProducer2.setDisableMessageTimestamp(true);
+            jmsProducer2.send(destination, textBody);
+            verifyTimestamp(jmsContext, destination, textBody, true);
+
+            // Re-confirm jmsProducer1 setting remains
+            jmsProducer1.send(destination, textBody);
+            verifyTimestamp(jmsContext, destination, textBody, false);
+        } catch (Exception e) {
+            fail(e.getMessage());
+        }
+    }
+
+    protected static void verifyTimestamp(JMSContext jmsContext, Destination destination, String expectedTextBody, boolean expectTimestampZero) throws JMSException {
+        try(JMSConsumer jmsConsumer = jmsContext.createConsumer(destination)) {
+            javax.jms.Message message = jmsConsumer.receive(1000l);
+            assertNotNull(message);
+            assertTrue(TextMessage.class.isAssignableFrom(message.getClass()));
+            assertEquals(expectedTextBody, TextMessage.class.cast(message).getText());
+
+            if(expectTimestampZero) {
+                assertEquals(Long.valueOf(0), Long.valueOf(message.getJMSTimestamp()));
+            } else {
+                assertNotEquals(Long.valueOf(0), Long.valueOf(message.getJMSTimestamp()));
+            }
+        }
+    }
+}
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2TestBase.java b/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2TestBase.java
new file mode 100644
index 0000000..d15af01
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2TestBase.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.jms2;
+
+import java.lang.management.ManagementFactory;
+import java.util.LinkedList;
+import java.util.List;
+import javax.jms.Connection;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.management.JMX;
+import javax.management.MBeanServer;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.jmx.BrokerMBeanSupport;
+import org.apache.activemq.broker.jmx.DestinationViewMBean;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.broker.jmx.TopicViewMBean;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.rules.TestName;
+
+public abstract class ActiveMQJMS2TestBase {
+
+    public static final String DEFAULT_JMX_DOMAIN_NAME = "org.apache.activemq";
+    public static final String DEFAULT_JMX_BROKER_NAME = "localhost";
+
+    public static final String DEFAULT_JMS_USER = "admin";
+    public static final String DEFAULT_JMS_PASS = "admin";
+
+    protected static ActiveMQConnectionFactory activemqConnectionFactory = null;
+
+    @Rule public TestName testName = new TestName();
+
+    // Control session
+    protected Connection connection = null;
+    protected Session session = null;
+    protected MessageProducer messageProducer = null;
+
+    protected String methodNameDestinationName = null;
+    protected MBeanServer mbeanServer = null;
+
+    @BeforeClass
+    public static void setUpClass() {
+        activemqConnectionFactory = new ActiveMQConnectionFactory("vm://localhost?marshal=false&broker.persistent=false");
+        List<String> newTrustedPackages = new LinkedList<>();
+        newTrustedPackages.addAll(activemqConnectionFactory.getTrustedPackages());
+        newTrustedPackages.add(ActiveMQJMS2TestBase.class.getPackageName());
+        activemqConnectionFactory.setTrustedPackages(newTrustedPackages);
+    }
+
+    @AfterClass
+    public static void tearDownClass() {
+        activemqConnectionFactory = null;
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        connection = activemqConnectionFactory.createConnection();
+        connection.start();
+        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        methodNameDestinationName = "AMQ.JMS2." + cleanParameterizedMethodName(testName.getMethodName().toUpperCase());
+        messageProducer = session.createProducer(session.createQueue(methodNameDestinationName));
+        mbeanServer = ManagementFactory.getPlatformMBeanServer();
+    }
+
+    @After
+    public void tearDown() {
+        if(messageProducer != null) {
+            try { messageProducer.close(); } catch (Exception e) { } finally { messageProducer = null; }
+        }
+
+        if(session != null) {
+            try { session.close(); } catch (Exception e) { } finally { session = null; }
+        }
+
+        if(connection != null) {
+            try { connection.close(); } catch (Exception e) { } finally { connection = null; }
+        }
+
+        methodNameDestinationName = null;
+    }
+
+    protected DestinationViewMBean getDestinationViewMBean(String destinationType, ActiveMQDestination destination) throws Exception {
+        switch(destinationType) {
+        case "queue": return getQueueViewMBean(destination);
+        case "topic": return getTopicViewMBean(destination);
+        case "temp-queue": return getTempQueueViewMBean(destination);
+        case "temp-topic": return getTempTopicViewMBean(destination);
+        default: throw new IllegalStateException("Unsupported destinationType: " + destinationType);
+        }
+    }
+
+    protected QueueViewMBean getQueueViewMBean(ActiveMQDestination destination) throws Exception {
+        return JMX.newMBeanProxy(mbeanServer, BrokerMBeanSupport.createDestinationName(BrokerMBeanSupport.createBrokerObjectName(DEFAULT_JMX_DOMAIN_NAME, DEFAULT_JMX_BROKER_NAME).toString(), destination), QueueViewMBean.class);
+    }
+
+    protected TopicViewMBean getTopicViewMBean(ActiveMQDestination destination) throws Exception {
+        return JMX.newMBeanProxy(mbeanServer, BrokerMBeanSupport.createDestinationName(BrokerMBeanSupport.createBrokerObjectName(DEFAULT_JMX_DOMAIN_NAME, DEFAULT_JMX_BROKER_NAME).toString(), destination), TopicViewMBean.class);
+    }
+
+    protected TopicViewMBean getTempQueueViewMBean(ActiveMQDestination destination) throws Exception {
+        return JMX.newMBeanProxy(mbeanServer, BrokerMBeanSupport.createDestinationName(BrokerMBeanSupport.createBrokerObjectName(DEFAULT_JMX_DOMAIN_NAME, DEFAULT_JMX_BROKER_NAME).toString(), destination), TopicViewMBean.class);
+    }
+
+    protected TopicViewMBean getTempTopicViewMBean(ActiveMQDestination destination) throws Exception {
+        return JMX.newMBeanProxy(mbeanServer, BrokerMBeanSupport.createDestinationName(BrokerMBeanSupport.createBrokerObjectName(DEFAULT_JMX_DOMAIN_NAME, DEFAULT_JMX_BROKER_NAME).toString(), destination), TopicViewMBean.class);
+    }
+
+    private static String cleanParameterizedMethodName(String methodName) {
+        // clean up parameterized method string: TESTMESSAGETIMESTAMPTIMETOLIVE[DESTINATIONTYPE=QUEUE, MESSAGETYPE=BYTES]
+        // returns: TESTMESSAGETIMESTAMPTIMETOLIVE.QUEUE.BYTES
+
+        if(methodName == null || (!methodName.contains("[") && !methodName.contains("]"))) {
+            return methodName;
+        }
+
+        String[] step1 = methodName.split("\\[", 2);
+        String[] step2 = step1[1].split("\\]", 2);
+        String[] step3 = step2[0].split(",", 16);
+
+        return step1[0] + "." + step3[0].split("=", 2)[1] + "." + step3[1].split("=", 2)[1];
+    }
+}
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2TestObjectMessagePayload.java b/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2TestObjectMessagePayload.java
new file mode 100644
index 0000000..dc3bd86
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2TestObjectMessagePayload.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.jms2;
+
+import java.io.Serializable;
+
+public class ActiveMQJMS2TestObjectMessagePayload implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private String payload;
+
+    public ActiveMQJMS2TestObjectMessagePayload(String payload) {
+        this.payload = payload;
+    }
+
+    public String getPayload() {
+        return payload;
+    }
+}
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2TestSupport.java b/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2TestSupport.java
new file mode 100644
index 0000000..6008f97
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2TestSupport.java
@@ -0,0 +1,307 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.jms2;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashSet;
+import java.util.Set;
+
+import javax.jms.BytesMessage;
+import javax.jms.Destination;
+import javax.jms.JMSContext;
+import javax.jms.JMSException;
+import javax.jms.JMSProducer;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.ObjectMessage;
+import javax.jms.QueueBrowser;
+import javax.jms.StreamMessage;
+import javax.jms.TextMessage;
+
+public class ActiveMQJMS2TestSupport {
+
+    protected static final Set<String> PROPERTY_NAMES = Collections.unmodifiableSet(new HashSet<>(Arrays.asList("JMS2_BOOLEAN_MIN", "JMS2_BOOLEAN_MAX", "JMS2_BYTE_MIN", "JMS2_BYTE_MAX",
+                                                        "JMS2_DOUBLE_MIN", "JMS2_DOUBLE_MAX", "JMS2_INT_MIN", "JMS2_INT_MAX", "JMS2_FLOAT_MIN", "JMS2_FLOAT_MAX", "JMS2_LONG_MIN",
+                                                        "JMS2_LONG_MAX", "JMS2_SHORT_MIN", "JMS2_SHORT_MAX", "JMS2_STRING_VAL")));
+
+    private ActiveMQJMS2TestSupport() {}
+
+    protected static Destination generateDestination(JMSContext jmsContext, String destinationType, String destinationName) throws JMSException {
+        Destination destination = null;
+        switch(destinationType) {
+        case "queue":
+            destination = jmsContext.createQueue(destinationName);
+            break;
+        case "topic":
+            destination = jmsContext.createTopic(destinationName);
+            break;
+        case "temp-queue":
+            destination = jmsContext.createTemporaryQueue();
+            break;
+        case "temp-topic":
+            destination = jmsContext.createTemporaryTopic();
+            break;
+        default:
+            fail("Unsupported destinationType:" + destinationType);
+        }
+       assertNotNull(destination);
+       return destination;
+    }
+
+    protected static Message generateMessage(JMSContext jmsContext, String messageType, String payload) throws JMSException {
+        assertNotNull(messageType);
+        Message tmpMessage = null;
+        switch(messageType) {
+        case "bytes":
+            BytesMessage bytesMessage = jmsContext.createBytesMessage();
+            bytesMessage.writeBytes(payload.getBytes(StandardCharsets.UTF_8));
+            tmpMessage = bytesMessage;
+            break;
+        case "map":
+            MapMessage mapMessage = jmsContext.createMapMessage();
+            mapMessage.setString("payload", payload);
+            tmpMessage = mapMessage;
+            break;
+        case "object":
+            tmpMessage = jmsContext.createObjectMessage(new ActiveMQJMS2TestObjectMessagePayload(payload));
+            break;
+        case "stream":
+            StreamMessage streamMessage = jmsContext.createStreamMessage();
+            streamMessage.writeString(payload);
+            tmpMessage = streamMessage;
+            break;
+        case "text":
+            tmpMessage = jmsContext.createTextMessage(payload);
+            break;
+        default:
+            fail("Unsupported messageType:" + messageType);
+        }
+        return tmpMessage;
+    }
+
+    protected static void populateJMSHeaders(javax.jms.Message message, String correlationID, Destination replyTo, String jmsType) throws JMSException {
+        assertNotNull(message);
+        message.setJMSCorrelationID(null);
+        message.setJMSReplyTo(null);
+        message.setJMSType(null);
+    }
+
+    protected static String sendMessage(JMSContext jmsContext, Destination destination, Message message) throws JMSException {
+        MessageData messageData = new MessageData();
+        messageData.setMessage(message);
+        return sendMessage(jmsContext, destination, messageData);
+    }
+
+    protected static String sendMessage(JMSContext jmsContext, Destination destination, MessageData messageData) throws JMSException {
+        assertNotNull(jmsContext);
+        assertNotNull(messageData);
+        assertNotNull(messageData.getMessage());
+
+        JMSProducer jmsProducer = jmsContext.createProducer();
+
+        if(messageData.getDeliveryDelay() != null) {
+            jmsProducer.setDeliveryDelay(messageData.getDeliveryDelay());
+        }
+        if(messageData.getDeliveryMode() != null) {
+            jmsProducer.setDeliveryMode(messageData.getDeliveryMode());
+        } 
+        if(messageData.getDisableMessageID() != null) {
+            jmsProducer.setDisableMessageID(messageData.getDisableMessageID());
+        }
+        if(messageData.getDisableMessageTimestamp() != null) {
+            jmsProducer.setDisableMessageTimestamp(messageData.getDisableMessageTimestamp());
+        }
+        if(messageData.getCorrelationID() != null) {
+            jmsProducer.setJMSCorrelationID(messageData.getCorrelationID());
+        }
+        if(messageData.getReplyTo() != null) {
+            jmsProducer.setJMSReplyTo(messageData.getReplyTo());
+        }
+        if(messageData.getJmsType() != null) {
+            jmsProducer.setJMSType(messageData.getJmsType());
+        }
+        if(messageData.getPriority() != null) {
+            jmsProducer.setPriority(messageData.getPriority());
+        }
+        if(messageData.getTimeToLive() != null) {
+            jmsProducer.setTimeToLive(messageData.getTimeToLive());
+        }
+        populateJMSProperties(jmsProducer);
+        validateJMSProperties(jmsProducer);
+        jmsProducer.send(destination, messageData.getMessage());
+
+        return messageData.getMessage().getJMSMessageID();
+    }
+
+    protected static void browseMessage(JMSContext jmsContext, String testDestinationName, String expectedTextBody, boolean expectFound) throws JMSException {
+        assertNotNull(jmsContext);
+        try(QueueBrowser queueBrowser = jmsContext.createBrowser(jmsContext.createQueue(testDestinationName))) {
+            Enumeration<?> messageEnumeration = queueBrowser.getEnumeration();
+            assertNotNull(messageEnumeration);
+            
+            boolean found = false; 
+            while(!found && messageEnumeration.hasMoreElements()) {
+                javax.jms.Message message = (Message)messageEnumeration.nextElement();
+                assertNotNull(message);
+                assertTrue(TextMessage.class.isAssignableFrom(message.getClass()));
+                assertEquals(expectedTextBody, TextMessage.class.cast(message).getText());
+                found = true;
+            }
+            assertEquals(expectFound, found);
+        }
+    }
+
+    protected static void validateMessageData(javax.jms.Message message, MessageData messageData) throws JMSException {
+        assertNotNull(message);
+        assertNotNull(messageData.getMessageType());
+        assertNotNull(messageData.getMessagePayload());
+        validateJMSHeaders(message, messageData);
+        validateJMSProperties(message);
+
+        switch(messageData.getMessageType()) {
+        case "bytes":
+            assertTrue(message instanceof BytesMessage);
+            BytesMessage bytesMessage = BytesMessage.class.cast(message);
+            byte[] payload = new byte[(int)bytesMessage.getBodyLength()];
+            bytesMessage.readBytes(payload);
+            assertEquals(messageData.getMessagePayload(), new String(payload, StandardCharsets.UTF_8));
+            break;
+        case "map":
+            assertTrue(message instanceof MapMessage);
+            MapMessage mapMessage = MapMessage.class.cast(message);
+            String mapPayload = mapMessage.getString("payload");
+            assertEquals(messageData.getMessagePayload(), mapPayload);
+            break;
+        case "object":
+            assertTrue(message instanceof ObjectMessage);
+            ObjectMessage objectMessage = ObjectMessage.class.cast(message);
+            Object tmpObject = objectMessage.getObject();
+            assertNotNull(tmpObject);
+            assertTrue(tmpObject instanceof ActiveMQJMS2TestObjectMessagePayload);
+            assertEquals(messageData.getMessagePayload(), ActiveMQJMS2TestObjectMessagePayload.class.cast(tmpObject).getPayload());
+            break;
+        case "stream":
+            assertTrue(message instanceof StreamMessage);
+            StreamMessage streamMessage = StreamMessage.class.cast(message);
+            assertEquals(messageData.getMessagePayload(), streamMessage.readString());
+            break;
+        case "text":
+            assertTrue(message instanceof TextMessage);
+            assertEquals(messageData.getMessagePayload(), TextMessage.class.cast(message).getText());
+            break;
+        default:
+            fail("Unsupported messageType:" + messageData.getMessageType());
+        }
+    }
+
+    private static void validateJMSHeaders(javax.jms.Message message, MessageData messageData) throws JMSException {
+        assertNotNull(message);
+        assertEquals(messageData.getCorrelationID(), message.getJMSCorrelationID());
+        if(messageData.getDeliveryMode() != null) {
+            assertEquals(messageData.getDeliveryMode(), Integer.valueOf(message.getJMSDeliveryMode()));
+        }
+        if(messageData.getDeliveryTime() != null) {
+            assertEquals(messageData.getDeliveryTime(), Long.valueOf(message.getJMSDeliveryTime()));
+        }
+        if(messageData.getExpiration() != null) {
+            assertEquals(messageData.getExpiration(), Long.valueOf(message.getJMSExpiration()));
+        }
+        if(messageData.getMessageID() != null) {
+            assertEquals(messageData.getMessageID(), message.getJMSMessageID());
+        }
+        if(messageData.getPriority() != null) {
+            assertEquals(messageData.getPriority(), Integer.valueOf(message.getJMSPriority()));
+        }
+        assertEquals(messageData.getReplyTo(), message.getJMSReplyTo());
+        if(messageData.getTimestamp() != null) {
+            assertEquals(messageData.getTimestamp(), Long.valueOf(message.getJMSTimestamp()));
+        }
+        if(Boolean.TRUE.equals(messageData.getDisableMessageTimestamp())) {
+            assertEquals(Long.valueOf(0l), Long.valueOf(message.getJMSTimestamp()));
+        }
+        assertEquals(messageData.getJmsType(), message.getJMSType());
+    }
+
+    private static void populateJMSProperties(JMSProducer jmsProducer) throws JMSException {
+        jmsProducer.setProperty("JMS2_BOOLEAN_MIN", false);
+        jmsProducer.setProperty("JMS2_BOOLEAN_MAX", true);
+        jmsProducer.setProperty("JMS2_BYTE_MIN", Byte.MIN_VALUE);
+        jmsProducer.setProperty("JMS2_BYTE_MAX", Byte.MAX_VALUE);
+        jmsProducer.setProperty("JMS2_DOUBLE_MIN", Double.MIN_VALUE);
+        jmsProducer.setProperty("JMS2_DOUBLE_MAX", Double.MAX_VALUE);
+        jmsProducer.setProperty("JMS2_INT_MIN", Integer.MIN_VALUE);
+        jmsProducer.setProperty("JMS2_INT_MAX", Integer.MAX_VALUE);
+        jmsProducer.setProperty("JMS2_FLOAT_MIN", Float.MIN_VALUE);
+        jmsProducer.setProperty("JMS2_FLOAT_MAX", Float.MAX_VALUE);
+        jmsProducer.setProperty("JMS2_LONG_MIN", Long.MIN_VALUE);
+        jmsProducer.setProperty("JMS2_LONG_MAX", Long.MAX_VALUE);
+        jmsProducer.setProperty("JMS2_SHORT_MIN", Short.MIN_VALUE);
+        jmsProducer.setProperty("JMS2_SHORT_MAX", Short.MAX_VALUE);
+        jmsProducer.setProperty("JMS2_STRING_VAL", "Hello World");
+    }
+
+    private static void validateJMSProperties(JMSProducer jmsProducer) throws JMSException {
+        assertNotNull(jmsProducer);
+        assertNotNull(jmsProducer.getPropertyNames());
+        assertEquals(Integer.valueOf(PROPERTY_NAMES.size()), Integer.valueOf(jmsProducer.getPropertyNames().size()));
+        for(String propertyName : PROPERTY_NAMES) {
+            assertTrue(jmsProducer.propertyExists(propertyName));
+        }
+        assertEquals(Boolean.FALSE, Boolean.valueOf(jmsProducer.getBooleanProperty("JMS2_BOOLEAN_MIN")));
+        assertEquals(Boolean.TRUE, Boolean.valueOf(jmsProducer.getBooleanProperty("JMS2_BOOLEAN_MAX")));
+        assertEquals(Byte.valueOf(Byte.MIN_VALUE), Byte.valueOf(jmsProducer.getByteProperty("JMS2_BYTE_MIN")));
+        assertEquals(Byte.valueOf(Byte.MAX_VALUE), Byte.valueOf(jmsProducer.getByteProperty("JMS2_BYTE_MAX")));
+        assertEquals(Double.valueOf(Double.MIN_VALUE), Double.valueOf(jmsProducer.getDoubleProperty("JMS2_DOUBLE_MIN")));
+        assertEquals(Double.valueOf(Double.MAX_VALUE), Double.valueOf(jmsProducer.getDoubleProperty("JMS2_DOUBLE_MAX")));
+        assertEquals(Integer.valueOf(Integer.MIN_VALUE), Integer.valueOf(jmsProducer.getIntProperty("JMS2_INT_MIN")));
+        assertEquals(Integer.valueOf(Integer.MAX_VALUE), Integer.valueOf(jmsProducer.getIntProperty("JMS2_INT_MAX")));
+        assertEquals(Float.valueOf(Float.MIN_VALUE), Float.valueOf(jmsProducer.getFloatProperty("JMS2_FLOAT_MIN")));
+        assertEquals(Float.valueOf(Float.MAX_VALUE), Float.valueOf(jmsProducer.getFloatProperty("JMS2_FLOAT_MAX")));
+        assertEquals(Long.valueOf(Long.MIN_VALUE), Long.valueOf(jmsProducer.getLongProperty("JMS2_LONG_MIN")));
+        assertEquals(Long.valueOf(Long.MAX_VALUE), Long.valueOf(jmsProducer.getLongProperty("JMS2_LONG_MAX")));
+        assertEquals(Short.valueOf(Short.MIN_VALUE), Short.valueOf(jmsProducer.getShortProperty("JMS2_SHORT_MIN")));
+        assertEquals(Short.valueOf(Short.MAX_VALUE), Short.valueOf(jmsProducer.getShortProperty("JMS2_SHORT_MAX")));
+        assertEquals("Hello World", jmsProducer.getStringProperty("JMS2_STRING_VAL"));
+    }
+
+    private static void validateJMSProperties(javax.jms.Message message) throws JMSException {
+        assertNotNull(message);
+        assertEquals(Boolean.FALSE, Boolean.valueOf(message.getBooleanProperty("JMS2_BOOLEAN_MIN")));
+        assertEquals(Boolean.TRUE, Boolean.valueOf(message.getBooleanProperty("JMS2_BOOLEAN_MAX")));
+        assertEquals(Byte.valueOf(Byte.MIN_VALUE), Byte.valueOf(message.getByteProperty("JMS2_BYTE_MIN")));
+        assertEquals(Byte.valueOf(Byte.MAX_VALUE), Byte.valueOf(message.getByteProperty("JMS2_BYTE_MAX")));
+        assertEquals(Double.valueOf(Double.MIN_VALUE), Double.valueOf(message.getDoubleProperty("JMS2_DOUBLE_MIN")));
+        assertEquals(Double.valueOf(Double.MAX_VALUE), Double.valueOf(message.getDoubleProperty("JMS2_DOUBLE_MAX")));
+        assertEquals(Integer.valueOf(Integer.MIN_VALUE), Integer.valueOf(message.getIntProperty("JMS2_INT_MIN")));
+        assertEquals(Integer.valueOf(Integer.MAX_VALUE), Integer.valueOf(message.getIntProperty("JMS2_INT_MAX")));
+        assertEquals(Float.valueOf(Float.MIN_VALUE), Float.valueOf(message.getFloatProperty("JMS2_FLOAT_MIN")));
+        assertEquals(Float.valueOf(Float.MAX_VALUE), Float.valueOf(message.getFloatProperty("JMS2_FLOAT_MAX")));
+        assertEquals(Long.valueOf(Long.MIN_VALUE), Long.valueOf(message.getLongProperty("JMS2_LONG_MIN")));
+        assertEquals(Long.valueOf(Long.MAX_VALUE), Long.valueOf(message.getLongProperty("JMS2_LONG_MAX")));
+        assertEquals(Short.valueOf(Short.MIN_VALUE), Short.valueOf(message.getShortProperty("JMS2_SHORT_MIN")));
+        assertEquals(Short.valueOf(Short.MAX_VALUE), Short.valueOf(message.getShortProperty("JMS2_SHORT_MAX")));
+        assertEquals("Hello World", message.getStringProperty("JMS2_STRING_VAL"));
+    }
+}
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/MessageData.java b/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/MessageData.java
new file mode 100644
index 0000000..4e84bef
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/MessageData.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.jms2;
+
+import javax.jms.Destination;
+
+public class MessageData {
+
+    private javax.jms.Message message = null;
+    private String messageType = null;
+    private String messagePayload = null; 
+    private Long deliveryDelay = null;
+    private Integer deliveryMode = null;
+    private Boolean disableMessageID = null;
+    private Boolean disableMessageTimestamp = null;
+    private String correlationID = null;
+    private Destination replyTo = null;
+    private String jmsType = null;
+    private Integer priority = null;
+    private Long expiration = null;
+    private Long timestamp = null;
+    private Long deliveryTime = null;
+    private String messageID = null;
+    private Long timeToLive = null;
+
+    MessageData() {}
+
+    // builder-style setters
+    public MessageData setMessage(javax.jms.Message message) { this.message = message; return this; }
+    public MessageData setMessageType(String messageType) { this.messageType = messageType; return this; }
+    public MessageData setMessagePayload(String messagePayload) { this.messagePayload = messagePayload; return this; } 
+    public MessageData setDeliveryDelay(Long deliveryDelay) { this.deliveryDelay = deliveryDelay; return this; }
+    public MessageData setDeliveryMode(Integer deliveryMode) { this.deliveryMode = deliveryMode; return this; }
+    public MessageData setDisableMessageID(Boolean disableMessageID) { this.disableMessageID = disableMessageID; return this; }
+    public MessageData setDisableMessageTimestamp(Boolean disableMessageTimestamp) { this.disableMessageTimestamp = disableMessageTimestamp; return this; }
+    public MessageData setCorrelationID(String correlationID) { this.correlationID = correlationID; return this; }
+    public MessageData setReplyTo(Destination replyTo) { this.replyTo = replyTo; return this; }
+    public MessageData setJMSType(String jmsType) { this.jmsType = jmsType; return this; }
+    public MessageData setPriority(Integer priority) { this.priority = priority; return this; }
+    public MessageData setExpiration(Long expiration) { this.expiration = expiration; return this; }
+    public MessageData setTimestamp(Long timestamp) { this.timestamp = timestamp; return this; }
+    public MessageData setDeliveryTime(Long deliveryTime) { this.deliveryTime = deliveryTime; return this; }
+    public MessageData setMessageID(String messageID) { this.messageID = messageID; return this; }
+    public MessageData setTimeToLive(Long timeToLive) { this.timeToLive = timeToLive; return this; }
+
+    // standard getters
+    public String getMessagePayload() {
+        return messagePayload;
+    }
+
+    public Long getDeliveryDelay() {
+        return deliveryDelay;
+    }
+
+    public Integer getDeliveryMode() {
+        return deliveryMode;
+    }
+
+    public Boolean getDisableMessageID() {
+        return disableMessageID;
+    }
+
+    public Boolean getDisableMessageTimestamp() {
+        return disableMessageTimestamp;
+    }
+
+    public String getCorrelationID() {
+        return correlationID;
+    }
+
+    public Destination getReplyTo() {
+        return replyTo;
+    }
+
+    public String getJmsType() {
+        return jmsType;
+    }
+
+    public Integer getPriority() {
+        return priority;
+    }
+
+    public Long getExpiration() {
+        return expiration;
+    }
+
+    public Long getTimestamp() {
+        return timestamp;
+    }
+
+    public Long getDeliveryTime() {
+        return deliveryTime;
+    }
+
+    public String getMessageID() {
+        return messageID;
+    }
+
+    public javax.jms.Message getMessage() {
+        return message;
+    }
+
+    public String getMessageType() {
+        return messageType;
+    }
+
+    public Long getTimeToLive() {
+        return timeToLive;
+    }
+}
diff --git a/pom.xml b/pom.xml
index c552711..f01d967 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1402,6 +1402,7 @@
             <exclude>**/kahadb/**/*.data</exclude>
             <exclude>**/resources/*.xsd</exclude>
             <exclude>**/src/test/resources/keystore</exclude>
+            <exclude>**/*.ts</exclude>
             <!-- web, web-console, web-demo files -->
             <exclude>**/webapp/mqtt/mqttws31.js</exclude>
             <exclude>**/webapp/js/dojo.js</exclude>
@@ -1461,9 +1462,9 @@
             <link>http://logging.apache.org/log4j/docs/api/</link>
           </links>
           <stylesheetfile>${basedir}/../etc/css/stylesheet.css</stylesheetfile>
-          <linksource>true</linksource>
           <maxmemory>2048m</maxmemory>
           <source>${source-version}</source>
+          <noindex>true</noindex>
           <additionalJOption>-J-Xmx2048m</additionalJOption>
           <!-- necessary for now under the javadocs can be fixed because jdk8 is much stricter -->
           <additionalJOption>${javadoc.options}</additionalJOption>
@@ -1592,6 +1593,9 @@
           <plugin>
             <groupId>org.apache.maven.plugins</groupId>
             <artifactId>maven-javadoc-plugin</artifactId>
+            <configuration>
+              <noindex>true</noindex>
+            </configuration>
             <executions>
               <execution>
                 <id>attach-javadocs</id>