blob: c1928954784a2804b481bcd4616a3e09ec7c553a [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.systests.jms_1_1.extensions.autocreation;
import static org.hamcrest.CoreMatchers.anyOf;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assume.assumeThat;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.Test;
import org.apache.qpid.server.exchange.ExchangeDefaults;
import org.apache.qpid.server.model.AlternateBinding;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.virtualhost.NodeAutoCreationPolicy;
import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
import org.apache.qpid.systests.JmsTestBase;
public class NodeAutoCreationPolicyTest extends JmsTestBase
{
private static final String DEAD_LETTER_QUEUE_SUFFIX = "_DLQ";
private static final String DEAD_LETTER_EXCHANGE_SUFFIX = "_DLE";
private static final String AUTO_CREATION_POLICIES = createAutoCreationPolicies();
private static final String TEST_MESSAGE = "Hello world!";
private static final String VALID_QUEUE_NAME = "fooQueue";
private static final String TYPE_QUEUE = "queue";
private static final String TYPE_TOPIC = "topic";
private static String createAutoCreationPolicies()
{
ObjectMapper mapper = new ObjectMapper();
try
{
NodeAutoCreationPolicy[] policies = new NodeAutoCreationPolicy[] {
new NodeAutoCreationPolicy()
{
@Override
public String getPattern()
{
return "fooQ.*";
}
@Override
public boolean isCreatedOnPublish()
{
return true;
}
@Override
public boolean isCreatedOnConsume()
{
return true;
}
@Override
public String getNodeType()
{
return "Queue";
}
@Override
public Map<String, Object> getAttributes()
{
return Collections.emptyMap();
}
},
new NodeAutoCreationPolicy()
{
@Override
public String getPattern()
{
return "barE.*";
}
@Override
public boolean isCreatedOnPublish()
{
return true;
}
@Override
public boolean isCreatedOnConsume()
{
return false;
}
@Override
public String getNodeType()
{
return "Exchange";
}
@Override
public Map<String, Object> getAttributes()
{
return Collections.singletonMap(Exchange.TYPE, "fanout");
}
},
new NodeAutoCreationPolicy()
{
@Override
public String getPattern()
{
return ".*" + DEAD_LETTER_QUEUE_SUFFIX;
}
@Override
public boolean isCreatedOnPublish()
{
return true;
}
@Override
public boolean isCreatedOnConsume()
{
return true;
}
@Override
public String getNodeType()
{
return "Queue";
}
@Override
public Map<String, Object> getAttributes()
{
return Collections.emptyMap();
}
},
new NodeAutoCreationPolicy()
{
@Override
public String getPattern()
{
return ".*" + DEAD_LETTER_EXCHANGE_SUFFIX;
}
@Override
public boolean isCreatedOnPublish()
{
return true;
}
@Override
public boolean isCreatedOnConsume()
{
return false;
}
@Override
public String getNodeType()
{
return "Exchange";
}
@Override
public Map<String, Object> getAttributes()
{
return Collections.singletonMap(Exchange.TYPE, ExchangeDefaults.FANOUT_EXCHANGE_CLASS);
}
}
};
return mapper.writeValueAsString(Arrays.asList(policies));
}
catch (IOException e)
{
throw new RuntimeException(e);
}
}
private void updateAutoCreationPolicies() throws Exception
{
updateEntityUsingAmqpManagement(getVirtualHostName(), "org.apache.qpid.VirtualHost", Collections.singletonMap(QueueManagingVirtualHost.NODE_AUTO_CREATION_POLICIES, AUTO_CREATION_POLICIES));
}
@Test
public void testSendingToQueuePattern() throws Exception
{
updateAutoCreationPolicies();
Connection connection = getConnection();
try
{
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Queue queue = session.createQueue(getDestinationAddress(VALID_QUEUE_NAME, TYPE_QUEUE));
final MessageProducer producer = session.createProducer(queue);
producer.send(session.createTextMessage(TEST_MESSAGE));
final MessageConsumer consumer = session.createConsumer(queue);
Message received = consumer.receive(getReceiveTimeout());
assertNotNull(received);
assertTrue(received instanceof TextMessage);
assertEquals(TEST_MESSAGE, ((TextMessage) received).getText());
}
finally
{
connection.close();
}
}
@Test
public void testConcurrentQueueCreation() throws Exception
{
updateAutoCreationPolicies();
final String destination = getDestinationAddress(VALID_QUEUE_NAME, TYPE_QUEUE);
final int numberOfActors = 3;
final Connection[] connections = new Connection[numberOfActors];
try
{
final Session[] sessions = new Session[numberOfActors];
for (int i = 0; i < numberOfActors; i++)
{
final Connection connection = getConnection();
connections[i] = connection;
sessions[i] = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
final List<CompletableFuture<MessageProducer>> futures = new ArrayList<>(numberOfActors);
final ExecutorService executorService = Executors.newFixedThreadPool(numberOfActors);
try
{
Stream.of(sessions)
.forEach(session -> futures.add(CompletableFuture.supplyAsync(() -> publishMessage(session,
destination),
executorService)));
final CompletableFuture<Void> combinedFuture =
CompletableFuture.allOf(futures.toArray(new CompletableFuture[numberOfActors]));
combinedFuture.get(getReceiveTimeout(), TimeUnit.MILLISECONDS);
}
finally
{
executorService.shutdown();
}
final Connection connection = getConnection();
try
{
connection.start();
final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Queue queue = session.createQueue(destination);
final MessageConsumer consumer = session.createConsumer(queue);
for (int i = 0; i < numberOfActors; i++)
{
Message received = consumer.receive(getReceiveTimeout());
assertNotNull(received);
assertTrue(received instanceof TextMessage);
assertEquals(TEST_MESSAGE, ((TextMessage) received).getText());
}
}
finally
{
connection.close();
}
}
finally
{
for (Connection connection : connections)
{
if (connection != null)
{
connection.close();
}
}
}
}
private MessageProducer publishMessage(final Session session, final String destination)
{
try
{
final Queue queue = session.createQueue(destination);
final MessageProducer producer = session.createProducer(queue);
producer.send(session.createTextMessage(TEST_MESSAGE));
return producer;
}
catch (JMSException e)
{
throw new IllegalStateException(e);
}
}
@Test
public void testSendingToNonMatchingQueuePattern() throws Exception
{
updateAutoCreationPolicies();
Connection connection = getConnection();
try
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Queue queue = session.createQueue(getDestinationAddress("foQueue", TYPE_QUEUE));
try
{
session.createProducer(queue);
fail("Creating producer should fail");
}
catch (JMSException e)
{
// pass
}
}
finally
{
connection.close();
}
}
@Test
public void testSendingToExchangePattern() throws Exception
{
updateAutoCreationPolicies();
Connection connection = getConnection();
try
{
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Topic topic = session.createTopic(getDestinationAddress("barExchange/foo", TYPE_TOPIC));
final MessageProducer producer = session.createProducer(topic);
producer.send(session.createTextMessage(TEST_MESSAGE));
final MessageConsumer consumer = session.createConsumer(topic);
Message received = consumer.receive(getReceiveTimeout() / 4);
assertNull(received);
producer.send(session.createTextMessage("Hello world2!"));
received = consumer.receive(getReceiveTimeout());
assertNotNull(received);
assertTrue(received instanceof TextMessage);
assertEquals("Hello world2!", ((TextMessage) received).getText());
}
finally
{
connection.close();
}
}
@Test
public void testSendingToNonMatchingTopicPattern() throws Exception
{
updateAutoCreationPolicies();
Connection connection = getConnection();
try
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Topic topic = session.createTopic(getDestinationAddress("baa", TYPE_TOPIC));
try
{
session.createProducer(topic);
fail("Creating producer should fail");
}
catch (JMSException e)
{
// pass
}
}
finally
{
connection.close();
}
}
@Test
public void testSendingToQueuePatternBURL() throws Exception
{
assumeThat("Qpid JMS Client does not support BURL syntax",
getProtocol(),
is(not(equalTo(Protocol.AMQP_1_0))));
updateAutoCreationPolicies();
Connection connection = getConnection();
try
{
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Queue queue = session.createQueue("BURL:direct:///fooQ/fooQ");
final MessageProducer producer = session.createProducer(queue);
producer.send(session.createTextMessage(TEST_MESSAGE));
final MessageConsumer consumer = session.createConsumer(queue);
Message received = consumer.receive(getReceiveTimeout());
assertNotNull(received);
assertTrue(received instanceof TextMessage);
assertEquals(TEST_MESSAGE, ((TextMessage) received).getText());
}
finally
{
connection.close();
}
}
@Test
public void testSendingToNonMatchingQueuePatternBURL() throws Exception
{
assumeThat("Using AMQP 0-8..0-9-1 to test BURL syntax",
getProtocol(),
is(not(anyOf(equalTo(Protocol.AMQP_1_0), equalTo(Protocol.AMQP_0_10)))));
updateAutoCreationPolicies();
Connection connection = getConnectionBuilder().setSyncPublish(true).build();
try
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Queue queue = session.createQueue("BURL:direct:///fo/fo");
try
{
final MessageProducer producer = session.createProducer(queue);
producer.send(session.createTextMessage(TEST_MESSAGE));
fail("Sending a message should fail");
}
catch (JMSException e)
{
// pass
}
}
finally
{
connection.close();
}
}
@Test
public void testQueueAlternateBindingCreation() throws Exception
{
updateAutoCreationPolicies();
String queueName = getTestName();
String deadLetterQueueName = queueName + DEAD_LETTER_QUEUE_SUFFIX;
final Map<String, Object> attributes = new HashMap<>();
Map<String, Object> expectedAlternateBinding =
Collections.singletonMap(AlternateBinding.DESTINATION, deadLetterQueueName);
attributes.put(org.apache.qpid.server.model.Queue.ALTERNATE_BINDING,
new ObjectMapper().writeValueAsString(expectedAlternateBinding));
createEntityUsingAmqpManagement(queueName, "org.apache.qpid.Queue", attributes);
Map<String, Object> queueAttributes = readEntityUsingAmqpManagement(queueName, "org.apache.qpid.Queue", true);
Object actualAlternateBinding = queueAttributes.get(org.apache.qpid.server.model.Queue.ALTERNATE_BINDING);
Map<String, Object> actualAlternateBindingMap = convertIfNecessary(actualAlternateBinding);
assertEquals("Unexpected alternate binding",
new HashMap<>(expectedAlternateBinding),
new HashMap<>(actualAlternateBindingMap));
Map<String, Object> dlqAttributes =
readEntityUsingAmqpManagement(deadLetterQueueName, "org.apache.qpid.Queue", true);
assertNotNull("Cannot get dead letter queue", dlqAttributes);
}
@Test
public void testExchangeAlternateBindingCreation() throws Exception
{
updateAutoCreationPolicies();
String exchangeName = getTestName();
String deadLetterExchangeName = exchangeName + DEAD_LETTER_EXCHANGE_SUFFIX;
final Map<String, Object> attributes = new HashMap<>();
Map<String, Object> expectedAlternateBinding =
Collections.singletonMap(AlternateBinding.DESTINATION, deadLetterExchangeName);
attributes.put(Exchange.ALTERNATE_BINDING, new ObjectMapper().writeValueAsString(expectedAlternateBinding));
attributes.put(Exchange.TYPE, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
createEntityUsingAmqpManagement(exchangeName, "org.apache.qpid.DirectExchange", attributes);
Map<String, Object> exchangeAttributes = readEntityUsingAmqpManagement(exchangeName, "org.apache.qpid.Exchange", true);
Object actualAlternateBinding = exchangeAttributes.get(Exchange.ALTERNATE_BINDING);
Map<String, Object> actualAlternateBindingMap = convertIfNecessary(actualAlternateBinding);
assertEquals("Unexpected alternate binding",
new HashMap<>(expectedAlternateBinding),
new HashMap<>(actualAlternateBindingMap));
Map<String, Object> dlqExchangeAttributes = readEntityUsingAmqpManagement(
deadLetterExchangeName,
"org.apache.qpid.FanoutExchange",
true);
assertNotNull("Cannot get dead letter exchange", dlqExchangeAttributes);
}
@SuppressWarnings("unchecked")
private Map<String, Object> convertIfNecessary(final Object actualAlternateBinding) throws IOException
{
Map<String, Object> actualAlternateBindingMap;
if (actualAlternateBinding instanceof String)
{
actualAlternateBindingMap = new ObjectMapper().readValue((String)actualAlternateBinding, Map.class);
}
else
{
actualAlternateBindingMap = (Map<String, Object>) actualAlternateBinding;
}
return actualAlternateBindingMap;
}
private String getDestinationAddress(final String name, final String type)
{
return getProtocol() == Protocol.AMQP_1_0
? name
: String.format("ADDR: %s; { assert: never, node: { type: %s } }", name, type);
}
}