| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| package org.apache.qpid.systests.jms_1_1.extensions.autocreation; |
| |
| import static org.hamcrest.CoreMatchers.anyOf; |
| import static org.hamcrest.CoreMatchers.equalTo; |
| import static org.hamcrest.CoreMatchers.is; |
| import static org.hamcrest.CoreMatchers.not; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertNotNull; |
| import static org.junit.Assert.assertNull; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| import static org.junit.Assume.assumeThat; |
| |
| import java.io.IOException; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.Map; |
| |
| import javax.jms.Connection; |
| import javax.jms.JMSException; |
| import javax.jms.Message; |
| import javax.jms.MessageConsumer; |
| import javax.jms.MessageProducer; |
| import javax.jms.Queue; |
| import javax.jms.Session; |
| import javax.jms.TextMessage; |
| import javax.jms.Topic; |
| |
| import com.fasterxml.jackson.databind.ObjectMapper; |
| import org.junit.Test; |
| |
| import org.apache.qpid.server.exchange.ExchangeDefaults; |
| import org.apache.qpid.server.model.AlternateBinding; |
| import org.apache.qpid.server.model.Exchange; |
| import org.apache.qpid.server.model.Protocol; |
| import org.apache.qpid.server.virtualhost.NodeAutoCreationPolicy; |
| import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost; |
| import org.apache.qpid.systests.JmsTestBase; |
| |
| public class NodeAutoCreationPolicyTest extends JmsTestBase |
| { |
| private static final String DEAD_LETTER_QUEUE_SUFFIX = "_DLQ"; |
| private static final String DEAD_LETTER_EXCHANGE_SUFFIX = "_DLE"; |
| private static final String AUTO_CREATION_POLICIES = createAutoCreationPolicies(); |
| |
| |
| private static String createAutoCreationPolicies() |
| { |
| ObjectMapper mapper = new ObjectMapper(); |
| try |
| { |
| |
| NodeAutoCreationPolicy[] policies = new NodeAutoCreationPolicy[] { |
| new NodeAutoCreationPolicy() |
| { |
| @Override |
| public String getPattern() |
| { |
| return "fooQ.*"; |
| } |
| |
| @Override |
| public boolean isCreatedOnPublish() |
| { |
| return true; |
| } |
| |
| @Override |
| public boolean isCreatedOnConsume() |
| { |
| return true; |
| } |
| |
| @Override |
| public String getNodeType() |
| { |
| return "Queue"; |
| } |
| |
| @Override |
| public Map<String, Object> getAttributes() |
| { |
| return Collections.emptyMap(); |
| } |
| }, |
| new NodeAutoCreationPolicy() |
| { |
| @Override |
| public String getPattern() |
| { |
| return "barE.*"; |
| } |
| |
| @Override |
| public boolean isCreatedOnPublish() |
| { |
| return true; |
| } |
| |
| @Override |
| public boolean isCreatedOnConsume() |
| { |
| return false; |
| } |
| |
| @Override |
| public String getNodeType() |
| { |
| return "Exchange"; |
| } |
| |
| @Override |
| public Map<String, Object> getAttributes() |
| { |
| return Collections.singletonMap(Exchange.TYPE, "fanout"); |
| } |
| }, |
| |
| new NodeAutoCreationPolicy() |
| { |
| @Override |
| public String getPattern() |
| { |
| return ".*" + DEAD_LETTER_QUEUE_SUFFIX; |
| } |
| |
| @Override |
| public boolean isCreatedOnPublish() |
| { |
| return true; |
| } |
| |
| @Override |
| public boolean isCreatedOnConsume() |
| { |
| return true; |
| } |
| |
| @Override |
| public String getNodeType() |
| { |
| return "Queue"; |
| } |
| |
| @Override |
| public Map<String, Object> getAttributes() |
| { |
| return Collections.emptyMap(); |
| } |
| }, |
| |
| new NodeAutoCreationPolicy() |
| { |
| @Override |
| public String getPattern() |
| { |
| return ".*" + DEAD_LETTER_EXCHANGE_SUFFIX; |
| } |
| |
| @Override |
| public boolean isCreatedOnPublish() |
| { |
| return true; |
| } |
| |
| @Override |
| public boolean isCreatedOnConsume() |
| { |
| return false; |
| } |
| |
| @Override |
| public String getNodeType() |
| { |
| return "Exchange"; |
| } |
| |
| @Override |
| public Map<String, Object> getAttributes() |
| { |
| return Collections.singletonMap(Exchange.TYPE, ExchangeDefaults.FANOUT_EXCHANGE_CLASS); |
| } |
| } |
| }; |
| |
| return mapper.writeValueAsString(Arrays.asList(policies)); |
| } |
| catch (IOException e) |
| { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| private void updateAutoCreationPolicies() throws Exception |
| { |
| updateEntityUsingAmqpManagement(getVirtualHostName(), "org.apache.qpid.VirtualHost", Collections.singletonMap(QueueManagingVirtualHost.NODE_AUTO_CREATION_POLICIES, AUTO_CREATION_POLICIES)); |
| } |
| |
| @Test |
| public void testSendingToQueuePattern() throws Exception |
| { |
| updateAutoCreationPolicies(); |
| |
| Connection connection = getConnection(); |
| try |
| { |
| connection.start(); |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| final Queue queue = session.createQueue(getProtocol() == Protocol.AMQP_1_0 |
| ? "fooQueue" |
| : "ADDR: fooQueue ; { assert: never, node: { type: queue } }"); |
| final MessageProducer producer = session.createProducer(queue); |
| producer.send(session.createTextMessage("Hello world!")); |
| |
| final MessageConsumer consumer = session.createConsumer(queue); |
| Message received = consumer.receive(getReceiveTimeout()); |
| assertNotNull(received); |
| assertTrue(received instanceof TextMessage); |
| assertEquals("Hello world!", ((TextMessage) received).getText()); |
| } |
| finally |
| { |
| connection.close(); |
| } |
| } |
| |
| @Test |
| public void testSendingToNonMatchingQueuePattern() throws Exception |
| { |
| updateAutoCreationPolicies(); |
| |
| Connection connection = getConnection(); |
| try |
| { |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| final Queue queue = session.createQueue(getProtocol() == Protocol.AMQP_1_0 |
| ? "foQueue" |
| : "ADDR: foQueue ; { assert: never, node: { type: queue } }"); |
| try |
| { |
| session.createProducer(queue); |
| fail("Creating producer should fail"); |
| } |
| catch (JMSException e) |
| { |
| // pass |
| } |
| } |
| finally |
| { |
| connection.close(); |
| } |
| } |
| |
| @Test |
| public void testSendingToExchangePattern() throws Exception |
| { |
| updateAutoCreationPolicies(); |
| |
| Connection connection = getConnection(); |
| try |
| { |
| connection.start(); |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| final Topic topic = session.createTopic(getProtocol() == Protocol.AMQP_1_0 |
| ? "barExchange/foo" |
| : "ADDR: barExchange/foo ; { assert: never, node: { type: topic } }"); |
| final MessageProducer producer = session.createProducer(topic); |
| producer.send(session.createTextMessage("Hello world!")); |
| |
| final MessageConsumer consumer = session.createConsumer(topic); |
| Message received = consumer.receive(getReceiveTimeout() / 4); |
| assertNull(received); |
| |
| producer.send(session.createTextMessage("Hello world2!")); |
| received = consumer.receive(getReceiveTimeout()); |
| |
| assertNotNull(received); |
| |
| assertTrue(received instanceof TextMessage); |
| assertEquals("Hello world2!", ((TextMessage) received).getText()); |
| } |
| finally |
| { |
| connection.close(); |
| } |
| } |
| |
| @Test |
| public void testSendingToNonMatchingTopicPattern() throws Exception |
| { |
| updateAutoCreationPolicies(); |
| |
| Connection connection = getConnection(); |
| try |
| { |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| final Topic topic = session.createTopic(getProtocol() == Protocol.AMQP_1_0 |
| ? "baa" |
| : "ADDR: baa ; { assert: never, node: { type: topic } }"); |
| try |
| { |
| session.createProducer(topic); |
| fail("Creating producer should fail"); |
| } |
| catch (JMSException e) |
| { |
| // pass |
| } |
| } |
| finally |
| { |
| connection.close(); |
| } |
| } |
| |
| @Test |
| public void testSendingToQueuePatternBURL() throws Exception |
| { |
| assumeThat("Qpid JMS Client does not support BURL syntax", |
| getProtocol(), |
| is(not(equalTo(Protocol.AMQP_1_0)))); |
| updateAutoCreationPolicies(); |
| |
| Connection connection = getConnection(); |
| try |
| { |
| connection.start(); |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| final Queue queue = session.createQueue("BURL:direct:///fooQ/fooQ"); |
| final MessageProducer producer = session.createProducer(queue); |
| producer.send(session.createTextMessage("Hello world!")); |
| |
| final MessageConsumer consumer = session.createConsumer(queue); |
| Message received = consumer.receive(getReceiveTimeout()); |
| assertNotNull(received); |
| assertTrue(received instanceof TextMessage); |
| assertEquals("Hello world!", ((TextMessage) received).getText()); |
| } |
| finally |
| { |
| connection.close(); |
| } |
| } |
| |
| @Test |
| public void testSendingToNonMatchingQueuePatternBURL() throws Exception |
| { |
| assumeThat("Using AMQP 0-8..0-9-1 to test BURL syntax", |
| getProtocol(), |
| is(not(anyOf(equalTo(Protocol.AMQP_1_0), equalTo(Protocol.AMQP_0_10))))); |
| updateAutoCreationPolicies(); |
| |
| Connection connection = getConnectionBuilder().setSyncPublish(true).build(); |
| try |
| { |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| final Queue queue = session.createQueue("BURL:direct:///fo/fo"); |
| try |
| { |
| final MessageProducer producer = session.createProducer(queue); |
| producer.send(session.createTextMessage("Hello world!")); |
| |
| fail("Sending a message should fail"); |
| } |
| catch (JMSException e) |
| { |
| // pass |
| } |
| } |
| finally |
| { |
| connection.close(); |
| } |
| } |
| |
| @Test |
| public void testQueueAlternateBindingCreation() throws Exception |
| { |
| updateAutoCreationPolicies(); |
| |
| String queueName = getTestName(); |
| String deadLetterQueueName = queueName + DEAD_LETTER_QUEUE_SUFFIX; |
| |
| final Map<String, Object> attributes = new HashMap<>(); |
| Map<String, Object> expectedAlternateBinding = |
| Collections.singletonMap(AlternateBinding.DESTINATION, deadLetterQueueName); |
| attributes.put(org.apache.qpid.server.model.Queue.ALTERNATE_BINDING, |
| new ObjectMapper().writeValueAsString(expectedAlternateBinding)); |
| createEntityUsingAmqpManagement(queueName, "org.apache.qpid.Queue", attributes); |
| |
| Map<String, Object> queueAttributes = readEntityUsingAmqpManagement(queueName, "org.apache.qpid.Queue", true); |
| |
| Object actualAlternateBinding = queueAttributes.get(org.apache.qpid.server.model.Queue.ALTERNATE_BINDING); |
| Map<String, Object> actualAlternateBindingMap = convertIfNecessary(actualAlternateBinding); |
| assertEquals("Unexpected alternate binding", |
| new HashMap<>(expectedAlternateBinding), |
| new HashMap<>(actualAlternateBindingMap)); |
| |
| Map<String, Object> dlqAttributes = |
| readEntityUsingAmqpManagement(deadLetterQueueName, "org.apache.qpid.Queue", true); |
| assertNotNull("Cannot get dead letter queue", dlqAttributes); |
| } |
| |
| @Test |
| public void testExchangeAlternateBindingCreation() throws Exception |
| { |
| updateAutoCreationPolicies(); |
| |
| String exchangeName = getTestName(); |
| String deadLetterExchangeName = exchangeName + DEAD_LETTER_EXCHANGE_SUFFIX; |
| |
| final Map<String, Object> attributes = new HashMap<>(); |
| Map<String, Object> expectedAlternateBinding = |
| Collections.singletonMap(AlternateBinding.DESTINATION, deadLetterExchangeName); |
| attributes.put(Exchange.ALTERNATE_BINDING, new ObjectMapper().writeValueAsString(expectedAlternateBinding)); |
| attributes.put(Exchange.TYPE, ExchangeDefaults.DIRECT_EXCHANGE_CLASS); |
| createEntityUsingAmqpManagement(exchangeName, "org.apache.qpid.DirectExchange", attributes); |
| |
| Map<String, Object> exchangeAttributes = readEntityUsingAmqpManagement(exchangeName, "org.apache.qpid.Exchange", true); |
| |
| Object actualAlternateBinding = exchangeAttributes.get(Exchange.ALTERNATE_BINDING); |
| Map<String, Object> actualAlternateBindingMap = convertIfNecessary(actualAlternateBinding); |
| assertEquals("Unexpected alternate binding", |
| new HashMap<>(expectedAlternateBinding), |
| new HashMap<>(actualAlternateBindingMap)); |
| |
| Map<String, Object> dlqExchangeAttributes = readEntityUsingAmqpManagement( |
| deadLetterExchangeName, |
| "org.apache.qpid.FanoutExchange", |
| true); |
| assertNotNull("Cannot get dead letter exchange", dlqExchangeAttributes); |
| } |
| |
| @SuppressWarnings("unchecked") |
| private Map<String, Object> convertIfNecessary(final Object actualAlternateBinding) throws IOException |
| { |
| Map<String, Object> actualAlternateBindingMap; |
| if (actualAlternateBinding instanceof String) |
| { |
| actualAlternateBindingMap = new ObjectMapper().readValue((String)actualAlternateBinding, Map.class); |
| } |
| else |
| { |
| actualAlternateBindingMap = (Map<String, Object>) actualAlternateBinding; |
| } |
| return actualAlternateBindingMap; |
| } |
| } |