[AMQ-8322] JMS2 Implementation - JMSContext, JMSProducer, JMSConsumer first pass (#729)
diff --git a/activemq-broker/src/test/java/org/apache/activemq/ActiveMQJms2Test.java b/activemq-broker/src/test/java/org/apache/activemq/ActiveMQJms2Test.java
deleted file mode 100644
index 51e2614..0000000
--- a/activemq-broker/src/test/java/org/apache/activemq/ActiveMQJms2Test.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq;
-
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class ActiveMQJms2Test {
-
- protected static ActiveMQConnectionFactory activemqConnectionFactory = null;
-
- protected Connection connection = null;
- protected Session session = null;
- protected MessageProducer messageProducer = null;
-
- @BeforeClass
- public static void setUpClass() {
- activemqConnectionFactory = new ActiveMQConnectionFactory("vm://localhost?marshal=false&broker.persistent=false");
- }
-
- @AfterClass
- public static void tearDownClass() {
- activemqConnectionFactory = null;
- }
-
- @Before
- public void setUp() throws JMSException {
- connection = activemqConnectionFactory.createConnection();
- connection.start();
-
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- messageProducer = session.createProducer(session.createQueue("AMQ.JMS2.TEST"));
- }
-
- @After
- public void tearDown() {
- if(messageProducer != null) {
- try { messageProducer.close(); } catch (Exception e) { } finally { messageProducer = null; }
- }
-
- if(session != null) {
- try { session.close(); } catch (Exception e) { } finally { session = null; }
- }
-
- if(connection != null) {
- try { connection.close(); } catch (Exception e) { } finally { connection = null; }
- }
- }
-
- @Test(expected = UnsupportedOperationException.class)
- public void testConnectionFactoryCreateContext() {
- activemqConnectionFactory.createContext();
- }
-
- @Test(expected = UnsupportedOperationException.class)
- public void testConnectionFactoryCreateContextSession() {
- activemqConnectionFactory.createContext(Session.AUTO_ACKNOWLEDGE);
- }
-
- @Test(expected = UnsupportedOperationException.class)
- public void testConnectionFactoryCreateContextUserPass() {
- activemqConnectionFactory.createContext("admin", "admin");
- }
-
- @Test(expected = UnsupportedOperationException.class)
- public void testConnectionFactoryCreateContextUserPassSession() {
- activemqConnectionFactory.createContext("admin", "admin", Session.AUTO_ACKNOWLEDGE);
- }
-
- @Test(expected = UnsupportedOperationException.class)
- public void testConnectionSharedConnectionConsumer() throws JMSException {
- connection.createSharedConnectionConsumer(null, null, null, null, 10);
- }
-
- @Test(expected = UnsupportedOperationException.class)
- public void testConnectionSharedDurableConnectionConsumer() throws JMSException {
- connection.createSharedDurableConnectionConsumer(null, null, null, null, 10);
- }
-
- @Test(expected = UnsupportedOperationException.class)
- public void testSessionAckMode() throws JMSException {
- connection.createSession(Session.AUTO_ACKNOWLEDGE);
- }
-
- @Test(expected = UnsupportedOperationException.class)
- public void testSessionDurableConsumer() throws JMSException {
- session.createDurableConsumer(null, null);
- }
-
- @Test(expected = UnsupportedOperationException.class)
- public void testSessionDurableConsumerSelectorNoLocal() throws JMSException {
- session.createDurableConsumer(null, null, null, true);
- }
-
- @Test(expected = UnsupportedOperationException.class)
- public void testSessionSharedConsumer() throws JMSException {
- session.createSharedConsumer(null, null);
- }
-
- @Test(expected = UnsupportedOperationException.class)
- public void testSessionSharedConsumerSelector() throws JMSException {
- session.createSharedConsumer(null, null, null);
- }
-
- @Test(expected = UnsupportedOperationException.class)
- public void testSessionSharedDurableConsumer() throws JMSException {
- session.createSharedDurableConsumer(null, null);
- }
-
- @Test(expected = UnsupportedOperationException.class)
- public void testSessionSharedDurableConsumerSelector() throws JMSException {
- session.createSharedDurableConsumer(null, null, null);
- }
-
- @Test(expected = UnsupportedOperationException.class)
- public void testProducerDeliveryDelayGet() throws JMSException {
- messageProducer.getDeliveryDelay();
- }
-
- @Test(expected = UnsupportedOperationException.class)
- public void testProducerDeliveryDelaySet() throws JMSException {
- messageProducer.setDeliveryDelay(1000l);
- }
-
- @Test(expected = UnsupportedOperationException.class)
- public void testProducerSendMessageCompletionListener() throws JMSException {
- messageProducer.send(session.createQueue("AMQ.TEST"), null);
- }
-
- @Test(expected = UnsupportedOperationException.class)
- public void testProducerSendMessageQoSParamsCompletionListener() throws JMSException {
- messageProducer.send(null, 1, 4, 0l, null);
- }
-
- @Test(expected = UnsupportedOperationException.class)
- public void testProducerSendDestinationMessageCompletionListener() throws JMSException {
- messageProducer.send(session.createQueue("AMQ.TEST"), null, null);
- }
-
- @Test(expected = UnsupportedOperationException.class)
- public void testProducerSendDestinationMessageQosParamsCompletionListener() throws JMSException {
- messageProducer.send(session.createQueue("AMQ.TEST"), null, 1, 4, 0l, null);
- }
-
-}
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
index 461dbe1..2324448 100644
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
@@ -292,7 +292,11 @@
*/
@Override
public JMSContext createContext() {
- throw new UnsupportedOperationException("createContext() is not supported");
+ try {
+ return new ActiveMQContext(createActiveMQConnection());
+ } catch (JMSException e) {
+ throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+ }
}
/**
@@ -300,7 +304,11 @@
*/
@Override
public JMSContext createContext(String userName, String password) {
- throw new UnsupportedOperationException("createContext() is not supported");
+ try {
+ return new ActiveMQContext(createActiveMQConnection(userName, password));
+ } catch (JMSException e) {
+ throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+ }
}
/**
@@ -308,7 +316,11 @@
*/
@Override
public JMSContext createContext(String userName, String password, int sessionMode) {
- throw new UnsupportedOperationException("createContext() is not supported");
+ try {
+ return new ActiveMQContext(createActiveMQConnection(userName, password), sessionMode);
+ } catch (JMSException e) {
+ throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+ }
}
/**
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConsumer.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConsumer.java
new file mode 100644
index 0000000..409506d
--- /dev/null
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConsumer.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import javax.jms.JMSConsumer;
+import javax.jms.JMSException;
+import javax.jms.JMSRuntimeException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+
+import org.apache.activemq.util.JMSExceptionSupport;
+
+public class ActiveMQConsumer implements JMSConsumer {
+
+ private final ActiveMQContext activemqContext;
+ private final MessageConsumer activemqMessageConsumer;
+
+ ActiveMQConsumer(ActiveMQContext activemqContext, MessageConsumer activemqMessageConsumer) {
+ this.activemqContext = activemqContext;
+ this.activemqMessageConsumer = activemqMessageConsumer;
+ }
+
+ @Override
+ public String getMessageSelector() {
+ try {
+ return activemqMessageConsumer.getMessageSelector();
+ } catch (JMSException e) {
+ throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+ }
+ }
+
+ @Override
+ public MessageListener getMessageListener() throws JMSRuntimeException {
+ try {
+ return activemqMessageConsumer.getMessageListener();
+ } catch (JMSException e) {
+ throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+ }
+ }
+
+ @Override
+ public void setMessageListener(MessageListener listener) throws JMSRuntimeException {
+ try {
+ activemqMessageConsumer.setMessageListener(listener);
+ } catch (JMSException e) {
+ throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+ }
+ }
+
+ @Override
+ public Message receive() {
+ try {
+ return activemqMessageConsumer.receive();
+ } catch (JMSException e) {
+ throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+ }
+ }
+
+ @Override
+ public Message receive(long timeout) {
+ try {
+ return activemqMessageConsumer.receive(timeout);
+ } catch (JMSException e) {
+ throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+ }
+ }
+
+ @Override
+ public Message receiveNoWait() {
+ try {
+ return activemqMessageConsumer.receiveNoWait();
+ } catch (JMSException e) {
+ throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+ }
+ }
+
+ @Override
+ public void close() {
+ try {
+ activemqMessageConsumer.close();
+ } catch (JMSException e) {
+ throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+ }
+ }
+
+ @Override
+ public <T> T receiveBody(Class<T> c) {
+ throw new UnsupportedOperationException("receiveBody(Class<T>) is not supported");
+ }
+
+ @Override
+ public <T> T receiveBody(Class<T> c, long timeout) {
+ throw new UnsupportedOperationException("receiveBody(Class<T>, long) is not supported");
+ }
+
+ @Override
+ public <T> T receiveBodyNoWait(Class<T> c) {
+ throw new UnsupportedOperationException("receiveBodyNoWait(Class<T>) is not supported");
+ }
+
+}
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQContext.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQContext.java
new file mode 100644
index 0000000..4966d09
--- /dev/null
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQContext.java
@@ -0,0 +1,560 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.io.Serializable;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.jms.BytesMessage;
+import javax.jms.ConnectionMetaData;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.IllegalStateRuntimeException;
+import javax.jms.JMSConsumer;
+import javax.jms.JMSContext;
+import javax.jms.JMSException;
+import javax.jms.JMSProducer;
+import javax.jms.JMSRuntimeException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.Session;
+import javax.jms.StreamMessage;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import org.apache.activemq.util.JMSExceptionSupport;
+
+/**
+ * In terms of the JMS 1.1 API a JMSContext should be thought of as
+ * representing both a Connection and a Session. Although the simplified
+ * API removes the need for applications to use those objects, the concepts
+ * of connection and session remain important. A connection represents a
+ * physical link to the JMS server and a session represents a
+ * single-threaded context for sending and receiving messages.
+ *
+ *
+ * @see javax.jms.JMSContext
+ */
+
+public class ActiveMQContext implements JMSContext {
+
+ private static final boolean DEFAULT_AUTO_START = true;
+
+ private final ActiveMQConnection activemqConnection;
+ private final AtomicLong connectionCounter;
+ private ActiveMQSession activemqSession = null;
+
+ // Configuration
+ private boolean autoStart = DEFAULT_AUTO_START;
+ private final int sessionMode;
+
+ // State
+ private boolean closeInvoked = false;
+ private final AtomicBoolean startInvoked = new AtomicBoolean(false);
+ private ActiveMQMessageProducer activemqMessageProducer = null;
+
+ ActiveMQContext(final ActiveMQConnection activemqConnection) {
+ this.activemqConnection = activemqConnection;
+ this.sessionMode = AUTO_ACKNOWLEDGE;
+ this.connectionCounter = new AtomicLong(1l);
+ }
+
+ ActiveMQContext(final ActiveMQConnection activemqConnection, final int sessionMode) {
+ this.activemqConnection = activemqConnection;
+ this.sessionMode = sessionMode;
+ this.connectionCounter = new AtomicLong(1l);
+ }
+
+ private ActiveMQContext(final ActiveMQConnection activemqConnection, final int sessionMode, final AtomicLong connectionCounter) {
+ this.activemqConnection = activemqConnection;
+ this.sessionMode = sessionMode;
+ this.connectionCounter = connectionCounter;
+ }
+
+ @Override
+ public JMSContext createContext(int sessionMode) {
+ if(connectionCounter.get() == 0l) {
+ throw new JMSRuntimeException("Context already closed");
+ }
+
+ connectionCounter.incrementAndGet();
+ return new ActiveMQContext(activemqConnection, sessionMode, connectionCounter);
+ }
+
+ @Override
+ public JMSProducer createProducer() {
+ return new ActiveMQProducer(this, getCreatedActiveMQMessageProducer());
+ }
+
+ @Override
+ public String getClientID() {
+ try {
+ return this.activemqConnection.getClientID();
+ } catch (JMSException e) {
+ throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+ }
+ }
+
+ @Override
+ public void setClientID(String clientID) {
+ try {
+ this.activemqConnection.setClientID(clientID);
+ } catch (JMSException e) {
+ throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+ }
+ }
+
+ @Override
+ public ConnectionMetaData getMetaData() {
+ checkContextState();
+ try {
+ return this.activemqConnection.getMetaData();
+ } catch (JMSException e) {
+ throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+ }
+ }
+
+ @Override
+ public ExceptionListener getExceptionListener() {
+ checkContextState();
+ try {
+ return this.activemqConnection.getExceptionListener();
+ } catch (JMSException e) {
+ throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+ }
+ }
+
+ @Override
+ public void setExceptionListener(ExceptionListener listener) {
+ checkContextState();
+ try {
+ this.activemqConnection.setExceptionListener(listener);
+ } catch (JMSException e) {
+ throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+ }
+ }
+
+ @Override
+ public void start() {
+ checkContextState();
+ try {
+ if(startInvoked.compareAndSet(false, true)) {
+ this.activemqConnection.start();
+ }
+ } catch (JMSException e) {
+ throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+ }
+ }
+
+ @Override
+ public void stop() {
+ checkContextState();
+ try {
+ if(startInvoked.compareAndSet(true, false)) {
+ this.activemqConnection.stop();
+ }
+ } catch (JMSException e) {
+ throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+ }
+ }
+
+ @Override
+ public void setAutoStart(boolean autoStart) {
+ this.autoStart = autoStart;
+ }
+
+ @Override
+ public boolean getAutoStart() {
+ return this.autoStart;
+ }
+
+ @Override
+ public synchronized void close() {
+ JMSRuntimeException firstException = null;
+
+ if(this.activemqMessageProducer != null) {
+ try {
+ this.activemqMessageProducer.close();
+ } catch (JMSException e) {
+ if(firstException == null) {
+ firstException = JMSExceptionSupport.convertToJMSRuntimeException(e);
+ }
+ }
+ }
+
+ if(this.activemqSession != null) {
+ try {
+ this.activemqSession.close();
+ } catch (JMSException e) {
+ if(firstException == null) {
+ firstException = JMSExceptionSupport.convertToJMSRuntimeException(e);
+ }
+ }
+ }
+
+ if(connectionCounter.decrementAndGet() == 0) {
+ if(this.activemqConnection != null) {
+ try {
+ closeInvoked = true;
+ this.activemqConnection.close();
+ } catch (JMSException e) {
+ if(firstException == null) {
+ firstException = JMSExceptionSupport.convertToJMSRuntimeException(e);
+ }
+ }
+ }
+ }
+
+ if(firstException != null) {
+ throw firstException;
+ }
+ }
+
+ @Override
+ public BytesMessage createBytesMessage() {
+ checkContextState();
+ try {
+ return activemqSession.createBytesMessage();
+ } catch (JMSException e) {
+ throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+ }
+ }
+
+ @Override
+ public MapMessage createMapMessage() {
+ checkContextState();
+ try {
+ return activemqSession.createMapMessage();
+ } catch (JMSException e) {
+ throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+ }
+ }
+
+ @Override
+ public Message createMessage() {
+ checkContextState();
+ try {
+ return activemqSession.createMessage();
+ } catch (JMSException e) {
+ throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+ }
+ }
+
+ @Override
+ public ObjectMessage createObjectMessage() {
+ checkContextState();
+ try {
+ return activemqSession.createObjectMessage();
+ } catch (JMSException e) {
+ throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+ }
+ }
+
+ @Override
+ public ObjectMessage createObjectMessage(Serializable object) {
+ checkContextState();
+ try {
+ return activemqSession.createObjectMessage(object);
+ } catch (JMSException e) {
+ throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+ }
+ }
+
+ @Override
+ public StreamMessage createStreamMessage() {
+ checkContextState();
+ try {
+ return activemqSession.createStreamMessage();
+ } catch (JMSException e) {
+ throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+ }
+ }
+
+ @Override
+ public TextMessage createTextMessage() {
+ checkContextState();
+ try {
+ return activemqSession.createTextMessage();
+ } catch (JMSException e) {
+ throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+ }
+ }
+
+ @Override
+ public TextMessage createTextMessage(String text) {
+ checkContextState();
+ try {
+ return activemqSession.createTextMessage(text);
+ } catch (JMSException e) {
+ throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+ }
+ }
+
+ @Override
+ public boolean getTransacted() {
+ checkContextState();
+ try {
+ return activemqSession.getTransacted();
+ } catch (JMSException e) {
+ throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+ }
+ }
+
+ @Override
+ public int getSessionMode() {
+ return this.sessionMode;
+ }
+
+ @Override
+ public void commit() {
+ checkContextState();
+ try {
+ activemqSession.commit();
+ } catch (JMSException e) {
+ throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+ }
+ }
+
+ @Override
+ public void rollback() {
+ checkContextState();
+ try {
+ activemqSession.rollback();
+ } catch (JMSException e) {
+ throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+ }
+ }
+
+ @Override
+ public void recover() {
+ checkContextState();
+ try {
+ activemqSession.recover();
+ } catch (JMSException e) {
+ throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+ }
+ }
+
+ @Override
+ public JMSConsumer createConsumer(Destination destination) {
+ checkContextState();
+ try {
+ if(getAutoStart()) {
+ start();
+ }
+ return new ActiveMQConsumer(this, activemqSession.createConsumer(destination));
+ } catch (JMSException e) {
+ throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+ }
+ }
+
+ @Override
+ public JMSConsumer createConsumer(Destination destination, String messageSelector) {
+ checkContextState();
+ try {
+ if(getAutoStart()) {
+ start();
+ }
+ return new ActiveMQConsumer(this, activemqSession.createConsumer(destination, messageSelector));
+ } catch (JMSException e) {
+ throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+ }
+ }
+
+ @Override
+ public JMSConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) {
+ checkContextState();
+ try {
+ if(getAutoStart()) {
+ start();
+ }
+ return new ActiveMQConsumer(this, activemqSession.createConsumer(destination, messageSelector, noLocal));
+ } catch (JMSException e) {
+ throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+ }
+ }
+
+ @Override
+ public Queue createQueue(String queueName) {
+ checkContextState();
+ try {
+ return activemqSession.createQueue(queueName);
+ } catch (JMSException e) {
+ throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+ }
+ }
+
+ @Override
+ public Topic createTopic(String topicName) {
+ checkContextState();
+ try {
+ return activemqSession.createTopic(topicName);
+ } catch (JMSException e) {
+ throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+ }
+ }
+
+ @Override
+ public JMSConsumer createDurableConsumer(Topic topic, String name) {
+ checkContextState();
+ try {
+ if(getAutoStart()) {
+ start();
+ }
+ return new ActiveMQConsumer(this, activemqSession.createDurableConsumer(topic, name));
+ } catch (JMSException e) {
+ throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+ }
+ }
+
+ @Override
+ public JMSConsumer createDurableConsumer(Topic topic, String name, String messageSelector, boolean noLocal) {
+ checkContextState();
+ try {
+ if(getAutoStart()) {
+ start();
+ }
+ return new ActiveMQConsumer(this, activemqSession.createDurableConsumer(topic, name, messageSelector, noLocal));
+ } catch (JMSException e) {
+ throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+ }
+ }
+
+ @Override
+ public JMSConsumer createSharedDurableConsumer(Topic topic, String name) {
+ throw new UnsupportedOperationException("createSharedDurableConsumer(topic, name) is not supported");
+ }
+
+ @Override
+ public JMSConsumer createSharedDurableConsumer(Topic topic, String name, String messageSelector) {
+ throw new UnsupportedOperationException("createDurableConsumer(topic, name, messageSelector) is not supported");
+ }
+
+ @Override
+ public JMSConsumer createSharedConsumer(Topic topic, String sharedSubscriptionName) {
+ throw new UnsupportedOperationException("createSharedConsumer(topic, sharedSubscriptionName) is not supported");
+ }
+
+ @Override
+ public JMSConsumer createSharedConsumer(Topic topic, String sharedSubscriptionName, String messageSelector) {
+ throw new UnsupportedOperationException("createSharedConsumer(topic, sharedSubscriptionName, messageSelector) is not supported");
+ }
+
+ @Override
+ public QueueBrowser createBrowser(Queue queue) {
+ checkContextState();
+ try {
+ if(getAutoStart()) {
+ start();
+ }
+ return activemqSession.createBrowser(queue);
+ } catch (JMSException e) {
+ throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+ }
+ }
+
+ @Override
+ public QueueBrowser createBrowser(Queue queue, String messageSelector) {
+ checkContextState();
+ try {
+ if(getAutoStart()) {
+ start();
+ }
+ return activemqSession.createBrowser(queue, messageSelector);
+ } catch (JMSException e) {
+ throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+ }
+ }
+
+ @Override
+ public TemporaryQueue createTemporaryQueue() {
+ checkContextState();
+ try {
+ return activemqSession.createTemporaryQueue();
+ } catch (JMSException e) {
+ throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+ }
+ }
+
+ @Override
+ public TemporaryTopic createTemporaryTopic() {
+ checkContextState();
+ try {
+ return activemqSession.createTemporaryTopic();
+ } catch (JMSException e) {
+ throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+ }
+ }
+
+ @Override
+ public void unsubscribe(String name) {
+ checkContextState();
+ try {
+ activemqSession.unsubscribe(name);
+ } catch (JMSException e) {
+ throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+ }
+ }
+
+ @Override
+ public void acknowledge() {
+ checkContextState();
+ try {
+ activemqSession.acknowledge();
+ } catch (JMSException e) {
+ throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+ }
+ }
+
+ private void checkContextState() {
+ if (activemqConnection == null) {
+ throw new JMSRuntimeException("Connection not available");
+ }
+
+ if (activemqSession == null) {
+ if (closeInvoked) {
+ throw new IllegalStateRuntimeException("Context is closed");
+ }
+ try {
+ Session jmsSession = activemqConnection.createSession(SESSION_TRANSACTED == sessionMode, sessionMode);
+ activemqSession = ActiveMQSession.class.cast(jmsSession);
+ } catch (JMSException e) {
+ throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+ }
+ }
+ }
+
+ private ActiveMQMessageProducer getCreatedActiveMQMessageProducer() {
+ checkContextState();
+
+ if (this.activemqMessageProducer == null) {
+ try {
+ this.activemqMessageProducer = new ActiveMQMessageProducer(activemqSession, activemqSession.getNextProducerId(), null, activemqConnection.getSendTimeout());
+ } catch (JMSException e) {
+ throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+ }
+ }
+ return this.activemqMessageProducer;
+ }
+
+}
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java
index 9fd2439..9e9ebd4 100644
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java
@@ -223,7 +223,7 @@
public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
this.send(destination, message, deliveryMode, priority, timeToLive, (AsyncCallback)null);
}
-
+
/**
*
* @param message the message to send
@@ -240,7 +240,6 @@
@Override
public void send(Message message, CompletionListener completionListener) throws JMSException {
throw new UnsupportedOperationException("send(Message, CompletionListener) is not supported");
-
}
@Override
@@ -287,6 +286,11 @@
}
public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, AsyncCallback onComplete) throws JMSException {
+ this.send(destination, message, deliveryMode, priority, timeToLive, getDisableMessageID(), getDisableMessageTimestamp(), onComplete);
+ }
+
+ public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, boolean disableMessageID, boolean disableMessageTimestamp, AsyncCallback onComplete) throws JMSException {
+
checkClosed();
if (destination == null) {
if (info.getDestination() == null) {
@@ -322,7 +326,7 @@
}
}
- this.session.send(this, dest, message, deliveryMode, priority, timeToLive, producerWindow, sendTimeout, onComplete);
+ this.session.send(this, dest, message, deliveryMode, priority, timeToLive, disableMessageID, disableMessageTimestamp, producerWindow, sendTimeout, onComplete);
stats.onMessage();
}
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducerSupport.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducerSupport.java
index b7dd6cf..accc7e5 100644
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducerSupport.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducerSupport.java
@@ -16,11 +16,15 @@
*/
package org.apache.activemq;
+import java.util.Set;
+
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.IllegalStateException;
+import javax.jms.IllegalStateRuntimeException;
import javax.jms.JMSException;
import javax.jms.Message;
+import javax.jms.MessageFormatRuntimeException;
import javax.jms.MessageProducer;
/**
@@ -82,13 +86,13 @@
* <P>
* Message IDs are enabled by default.
*
- * @param value indicates if message IDs are disabled
+ * @param disableMessageID indicates if message IDs are disabled
* @throws javax.jms.JMSException if the JMS provider fails to close the producer due to
* some internal error.
*/
- public void setDisableMessageID(boolean value) throws JMSException {
+ public void setDisableMessageID(boolean disableMessageID) throws JMSException {
checkClosed();
- this.disableMessageID = value;
+ this.disableMessageID = disableMessageID;
}
/**
@@ -118,13 +122,13 @@
* <P>
* Message timestamps are enabled by default.
*
- * @param value indicates if message timestamps are disabled
+ * @param disableMessageTimestamp indicates if message timestamps are disabled
* @throws javax.jms.JMSException if the JMS provider fails to close the producer due to
* some internal error.
*/
- public void setDisableMessageTimestamp(boolean value) throws JMSException {
+ public void setDisableMessageTimestamp(boolean disableMessageTimestamp) throws JMSException {
checkClosed();
- this.disableMessageTimestamp = value;
+ this.disableMessageTimestamp = disableMessageTimestamp;
}
/**
@@ -345,4 +349,46 @@
public void setSendTimeout(int sendTimeout) {
this.sendTimeout = sendTimeout;
}
+
+ // See JMS 2.0 spec sections 3.5.1 and 3.8.1.1
+ public static final Set<String> JMS_PROPERTY_NAMES_DISALLOWED = Set.of("JMSDeliveryMode", "JMSPriority", "JMSMessageID", "JMSTimestamp", "JMSCorrelationID", "JMSType", "NULL", "TRUE", "FALSE", "NOT", "AND", "OR", "BETWEEN", "LIKE", "IN", "IS", "ESCAPE");
+
+ public static void validateValidPropertyName(String propertyName) throws IllegalStateRuntimeException {
+ if(propertyName == null || propertyName.length() == 0) {
+ throw new IllegalArgumentException("Invalid JMS property name must not be null or empty");
+ }
+
+ if(JMS_PROPERTY_NAMES_DISALLOWED.contains(propertyName)) {
+ throw new IllegalArgumentException("Invalid JMS property: " + propertyName + " name is in disallowed list");
+ }
+
+ char first = propertyName.charAt(0);
+ if(!(Character.isJavaIdentifierStart(first))) {
+ throw new IllegalArgumentException("Invalid JMS property: " + propertyName + " name starts with invalid character: " + first);
+ }
+
+ for (int i=1; i < propertyName.length(); i++) {
+ char c = propertyName.charAt(i);
+ if (!(Character.isJavaIdentifierPart(c))) {
+ throw new IllegalArgumentException("Invalid JMS property: " + propertyName + " name contains invalid character: " + c);
+ }
+ }
+ }
+
+ public static void validateValidPropertyValue(String propertyName, Object propertyValue) throws IllegalStateRuntimeException {
+ boolean invalid = true;
+
+ if(propertyValue == null || propertyValue instanceof String ||
+ propertyValue instanceof Integer || propertyValue instanceof Short ||
+ propertyValue instanceof Float || propertyValue instanceof Long ||
+ propertyValue instanceof Boolean || propertyValue instanceof Byte ||
+ propertyValue instanceof Character || propertyValue instanceof Double) {
+ invalid = false;
+ }
+
+ if(invalid) {
+ throw new MessageFormatRuntimeException("Invalid JMS property: " + propertyName + " value class: " + propertyValue.getClass().getName() + " is not permitted by specification");
+ }
+ }
+
}
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageTransformation.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageTransformation.java
index 2d6094e..f456b5f 100644
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageTransformation.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageTransformation.java
@@ -16,6 +16,8 @@
*/
package org.apache.activemq;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
import java.net.MalformedURLException;
import java.util.Enumeration;
@@ -179,6 +181,7 @@
toMessage.setJMSReplyTo(transformDestination(fromMessage.getJMSReplyTo()));
toMessage.setJMSDestination(transformDestination(fromMessage.getJMSDestination()));
toMessage.setJMSDeliveryMode(fromMessage.getJMSDeliveryMode());
+ toMessage.setJMSDeliveryTime(getFromMessageDeliveryTime(fromMessage)); // TODO: AMQ-8500 DeliveryTime support ref: ActiveMQSession#send
toMessage.setJMSRedelivered(fromMessage.getJMSRedelivered());
toMessage.setJMSType(fromMessage.getJMSType());
toMessage.setJMSExpiration(fromMessage.getJMSExpiration());
@@ -193,4 +196,23 @@
toMessage.setObjectProperty(name, obj);
}
}
+
+ private static long getFromMessageDeliveryTime(Message fromMessage) throws JMSException {
+ Method deliveryTimeGetMethod = null;
+ try {
+ Class<?> clazz = fromMessage.getClass();
+ Method method = clazz.getMethod("getJMSDeliveryTime");
+ if (!Modifier.isAbstract(method.getModifiers())) {
+ deliveryTimeGetMethod = method;
+ }
+ } catch (NoSuchMethodException e) {
+ // We fallback to JMSTimestamp for jms v1.x
+ }
+
+ if (deliveryTimeGetMethod != null) {
+ return fromMessage.getJMSDeliveryTime();
+ }
+
+ return fromMessage.getJMSTimestamp();
+ }
}
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQProducer.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQProducer.java
new file mode 100644
index 0000000..7a6c621
--- /dev/null
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQProducer.java
@@ -0,0 +1,545 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import javax.jms.BytesMessage;
+import javax.jms.CompletionListener;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.IllegalStateRuntimeException;
+import javax.jms.JMSException;
+import javax.jms.JMSProducer;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageFormatRuntimeException;
+import javax.jms.ObjectMessage;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.util.JMSExceptionSupport;
+import org.apache.activemq.util.TypeConversionSupport;
+
+public class ActiveMQProducer implements JMSProducer {
+
+ private final ActiveMQContext activemqContext;
+ private final ActiveMQMessageProducer activemqMessageProducer;
+
+ // QoS override of defaults on a per-JMSProducer instance basis
+ private String correlationId = null;
+ private byte[] correlationIdBytes = null;
+ private Integer deliveryMode = null;
+ private Boolean disableMessageID = false;
+ private Boolean disableMessageTimestamp = false;
+ private Integer priority = null;
+ private Destination replyTo = null;
+ private Long timeToLive = null;
+ private String type = null;
+
+ // Properties applied to all messages on a per-JMS producer instance basis
+ private Map<String, Object> messageProperties = null;
+
+ ActiveMQProducer(ActiveMQContext activemqContext, ActiveMQMessageProducer activemqMessageProducer) {
+ this.activemqContext = activemqContext;
+ this.activemqMessageProducer = activemqMessageProducer;
+ }
+
+ @Override
+ public JMSProducer send(Destination destination, Message message) {
+ try {
+ if(this.correlationId != null) {
+ message.setJMSCorrelationID(this.correlationId);
+ }
+
+ if(this.correlationIdBytes != null) {
+ message.setJMSCorrelationIDAsBytes(this.correlationIdBytes);
+ }
+
+ if(this.replyTo != null) {
+ message.setJMSReplyTo(this.replyTo);
+ }
+
+ if(this.type != null) {
+ message.setJMSType(this.type);
+ }
+
+ if(messageProperties != null && !messageProperties.isEmpty()) {
+ for(Map.Entry<String, Object> propertyEntry : messageProperties.entrySet()) {
+ message.setObjectProperty(propertyEntry.getKey(), propertyEntry.getValue());
+ }
+ }
+
+ activemqMessageProducer.send(destination, message, getDeliveryMode(), getPriority(), getTimeToLive(), getDisableMessageID(), getDisableMessageTimestamp(), null);
+ } catch (JMSException e) {
+ throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+ }
+ return this;
+ }
+
+ @Override
+ public JMSProducer send(Destination destination, String body) {
+ TextMessage textMessage = activemqContext.createTextMessage(body);
+ send(destination, textMessage);
+ return this;
+ }
+
+ @Override
+ public JMSProducer send(Destination destination, Map<String, Object> body) {
+ MapMessage mapMessage = activemqContext.createMapMessage();
+
+ if (body != null) {
+ try {
+ for (Map.Entry<String, Object> mapEntry : body.entrySet()) {
+ final String key = mapEntry.getKey();
+ final Object value = mapEntry.getValue();
+ final Class<?> valueObject = value.getClass();
+ if (String.class.isAssignableFrom(valueObject)) {
+ mapMessage.setString(key, String.class.cast(value));
+ } else if (Integer.class.isAssignableFrom(valueObject)) {
+ mapMessage.setInt(key, Integer.class.cast(value));
+ } else if (Long.class.isAssignableFrom(valueObject)) {
+ mapMessage.setLong(key, Long.class.cast(value));
+ } else if (Double.class.isAssignableFrom(valueObject)) {
+ mapMessage.setDouble(key, Double.class.cast(value));
+ } else if (Boolean.class.isAssignableFrom(valueObject)) {
+ mapMessage.setBoolean(key, Boolean.class.cast(value));
+ } else if (Character.class.isAssignableFrom(valueObject)) {
+ mapMessage.setChar(key, Character.class.cast(value));
+ } else if (Short.class.isAssignableFrom(valueObject)) {
+ mapMessage.setShort(key, Short.class.cast(value));
+ } else if (Float.class.isAssignableFrom(valueObject)) {
+ mapMessage.setFloat(key, Float.class.cast(value));
+ } else if (Byte.class.isAssignableFrom(valueObject)) {
+ mapMessage.setByte(key, Byte.class.cast(value));
+ } else if (byte[].class.isAssignableFrom(valueObject)) {
+ byte[] array = byte[].class.cast(value);
+ mapMessage.setBytes(key, array, 0, array.length);
+ } else {
+ mapMessage.setObject(key, value);
+ }
+ }
+ } catch (JMSException e) {
+ throw new MessageFormatRuntimeException(e.getMessage());
+ }
+ }
+ send(destination, mapMessage);
+ return this;
+ }
+
+ @Override
+ public JMSProducer send(Destination destination, byte[] body) {
+ BytesMessage bytesMessage = activemqContext.createBytesMessage();
+
+ try {
+ if(body != null) {
+ bytesMessage.writeBytes(body);
+ }
+ send(destination, bytesMessage);
+ } catch (JMSException e) {
+ throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+ }
+ return this;
+ }
+
+ @Override
+ public JMSProducer send(Destination destination, Serializable body) {
+ ObjectMessage objectMessage = activemqContext.createObjectMessage(body);
+ send(destination, objectMessage);
+ return this;
+ }
+
+ @Override
+ public JMSProducer setDisableMessageID(boolean disableMessageID) {
+ this.disableMessageID = disableMessageID;
+ return this;
+ }
+
+ @Override
+ public boolean getDisableMessageID() {
+ if(this.disableMessageID != null) {
+ return this.disableMessageID;
+ }
+
+ try {
+ return this.activemqMessageProducer.getDisableMessageID();
+ } catch (JMSException e) {
+ throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+ }
+ }
+
+ @Override
+ public JMSProducer setDisableMessageTimestamp(boolean disableMessageTimestamp) {
+ this.disableMessageTimestamp = disableMessageTimestamp;
+ return this;
+ }
+
+ @Override
+ public boolean getDisableMessageTimestamp() {
+ if(this.disableMessageTimestamp != null) {
+ return this.disableMessageTimestamp;
+ }
+
+ try {
+ return this.activemqMessageProducer.getDisableMessageTimestamp();
+ } catch (JMSException e) {
+ throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+ }
+ }
+
+ @Override
+ public JMSProducer setDeliveryMode(int deliveryMode) {
+ if (deliveryMode != DeliveryMode.PERSISTENT && deliveryMode != DeliveryMode.NON_PERSISTENT) {
+ throw new IllegalStateRuntimeException("unknown delivery mode: " + deliveryMode);
+ }
+ this.deliveryMode = deliveryMode;
+ return this;
+ }
+
+ @Override
+ public int getDeliveryMode() {
+ if(deliveryMode != null) {
+ return deliveryMode;
+ }
+
+ try {
+ return this.activemqMessageProducer.getDeliveryMode();
+ } catch (JMSException e) {
+ throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+ }
+ }
+
+ @Override
+ public JMSProducer setPriority(int priority) {
+ if (priority < 0 || priority > 9) {
+ throw new IllegalStateRuntimeException("default priority must be a value between 0 and 9");
+ }
+ this.priority = priority;
+ return this;
+ }
+
+ @Override
+ public int getPriority() {
+ if(priority != null) {
+ return priority;
+ }
+
+ try {
+ return this.activemqMessageProducer.getPriority();
+ } catch (JMSException e) {
+ throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+ }
+ }
+
+ @Override
+ public JMSProducer setTimeToLive(long timeToLive) {
+ this.timeToLive = timeToLive;
+ return this;
+ }
+
+ @Override
+ public long getTimeToLive() {
+ if(timeToLive != null) {
+ return timeToLive;
+ }
+
+ try {
+ return this.activemqMessageProducer.getTimeToLive();
+ } catch (JMSException e) {
+ throw JMSExceptionSupport.convertToJMSRuntimeException(e);
+ }
+ }
+
+ @Override
+ public JMSProducer setDeliveryDelay(long deliveryDelay) {
+ throw new UnsupportedOperationException("setDeliveryDelay(long) is not supported");
+ }
+
+ @Override
+ public long getDeliveryDelay() {
+ throw new UnsupportedOperationException("getDeliveryDelay() is not supported");
+ }
+
+ @Override
+ public JMSProducer setAsync(CompletionListener completionListener) {
+ throw new UnsupportedOperationException("setAsync(CompletionListener) is not supported");
+ }
+
+ @Override
+ public CompletionListener getAsync() {
+ throw new UnsupportedOperationException("getAsync() is not supported");
+ }
+
+ @Override
+ public JMSProducer setProperty(String name, boolean value) {
+ checkProperty(name, value);
+ getCreatedMessageProperties().put(name, value);
+ return this;
+ }
+
+ @Override
+ public JMSProducer setProperty(String name, byte value) {
+ checkProperty(name, value);
+ getCreatedMessageProperties().put(name, value);
+ return this;
+ }
+
+ @Override
+ public JMSProducer setProperty(String name, short value) {
+ checkProperty(name, value);
+ getCreatedMessageProperties().put(name, value);
+ return this;
+ }
+
+ @Override
+ public JMSProducer setProperty(String name, int value) {
+ checkProperty(name, value);
+ getCreatedMessageProperties().put(name, value);
+ return this;
+ }
+
+ @Override
+ public JMSProducer setProperty(String name, long value) {
+ checkProperty(name, value);
+ getCreatedMessageProperties().put(name, value);
+ return this;
+ }
+
+ @Override
+ public JMSProducer setProperty(String name, float value) {
+ checkProperty(name, value);
+ getCreatedMessageProperties().put(name, value);
+ return this;
+ }
+
+ @Override
+ public JMSProducer setProperty(String name, double value) {
+ checkProperty(name, value);
+ getCreatedMessageProperties().put(name, value);
+ return this;
+ }
+
+ @Override
+ public JMSProducer setProperty(String name, String value) {
+ checkProperty(name, value);
+ getCreatedMessageProperties().put(name, value);
+ return this;
+ }
+
+ @Override
+ public JMSProducer setProperty(String name, Object value) {
+ checkProperty(name, value);
+ getCreatedMessageProperties().put(name, value);
+ return this;
+ }
+
+ @Override
+ public JMSProducer clearProperties() {
+ getCreatedMessageProperties().clear();
+ return this;
+ }
+
+ @Override
+ public boolean propertyExists(String name) {
+ if(name == null || name.isEmpty()) {
+ return false;
+ }
+ return getCreatedMessageProperties().containsKey(name);
+ }
+
+ @Override
+ public boolean getBooleanProperty(String name) {
+ Object value = getCreatedMessageProperties().get(name);
+ if (value == null) {
+ throw new NullPointerException("property " + name + " was null");
+ }
+ Boolean rc = (Boolean)TypeConversionSupport.convert(value, Boolean.class);
+ if (rc == null) {
+ throw new MessageFormatRuntimeException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a boolean");
+ }
+ return rc.booleanValue();
+ }
+
+ @Override
+ public byte getByteProperty(String name) {
+ Object value = getCreatedMessageProperties().get(name);
+ if (value == null) {
+ throw new NullPointerException("property " + name + " was null");
+ }
+ Byte rc = (Byte)TypeConversionSupport.convert(value, Byte.class);
+ if (rc == null) {
+ throw new MessageFormatRuntimeException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a byte");
+ }
+ return rc.byteValue();
+ }
+
+ @Override
+ public short getShortProperty(String name) {
+ Object value = getCreatedMessageProperties().get(name);
+ if (value == null) {
+ throw new NullPointerException("property " + name + " was null");
+ }
+ Short rc = (Short)TypeConversionSupport.convert(value, Short.class);
+ if (rc == null) {
+ throw new MessageFormatRuntimeException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a short");
+ }
+ return rc.shortValue();
+ }
+
+ @Override
+ public int getIntProperty(String name) {
+ Object value = getCreatedMessageProperties().get(name);
+ if (value == null) {
+ throw new NullPointerException("property " + name + " was null");
+ }
+ Integer rc = (Integer)TypeConversionSupport.convert(value, Integer.class);
+ if (rc == null) {
+ throw new MessageFormatRuntimeException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as an integer");
+ }
+ return rc.intValue();
+ }
+
+ @Override
+ public long getLongProperty(String name) {
+ Object value = getCreatedMessageProperties().get(name);
+ if (value == null) {
+ throw new NullPointerException("property " + name + " was null");
+ }
+ Long rc = (Long)TypeConversionSupport.convert(value, Long.class);
+ if (rc == null) {
+ throw new MessageFormatRuntimeException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a long");
+ }
+ return rc.longValue();
+ }
+
+ @Override
+ public float getFloatProperty(String name) {
+ Object value = getCreatedMessageProperties().get(name);
+ if (value == null) {
+ throw new NullPointerException("property " + name + " was null");
+ }
+ Float rc = (Float)TypeConversionSupport.convert(value, Float.class);
+ if (rc == null) {
+ throw new MessageFormatRuntimeException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a float");
+ }
+ return rc.floatValue();
+ }
+
+ @Override
+ public double getDoubleProperty(String name) {
+ Object value = getCreatedMessageProperties().get(name);
+ if (value == null) {
+ throw new NullPointerException("property " + name + " was null");
+ }
+ Double rc = (Double)TypeConversionSupport.convert(value, Double.class);
+ if (rc == null) {
+ throw new MessageFormatRuntimeException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a double");
+ }
+ return rc.doubleValue();
+ }
+
+ @Override
+ public String getStringProperty(String name) {
+ Object value = getCreatedMessageProperties().get(name);
+ if (value == null) {
+ throw new NullPointerException("property " + name + " was null");
+ }
+ String rc = (String)TypeConversionSupport.convert(value, String.class);
+ if (rc == null) {
+ throw new MessageFormatRuntimeException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a string");
+ }
+ return rc;
+ }
+
+ @Override
+ public Object getObjectProperty(String name) {
+ if (name == null) {
+ throw new NullPointerException("Property name cannot be null");
+ }
+
+ // TODO: Update PropertyExpression to handle converting message headers to properties for JMSProducer.
+ //PropertyExpression expression = new PropertyExpression(name);
+ //return expression.evaluate(this);
+ return getCreatedMessageProperties().get(name);
+ }
+
+ @Override
+ public Set<String> getPropertyNames() {
+ return getCreatedMessageProperties().keySet();
+ }
+
+ @Override
+ public JMSProducer setJMSCorrelationIDAsBytes(byte[] correlationID) {
+ if(correlationID != null) {
+ this.correlationIdBytes = Arrays.copyOf(correlationID, correlationID.length);
+ }
+ return this;
+ }
+
+ @Override
+ public byte[] getJMSCorrelationIDAsBytes() {
+ return this.correlationIdBytes;
+ }
+
+ @Override
+ public JMSProducer setJMSCorrelationID(String correlationID) {
+ this.correlationId = correlationID;
+ return this;
+ }
+
+ @Override
+ public String getJMSCorrelationID() {
+ return this.correlationId;
+ }
+
+ @Override
+ public JMSProducer setJMSType(String type) {
+ this.type = type;
+ return this;
+ }
+
+ @Override
+ public String getJMSType() {
+ return this.type;
+ }
+
+ @Override
+ public JMSProducer setJMSReplyTo(Destination replyTo) {
+ this.replyTo = replyTo;
+ return this;
+ }
+
+ @Override
+ public Destination getJMSReplyTo() {
+ return replyTo;
+ }
+
+ private void checkProperty(String name, Object value) {
+ ActiveMQMessageProducerSupport.validateValidPropertyName(name);
+ ActiveMQMessageProducerSupport.validateValidPropertyValue(name, value);
+ }
+
+ private Map<String, Object> getCreatedMessageProperties() {
+ if(messageProperties == null) {
+ messageProperties = new HashMap<>();
+ }
+ return messageProperties;
+ }
+}
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
index 2ba1fe1..66f7347 100644
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
@@ -19,6 +19,8 @@
import java.io.File;
import java.io.InputStream;
import java.io.Serializable;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
import java.net.URL;
import java.util.Collections;
import java.util.Iterator;
@@ -1394,12 +1396,14 @@
@Override
public MessageConsumer createDurableConsumer(Topic topic, String name) throws JMSException {
- throw new UnsupportedOperationException("createDurableConsumer(Topic, name) is not supported");
+ checkClosed();
+ return createDurableSubscriber(topic, name, null, false);
}
@Override
public MessageConsumer createDurableConsumer(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException {
- throw new UnsupportedOperationException("createDurableConsumer(Topic, name, messageSelector, noLocal) is not supported");
+ checkClosed();
+ return createDurableSubscriber(topic, name, messageSelector, noLocal);
}
@Override
@@ -1939,6 +1943,26 @@
*/
protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive,
MemoryUsage producerWindow, int sendTimeout, AsyncCallback onComplete) throws JMSException {
+ send(producer, destination, message, deliveryMode, priority, timeToLive, producer.getDisableMessageID(), producer.getDisableMessageID(), producerWindow, sendTimeout, onComplete);
+ }
+
+ /**
+ * Sends the message for dispatch by the broker.
+ *
+ * @param producer - message producer.
+ * @param destination - message destination.
+ * @param message - message to be sent.
+ * @param deliveryMode - JMS message delivery mode.
+ * @param priority - message priority.
+ * @param timeToLive - message expiration.
+ * @param disableTimestamp - disable timestamp.
+ * @param disableMessageID - optionally, disable messageID.
+ * @param producerWindow
+ * @param onComplete
+ * @throws JMSException
+ */
+ protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive,
+ boolean disableMessageID, boolean disableMessageTimestamp, MemoryUsage producerWindow, int sendTimeout, AsyncCallback onComplete) throws JMSException {
checkClosed();
if (destination.isTemporary() && connection.isDeleted(destination)) {
@@ -1956,12 +1980,22 @@
//Set the "JMS" header fields on the original message, see 1.1 spec section 3.4.11
message.setJMSDeliveryMode(deliveryMode);
long expiration = 0L;
- if (!producer.getDisableMessageTimestamp()) {
- long timeStamp = System.currentTimeMillis();
+ long timeStamp = System.currentTimeMillis();
+ if (timeToLive > 0) {
+ expiration = timeToLive + timeStamp;
+ }
+
+ // TODO: AMQ-8500 - update this when openwire supports JMSDeliveryTime
+ // ref: ActiveMQMessageTransformation#copyProperties
+ if(!(message instanceof ActiveMQMessage)) {
+ setForeignMessageDeliveryTime(message, timeStamp);
+ } else {
+ message.setJMSDeliveryTime(timeStamp);
+ }
+ if (!disableMessageTimestamp && !producer.getDisableMessageTimestamp()) {
message.setJMSTimestamp(timeStamp);
- if (timeToLive > 0) {
- expiration = timeToLive + timeStamp;
- }
+ } else {
+ message.setJMSTimestamp(0l);
}
message.setJMSExpiration(expiration);
message.setJMSPriority(priority);
@@ -2287,4 +2321,22 @@
protected ThreadPoolExecutor getConnectionExecutor() {
return this.connectionExecutor;
}
+
+ private static void setForeignMessageDeliveryTime(final Message foreignMessage, final long deliveryTime) throws JMSException {
+ // Check for JMS v2 message via presence of setJMSDeliveryTime
+ Method deliveryTimeMethod = null;
+ try {
+ Class<?> clazz = foreignMessage.getClass();
+ Method method = clazz.getMethod("setJMSDeliveryTime", new Class[] { long.class });
+ if (!Modifier.isAbstract(method.getModifiers())) {
+ deliveryTimeMethod = method;
+ }
+ } catch (NoSuchMethodException e) {
+ // non-JMS v2 message
+ }
+
+ if (deliveryTimeMethod != null) {
+ foreignMessage.setJMSDeliveryTime(deliveryTime);
+ }
+ }
}
diff --git a/activemq-client/src/main/java/org/apache/activemq/util/JMSExceptionSupport.java b/activemq-client/src/main/java/org/apache/activemq/util/JMSExceptionSupport.java
index a73f01f..41711ca 100644
--- a/activemq-client/src/main/java/org/apache/activemq/util/JMSExceptionSupport.java
+++ b/activemq-client/src/main/java/org/apache/activemq/util/JMSExceptionSupport.java
@@ -16,10 +16,21 @@
*/
package org.apache.activemq.util;
+import javax.jms.IllegalStateRuntimeException;
+import javax.jms.InvalidClientIDRuntimeException;
+import javax.jms.InvalidDestinationRuntimeException;
+import javax.jms.InvalidSelectorRuntimeException;
import javax.jms.JMSException;
+import javax.jms.JMSRuntimeException;
import javax.jms.JMSSecurityException;
+import javax.jms.JMSSecurityRuntimeException;
import javax.jms.MessageEOFException;
import javax.jms.MessageFormatException;
+import javax.jms.MessageFormatRuntimeException;
+import javax.jms.MessageNotWriteableRuntimeException;
+import javax.jms.ResourceAllocationRuntimeException;
+import javax.jms.TransactionInProgressRuntimeException;
+import javax.jms.TransactionRolledBackRuntimeException;
import org.apache.activemq.MaxFrameSizeExceededException;
@@ -105,4 +116,38 @@
exception.initCause(cause);
return exception;
}
+
+ public static JMSRuntimeException convertToJMSRuntimeException(JMSException e) {
+ if (e instanceof javax.jms.IllegalStateException) {
+ return new IllegalStateRuntimeException(e.getMessage(), e.getErrorCode(), e);
+ }
+ if (e instanceof javax.jms.InvalidClientIDException) {
+ return new InvalidClientIDRuntimeException(e.getMessage(), e.getErrorCode(), e);
+ }
+ if (e instanceof javax.jms.InvalidDestinationException) {
+ return new InvalidDestinationRuntimeException(e.getMessage(), e.getErrorCode(), e);
+ }
+ if (e instanceof javax.jms.InvalidSelectorException) {
+ return new InvalidSelectorRuntimeException(e.getMessage(), e.getErrorCode(), e);
+ }
+ if (e instanceof javax.jms.JMSSecurityException) {
+ return new JMSSecurityRuntimeException(e.getMessage(), e.getErrorCode(), e);
+ }
+ if (e instanceof javax.jms.MessageFormatException) {
+ return new MessageFormatRuntimeException(e.getMessage(), e.getErrorCode(), e);
+ }
+ if (e instanceof javax.jms.MessageNotWriteableException) {
+ return new MessageNotWriteableRuntimeException(e.getMessage(), e.getErrorCode(), e);
+ }
+ if (e instanceof javax.jms.ResourceAllocationException) {
+ return new ResourceAllocationRuntimeException(e.getMessage(), e.getErrorCode(), e);
+ }
+ if (e instanceof javax.jms.TransactionInProgressException) {
+ return new TransactionInProgressRuntimeException(e.getMessage(), e.getErrorCode(), e);
+ }
+ if (e instanceof javax.jms.TransactionRolledBackException) {
+ return new TransactionRolledBackRuntimeException(e.getMessage(), e.getErrorCode(), e);
+ }
+ return new JMSRuntimeException(e.getMessage(), e.getErrorCode(), e);
+ }
}
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2AckModesTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2AckModesTest.java
new file mode 100644
index 0000000..25e45ec
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2AckModesTest.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.jms2;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSConsumer;
+import javax.jms.JMSContext;
+import javax.jms.Message;
+
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.broker.jmx.DestinationViewMBean;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.util.Wait;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(value = Parameterized.class)
+public class ActiveMQJMS2AckModesTest extends ActiveMQJMS2TestBase {
+
+ private final String destinationName;
+ private final String destinationType;
+ private final int ackMode;
+ private final String messagePayload;
+
+ public ActiveMQJMS2AckModesTest(String destinationType, int ackMode) {
+ this.destinationName = "AMQ.JMS2.ACKMODE." + Integer.toString(ackMode) + destinationType.toUpperCase();
+ this.destinationType = destinationType;
+ this.ackMode = ackMode;
+ this.messagePayload = "Test message destType: " + destinationType + " ackMode: " + Integer.toString(ackMode);
+ }
+
+ @Parameterized.Parameters(name="destinationType={0}, ackMode={1}")
+ public static Collection<Object[]> data() {
+ return Arrays.asList(new Object[][] {
+ {"queue", ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE },
+ {"queue", ActiveMQSession.AUTO_ACKNOWLEDGE },
+ {"queue", ActiveMQSession.CLIENT_ACKNOWLEDGE },
+ {"queue", ActiveMQSession.DUPS_OK_ACKNOWLEDGE },
+ {"queue", ActiveMQSession.SESSION_TRANSACTED },
+ {"topic", ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE },
+ {"topic", ActiveMQSession.AUTO_ACKNOWLEDGE },
+ {"topic", ActiveMQSession.CLIENT_ACKNOWLEDGE },
+ {"topic", ActiveMQSession.DUPS_OK_ACKNOWLEDGE },
+ {"topic", ActiveMQSession.SESSION_TRANSACTED },
+ {"temp-queue", ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE },
+ {"temp-queue", ActiveMQSession.AUTO_ACKNOWLEDGE },
+ {"temp-queue", ActiveMQSession.CLIENT_ACKNOWLEDGE },
+ {"temp-queue", ActiveMQSession.DUPS_OK_ACKNOWLEDGE },
+ {"temp-queue", ActiveMQSession.SESSION_TRANSACTED },
+ {"temp-topic", ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE },
+ {"temp-topic", ActiveMQSession.AUTO_ACKNOWLEDGE },
+ {"temp-topic", ActiveMQSession.CLIENT_ACKNOWLEDGE },
+ {"temp-topic", ActiveMQSession.DUPS_OK_ACKNOWLEDGE },
+ {"temp-topic", ActiveMQSession.SESSION_TRANSACTED }
+ });
+ }
+
+ @Test
+ public void testAcknowledgementMode() {
+
+ try(JMSContext jmsContext = activemqConnectionFactory.createContext(DEFAULT_JMS_USER, DEFAULT_JMS_PASS, ackMode)) {
+ assertNotNull(jmsContext);
+ Destination destination = ActiveMQJMS2TestSupport.generateDestination(jmsContext, destinationType, destinationName);
+ assertNotNull(destination);
+ JMSConsumer jmsConsumer = jmsContext.createConsumer(destination);
+ assertNotNull(jmsConsumer);
+ jmsContext.start();
+
+ Message message = ActiveMQJMS2TestSupport.generateMessage(jmsContext, "text", messagePayload);
+
+ List<String> sentMessageIds = new LinkedList<>();
+ for(int deliveryMode : Arrays.asList(DeliveryMode.NON_PERSISTENT, DeliveryMode.PERSISTENT)) {
+ MessageData sendMessageData = new MessageData();
+ sendMessageData.setMessage(message).setDeliveryMode(deliveryMode);
+ sentMessageIds.add(ActiveMQJMS2TestSupport.sendMessage(jmsContext, destination, sendMessageData));
+ }
+
+ // For session transacted ack after all messages are sent
+ switch(ackMode) {
+ case ActiveMQSession.SESSION_TRANSACTED: jmsContext.commit(); break;
+ default: break;
+ }
+
+ Message recvMessage = null;
+ Message lastRecvMessage = null;
+ List<Message> recvMessages = new LinkedList<>();
+
+ recvMessage = jmsConsumer.receive(2000l);
+
+ while (recvMessage != null) {
+ recvMessages.add(recvMessage);
+ lastRecvMessage = recvMessage;
+ switch(ackMode) {
+ case ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE: lastRecvMessage.acknowledge(); break;
+ default: break;
+ }
+
+ recvMessage = jmsConsumer.receive(500l);
+ }
+
+ assertEquals(Integer.valueOf(2), Integer.valueOf(recvMessages.size()));
+
+ switch(ackMode) {
+ case ActiveMQSession.CLIENT_ACKNOWLEDGE: lastRecvMessage.acknowledge(); break;
+ case ActiveMQSession.SESSION_TRANSACTED: jmsContext.commit(); break;
+ default: break;
+ }
+ jmsConsumer.close();
+
+ int foundCount = 0;
+ for(int validDeliveryMode : Arrays.asList(DeliveryMode.NON_PERSISTENT, DeliveryMode.PERSISTENT)) {
+ for(javax.jms.Message tmpMessage : recvMessages) {
+ if(tmpMessage.getJMSDeliveryMode() == validDeliveryMode) {
+ MessageData messageData = new MessageData();
+ messageData.setMessageType("text").setMessagePayload(messagePayload).setDeliveryMode(validDeliveryMode);
+ ActiveMQJMS2TestSupport.validateMessageData(tmpMessage, messageData);
+ foundCount++;
+ }
+ }
+ }
+ assertEquals(Integer.valueOf(2), Integer.valueOf(foundCount));
+
+ DestinationViewMBean destinationViewMBean = getDestinationViewMBean(destinationType, (ActiveMQDestination)destination);
+ assertTrue("QueueSize = 0 expected", Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return destinationViewMBean.getQueueSize() == 0l;
+ }
+ }, 5000l, 10l));
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+ }
+
+
+}
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2ContextTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2ContextTest.java
new file mode 100644
index 0000000..4891603
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2ContextTest.java
@@ -0,0 +1,360 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.jms2;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.Enumeration;
+
+import javax.jms.CompletionListener;
+import javax.jms.Destination;
+import javax.jms.JMSConsumer;
+import javax.jms.JMSContext;
+import javax.jms.JMSException;
+import javax.jms.JMSProducer;
+import javax.jms.JMSRuntimeException;
+import javax.jms.Message;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import org.apache.activemq.ActiveMQContext;
+import org.junit.Test;
+
+public class ActiveMQJMS2ContextTest extends ActiveMQJMS2TestBase {
+
+ @Test
+ public void testConnectionFactoryCreateContext() {
+ try(JMSContext jmsContext = activemqConnectionFactory.createContext()) {
+ assertNotNull(jmsContext);
+ jmsContext.start();
+ assertTrue(ActiveMQContext.class.isAssignableFrom(jmsContext.getClass()));
+ Destination destination = jmsContext.createQueue(methodNameDestinationName);
+ sendMessage(jmsContext, destination, "Test-" + methodNameDestinationName);
+ recvMessage(jmsContext, destination, "Test-" + methodNameDestinationName);
+ } catch (JMSException e) {
+ fail(e.getMessage());
+ }
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testConnectionFactoryCreateContextSession() {
+ activemqConnectionFactory.createContext(Session.AUTO_ACKNOWLEDGE);
+ }
+
+ @Test
+ public void testConnectionFactoryCreateContextUserPass() {
+ try(JMSContext jmsContext = activemqConnectionFactory.createContext(DEFAULT_JMS_USER, DEFAULT_JMS_PASS)) {
+ assertNotNull(jmsContext);
+ jmsContext.start();
+ assertTrue(ActiveMQContext.class.isAssignableFrom(jmsContext.getClass()));
+ Destination destination = jmsContext.createQueue(methodNameDestinationName);
+ sendMessage(jmsContext, destination, "Test-" + methodNameDestinationName);
+ recvMessage(jmsContext, destination, "Test-" + methodNameDestinationName);
+ } catch (JMSException e) {
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testConnectionFactoryCreateContextUserPassSession() {
+ try(JMSContext jmsContext = activemqConnectionFactory.createContext(DEFAULT_JMS_USER, DEFAULT_JMS_PASS, Session.AUTO_ACKNOWLEDGE)) {
+ assertNotNull(jmsContext);
+ assertTrue(ActiveMQContext.class.isAssignableFrom(jmsContext.getClass()));
+ }
+ }
+
+ @Test
+ public void testConnectionFactoryCreateContexMultiContext() {
+ JMSContext secondJMSContext = null;
+ JMSContext thirdJMSContext = null;
+
+ try(JMSContext jmsContext = activemqConnectionFactory.createContext(DEFAULT_JMS_USER, DEFAULT_JMS_PASS)) {
+ assertNotNull(jmsContext);
+ jmsContext.start();
+ assertTrue(ActiveMQContext.class.isAssignableFrom(jmsContext.getClass()));
+
+ Destination testDestination = jmsContext.createQueue(methodNameDestinationName);
+ sendMessage(jmsContext, testDestination, "Test-" + methodNameDestinationName);
+ recvMessage(jmsContext, testDestination, "Test-" + methodNameDestinationName);
+
+ secondJMSContext = jmsContext.createContext(Session.AUTO_ACKNOWLEDGE);
+ } catch (JMSException e) {
+ fail(e.getMessage());
+ }
+
+ // First context closed
+ String secondTestDestinationName = methodNameDestinationName + ".SECOND";
+ Destination secondTestDestination = secondJMSContext.createQueue(secondTestDestinationName);
+
+ try {
+ sendMessage(secondJMSContext, secondTestDestination, "Test-" + methodNameDestinationName);
+ recvMessage(secondJMSContext, secondTestDestination, "Test-" + methodNameDestinationName);
+ } catch (JMSException e) {
+ fail(e.getMessage());
+ } finally {
+ if(secondJMSContext != null) {
+ try { secondJMSContext.close(); } catch (JMSRuntimeException e) { fail(e.getMessage()); }
+ }
+ }
+
+ // Attempt to obtain a third context after all contexts have been closed
+ boolean caught = false;
+ try {
+ thirdJMSContext = secondJMSContext.createContext(Session.AUTO_ACKNOWLEDGE);
+ fail("JMSRuntimeException expected");
+ } catch (JMSRuntimeException e) {
+ assertNull(thirdJMSContext);
+ caught = true;
+ assertEquals("Context already closed", e.getMessage());
+ }
+ assertTrue(caught);
+ }
+
+ @Test
+ public void testConnectionFactoryCreateContextBrowse() {
+ try(JMSContext jmsContext = activemqConnectionFactory.createContext()) {
+ assertNotNull(jmsContext);
+ jmsContext.start();
+ assertTrue(ActiveMQContext.class.isAssignableFrom(jmsContext.getClass()));
+ Destination destination = jmsContext.createQueue(methodNameDestinationName);
+ sendMessage(jmsContext, destination, "Test-" + methodNameDestinationName);
+ browseMessage(jmsContext, destination, "Test-" + methodNameDestinationName, true);
+ } catch (JMSException e) {
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testConnectionFactoryCreateContextBrowseAutoStart() {
+ try(JMSContext jmsContext = activemqConnectionFactory.createContext()) {
+ assertNotNull(jmsContext);
+ assertTrue(ActiveMQContext.class.isAssignableFrom(jmsContext.getClass()));
+ Destination destination = jmsContext.createQueue(methodNameDestinationName);
+ sendMessage(jmsContext, destination, "Test-" + methodNameDestinationName);
+ browseMessage(jmsContext, destination, "Test-" + methodNameDestinationName, true);
+ } catch (JMSException e) {
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testConnectionFactoryCreateContextBrowseAutoStartFalse() {
+ try(JMSContext jmsContext = activemqConnectionFactory.createContext()) {
+ assertNotNull(jmsContext);
+ jmsContext.setAutoStart(false);
+ assertTrue(ActiveMQContext.class.isAssignableFrom(jmsContext.getClass()));
+ Destination destination = jmsContext.createQueue(methodNameDestinationName);
+ sendMessage(jmsContext, destination, "Test-" + methodNameDestinationName);
+ browseMessage(jmsContext, destination, "Test-" + methodNameDestinationName, false);
+ } catch (JMSException e) {
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testConnectionFactoryCreateContextBrowseAutoStartFalseStartDelayed() {
+ try(JMSContext jmsContext = activemqConnectionFactory.createContext()) {
+ assertNotNull(jmsContext);
+ jmsContext.setAutoStart(false);
+ assertTrue(ActiveMQContext.class.isAssignableFrom(jmsContext.getClass()));
+ Destination destination = jmsContext.createQueue(methodNameDestinationName);
+ sendMessage(jmsContext, destination, "Test-" + methodNameDestinationName);
+ jmsContext.start();
+ browseMessage(jmsContext, destination, "Test-" + methodNameDestinationName, true);
+ } catch (JMSException e) {
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testDisableMessageID() {
+ try(JMSContext jmsContext = activemqConnectionFactory.createContext()) {
+ assertNotNull(jmsContext);
+ jmsContext.setAutoStart(false);
+ assertTrue(ActiveMQContext.class.isAssignableFrom(jmsContext.getClass()));
+ jmsContext.start();
+ JMSProducer jmsProducer = jmsContext.createProducer();
+ jmsProducer.setDisableMessageID(true);
+ Destination destination = jmsContext.createQueue(methodNameDestinationName);
+ Message sendMessage = jmsContext.createTextMessage("Test-" + methodNameDestinationName);
+ jmsProducer.send(destination, sendMessage);
+ JMSConsumer jmsConsumer = jmsContext.createConsumer(destination);
+ Message recvMessage = jmsConsumer.receive(5000l);
+ assertNotNull(recvMessage);
+ // Verify messageID since ActiveMQ does not disableMessageID
+ assertEquals(sendMessage.getJMSMessageID(), recvMessage.getJMSMessageID());
+ } catch (JMSException e) {
+ fail(e.getMessage());
+ }
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testConnectionSharedConnectionConsumer() throws JMSException {
+ connection.createSharedConnectionConsumer(null, null, null, null, 10);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testConnectionSharedDurableConnectionConsumer() throws JMSException {
+ connection.createSharedDurableConnectionConsumer(null, null, null, null, 10);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testSessionAckMode() throws JMSException {
+ connection.createSession(Session.AUTO_ACKNOWLEDGE);
+ }
+
+ @Test
+ public void testSessionDurableConsumer() throws JMSException {
+ try(JMSContext jmsContext = activemqConnectionFactory.createContext()) {
+ assertNotNull(jmsContext);
+ jmsContext.setAutoStart(false);
+ jmsContext.setClientID(testName.getMethodName());
+ jmsContext.start();
+ assertTrue(ActiveMQContext.class.isAssignableFrom(jmsContext.getClass()));
+ Topic topic = jmsContext.createTopic(methodNameDestinationName);
+
+ JMSConsumer jmsConsumerRegisterDurableConsumer = jmsContext.createDurableConsumer(topic, testName.getMethodName());
+ jmsConsumerRegisterDurableConsumer.close();
+ sendMessage(jmsContext, topic, "Test-" + methodNameDestinationName);
+ recvMessageDurable(jmsContext, topic, testName.getMethodName(), null, false, "Test-" + methodNameDestinationName);
+ } catch (JMSException e) {
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testSessionDurableConsumerSelectorNoLocal() throws JMSException {
+ try(JMSContext jmsContext = activemqConnectionFactory.createContext()) {
+ assertNotNull(jmsContext);
+ jmsContext.setAutoStart(false);
+ jmsContext.setClientID(testName.getMethodName());
+ jmsContext.start();
+ assertTrue(ActiveMQContext.class.isAssignableFrom(jmsContext.getClass()));
+ Topic topic = jmsContext.createTopic(methodNameDestinationName);
+
+ JMSConsumer jmsConsumerRegisterDurableConsumer = jmsContext.createDurableConsumer(topic, testName.getMethodName(), "TRUE", false);
+ jmsConsumerRegisterDurableConsumer.close();
+ sendMessage(jmsContext, topic, "Test-" + methodNameDestinationName);
+ recvMessageDurable(jmsContext, topic, testName.getMethodName(), "TRUE", false, "Test-" + methodNameDestinationName);
+ } catch (JMSException e) {
+ fail(e.getMessage());
+ }
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testSessionSharedConsumer() throws JMSException {
+ session.createSharedConsumer(null, null);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testSessionSharedConsumerSelector() throws JMSException {
+ session.createSharedConsumer(null, null, null);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testSessionSharedDurableConsumer() throws JMSException {
+ session.createSharedDurableConsumer(null, null);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testSessionSharedDurableConsumerSelector() throws JMSException {
+ session.createSharedDurableConsumer(null, null, null);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testProducerDeliveryDelayGet() throws JMSException {
+ messageProducer.getDeliveryDelay();
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testProducerDeliveryDelaySet() throws JMSException {
+ messageProducer.setDeliveryDelay(1000l);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testProducerSendMessageCompletionListener() throws JMSException {
+ messageProducer.send(session.createQueue(methodNameDestinationName), null, (CompletionListener)null);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testProducerSendMessageQoSParamsCompletionListener() throws JMSException {
+ messageProducer.send(null, 1, 4, 0l, null);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testProducerSendDestinationMessageCompletionListener() throws JMSException {
+ messageProducer.send(session.createQueue(methodNameDestinationName), null, null);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testProducerSendDestinationMessageQosParamsCompletionListener() throws JMSException {
+ messageProducer.send(session.createQueue(methodNameDestinationName), null, 1, 4, 0l, null);
+ }
+
+ protected static void sendMessage(JMSContext jmsContext, Destination testDestination, String textBody) {
+ assertNotNull(jmsContext);
+ JMSProducer jmsProducer = jmsContext.createProducer();
+ jmsProducer.send(testDestination, textBody);
+ }
+
+ protected static void browseMessage(JMSContext jmsContext, Destination testDestination, String expectedTextBody, boolean expectFound) throws JMSException {
+ assertNotNull(jmsContext);
+ assertTrue(Queue.class.isAssignableFrom(testDestination.getClass()));
+ Queue testQueue = Queue.class.cast(testDestination);
+ try(QueueBrowser queueBrowser = jmsContext.createBrowser(testQueue)) {
+ Enumeration<?> messageEnumeration = queueBrowser.getEnumeration();
+ assertNotNull(messageEnumeration);
+
+ boolean found = false;
+ while(!found && messageEnumeration.hasMoreElements()) {
+ javax.jms.Message message = (javax.jms.Message)messageEnumeration.nextElement();
+ assertNotNull(message);
+ assertTrue(TextMessage.class.isAssignableFrom(message.getClass()));
+ assertEquals(expectedTextBody, TextMessage.class.cast(message).getText());
+ found = true;
+ }
+ assertEquals(expectFound, found);
+ }
+ }
+
+ protected static void recvMessage(JMSContext jmsContext, Destination testDestination, String expectedTextBody) throws JMSException {
+ assertNotNull(jmsContext);
+ try(JMSConsumer jmsConsumer = jmsContext.createConsumer(testDestination)) {
+ javax.jms.Message message = jmsConsumer.receive(1000l);
+ assertNotNull(message);
+ assertTrue(TextMessage.class.isAssignableFrom(message.getClass()));
+ assertEquals(expectedTextBody, TextMessage.class.cast(message).getText());
+ }
+ }
+
+ protected static void recvMessageDurable(JMSContext jmsContext, Topic testTopic, String subscriptionName, String selector, boolean noLocal, String expectedTextBody) throws JMSException {
+ assertNotNull(jmsContext);
+ try(JMSConsumer jmsConsumer = jmsContext.createDurableConsumer(testTopic, subscriptionName, selector, noLocal)) {
+ javax.jms.Message message = jmsConsumer.receive(1000l);
+ assertNotNull(message);
+ assertTrue(TextMessage.class.isAssignableFrom(message.getClass()));
+ assertEquals(expectedTextBody, TextMessage.class.cast(message).getText());
+ }
+ }
+}
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2MessageListenerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2MessageListenerTest.java
new file mode 100644
index 0000000..917a26b
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2MessageListenerTest.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.jms2;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSConsumer;
+import javax.jms.JMSContext;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.util.Wait;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(value = Parameterized.class)
+public class ActiveMQJMS2MessageListenerTest extends ActiveMQJMS2TestBase {
+
+ private final String destinationName;
+ private final String destinationType;
+ private final int ackMode;
+ private final String messagePayload;
+
+ public ActiveMQJMS2MessageListenerTest(String destinationType, int ackMode) {
+ this.destinationName = "AMQ.JMS2.ACKMODE." + Integer.toString(ackMode) + destinationType.toUpperCase();
+ this.destinationType = destinationType;
+ this.ackMode = ackMode;
+ this.messagePayload = "Test message destType: " + destinationType + " ackMode: " + Integer.toString(ackMode);
+ }
+
+ @Parameterized.Parameters(name="destinationType={0}, ackMode={1}")
+ public static Collection<Object[]> data() {
+ return Arrays.asList(new Object[][] {
+ {"queue", ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE },
+ {"queue", Session.AUTO_ACKNOWLEDGE },
+ {"queue", Session.CLIENT_ACKNOWLEDGE },
+ {"queue", Session.DUPS_OK_ACKNOWLEDGE },
+ {"queue", Session.SESSION_TRANSACTED }
+ });
+ }
+
+ @Test
+ public void testMessageListener() {
+
+ try(JMSContext jmsContext = activemqConnectionFactory.createContext(DEFAULT_JMS_USER, DEFAULT_JMS_PASS, ackMode)) {
+ assertNotNull(jmsContext);
+ Destination destination = ActiveMQJMS2TestSupport.generateDestination(jmsContext, destinationType, destinationName);
+ assertNotNull(destination);
+ QueueViewMBean localQueueViewMBean = getQueueViewMBean((ActiveMQDestination)destination);
+ JMSConsumer jmsConsumer = jmsContext.createConsumer(destination);
+
+ AtomicInteger receivedMessageCount = new AtomicInteger();
+ AtomicInteger exceptionCount = new AtomicInteger();
+ CountDownLatch countDownLatch = new CountDownLatch(2);
+
+ jmsConsumer.setMessageListener(new MessageListener() {
+ @Override
+ public void onMessage(Message message) {
+ countDownLatch.countDown();
+ receivedMessageCount.incrementAndGet();
+ try {
+ switch(ackMode) {
+ case Session.CLIENT_ACKNOWLEDGE: message.acknowledge(); break;
+ case ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE: message.acknowledge(); break;
+ default: break;
+ }
+ } catch (JMSException e) {
+ exceptionCount.incrementAndGet();
+ }
+ }
+ });
+ jmsContext.start();
+
+ Message message = ActiveMQJMS2TestSupport.generateMessage(jmsContext, "text", messagePayload);
+
+ List<String> sentMessageIds = new LinkedList<>();
+ for(int deliveryMode : Arrays.asList(DeliveryMode.NON_PERSISTENT, DeliveryMode.PERSISTENT)) {
+ MessageData sendMessageData = new MessageData();
+ sendMessageData.setMessage(message).setDeliveryMode(deliveryMode);
+ sentMessageIds.add(ActiveMQJMS2TestSupport.sendMessage(jmsContext, destination, sendMessageData));
+ }
+
+ // For session transacted ack we ack after all messages are sent
+ switch(ackMode) {
+ case ActiveMQSession.SESSION_TRANSACTED:
+ assertEquals(Long.valueOf(0), Long.valueOf(localQueueViewMBean.getEnqueueCount()));
+ jmsContext.commit();
+ assertEquals(Long.valueOf(2), Long.valueOf(localQueueViewMBean.getEnqueueCount()));
+ break;
+ default:
+ assertEquals(Long.valueOf(2), Long.valueOf(localQueueViewMBean.getEnqueueCount()));
+ break;
+ }
+
+ countDownLatch.await(5, TimeUnit.SECONDS);
+
+ assertEquals(Integer.valueOf(2), Integer.valueOf(receivedMessageCount.get()));
+ assertEquals(Integer.valueOf(0), Integer.valueOf(exceptionCount.get()));
+
+ // For session transacted we ack after all messages are received
+ switch(ackMode) {
+ case ActiveMQSession.SESSION_TRANSACTED:
+ assertEquals(Long.valueOf(0), Long.valueOf(localQueueViewMBean.getDequeueCount()));
+ jmsContext.commit();
+ break;
+ default: break;
+ }
+ jmsConsumer.close();
+
+ assertTrue("DequeueCount = 2 and QueueSize = 0 expected", Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return localQueueViewMBean.getDequeueCount() == 2l && localQueueViewMBean.getQueueSize() == 0l;
+ }
+ }, 5000l, 100l));
+
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+ }
+}
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2MessageTypesTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2MessageTypesTest.java
new file mode 100644
index 0000000..465c067
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2MessageTypesTest.java
@@ -0,0 +1,597 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.jms2;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.IllegalStateRuntimeException;
+import javax.jms.JMSConsumer;
+import javax.jms.JMSContext;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+import javax.jms.Topic;
+
+import org.apache.activemq.util.Wait;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(value = Parameterized.class)
+public class ActiveMQJMS2MessageTypesTest extends ActiveMQJMS2TestBase {
+
+ private final Set<Integer> VALID_PRIORITIES = Set.of(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+ private final String clientID;
+ private final String destinationType;
+ private final String messagePayload;
+ private final String messageType;
+
+ public ActiveMQJMS2MessageTypesTest(String destinationType, String messageType) {
+ this.clientID = destinationType + "-" + messageType;
+ this.destinationType = destinationType;
+ this.messagePayload = "Test message payload";
+ this.messageType = messageType;
+ }
+
+ @Parameterized.Parameters(name="destinationType={0}, messageType={1}")
+ public static Collection<Object[]> data() {
+ return Arrays.asList(new Object[][] {
+ {"queue", "bytes"},
+ {"queue", "map"},
+ {"queue", "object"},
+ {"queue", "stream"},
+ {"queue", "text"},
+ {"topic", "bytes"},
+ {"topic", "map"},
+ {"topic", "object"},
+ {"topic", "stream"},
+ {"topic", "text"},
+ {"temp-queue", "bytes"},
+ {"temp-queue", "map"},
+ {"temp-queue", "object"},
+ {"temp-queue", "stream"},
+ {"temp-queue", "text"},
+ {"temp-topic", "bytes"},
+ {"temp-topic", "map"},
+ {"temp-topic", "object"},
+ {"temp-topic", "stream"},
+ {"temp-topic", "text"},
+ });
+ }
+
+ @Test
+ public void testMessageDeliveryMode() {
+
+ try(JMSContext jmsContext = activemqConnectionFactory.createContext(DEFAULT_JMS_USER, DEFAULT_JMS_PASS, Session.AUTO_ACKNOWLEDGE)) {
+ assertNotNull(jmsContext);
+ Destination destination = ActiveMQJMS2TestSupport.generateDestination(jmsContext, destinationType, methodNameDestinationName);
+ assertNotNull(destination);
+ JMSConsumer jmsConsumer = jmsContext.createConsumer(destination);
+ assertNotNull(jmsConsumer);
+
+ Message message = ActiveMQJMS2TestSupport.generateMessage(jmsContext, messageType, messagePayload);
+
+ List<String> sentMessageIds = new LinkedList<>();
+ for(int deliveryMode : Arrays.asList(DeliveryMode.NON_PERSISTENT, DeliveryMode.PERSISTENT)) {
+ MessageData sendMessageData = new MessageData();
+ sendMessageData.setMessage(message).setDeliveryMode(deliveryMode);
+ sentMessageIds.add(ActiveMQJMS2TestSupport.sendMessage(jmsContext, destination, sendMessageData));
+ }
+
+ List<Message> recvMessages = new CopyOnWriteArrayList<>();
+ AtomicBoolean receivedExpected = new AtomicBoolean(false);
+ AtomicInteger exceptionCount = new AtomicInteger();
+
+ jmsConsumer.setMessageListener(new MessageListener() {
+ private boolean recvNonPersistent = false;
+ private boolean recvPersistent = false;
+
+ @Override
+ public void onMessage(Message message) {
+ recvMessages.add(message);
+ try {
+ switch(message.getJMSDeliveryMode()) {
+ case DeliveryMode.NON_PERSISTENT: recvNonPersistent = true; break;
+ case DeliveryMode.PERSISTENT: recvPersistent = true; break;
+ default: break;
+ }
+
+ if(recvNonPersistent && recvPersistent) {
+ receivedExpected.set(true);
+ }
+ } catch (JMSException e) {
+ exceptionCount.incrementAndGet();
+ }
+ }
+ });
+ jmsContext.start();
+
+ assertTrue("Expected to receive both DeliveryModes", Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return receivedExpected.get();
+ }
+ }, 5000l, 100l));
+ jmsConsumer.close();
+ jmsContext.stop();
+
+ assertEquals(Integer.valueOf(2), Integer.valueOf(recvMessages.size()));
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testMessageDeliveryModeInvalid() {
+
+ try(JMSContext jmsContext = activemqConnectionFactory.createContext(DEFAULT_JMS_USER, DEFAULT_JMS_PASS, Session.AUTO_ACKNOWLEDGE)) {
+ assertNotNull(jmsContext);
+ Destination destination = ActiveMQJMS2TestSupport.generateDestination(jmsContext, destinationType, methodNameDestinationName);
+ assertNotNull(destination);
+ jmsContext.start();
+
+ Message message = ActiveMQJMS2TestSupport.generateMessage(jmsContext, messageType, messagePayload);
+ MessageData sendMessageData = new MessageData();
+ sendMessageData.setMessage(message).setDeliveryMode(99);
+ boolean caught = false;
+ try {
+ ActiveMQJMS2TestSupport.sendMessage(jmsContext, destination, sendMessageData);
+ fail("IlegalStateRuntimeException expected");
+ } catch (IllegalStateRuntimeException e) {
+ assertEquals("unknown delivery mode: 99", e.getMessage());
+ caught = true;
+ }
+ assertTrue(caught);
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testMessagePriority() {
+
+ try(JMSContext jmsContext = activemqConnectionFactory.createContext(DEFAULT_JMS_USER, DEFAULT_JMS_PASS, Session.AUTO_ACKNOWLEDGE)) {
+ assertNotNull(jmsContext);
+ Destination destination = ActiveMQJMS2TestSupport.generateDestination(jmsContext, destinationType, methodNameDestinationName);
+ assertNotNull(destination);
+ JMSConsumer jmsConsumer = jmsContext.createConsumer(destination);
+ assertNotNull(jmsConsumer);
+
+ Message message = ActiveMQJMS2TestSupport.generateMessage(jmsContext, messageType, messagePayload);
+ List<String> sentMessageIds = new LinkedList<>();
+ for(int priority : VALID_PRIORITIES) {
+ MessageData sendMessageData = new MessageData();
+ sendMessageData.setMessage(message).setPriority(priority);
+ sentMessageIds.add(ActiveMQJMS2TestSupport.sendMessage(jmsContext, destination, sendMessageData));
+ }
+
+ final List<Message> recvMessages = new CopyOnWriteArrayList<>();
+ final CountDownLatch countDownLatch = new CountDownLatch(10);
+
+ jmsConsumer.setMessageListener(new MessageListener() {
+ @Override
+ public void onMessage(Message message) {
+ recvMessages.add(message);
+ countDownLatch.countDown();
+ }
+ });
+ jmsContext.start();
+
+ assertTrue("Expected to receive 10 messages", countDownLatch.await(5, TimeUnit.SECONDS));
+ jmsConsumer.close();
+ jmsContext.stop();
+
+ // Check that exactly 10 messages have been received
+ assertEquals(Integer.valueOf(10), Integer.valueOf(recvMessages.size()));
+
+ int validatedCount = 0;
+ for(int validatedPriority : VALID_PRIORITIES) {
+ for(javax.jms.Message tmpMessage : recvMessages) {
+ if(tmpMessage.getJMSPriority() == validatedPriority) {
+ MessageData messageData = new MessageData();
+ messageData.setMessageType(messageType).setMessagePayload(messagePayload).setPriority(validatedPriority);
+ ActiveMQJMS2TestSupport.validateMessageData(tmpMessage, messageData);
+ validatedCount++;
+ }
+ }
+ }
+ assertEquals(Integer.valueOf(10), Integer.valueOf(validatedCount));
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testMessagePriorityInvalidLower() {
+
+ try(JMSContext jmsContext = activemqConnectionFactory.createContext(DEFAULT_JMS_USER, DEFAULT_JMS_PASS, Session.AUTO_ACKNOWLEDGE)) {
+ assertNotNull(jmsContext);
+ Destination destination = ActiveMQJMS2TestSupport.generateDestination(jmsContext, destinationType, methodNameDestinationName);
+ assertNotNull(destination);
+
+ Message message = ActiveMQJMS2TestSupport.generateMessage(jmsContext, messageType, messagePayload);
+ MessageData sendMessageData = new MessageData();
+ sendMessageData.setMessage(message).setPriority(-1);
+ boolean caught = false;
+ try {
+ ActiveMQJMS2TestSupport.sendMessage(jmsContext, destination, sendMessageData);
+ fail("IlegalStateRuntimeException expected");
+ } catch (IllegalStateRuntimeException e) {
+ assertEquals("default priority must be a value between 0 and 9", e.getMessage());
+ caught = true;
+ }
+ assertTrue(caught);
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testMessagePriorityInvalidHigher() {
+
+ try(JMSContext jmsContext = activemqConnectionFactory.createContext(DEFAULT_JMS_USER, DEFAULT_JMS_PASS, Session.AUTO_ACKNOWLEDGE)) {
+ assertNotNull(jmsContext);
+ Destination destination = ActiveMQJMS2TestSupport.generateDestination(jmsContext, destinationType, methodNameDestinationName);
+ assertNotNull(destination);
+
+ Message message = ActiveMQJMS2TestSupport.generateMessage(jmsContext, messageType, messagePayload);
+ MessageData sendMessageData = new MessageData();
+ sendMessageData.setMessage(message).setPriority(10);
+ boolean caught = false;
+ try {
+ ActiveMQJMS2TestSupport.sendMessage(jmsContext, destination, sendMessageData);
+ fail("IlegalStateRuntimeException expected");
+ } catch (IllegalStateRuntimeException e) {
+ assertEquals("default priority must be a value between 0 and 9", e.getMessage());
+ caught = true;
+ }
+ assertTrue(caught);
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testMessageTimestampTimeToLive() {
+
+ long timeToLive = 900000l;
+
+ try(JMSContext jmsContext = activemqConnectionFactory.createContext(DEFAULT_JMS_USER, DEFAULT_JMS_PASS, Session.AUTO_ACKNOWLEDGE)) {
+ assertNotNull(jmsContext);
+ Destination destination = ActiveMQJMS2TestSupport.generateDestination(jmsContext, destinationType, methodNameDestinationName);
+ assertNotNull(destination);
+ JMSConsumer jmsConsumer = jmsContext.createConsumer(destination);
+ assertNotNull(jmsConsumer);
+
+ Message message = ActiveMQJMS2TestSupport.generateMessage(jmsContext, messageType, messagePayload);
+
+ List<String> sentMessageIds = new LinkedList<>();
+ long messageExpiration = Long.MIN_VALUE;
+ long messageTimestamp = Long.MIN_VALUE;
+
+ MessageData sendMessageData = new MessageData();
+ sendMessageData.setMessage(message).setTimeToLive(timeToLive);
+ sentMessageIds.add(ActiveMQJMS2TestSupport.sendMessage(jmsContext, destination, sendMessageData));
+ messageExpiration = message.getJMSExpiration();
+ messageTimestamp = message.getJMSTimestamp();
+ assertNotEquals(Long.MIN_VALUE, messageExpiration);
+ assertNotEquals(Long.MIN_VALUE, messageTimestamp);
+
+ final List<Message> recvMessages = new CopyOnWriteArrayList<>();
+ final CountDownLatch countDownLatch = new CountDownLatch(1);
+
+ jmsConsumer.setMessageListener(new MessageListener() {
+ @Override
+ public void onMessage(Message message) {
+ recvMessages.add(message);
+ countDownLatch.countDown();
+ }
+ });
+ jmsContext.start();
+
+ assertTrue("Expected to receive 1 message", countDownLatch.await(5l, TimeUnit.SECONDS));
+ jmsConsumer.close();
+ jmsContext.stop();
+
+ // Check that exactly 1 messages have been received
+ assertEquals(Integer.valueOf(1), Integer.valueOf(recvMessages.size()));
+
+ int validatedCount = 0;
+ for(javax.jms.Message tmpMessage : recvMessages) {
+ MessageData recvMessageData = new MessageData();
+ recvMessageData.setMessageType(messageType).setMessagePayload(messagePayload).setExpiration(messageExpiration).setTimestamp(messageTimestamp);
+ ActiveMQJMS2TestSupport.validateMessageData(tmpMessage, recvMessageData);
+ validatedCount++;
+ }
+ assertEquals(Integer.valueOf(1), Integer.valueOf(validatedCount));
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testMessageDisableTimestamp() {
+
+ long timeToLive = 900000l;
+
+ try(JMSContext jmsContext = activemqConnectionFactory.createContext(DEFAULT_JMS_USER, DEFAULT_JMS_PASS, Session.AUTO_ACKNOWLEDGE)) {
+ assertNotNull(jmsContext);
+ Destination destination = ActiveMQJMS2TestSupport.generateDestination(jmsContext, destinationType, methodNameDestinationName);
+ assertNotNull(destination);
+ JMSConsumer jmsConsumer = jmsContext.createConsumer(destination);
+ assertNotNull(jmsConsumer);
+
+ Message message = ActiveMQJMS2TestSupport.generateMessage(jmsContext, messageType, messagePayload);
+
+ List<String> sentMessageIds = new LinkedList<>();
+ long messageExpiration = Long.MIN_VALUE;
+ long messageTimestamp = Long.MIN_VALUE;
+ MessageData sendMessageData = new MessageData();
+ sendMessageData.setMessage(message).setTimeToLive(timeToLive).setDisableMessageTimestamp(true);
+ sentMessageIds.add(ActiveMQJMS2TestSupport.sendMessage(jmsContext, destination, sendMessageData));
+ messageExpiration = message.getJMSExpiration();
+ messageTimestamp = message.getJMSTimestamp();
+ assertEquals(0l, messageTimestamp);
+ assertNotEquals(Long.MIN_VALUE, messageExpiration);
+
+ final List<Message> recvMessages = new CopyOnWriteArrayList<>();
+ final CountDownLatch countDownLatch = new CountDownLatch(1);
+
+ jmsConsumer.setMessageListener(new MessageListener() {
+ @Override
+ public void onMessage(Message message) {
+ recvMessages.add(message);
+ countDownLatch.countDown();
+ }
+ });
+ jmsContext.start();
+
+ assertTrue("Expected to receive 1 message", countDownLatch.await(5l, TimeUnit.SECONDS));
+ jmsConsumer.close();
+ jmsContext.stop();
+
+ for(javax.jms.Message tmpMessage : recvMessages) {
+ MessageData recvMessageData = new MessageData();
+ recvMessageData.setMessagePayload(messagePayload).setMessageType(messageType).setExpiration(messageExpiration).setTimestamp(messageTimestamp).setDisableMessageTimestamp(true);
+ ActiveMQJMS2TestSupport.validateMessageData(tmpMessage, recvMessageData);
+ }
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testMessageNonQOSHeaders() {
+
+ String jmsCorrelationID = UUID.randomUUID().toString();
+
+ try(JMSContext jmsContext = activemqConnectionFactory.createContext(DEFAULT_JMS_USER, DEFAULT_JMS_PASS, Session.AUTO_ACKNOWLEDGE)) {
+ assertNotNull(jmsContext);
+ Destination destination = ActiveMQJMS2TestSupport.generateDestination(jmsContext, destinationType, methodNameDestinationName);
+ assertNotNull(destination);
+ JMSConsumer jmsConsumer = jmsContext.createConsumer(destination);
+ assertNotNull(jmsConsumer);
+
+ Message message = ActiveMQJMS2TestSupport.generateMessage(jmsContext, messageType, messagePayload);
+ Destination jmsReplyTo = ActiveMQJMS2TestSupport.generateDestination(jmsContext, destinationType, methodNameDestinationName + ".REPLYTO");
+ String jmsType = message.getClass().getName();
+
+ List<String> sentMessageIds = new LinkedList<>();
+ MessageData sendMessageData = new MessageData();
+ sendMessageData.setMessage(message).setCorrelationID(jmsCorrelationID).setReplyTo(jmsReplyTo).setJMSType(jmsType);
+ sentMessageIds.add(ActiveMQJMS2TestSupport.sendMessage(jmsContext, destination, sendMessageData));
+
+ final List<Message> recvMessages = new CopyOnWriteArrayList<>();
+ final CountDownLatch countDownLatch = new CountDownLatch(1);
+
+ jmsConsumer.setMessageListener(new MessageListener() {
+ @Override
+ public void onMessage(Message message) {
+ recvMessages.add(message);
+ countDownLatch.countDown();
+ }
+ });
+ jmsContext.start();
+
+ assertTrue("Expected to receive 1 message", countDownLatch.await(5l, TimeUnit.SECONDS));
+ jmsConsumer.close();
+ jmsContext.stop();
+
+ for(javax.jms.Message tmpMessage : recvMessages) {
+ MessageData recvMessageData = new MessageData();
+ recvMessageData.setMessagePayload(messagePayload).setMessageType(messageType).setCorrelationID(jmsCorrelationID).setReplyTo(jmsReplyTo).setJMSType(jmsType);
+ ActiveMQJMS2TestSupport.validateMessageData(tmpMessage, recvMessageData);
+ }
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testMessageDisableMessageID() {
+
+ try(JMSContext jmsContext = activemqConnectionFactory.createContext(DEFAULT_JMS_USER, DEFAULT_JMS_PASS, Session.AUTO_ACKNOWLEDGE)) {
+ assertNotNull(jmsContext);
+ Destination destination = ActiveMQJMS2TestSupport.generateDestination(jmsContext, destinationType, methodNameDestinationName);
+ assertNotNull(destination);
+ JMSConsumer jmsConsumer = jmsContext.createConsumer(destination);
+ assertNotNull(jmsConsumer);
+
+ Message message = ActiveMQJMS2TestSupport.generateMessage(jmsContext, messageType, messagePayload);
+ String jmsMessageID = (ActiveMQJMS2TestSupport.sendMessage(jmsContext, destination, message));
+
+ final List<Message> recvMessages = new CopyOnWriteArrayList<>();
+ final CountDownLatch countDownLatch = new CountDownLatch(1);
+
+ jmsConsumer.setMessageListener(new MessageListener() {
+ @Override
+ public void onMessage(Message message) {
+ recvMessages.add(message);
+ countDownLatch.countDown();
+ }
+ });
+ jmsContext.start();
+
+ assertTrue("Expected to receive 1 message", countDownLatch.await(5l, TimeUnit.SECONDS));
+ jmsConsumer.close();
+ jmsContext.stop();
+
+ for(javax.jms.Message tmpMessage : recvMessages) {
+ MessageData messageData = new MessageData();
+ messageData.setMessageType(messageType).setMessagePayload(messagePayload).setMessageID(jmsMessageID);
+ ActiveMQJMS2TestSupport.validateMessageData(tmpMessage, messageData);
+ }
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testTopicDurableSubscriber() {
+ // Skip if this is not a topic test run
+ if(!"topic".equals(destinationType)) {
+ return;
+ }
+
+ try(JMSContext jmsContext = activemqConnectionFactory.createContext(DEFAULT_JMS_USER, DEFAULT_JMS_PASS, Session.AUTO_ACKNOWLEDGE)) {
+ assertNotNull(jmsContext);
+ jmsContext.setClientID(clientID);
+
+ Destination destination = ActiveMQJMS2TestSupport.generateDestination(jmsContext, destinationType, methodNameDestinationName);
+ assertNotNull(destination);
+ assertTrue(destination instanceof Topic);
+ Topic topic = Topic.class.cast(destination);
+
+ JMSConsumer jmsConsumerRegister = jmsContext.createDurableConsumer(topic, methodNameDestinationName);
+ assertNotNull(jmsConsumerRegister);
+ jmsContext.start();
+ jmsConsumerRegister.close();
+
+ Message message = ActiveMQJMS2TestSupport.generateMessage(jmsContext, messageType, messagePayload);
+ String jmsMessageID = (ActiveMQJMS2TestSupport.sendMessage(jmsContext, destination, message));
+
+ JMSConsumer jmsConsumerDurable = jmsContext.createDurableConsumer(topic, methodNameDestinationName);
+ assertNotNull(jmsConsumerDurable);
+
+ final List<Message> recvMessages = new CopyOnWriteArrayList<>();
+ final CountDownLatch countDownLatch = new CountDownLatch(1);
+ jmsConsumerDurable.setMessageListener(new MessageListener() {
+ @Override
+ public void onMessage(Message message) {
+ recvMessages.add(message);
+ countDownLatch.countDown();
+ }
+ });
+
+ assertTrue("Expected to receive 1 message", countDownLatch.await(5l, TimeUnit.SECONDS));
+ jmsConsumerDurable.close();
+ jmsContext.stop();
+
+ for(javax.jms.Message tmpMessage : recvMessages) {
+ MessageData messageData = new MessageData();
+ messageData.setMessageType(messageType).setMessagePayload(messagePayload).setMessageID(jmsMessageID);
+ ActiveMQJMS2TestSupport.validateMessageData(tmpMessage, messageData);
+ }
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testTopicDurableSubscriberSelector() {
+ // Skip if this is not a topic test run
+ if(!"topic".equals(destinationType)) {
+ return;
+ }
+
+ try(JMSContext jmsContext = activemqConnectionFactory.createContext(DEFAULT_JMS_USER, DEFAULT_JMS_PASS, Session.AUTO_ACKNOWLEDGE)) {
+ assertNotNull(jmsContext);
+ jmsContext.setClientID(clientID);
+
+ Destination destination = ActiveMQJMS2TestSupport.generateDestination(jmsContext, destinationType, methodNameDestinationName);
+ assertNotNull(destination);
+ assertTrue(destination instanceof Topic);
+ Topic topic = Topic.class.cast(destination);
+
+ JMSConsumer jmsConsumerRegister = jmsContext.createDurableConsumer(topic, methodNameDestinationName, "JMSPriority=7", false);
+ assertNotNull(jmsConsumerRegister);
+ assertEquals("JMSPriority=7", jmsConsumerRegister.getMessageSelector());
+ jmsContext.start();
+
+ jmsConsumerRegister.close();
+
+ Message message = ActiveMQJMS2TestSupport.generateMessage(jmsContext, messageType, messagePayload);
+
+ List<String> sentMessageIds = new LinkedList<>();
+ String matchMessageId = null;
+ for(int priority=0; priority<10; priority++) {
+ MessageData sendMessageData = new MessageData();
+ sendMessageData.setMessage(message).setPriority(priority);
+ String sentMessageId = ActiveMQJMS2TestSupport.sendMessage(jmsContext, destination, sendMessageData);
+ sentMessageIds.add(sentMessageId);
+ if(priority == 7) {
+ matchMessageId = sentMessageId;
+ }
+ }
+
+ JMSConsumer jmsConsumerDurable = jmsContext.createDurableConsumer(topic, methodNameDestinationName, "JMSPriority=7", false);
+ assertNotNull(jmsConsumerDurable);
+ assertEquals("JMSPriority=7", jmsConsumerDurable.getMessageSelector());
+
+ final List<Message> recvMessages = new CopyOnWriteArrayList<>();
+ final CountDownLatch countDownLatch = new CountDownLatch(1);
+ jmsConsumerDurable.setMessageListener(new MessageListener() {
+ @Override
+ public void onMessage(Message message) {
+ recvMessages.add(message);
+ countDownLatch.countDown();
+ }
+ });
+
+ assertTrue("Expected to receive 1 message", countDownLatch.await(5l, TimeUnit.SECONDS));
+ jmsConsumerDurable.close();
+ jmsContext.stop();
+
+ for(javax.jms.Message tmpMessage : recvMessages) {
+ MessageData recvMessageData = new MessageData();
+ recvMessageData.setMessageType(messageType).setMessagePayload(messagePayload).setMessageID(matchMessageId);
+ ActiveMQJMS2TestSupport.validateMessageData(tmpMessage, recvMessageData);
+ }
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+ }
+}
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2ProducerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2ProducerTest.java
new file mode 100644
index 0000000..84bc8c4
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2ProducerTest.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.jms2;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.time.ZonedDateTime;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Set;
+
+import javax.jms.Destination;
+import javax.jms.JMSConsumer;
+import javax.jms.JMSContext;
+import javax.jms.JMSException;
+import javax.jms.JMSProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQMessageProducerSupport;
+import org.junit.Test;
+
+public class ActiveMQJMS2ProducerTest extends ActiveMQJMS2TestBase {
+
+ private static final String PROPERTY_NAME_VALID="ValidName";
+ private static final String PROPERTY_VALUE_VALID="valid value";
+
+ private static final Set<String> PROPERTY_NAMES_INVALID = Set.of("contains+plus+sign", "123startswithnumber", "contains blank space", "has-dash", "with&");
+ private static final Set<Object> PROPERTY_VALUES_INVALID = Set.of(new HashSet<String>(), new HashMap<Integer, String>(), ZonedDateTime.now());
+
+ @Test
+ public void testInvalidPropertyNameNullEmpty() {
+ try(JMSContext jmsContext = activemqConnectionFactory.createContext(DEFAULT_JMS_USER, DEFAULT_JMS_PASS, Session.AUTO_ACKNOWLEDGE)) {
+ assertNotNull(jmsContext);
+ JMSProducer jmsProducer = jmsContext.createProducer();
+ jmsContext.start();
+
+ boolean caught = false;
+ try {
+ jmsProducer.setProperty(null, PROPERTY_VALUE_VALID);
+ } catch (Exception e) {
+ assertEquals("Invalid JMS property name must not be null or empty", e.getMessage());
+ caught = true;
+ }
+ assertTrue(caught);
+ caught = false;
+
+ try {
+ jmsProducer.setProperty("", PROPERTY_VALUE_VALID);
+ } catch (Exception e) {
+ assertEquals("Invalid JMS property name must not be null or empty", e.getMessage());
+ caught = true;
+ }
+ assertTrue(caught);
+ caught = false;
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testInvalidPropertyNameKeyword() {
+ try(JMSContext jmsContext = activemqConnectionFactory.createContext(DEFAULT_JMS_USER, DEFAULT_JMS_PASS, Session.AUTO_ACKNOWLEDGE)) {
+ assertNotNull(jmsContext);
+ JMSProducer jmsProducer = jmsContext.createProducer();
+ jmsContext.start();
+
+ for(String propertyName : ActiveMQMessageProducerSupport.JMS_PROPERTY_NAMES_DISALLOWED) {
+ boolean caught = false;
+ try {
+ jmsProducer.setProperty(propertyName, PROPERTY_VALUE_VALID);
+ } catch (Exception e) {
+ assertEquals("Invalid JMS property: " + propertyName + " name is in disallowed list", e.getMessage());
+ caught = true;
+ }
+ assertTrue("Expected exception for propertyName: " + propertyName, caught);
+ }
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testInvalidPropertyNameInvalidJavaIdentifier() {
+ try(JMSContext jmsContext = activemqConnectionFactory.createContext(DEFAULT_JMS_USER, DEFAULT_JMS_PASS, Session.AUTO_ACKNOWLEDGE)) {
+ assertNotNull(jmsContext);
+ JMSProducer jmsProducer = jmsContext.createProducer();
+ jmsContext.start();
+
+ for(String propertyName : PROPERTY_NAMES_INVALID) {
+ boolean caught = false;
+ try {
+ jmsProducer.setProperty(propertyName, PROPERTY_VALUE_VALID);
+ } catch (Exception e) {
+ assertTrue(e.getMessage().startsWith("Invalid JMS property: " + propertyName + " name"));
+ assertTrue(e.getMessage().contains("invalid character"));
+ caught = true;
+ }
+ assertTrue("Expected exception for propertyName: " + propertyName, caught);
+ }
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testInvalidPropertyNameInvalidValueClassType() {
+ try(JMSContext jmsContext = activemqConnectionFactory.createContext(DEFAULT_JMS_USER, DEFAULT_JMS_PASS, Session.AUTO_ACKNOWLEDGE)) {
+ assertNotNull(jmsContext);
+ JMSProducer jmsProducer = jmsContext.createProducer();
+ jmsContext.start();
+
+ for(Object propertyValue : PROPERTY_VALUES_INVALID) {
+ boolean caught = false;
+ try {
+ jmsProducer.setProperty(PROPERTY_NAME_VALID, propertyValue);
+ } catch (Exception e) {
+ assertEquals("Invalid JMS property: " + PROPERTY_NAME_VALID + " value class: " + propertyValue.getClass().getName() + " is not permitted by specification", e.getMessage());
+ caught = true;
+ }
+ assertTrue("Expected exception for propertyValue: " + propertyValue, caught);
+ }
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testDisableTimestamp() {
+ try(JMSContext jmsContext = activemqConnectionFactory.createContext(DEFAULT_JMS_USER, DEFAULT_JMS_PASS, Session.AUTO_ACKNOWLEDGE)) {
+ assertNotNull(jmsContext);
+ Destination destination = jmsContext.createQueue(methodNameDestinationName);
+ JMSProducer jmsProducer1 = jmsContext.createProducer();
+ jmsProducer1.setDisableMessageTimestamp(true);
+ jmsContext.start();
+
+ String textBody = "Test-" + methodNameDestinationName;
+ jmsProducer1.send(destination, textBody);
+ verifyTimestamp(jmsContext, destination, textBody, true);
+
+ // Re-enable timestatmp
+ jmsProducer1.setDisableMessageTimestamp(false);
+ jmsProducer1.send(destination, textBody);
+ verifyTimestamp(jmsContext, destination, textBody, false);
+
+ // Create second producer from same Context
+ JMSProducer jmsProducer2 = jmsContext.createProducer();
+ jmsProducer2.setDisableMessageTimestamp(true);
+ jmsProducer2.send(destination, textBody);
+ verifyTimestamp(jmsContext, destination, textBody, true);
+
+ // Re-confirm jmsProducer1 setting remains
+ jmsProducer1.send(destination, textBody);
+ verifyTimestamp(jmsContext, destination, textBody, false);
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+ }
+
+ protected static void verifyTimestamp(JMSContext jmsContext, Destination destination, String expectedTextBody, boolean expectTimestampZero) throws JMSException {
+ try(JMSConsumer jmsConsumer = jmsContext.createConsumer(destination)) {
+ javax.jms.Message message = jmsConsumer.receive(1000l);
+ assertNotNull(message);
+ assertTrue(TextMessage.class.isAssignableFrom(message.getClass()));
+ assertEquals(expectedTextBody, TextMessage.class.cast(message).getText());
+
+ if(expectTimestampZero) {
+ assertEquals(Long.valueOf(0), Long.valueOf(message.getJMSTimestamp()));
+ } else {
+ assertNotEquals(Long.valueOf(0), Long.valueOf(message.getJMSTimestamp()));
+ }
+ }
+ }
+}
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2TestBase.java b/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2TestBase.java
new file mode 100644
index 0000000..d15af01
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2TestBase.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.jms2;
+
+import java.lang.management.ManagementFactory;
+import java.util.LinkedList;
+import java.util.List;
+import javax.jms.Connection;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.management.JMX;
+import javax.management.MBeanServer;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.jmx.BrokerMBeanSupport;
+import org.apache.activemq.broker.jmx.DestinationViewMBean;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.broker.jmx.TopicViewMBean;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.rules.TestName;
+
+public abstract class ActiveMQJMS2TestBase {
+
+ public static final String DEFAULT_JMX_DOMAIN_NAME = "org.apache.activemq";
+ public static final String DEFAULT_JMX_BROKER_NAME = "localhost";
+
+ public static final String DEFAULT_JMS_USER = "admin";
+ public static final String DEFAULT_JMS_PASS = "admin";
+
+ protected static ActiveMQConnectionFactory activemqConnectionFactory = null;
+
+ @Rule public TestName testName = new TestName();
+
+ // Control session
+ protected Connection connection = null;
+ protected Session session = null;
+ protected MessageProducer messageProducer = null;
+
+ protected String methodNameDestinationName = null;
+ protected MBeanServer mbeanServer = null;
+
+ @BeforeClass
+ public static void setUpClass() {
+ activemqConnectionFactory = new ActiveMQConnectionFactory("vm://localhost?marshal=false&broker.persistent=false");
+ List<String> newTrustedPackages = new LinkedList<>();
+ newTrustedPackages.addAll(activemqConnectionFactory.getTrustedPackages());
+ newTrustedPackages.add(ActiveMQJMS2TestBase.class.getPackageName());
+ activemqConnectionFactory.setTrustedPackages(newTrustedPackages);
+ }
+
+ @AfterClass
+ public static void tearDownClass() {
+ activemqConnectionFactory = null;
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ connection = activemqConnectionFactory.createConnection();
+ connection.start();
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ methodNameDestinationName = "AMQ.JMS2." + cleanParameterizedMethodName(testName.getMethodName().toUpperCase());
+ messageProducer = session.createProducer(session.createQueue(methodNameDestinationName));
+ mbeanServer = ManagementFactory.getPlatformMBeanServer();
+ }
+
+ @After
+ public void tearDown() {
+ if(messageProducer != null) {
+ try { messageProducer.close(); } catch (Exception e) { } finally { messageProducer = null; }
+ }
+
+ if(session != null) {
+ try { session.close(); } catch (Exception e) { } finally { session = null; }
+ }
+
+ if(connection != null) {
+ try { connection.close(); } catch (Exception e) { } finally { connection = null; }
+ }
+
+ methodNameDestinationName = null;
+ }
+
+ protected DestinationViewMBean getDestinationViewMBean(String destinationType, ActiveMQDestination destination) throws Exception {
+ switch(destinationType) {
+ case "queue": return getQueueViewMBean(destination);
+ case "topic": return getTopicViewMBean(destination);
+ case "temp-queue": return getTempQueueViewMBean(destination);
+ case "temp-topic": return getTempTopicViewMBean(destination);
+ default: throw new IllegalStateException("Unsupported destinationType: " + destinationType);
+ }
+ }
+
+ protected QueueViewMBean getQueueViewMBean(ActiveMQDestination destination) throws Exception {
+ return JMX.newMBeanProxy(mbeanServer, BrokerMBeanSupport.createDestinationName(BrokerMBeanSupport.createBrokerObjectName(DEFAULT_JMX_DOMAIN_NAME, DEFAULT_JMX_BROKER_NAME).toString(), destination), QueueViewMBean.class);
+ }
+
+ protected TopicViewMBean getTopicViewMBean(ActiveMQDestination destination) throws Exception {
+ return JMX.newMBeanProxy(mbeanServer, BrokerMBeanSupport.createDestinationName(BrokerMBeanSupport.createBrokerObjectName(DEFAULT_JMX_DOMAIN_NAME, DEFAULT_JMX_BROKER_NAME).toString(), destination), TopicViewMBean.class);
+ }
+
+ protected TopicViewMBean getTempQueueViewMBean(ActiveMQDestination destination) throws Exception {
+ return JMX.newMBeanProxy(mbeanServer, BrokerMBeanSupport.createDestinationName(BrokerMBeanSupport.createBrokerObjectName(DEFAULT_JMX_DOMAIN_NAME, DEFAULT_JMX_BROKER_NAME).toString(), destination), TopicViewMBean.class);
+ }
+
+ protected TopicViewMBean getTempTopicViewMBean(ActiveMQDestination destination) throws Exception {
+ return JMX.newMBeanProxy(mbeanServer, BrokerMBeanSupport.createDestinationName(BrokerMBeanSupport.createBrokerObjectName(DEFAULT_JMX_DOMAIN_NAME, DEFAULT_JMX_BROKER_NAME).toString(), destination), TopicViewMBean.class);
+ }
+
+ private static String cleanParameterizedMethodName(String methodName) {
+ // clean up parameterized method string: TESTMESSAGETIMESTAMPTIMETOLIVE[DESTINATIONTYPE=QUEUE, MESSAGETYPE=BYTES]
+ // returns: TESTMESSAGETIMESTAMPTIMETOLIVE.QUEUE.BYTES
+
+ if(methodName == null || (!methodName.contains("[") && !methodName.contains("]"))) {
+ return methodName;
+ }
+
+ String[] step1 = methodName.split("\\[", 2);
+ String[] step2 = step1[1].split("\\]", 2);
+ String[] step3 = step2[0].split(",", 16);
+
+ return step1[0] + "." + step3[0].split("=", 2)[1] + "." + step3[1].split("=", 2)[1];
+ }
+}
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2TestObjectMessagePayload.java b/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2TestObjectMessagePayload.java
new file mode 100644
index 0000000..dc3bd86
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2TestObjectMessagePayload.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.jms2;
+
+import java.io.Serializable;
+
+public class ActiveMQJMS2TestObjectMessagePayload implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private String payload;
+
+ public ActiveMQJMS2TestObjectMessagePayload(String payload) {
+ this.payload = payload;
+ }
+
+ public String getPayload() {
+ return payload;
+ }
+}
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2TestSupport.java b/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2TestSupport.java
new file mode 100644
index 0000000..6008f97
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2TestSupport.java
@@ -0,0 +1,307 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.jms2;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashSet;
+import java.util.Set;
+
+import javax.jms.BytesMessage;
+import javax.jms.Destination;
+import javax.jms.JMSContext;
+import javax.jms.JMSException;
+import javax.jms.JMSProducer;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.ObjectMessage;
+import javax.jms.QueueBrowser;
+import javax.jms.StreamMessage;
+import javax.jms.TextMessage;
+
+public class ActiveMQJMS2TestSupport {
+
+ protected static final Set<String> PROPERTY_NAMES = Collections.unmodifiableSet(new HashSet<>(Arrays.asList("JMS2_BOOLEAN_MIN", "JMS2_BOOLEAN_MAX", "JMS2_BYTE_MIN", "JMS2_BYTE_MAX",
+ "JMS2_DOUBLE_MIN", "JMS2_DOUBLE_MAX", "JMS2_INT_MIN", "JMS2_INT_MAX", "JMS2_FLOAT_MIN", "JMS2_FLOAT_MAX", "JMS2_LONG_MIN",
+ "JMS2_LONG_MAX", "JMS2_SHORT_MIN", "JMS2_SHORT_MAX", "JMS2_STRING_VAL")));
+
+ private ActiveMQJMS2TestSupport() {}
+
+ protected static Destination generateDestination(JMSContext jmsContext, String destinationType, String destinationName) throws JMSException {
+ Destination destination = null;
+ switch(destinationType) {
+ case "queue":
+ destination = jmsContext.createQueue(destinationName);
+ break;
+ case "topic":
+ destination = jmsContext.createTopic(destinationName);
+ break;
+ case "temp-queue":
+ destination = jmsContext.createTemporaryQueue();
+ break;
+ case "temp-topic":
+ destination = jmsContext.createTemporaryTopic();
+ break;
+ default:
+ fail("Unsupported destinationType:" + destinationType);
+ }
+ assertNotNull(destination);
+ return destination;
+ }
+
+ protected static Message generateMessage(JMSContext jmsContext, String messageType, String payload) throws JMSException {
+ assertNotNull(messageType);
+ Message tmpMessage = null;
+ switch(messageType) {
+ case "bytes":
+ BytesMessage bytesMessage = jmsContext.createBytesMessage();
+ bytesMessage.writeBytes(payload.getBytes(StandardCharsets.UTF_8));
+ tmpMessage = bytesMessage;
+ break;
+ case "map":
+ MapMessage mapMessage = jmsContext.createMapMessage();
+ mapMessage.setString("payload", payload);
+ tmpMessage = mapMessage;
+ break;
+ case "object":
+ tmpMessage = jmsContext.createObjectMessage(new ActiveMQJMS2TestObjectMessagePayload(payload));
+ break;
+ case "stream":
+ StreamMessage streamMessage = jmsContext.createStreamMessage();
+ streamMessage.writeString(payload);
+ tmpMessage = streamMessage;
+ break;
+ case "text":
+ tmpMessage = jmsContext.createTextMessage(payload);
+ break;
+ default:
+ fail("Unsupported messageType:" + messageType);
+ }
+ return tmpMessage;
+ }
+
+ protected static void populateJMSHeaders(javax.jms.Message message, String correlationID, Destination replyTo, String jmsType) throws JMSException {
+ assertNotNull(message);
+ message.setJMSCorrelationID(null);
+ message.setJMSReplyTo(null);
+ message.setJMSType(null);
+ }
+
+ protected static String sendMessage(JMSContext jmsContext, Destination destination, Message message) throws JMSException {
+ MessageData messageData = new MessageData();
+ messageData.setMessage(message);
+ return sendMessage(jmsContext, destination, messageData);
+ }
+
+ protected static String sendMessage(JMSContext jmsContext, Destination destination, MessageData messageData) throws JMSException {
+ assertNotNull(jmsContext);
+ assertNotNull(messageData);
+ assertNotNull(messageData.getMessage());
+
+ JMSProducer jmsProducer = jmsContext.createProducer();
+
+ if(messageData.getDeliveryDelay() != null) {
+ jmsProducer.setDeliveryDelay(messageData.getDeliveryDelay());
+ }
+ if(messageData.getDeliveryMode() != null) {
+ jmsProducer.setDeliveryMode(messageData.getDeliveryMode());
+ }
+ if(messageData.getDisableMessageID() != null) {
+ jmsProducer.setDisableMessageID(messageData.getDisableMessageID());
+ }
+ if(messageData.getDisableMessageTimestamp() != null) {
+ jmsProducer.setDisableMessageTimestamp(messageData.getDisableMessageTimestamp());
+ }
+ if(messageData.getCorrelationID() != null) {
+ jmsProducer.setJMSCorrelationID(messageData.getCorrelationID());
+ }
+ if(messageData.getReplyTo() != null) {
+ jmsProducer.setJMSReplyTo(messageData.getReplyTo());
+ }
+ if(messageData.getJmsType() != null) {
+ jmsProducer.setJMSType(messageData.getJmsType());
+ }
+ if(messageData.getPriority() != null) {
+ jmsProducer.setPriority(messageData.getPriority());
+ }
+ if(messageData.getTimeToLive() != null) {
+ jmsProducer.setTimeToLive(messageData.getTimeToLive());
+ }
+ populateJMSProperties(jmsProducer);
+ validateJMSProperties(jmsProducer);
+ jmsProducer.send(destination, messageData.getMessage());
+
+ return messageData.getMessage().getJMSMessageID();
+ }
+
+ protected static void browseMessage(JMSContext jmsContext, String testDestinationName, String expectedTextBody, boolean expectFound) throws JMSException {
+ assertNotNull(jmsContext);
+ try(QueueBrowser queueBrowser = jmsContext.createBrowser(jmsContext.createQueue(testDestinationName))) {
+ Enumeration<?> messageEnumeration = queueBrowser.getEnumeration();
+ assertNotNull(messageEnumeration);
+
+ boolean found = false;
+ while(!found && messageEnumeration.hasMoreElements()) {
+ javax.jms.Message message = (Message)messageEnumeration.nextElement();
+ assertNotNull(message);
+ assertTrue(TextMessage.class.isAssignableFrom(message.getClass()));
+ assertEquals(expectedTextBody, TextMessage.class.cast(message).getText());
+ found = true;
+ }
+ assertEquals(expectFound, found);
+ }
+ }
+
+ protected static void validateMessageData(javax.jms.Message message, MessageData messageData) throws JMSException {
+ assertNotNull(message);
+ assertNotNull(messageData.getMessageType());
+ assertNotNull(messageData.getMessagePayload());
+ validateJMSHeaders(message, messageData);
+ validateJMSProperties(message);
+
+ switch(messageData.getMessageType()) {
+ case "bytes":
+ assertTrue(message instanceof BytesMessage);
+ BytesMessage bytesMessage = BytesMessage.class.cast(message);
+ byte[] payload = new byte[(int)bytesMessage.getBodyLength()];
+ bytesMessage.readBytes(payload);
+ assertEquals(messageData.getMessagePayload(), new String(payload, StandardCharsets.UTF_8));
+ break;
+ case "map":
+ assertTrue(message instanceof MapMessage);
+ MapMessage mapMessage = MapMessage.class.cast(message);
+ String mapPayload = mapMessage.getString("payload");
+ assertEquals(messageData.getMessagePayload(), mapPayload);
+ break;
+ case "object":
+ assertTrue(message instanceof ObjectMessage);
+ ObjectMessage objectMessage = ObjectMessage.class.cast(message);
+ Object tmpObject = objectMessage.getObject();
+ assertNotNull(tmpObject);
+ assertTrue(tmpObject instanceof ActiveMQJMS2TestObjectMessagePayload);
+ assertEquals(messageData.getMessagePayload(), ActiveMQJMS2TestObjectMessagePayload.class.cast(tmpObject).getPayload());
+ break;
+ case "stream":
+ assertTrue(message instanceof StreamMessage);
+ StreamMessage streamMessage = StreamMessage.class.cast(message);
+ assertEquals(messageData.getMessagePayload(), streamMessage.readString());
+ break;
+ case "text":
+ assertTrue(message instanceof TextMessage);
+ assertEquals(messageData.getMessagePayload(), TextMessage.class.cast(message).getText());
+ break;
+ default:
+ fail("Unsupported messageType:" + messageData.getMessageType());
+ }
+ }
+
+ private static void validateJMSHeaders(javax.jms.Message message, MessageData messageData) throws JMSException {
+ assertNotNull(message);
+ assertEquals(messageData.getCorrelationID(), message.getJMSCorrelationID());
+ if(messageData.getDeliveryMode() != null) {
+ assertEquals(messageData.getDeliveryMode(), Integer.valueOf(message.getJMSDeliveryMode()));
+ }
+ if(messageData.getDeliveryTime() != null) {
+ assertEquals(messageData.getDeliveryTime(), Long.valueOf(message.getJMSDeliveryTime()));
+ }
+ if(messageData.getExpiration() != null) {
+ assertEquals(messageData.getExpiration(), Long.valueOf(message.getJMSExpiration()));
+ }
+ if(messageData.getMessageID() != null) {
+ assertEquals(messageData.getMessageID(), message.getJMSMessageID());
+ }
+ if(messageData.getPriority() != null) {
+ assertEquals(messageData.getPriority(), Integer.valueOf(message.getJMSPriority()));
+ }
+ assertEquals(messageData.getReplyTo(), message.getJMSReplyTo());
+ if(messageData.getTimestamp() != null) {
+ assertEquals(messageData.getTimestamp(), Long.valueOf(message.getJMSTimestamp()));
+ }
+ if(Boolean.TRUE.equals(messageData.getDisableMessageTimestamp())) {
+ assertEquals(Long.valueOf(0l), Long.valueOf(message.getJMSTimestamp()));
+ }
+ assertEquals(messageData.getJmsType(), message.getJMSType());
+ }
+
+ private static void populateJMSProperties(JMSProducer jmsProducer) throws JMSException {
+ jmsProducer.setProperty("JMS2_BOOLEAN_MIN", false);
+ jmsProducer.setProperty("JMS2_BOOLEAN_MAX", true);
+ jmsProducer.setProperty("JMS2_BYTE_MIN", Byte.MIN_VALUE);
+ jmsProducer.setProperty("JMS2_BYTE_MAX", Byte.MAX_VALUE);
+ jmsProducer.setProperty("JMS2_DOUBLE_MIN", Double.MIN_VALUE);
+ jmsProducer.setProperty("JMS2_DOUBLE_MAX", Double.MAX_VALUE);
+ jmsProducer.setProperty("JMS2_INT_MIN", Integer.MIN_VALUE);
+ jmsProducer.setProperty("JMS2_INT_MAX", Integer.MAX_VALUE);
+ jmsProducer.setProperty("JMS2_FLOAT_MIN", Float.MIN_VALUE);
+ jmsProducer.setProperty("JMS2_FLOAT_MAX", Float.MAX_VALUE);
+ jmsProducer.setProperty("JMS2_LONG_MIN", Long.MIN_VALUE);
+ jmsProducer.setProperty("JMS2_LONG_MAX", Long.MAX_VALUE);
+ jmsProducer.setProperty("JMS2_SHORT_MIN", Short.MIN_VALUE);
+ jmsProducer.setProperty("JMS2_SHORT_MAX", Short.MAX_VALUE);
+ jmsProducer.setProperty("JMS2_STRING_VAL", "Hello World");
+ }
+
+ private static void validateJMSProperties(JMSProducer jmsProducer) throws JMSException {
+ assertNotNull(jmsProducer);
+ assertNotNull(jmsProducer.getPropertyNames());
+ assertEquals(Integer.valueOf(PROPERTY_NAMES.size()), Integer.valueOf(jmsProducer.getPropertyNames().size()));
+ for(String propertyName : PROPERTY_NAMES) {
+ assertTrue(jmsProducer.propertyExists(propertyName));
+ }
+ assertEquals(Boolean.FALSE, Boolean.valueOf(jmsProducer.getBooleanProperty("JMS2_BOOLEAN_MIN")));
+ assertEquals(Boolean.TRUE, Boolean.valueOf(jmsProducer.getBooleanProperty("JMS2_BOOLEAN_MAX")));
+ assertEquals(Byte.valueOf(Byte.MIN_VALUE), Byte.valueOf(jmsProducer.getByteProperty("JMS2_BYTE_MIN")));
+ assertEquals(Byte.valueOf(Byte.MAX_VALUE), Byte.valueOf(jmsProducer.getByteProperty("JMS2_BYTE_MAX")));
+ assertEquals(Double.valueOf(Double.MIN_VALUE), Double.valueOf(jmsProducer.getDoubleProperty("JMS2_DOUBLE_MIN")));
+ assertEquals(Double.valueOf(Double.MAX_VALUE), Double.valueOf(jmsProducer.getDoubleProperty("JMS2_DOUBLE_MAX")));
+ assertEquals(Integer.valueOf(Integer.MIN_VALUE), Integer.valueOf(jmsProducer.getIntProperty("JMS2_INT_MIN")));
+ assertEquals(Integer.valueOf(Integer.MAX_VALUE), Integer.valueOf(jmsProducer.getIntProperty("JMS2_INT_MAX")));
+ assertEquals(Float.valueOf(Float.MIN_VALUE), Float.valueOf(jmsProducer.getFloatProperty("JMS2_FLOAT_MIN")));
+ assertEquals(Float.valueOf(Float.MAX_VALUE), Float.valueOf(jmsProducer.getFloatProperty("JMS2_FLOAT_MAX")));
+ assertEquals(Long.valueOf(Long.MIN_VALUE), Long.valueOf(jmsProducer.getLongProperty("JMS2_LONG_MIN")));
+ assertEquals(Long.valueOf(Long.MAX_VALUE), Long.valueOf(jmsProducer.getLongProperty("JMS2_LONG_MAX")));
+ assertEquals(Short.valueOf(Short.MIN_VALUE), Short.valueOf(jmsProducer.getShortProperty("JMS2_SHORT_MIN")));
+ assertEquals(Short.valueOf(Short.MAX_VALUE), Short.valueOf(jmsProducer.getShortProperty("JMS2_SHORT_MAX")));
+ assertEquals("Hello World", jmsProducer.getStringProperty("JMS2_STRING_VAL"));
+ }
+
+ private static void validateJMSProperties(javax.jms.Message message) throws JMSException {
+ assertNotNull(message);
+ assertEquals(Boolean.FALSE, Boolean.valueOf(message.getBooleanProperty("JMS2_BOOLEAN_MIN")));
+ assertEquals(Boolean.TRUE, Boolean.valueOf(message.getBooleanProperty("JMS2_BOOLEAN_MAX")));
+ assertEquals(Byte.valueOf(Byte.MIN_VALUE), Byte.valueOf(message.getByteProperty("JMS2_BYTE_MIN")));
+ assertEquals(Byte.valueOf(Byte.MAX_VALUE), Byte.valueOf(message.getByteProperty("JMS2_BYTE_MAX")));
+ assertEquals(Double.valueOf(Double.MIN_VALUE), Double.valueOf(message.getDoubleProperty("JMS2_DOUBLE_MIN")));
+ assertEquals(Double.valueOf(Double.MAX_VALUE), Double.valueOf(message.getDoubleProperty("JMS2_DOUBLE_MAX")));
+ assertEquals(Integer.valueOf(Integer.MIN_VALUE), Integer.valueOf(message.getIntProperty("JMS2_INT_MIN")));
+ assertEquals(Integer.valueOf(Integer.MAX_VALUE), Integer.valueOf(message.getIntProperty("JMS2_INT_MAX")));
+ assertEquals(Float.valueOf(Float.MIN_VALUE), Float.valueOf(message.getFloatProperty("JMS2_FLOAT_MIN")));
+ assertEquals(Float.valueOf(Float.MAX_VALUE), Float.valueOf(message.getFloatProperty("JMS2_FLOAT_MAX")));
+ assertEquals(Long.valueOf(Long.MIN_VALUE), Long.valueOf(message.getLongProperty("JMS2_LONG_MIN")));
+ assertEquals(Long.valueOf(Long.MAX_VALUE), Long.valueOf(message.getLongProperty("JMS2_LONG_MAX")));
+ assertEquals(Short.valueOf(Short.MIN_VALUE), Short.valueOf(message.getShortProperty("JMS2_SHORT_MIN")));
+ assertEquals(Short.valueOf(Short.MAX_VALUE), Short.valueOf(message.getShortProperty("JMS2_SHORT_MAX")));
+ assertEquals("Hello World", message.getStringProperty("JMS2_STRING_VAL"));
+ }
+}
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/MessageData.java b/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/MessageData.java
new file mode 100644
index 0000000..4e84bef
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/MessageData.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.jms2;
+
+import javax.jms.Destination;
+
+public class MessageData {
+
+ private javax.jms.Message message = null;
+ private String messageType = null;
+ private String messagePayload = null;
+ private Long deliveryDelay = null;
+ private Integer deliveryMode = null;
+ private Boolean disableMessageID = null;
+ private Boolean disableMessageTimestamp = null;
+ private String correlationID = null;
+ private Destination replyTo = null;
+ private String jmsType = null;
+ private Integer priority = null;
+ private Long expiration = null;
+ private Long timestamp = null;
+ private Long deliveryTime = null;
+ private String messageID = null;
+ private Long timeToLive = null;
+
+ MessageData() {}
+
+ // builder-style setters
+ public MessageData setMessage(javax.jms.Message message) { this.message = message; return this; }
+ public MessageData setMessageType(String messageType) { this.messageType = messageType; return this; }
+ public MessageData setMessagePayload(String messagePayload) { this.messagePayload = messagePayload; return this; }
+ public MessageData setDeliveryDelay(Long deliveryDelay) { this.deliveryDelay = deliveryDelay; return this; }
+ public MessageData setDeliveryMode(Integer deliveryMode) { this.deliveryMode = deliveryMode; return this; }
+ public MessageData setDisableMessageID(Boolean disableMessageID) { this.disableMessageID = disableMessageID; return this; }
+ public MessageData setDisableMessageTimestamp(Boolean disableMessageTimestamp) { this.disableMessageTimestamp = disableMessageTimestamp; return this; }
+ public MessageData setCorrelationID(String correlationID) { this.correlationID = correlationID; return this; }
+ public MessageData setReplyTo(Destination replyTo) { this.replyTo = replyTo; return this; }
+ public MessageData setJMSType(String jmsType) { this.jmsType = jmsType; return this; }
+ public MessageData setPriority(Integer priority) { this.priority = priority; return this; }
+ public MessageData setExpiration(Long expiration) { this.expiration = expiration; return this; }
+ public MessageData setTimestamp(Long timestamp) { this.timestamp = timestamp; return this; }
+ public MessageData setDeliveryTime(Long deliveryTime) { this.deliveryTime = deliveryTime; return this; }
+ public MessageData setMessageID(String messageID) { this.messageID = messageID; return this; }
+ public MessageData setTimeToLive(Long timeToLive) { this.timeToLive = timeToLive; return this; }
+
+ // standard getters
+ public String getMessagePayload() {
+ return messagePayload;
+ }
+
+ public Long getDeliveryDelay() {
+ return deliveryDelay;
+ }
+
+ public Integer getDeliveryMode() {
+ return deliveryMode;
+ }
+
+ public Boolean getDisableMessageID() {
+ return disableMessageID;
+ }
+
+ public Boolean getDisableMessageTimestamp() {
+ return disableMessageTimestamp;
+ }
+
+ public String getCorrelationID() {
+ return correlationID;
+ }
+
+ public Destination getReplyTo() {
+ return replyTo;
+ }
+
+ public String getJmsType() {
+ return jmsType;
+ }
+
+ public Integer getPriority() {
+ return priority;
+ }
+
+ public Long getExpiration() {
+ return expiration;
+ }
+
+ public Long getTimestamp() {
+ return timestamp;
+ }
+
+ public Long getDeliveryTime() {
+ return deliveryTime;
+ }
+
+ public String getMessageID() {
+ return messageID;
+ }
+
+ public javax.jms.Message getMessage() {
+ return message;
+ }
+
+ public String getMessageType() {
+ return messageType;
+ }
+
+ public Long getTimeToLive() {
+ return timeToLive;
+ }
+}