| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.activemq.artemis.tests.integration.amqp; |
| |
| import java.util.concurrent.TimeUnit; |
| |
| import org.apache.activemq.artemis.api.core.RoutingType; |
| import org.apache.activemq.artemis.api.core.SimpleString; |
| import org.apache.activemq.artemis.core.server.ActiveMQServer; |
| import org.apache.activemq.artemis.core.server.AddressQueryResult; |
| import org.apache.activemq.artemis.core.settings.impl.AddressSettings; |
| import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport; |
| import org.apache.activemq.transport.amqp.client.AmqpClient; |
| import org.apache.activemq.transport.amqp.client.AmqpConnection; |
| import org.apache.activemq.transport.amqp.client.AmqpMessage; |
| import org.apache.activemq.transport.amqp.client.AmqpReceiver; |
| import org.apache.activemq.transport.amqp.client.AmqpSender; |
| import org.apache.activemq.transport.amqp.client.AmqpSession; |
| import org.junit.Test; |
| |
| public class AmqpAnonymousRelayTest extends AmqpClientTestSupport { |
| |
| private static final String AUTO_CREATION_QUEUE_PREFIX = "AmqpAnonymousRelayTest-AutoCreateQueues."; |
| private static final String AUTO_CREATION_TOPIC_PREFIX = "AmqpAnonymousRelayTest-AutoCreateTopics."; |
| |
| // Disable auto-creation in the general config created by the superclass, we add specific prefixed areas with it enabled |
| @Override |
| protected boolean isAutoCreateQueues() { |
| return false; |
| } |
| |
| @Override |
| protected boolean isAutoCreateAddresses() { |
| return false; |
| } |
| |
| // Additional address configuration for auto creation of queues and topics |
| @Override |
| protected void configureAddressPolicy(ActiveMQServer server) { |
| super.configureAddressPolicy(server); |
| |
| AddressSettings autoCreateQueueAddressSettings = new AddressSettings(); |
| autoCreateQueueAddressSettings.setAutoCreateQueues(true); |
| autoCreateQueueAddressSettings.setAutoCreateAddresses(true); |
| autoCreateQueueAddressSettings.setDefaultAddressRoutingType(RoutingType.ANYCAST); |
| autoCreateQueueAddressSettings.setDefaultQueueRoutingType(RoutingType.ANYCAST); |
| |
| server.getConfiguration().getAddressesSettings().put(AUTO_CREATION_QUEUE_PREFIX + "#", autoCreateQueueAddressSettings); |
| |
| AddressSettings autoCreateTopicAddressSettings = new AddressSettings(); |
| autoCreateTopicAddressSettings.setAutoCreateQueues(true); |
| autoCreateTopicAddressSettings.setAutoCreateAddresses(true); |
| autoCreateTopicAddressSettings.setDefaultAddressRoutingType(RoutingType.MULTICAST); |
| autoCreateTopicAddressSettings.setDefaultQueueRoutingType(RoutingType.MULTICAST); |
| |
| server.getConfiguration().getAddressesSettings().put(AUTO_CREATION_TOPIC_PREFIX + "#", autoCreateTopicAddressSettings); |
| } |
| |
| @Test(timeout = 60000) |
| public void testSendMessageOnAnonymousProducerCausesQueueAutoCreation() throws Exception { |
| AmqpClient client = createAmqpClient(); |
| AmqpConnection connection = addConnection(client.connect()); |
| |
| // We use an address in the QUEUE prefixed auto-creation area to ensure the broker picks this up |
| // and creates a queue, in the absense of any other message annotation / terminus capability config. |
| String queueName = AUTO_CREATION_QUEUE_PREFIX + getQueueName(); |
| try { |
| AmqpSession session = connection.createSession(); |
| |
| AmqpSender sender = session.createAnonymousSender(); |
| AmqpMessage message = new AmqpMessage(); |
| message.setAddress(queueName); |
| message.setText(getTestName()); |
| |
| AddressQueryResult addressQueryResult = server.addressQuery(SimpleString.toSimpleString(queueName)); |
| assertFalse(addressQueryResult.isExists()); |
| |
| sender.send(message); |
| sender.close(); |
| |
| addressQueryResult = server.addressQuery(SimpleString.toSimpleString(queueName)); |
| assertTrue(addressQueryResult.isExists()); |
| assertTrue(addressQueryResult.getRoutingTypes().contains(RoutingType.ANYCAST)); |
| assertTrue(addressQueryResult.isAutoCreated()); |
| |
| // Create a receiver and verify it can consume the message from the auto-created queue |
| AmqpReceiver receiver = session.createReceiver(queueName); |
| receiver.flow(1); |
| AmqpMessage received = receiver.receive(10, TimeUnit.SECONDS); |
| assertNotNull("Should have read message", received); |
| assertEquals(getTestName(), received.getText()); |
| received.accept(); |
| |
| receiver.close(); |
| } finally { |
| connection.close(); |
| } |
| } |
| |
| @Test(timeout = 60000) |
| public void testSendMessageOnAnonymousProducerCausesTopicAutoCreation() throws Exception { |
| AmqpClient client = createAmqpClient(); |
| AmqpConnection connection = addConnection(client.connect()); |
| |
| // We use an address in the TOPIC prefixed auto-creation area to ensure the broker picks this up |
| // and creates a topic, in the absense of any other message annotation / terminus capability config. |
| String topicName = AUTO_CREATION_TOPIC_PREFIX + getTopicName(); |
| |
| try { |
| AmqpSession session = connection.createSession(); |
| |
| AmqpSender sender = session.createAnonymousSender(); |
| AmqpMessage message = new AmqpMessage(); |
| |
| message.setAddress(topicName); |
| message.setText("creating-topic-address"); |
| |
| AddressQueryResult addressQueryResult = server.addressQuery(SimpleString.toSimpleString(topicName)); |
| assertFalse(addressQueryResult.isExists()); |
| |
| sender.send(message); |
| |
| addressQueryResult = server.addressQuery(SimpleString.toSimpleString(topicName)); |
| assertTrue(addressQueryResult.isExists()); |
| assertTrue(addressQueryResult.getRoutingTypes().contains(RoutingType.MULTICAST)); |
| assertTrue(addressQueryResult.isAutoCreated()); |
| |
| // Create 2 receivers and verify they can both consume a new message sent to the auto-created topic |
| AmqpReceiver receiver1 = session.createReceiver(topicName); |
| AmqpReceiver receiver2 = session.createReceiver(topicName); |
| receiver1.flow(1); |
| receiver2.flow(1); |
| |
| AmqpMessage message2 = new AmqpMessage(); |
| message2.setAddress(topicName); |
| message2.setText(getTestName()); |
| |
| sender.send(message2); |
| |
| AmqpMessage received1 = receiver1.receive(10, TimeUnit.SECONDS); |
| assertNotNull("Should have read message", received1); |
| assertEquals(getTestName(), received1.getText()); |
| received1.accept(); |
| |
| AmqpMessage received2 = receiver2.receive(10, TimeUnit.SECONDS); |
| assertNotNull("Should have read message", received2); |
| assertEquals(getTestName(), received2.getText()); |
| received1.accept(); |
| |
| receiver1.close(); |
| receiver2.close(); |
| sender.close(); |
| } finally { |
| connection.close(); |
| } |
| } |
| |
| @Test(timeout = 60000) |
| public void testSendMessageOnAnonymousProducerWithDestinationTypeAnnotationCausesQueueAutoCreation() throws Exception { |
| AmqpClient client = createAmqpClient(); |
| AmqpConnection connection = addConnection(client.connect()); |
| |
| try { |
| AmqpSession session = connection.createSession(); |
| |
| AmqpSender sender = session.createAnonymousSender(); |
| AmqpMessage message = new AmqpMessage(); |
| message.setMessageAnnotation(AMQPMessageSupport.JMS_DEST_TYPE_MSG_ANNOTATION.toString(), AMQPMessageSupport.QUEUE_TYPE); |
| |
| // We deliberately use the TOPIC prefixed auto-creation area, not the QUEUE prefix, to ensure |
| // we get a queue because the broker inspects the value we send on the message, and not just |
| // because it was taken as a default from the address settings. |
| String queueName = AUTO_CREATION_TOPIC_PREFIX + getQueueName(); |
| |
| message.setAddress(queueName); |
| message.setText(getTestName()); |
| |
| AddressQueryResult addressQueryResult = server.addressQuery(SimpleString.toSimpleString(queueName)); |
| assertFalse(addressQueryResult.isExists()); |
| |
| sender.send(message); |
| sender.close(); |
| |
| addressQueryResult = server.addressQuery(SimpleString.toSimpleString(queueName)); |
| assertTrue(addressQueryResult.isExists()); |
| assertTrue(addressQueryResult.getRoutingTypes().contains(RoutingType.ANYCAST)); |
| assertTrue(addressQueryResult.isAutoCreated()); |
| |
| // Create a receiver and verify it can consume the message from the auto-created queue |
| AmqpReceiver receiver = session.createReceiver(queueName); |
| receiver.flow(1); |
| AmqpMessage received = receiver.receive(10, TimeUnit.SECONDS); |
| assertNotNull("Should have read message", received); |
| assertEquals(getTestName(), received.getText()); |
| received.accept(); |
| |
| receiver.close(); |
| } finally { |
| connection.close(); |
| } |
| } |
| |
| @Test(timeout = 60000) |
| public void testSendMessageOnAnonymousProducerWithDestinationTypeAnnotationCausesTopicAutoCreation() throws Exception { |
| AmqpClient client = createAmqpClient(); |
| AmqpConnection connection = addConnection(client.connect()); |
| |
| try { |
| AmqpSession session = connection.createSession(); |
| |
| AmqpSender sender = session.createAnonymousSender(); |
| AmqpMessage message = new AmqpMessage(); |
| message.setMessageAnnotation(AMQPMessageSupport.JMS_DEST_TYPE_MSG_ANNOTATION.toString(), AMQPMessageSupport.TOPIC_TYPE); |
| |
| // We deliberately use the QUEUE prefixed auto-creation area, not the TOPIC prefix, to ensure |
| // we get a topic because the broker inspects the value we send on the message, and not just |
| // because it was taken as a default from the address settings. |
| String topicName = AUTO_CREATION_QUEUE_PREFIX + getTopicName(); |
| message.setAddress(topicName); |
| message.setText("creating-topic-address"); |
| |
| AddressQueryResult addressQueryResult = server.addressQuery(SimpleString.toSimpleString(topicName)); |
| assertFalse(addressQueryResult.isExists()); |
| |
| sender.send(message); |
| |
| addressQueryResult = server.addressQuery(SimpleString.toSimpleString(topicName)); |
| assertTrue(addressQueryResult.isExists()); |
| assertTrue(addressQueryResult.getRoutingTypes().contains(RoutingType.MULTICAST)); |
| assertTrue(addressQueryResult.isAutoCreated()); |
| |
| // Create 2 receivers and verify they can both consume a new message sent to the auto-created topic |
| AmqpReceiver receiver1 = session.createReceiver(topicName); |
| AmqpReceiver receiver2 = session.createReceiver(topicName); |
| receiver1.flow(1); |
| receiver2.flow(1); |
| |
| AmqpMessage message2 = new AmqpMessage(); |
| message2.setMessageAnnotation(AMQPMessageSupport.JMS_DEST_TYPE_MSG_ANNOTATION.toString(), AMQPMessageSupport.TOPIC_TYPE); |
| message2.setAddress(topicName); |
| message2.setText(getTestName()); |
| |
| sender.send(message2); |
| |
| AmqpMessage received1 = receiver1.receive(10, TimeUnit.SECONDS); |
| assertNotNull("Should have read message", received1); |
| assertEquals(getTestName(), received1.getText()); |
| received1.accept(); |
| |
| AmqpMessage received2 = receiver2.receive(10, TimeUnit.SECONDS); |
| assertNotNull("Should have read message", received2); |
| assertEquals(getTestName(), received2.getText()); |
| received1.accept(); |
| |
| receiver1.close(); |
| receiver2.close(); |
| sender.close(); |
| } finally { |
| connection.close(); |
| } |
| } |
| |
| @Test(timeout = 60000) |
| public void testSendMessageOnAnonymousRelayLinkUsingMessageTo() throws Exception { |
| AmqpClient client = createAmqpClient(); |
| AmqpConnection connection = addConnection(client.connect()); |
| |
| try { |
| AmqpSession session = connection.createSession(); |
| |
| AmqpSender sender = session.createAnonymousSender(); |
| AmqpMessage message = new AmqpMessage(); |
| |
| message.setAddress(getQueueName()); |
| message.setMessageId("msg" + 1); |
| message.setText("Test-Message"); |
| |
| sender.send(message); |
| sender.close(); |
| |
| AmqpReceiver receiver = session.createReceiver(getQueueName()); |
| receiver.flow(1); |
| AmqpMessage received = receiver.receive(10, TimeUnit.SECONDS); |
| assertNotNull("Should have read message", received); |
| assertEquals("msg1", received.getMessageId()); |
| received.accept(); |
| |
| receiver.close(); |
| } finally { |
| connection.close(); |
| } |
| } |
| |
| @Test(timeout = 60000) |
| public void testSendMessageFailsOnAnonymousRelayLinkWhenNoToValueSet() throws Exception { |
| AmqpClient client = createAmqpClient(); |
| AmqpConnection connection = addConnection(client.connect()); |
| |
| try { |
| AmqpSession session = connection.createSession(); |
| |
| AmqpSender sender = session.createAnonymousSender(); |
| AmqpMessage message = new AmqpMessage(); |
| |
| message.setMessageId("msg" + 1); |
| message.setText("Test-Message"); |
| |
| try { |
| sender.send(message); |
| fail("Should not be able to send, message should be rejected"); |
| } catch (Exception ex) { |
| ex.printStackTrace(); |
| } finally { |
| sender.close(); |
| } |
| } finally { |
| connection.close(); |
| } |
| } |
| |
| @Test(timeout = 60000) |
| public void testSendMessageFailsOnAnonymousRelayWhenToFieldHasNonExistingAddress() throws Exception { |
| AmqpClient client = createAmqpClient(); |
| AmqpConnection connection = addConnection(client.connect()); |
| |
| try { |
| AmqpSession session = connection.createSession(); |
| |
| AmqpSender sender = session.createAnonymousSender(); |
| AmqpMessage message = new AmqpMessage(); |
| |
| message.setAddress("exampleQueu-not-in-service"); |
| message.setMessageId("msg" + 1); |
| message.setText("Test-Message"); |
| |
| try { |
| sender.send(message); |
| fail("Should not be able to send, message should be rejected"); |
| } catch (Exception ex) { |
| ex.printStackTrace(); |
| } finally { |
| sender.close(); |
| } |
| } finally { |
| connection.close(); |
| } |
| } |
| } |