This closes #3269
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index bd17817..25cb4ab 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -464,6 +464,7 @@
context.incrementSettle();
+ RoutingType routingType = null;
if (address != null) {
message.setAddress(address);
} else {
@@ -474,10 +475,15 @@
rejectMessage(context, delivery, Symbol.valueOf("failed"), "Missing 'to' field for message sent to an anonymous producer");
return;
}
+
+ routingType = message.getRoutingType();
}
//here check queue-autocreation
- RoutingType routingType = context.getRoutingType(receiver, address);
+ if (routingType == null) {
+ routingType = context.getRoutingType(receiver, address);
+ }
+
if (!checkAddressAndAutocreateIfPossible(address, routingType)) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist();
}
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
index bd5551c..99ade07 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
@@ -32,6 +32,7 @@
import org.apache.qpid.proton.amqp.messaging.Header;
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.amqp.messaging.Properties;
+import org.apache.qpid.proton.amqp.messaging.Section;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.message.Message;
@@ -612,6 +613,25 @@
}
/**
+ * Attempts to retrieve the message body as a String from an AmqpValue body.
+ *
+ * @return the string
+ * @throws NoSuchElementException if the body does not contain a AmqpValue with String.
+ */
+ public String getText() throws NoSuchElementException {
+ Section body = getWrappedMessage().getBody();
+ if (body instanceof AmqpValue) {
+ AmqpValue value = (AmqpValue) body;
+
+ if (value.getValue() instanceof String) {
+ return (String) value.getValue();
+ }
+ }
+
+ throw new NoSuchElementException("Message does not contain a String body");
+ }
+
+ /**
* Sets a byte array value into the body of an outgoing Message, throws
* an exception if this is an incoming message instance.
*
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpAnonymousRelayTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpAnonymousRelayTest.java
index 1743624..98938d7 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpAnonymousRelayTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpAnonymousRelayTest.java
@@ -18,6 +18,12 @@
import java.util.concurrent.TimeUnit;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.AddressQueryResult;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
@@ -28,6 +34,10 @@
public class AmqpAnonymousRelayTest extends AmqpClientTestSupport {
+ private static final String AUTO_CREATION_QUEUE_PREFIX = "AmqpAnonymousRelayTest-AutoCreateQueues.";
+ private static final String AUTO_CREATION_TOPIC_PREFIX = "AmqpAnonymousRelayTest-AutoCreateTopics.";
+
+ // Disable auto-creation in the general config created by the superclass, we add specific prefixed areas with it enabled
@Override
protected boolean isAutoCreateQueues() {
return false;
@@ -38,6 +48,232 @@
return false;
}
+ // Additional address configuration for auto creation of queues and topics
+ @Override
+ protected void configureAddressPolicy(ActiveMQServer server) {
+ super.configureAddressPolicy(server);
+
+ AddressSettings autoCreateQueueAddressSettings = new AddressSettings();
+ autoCreateQueueAddressSettings.setAutoCreateQueues(true);
+ autoCreateQueueAddressSettings.setAutoCreateAddresses(true);
+ autoCreateQueueAddressSettings.setDefaultAddressRoutingType(RoutingType.ANYCAST);
+ autoCreateQueueAddressSettings.setDefaultQueueRoutingType(RoutingType.ANYCAST);
+
+ server.getConfiguration().getAddressesSettings().put(AUTO_CREATION_QUEUE_PREFIX + "#", autoCreateQueueAddressSettings);
+
+ AddressSettings autoCreateTopicAddressSettings = new AddressSettings();
+ autoCreateTopicAddressSettings.setAutoCreateQueues(true);
+ autoCreateTopicAddressSettings.setAutoCreateAddresses(true);
+ autoCreateTopicAddressSettings.setDefaultAddressRoutingType(RoutingType.MULTICAST);
+ autoCreateTopicAddressSettings.setDefaultQueueRoutingType(RoutingType.MULTICAST);
+
+ server.getConfiguration().getAddressesSettings().put(AUTO_CREATION_TOPIC_PREFIX + "#", autoCreateTopicAddressSettings);
+ }
+
+ @Test(timeout = 60000)
+ public void testSendMessageOnAnonymousProducerCausesQueueAutoCreation() throws Exception {
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+
+ // We use an address in the QUEUE prefixed auto-creation area to ensure the broker picks this up
+ // and creates a queue, in the absense of any other message annotation / terminus capability config.
+ String queueName = AUTO_CREATION_QUEUE_PREFIX + getQueueName();
+ try {
+ AmqpSession session = connection.createSession();
+
+ AmqpSender sender = session.createAnonymousSender();
+ AmqpMessage message = new AmqpMessage();
+ message.setAddress(queueName);
+ message.setText(getTestName());
+
+ AddressQueryResult addressQueryResult = server.addressQuery(SimpleString.toSimpleString(queueName));
+ assertFalse(addressQueryResult.isExists());
+
+ sender.send(message);
+ sender.close();
+
+ addressQueryResult = server.addressQuery(SimpleString.toSimpleString(queueName));
+ assertTrue(addressQueryResult.isExists());
+ assertTrue(addressQueryResult.getRoutingTypes().contains(RoutingType.ANYCAST));
+ assertTrue(addressQueryResult.isAutoCreated());
+
+ // Create a receiver and verify it can consume the message from the auto-created queue
+ AmqpReceiver receiver = session.createReceiver(queueName);
+ receiver.flow(1);
+ AmqpMessage received = receiver.receive(10, TimeUnit.SECONDS);
+ assertNotNull("Should have read message", received);
+ assertEquals(getTestName(), received.getText());
+ received.accept();
+
+ receiver.close();
+ } finally {
+ connection.close();
+ }
+ }
+
+ @Test(timeout = 60000)
+ public void testSendMessageOnAnonymousProducerCausesTopicAutoCreation() throws Exception {
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+
+ // We use an address in the TOPIC prefixed auto-creation area to ensure the broker picks this up
+ // and creates a topic, in the absense of any other message annotation / terminus capability config.
+ String topicName = AUTO_CREATION_TOPIC_PREFIX + getTopicName();
+
+ try {
+ AmqpSession session = connection.createSession();
+
+ AmqpSender sender = session.createAnonymousSender();
+ AmqpMessage message = new AmqpMessage();
+
+ message.setAddress(topicName);
+ message.setText("creating-topic-address");
+
+ AddressQueryResult addressQueryResult = server.addressQuery(SimpleString.toSimpleString(topicName));
+ assertFalse(addressQueryResult.isExists());
+
+ sender.send(message);
+
+ addressQueryResult = server.addressQuery(SimpleString.toSimpleString(topicName));
+ assertTrue(addressQueryResult.isExists());
+ assertTrue(addressQueryResult.getRoutingTypes().contains(RoutingType.MULTICAST));
+ assertTrue(addressQueryResult.isAutoCreated());
+
+ // Create 2 receivers and verify they can both consume a new message sent to the auto-created topic
+ AmqpReceiver receiver1 = session.createReceiver(topicName);
+ AmqpReceiver receiver2 = session.createReceiver(topicName);
+ receiver1.flow(1);
+ receiver2.flow(1);
+
+ AmqpMessage message2 = new AmqpMessage();
+ message2.setAddress(topicName);
+ message2.setText(getTestName());
+
+ sender.send(message2);
+
+ AmqpMessage received1 = receiver1.receive(10, TimeUnit.SECONDS);
+ assertNotNull("Should have read message", received1);
+ assertEquals(getTestName(), received1.getText());
+ received1.accept();
+
+ AmqpMessage received2 = receiver2.receive(10, TimeUnit.SECONDS);
+ assertNotNull("Should have read message", received2);
+ assertEquals(getTestName(), received2.getText());
+ received1.accept();
+
+ receiver1.close();
+ receiver2.close();
+ sender.close();
+ } finally {
+ connection.close();
+ }
+ }
+
+ @Test(timeout = 60000)
+ public void testSendMessageOnAnonymousProducerWithDestinationTypeAnnotationCausesQueueAutoCreation() throws Exception {
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+
+ try {
+ AmqpSession session = connection.createSession();
+
+ AmqpSender sender = session.createAnonymousSender();
+ AmqpMessage message = new AmqpMessage();
+ message.setMessageAnnotation(AMQPMessageSupport.JMS_DEST_TYPE_MSG_ANNOTATION.toString(), AMQPMessageSupport.QUEUE_TYPE);
+
+ // We deliberately use the TOPIC prefixed auto-creation area, not the QUEUE prefix, to ensure
+ // we get a queue because the broker inspects the value we send on the message, and not just
+ // because it was taken as a default from the address settings.
+ String queueName = AUTO_CREATION_TOPIC_PREFIX + getQueueName();
+
+ message.setAddress(queueName);
+ message.setText(getTestName());
+
+ AddressQueryResult addressQueryResult = server.addressQuery(SimpleString.toSimpleString(queueName));
+ assertFalse(addressQueryResult.isExists());
+
+ sender.send(message);
+ sender.close();
+
+ addressQueryResult = server.addressQuery(SimpleString.toSimpleString(queueName));
+ assertTrue(addressQueryResult.isExists());
+ assertTrue(addressQueryResult.getRoutingTypes().contains(RoutingType.ANYCAST));
+ assertTrue(addressQueryResult.isAutoCreated());
+
+ // Create a receiver and verify it can consume the message from the auto-created queue
+ AmqpReceiver receiver = session.createReceiver(queueName);
+ receiver.flow(1);
+ AmqpMessage received = receiver.receive(10, TimeUnit.SECONDS);
+ assertNotNull("Should have read message", received);
+ assertEquals(getTestName(), received.getText());
+ received.accept();
+
+ receiver.close();
+ } finally {
+ connection.close();
+ }
+ }
+
+ @Test(timeout = 60000)
+ public void testSendMessageOnAnonymousProducerWithDestinationTypeAnnotationCausesTopicAutoCreation() throws Exception {
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+
+ try {
+ AmqpSession session = connection.createSession();
+
+ AmqpSender sender = session.createAnonymousSender();
+ AmqpMessage message = new AmqpMessage();
+ message.setMessageAnnotation(AMQPMessageSupport.JMS_DEST_TYPE_MSG_ANNOTATION.toString(), AMQPMessageSupport.TOPIC_TYPE);
+
+ // We deliberately use the QUEUE prefixed auto-creation area, not the TOPIC prefix, to ensure
+ // we get a topic because the broker inspects the value we send on the message, and not just
+ // because it was taken as a default from the address settings.
+ String topicName = AUTO_CREATION_QUEUE_PREFIX + getTopicName();
+ message.setAddress(topicName);
+ message.setText("creating-topic-address");
+
+ AddressQueryResult addressQueryResult = server.addressQuery(SimpleString.toSimpleString(topicName));
+ assertFalse(addressQueryResult.isExists());
+
+ sender.send(message);
+
+ addressQueryResult = server.addressQuery(SimpleString.toSimpleString(topicName));
+ assertTrue(addressQueryResult.isExists());
+ assertTrue(addressQueryResult.getRoutingTypes().contains(RoutingType.MULTICAST));
+ assertTrue(addressQueryResult.isAutoCreated());
+
+ // Create 2 receivers and verify they can both consume a new message sent to the auto-created topic
+ AmqpReceiver receiver1 = session.createReceiver(topicName);
+ AmqpReceiver receiver2 = session.createReceiver(topicName);
+ receiver1.flow(1);
+ receiver2.flow(1);
+
+ AmqpMessage message2 = new AmqpMessage();
+ message2.setMessageAnnotation(AMQPMessageSupport.JMS_DEST_TYPE_MSG_ANNOTATION.toString(), AMQPMessageSupport.TOPIC_TYPE);
+ message2.setAddress(topicName);
+ message2.setText(getTestName());
+
+ sender.send(message2);
+
+ AmqpMessage received1 = receiver1.receive(10, TimeUnit.SECONDS);
+ assertNotNull("Should have read message", received1);
+ assertEquals(getTestName(), received1.getText());
+ received1.accept();
+
+ AmqpMessage received2 = receiver2.receive(10, TimeUnit.SECONDS);
+ assertNotNull("Should have read message", received2);
+ assertEquals(getTestName(), received2.getText());
+ received1.accept();
+
+ receiver1.close();
+ receiver2.close();
+ sender.close();
+ } finally {
+ connection.close();
+ }
+ }
+
@Test(timeout = 60000)
public void testSendMessageOnAnonymousRelayLinkUsingMessageTo() throws Exception {
AmqpClient client = createAmqpClient();
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageProducerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageProducerTest.java
index 2125ed8..fddc19c 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageProducerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageProducerTest.java
@@ -37,6 +37,35 @@
public class JMSMessageProducerTest extends JMSClientTestSupport {
@Test(timeout = 30000)
+ public void testAnonymousProducerWithQueueAutoCreation() throws Exception {
+ Connection connection = createConnection();
+
+ try {
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ String queueName = UUID.randomUUID().toString() + ":" + getQueueName();
+ Queue queue = session.createQueue(queueName);
+ MessageProducer p = session.createProducer(null);
+
+ TextMessage message = session.createTextMessage();
+ message.setText(getTestName());
+ // This will auto-create the address, and be retained for subsequent consumption
+ p.send(queue, message);
+
+ {
+ MessageConsumer consumer = session.createConsumer(queue);
+ p.send(queue, message);
+ Message msg = consumer.receive(2000);
+ assertNotNull(msg);
+ assertTrue(msg instanceof TextMessage);
+ assertEquals(getTestName(), ((TextMessage)msg).getText());
+ consumer.close();
+ }
+ } finally {
+ connection.close();
+ }
+ }
+
+ @Test(timeout = 30000)
public void testAnonymousProducer() throws Exception {
Connection connection = createConnection();
@@ -71,25 +100,32 @@
}
@Test(timeout = 30000)
- public void testAnonymousProducerWithAutoCreation() throws Exception {
+ public void testAnonymousProducerWithTopicAutoCreation() throws Exception {
Connection connection = createConnection();
try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Topic topic = session.createTopic(UUID.randomUUID().toString());
+ String topicName = UUID.randomUUID().toString() + ":" + getQueueName();
+ Topic topic = session.createTopic(topicName);
MessageProducer p = session.createProducer(null);
TextMessage message = session.createTextMessage();
- message.setText("hello");
- // this will auto-create the address
+ message.setText("creating-topic-address");
+ // This will auto-create the address, but msg will be discarded as there are no consumers
p.send(topic, message);
{
+ // This will create a new consumer, on the topic address, verifying it can attach
+ // and then receives a further sent message
MessageConsumer consumer = session.createConsumer(topic);
- p.send(topic, message);
+ Message message2 = message = session.createTextMessage(getTestName());
+
+ p.send(topic, message2);
+
Message msg = consumer.receive(2000);
assertNotNull(msg);
assertTrue(msg instanceof TextMessage);
+ assertEquals(getTestName(), ((TextMessage)msg).getText());
consumer.close();
}
} finally {