blob: d49f0586ba513b61dd39efaee21a467951746b2a [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.store;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.qpid.AMQException;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.framing.amqp_8_0.BasicConsumeBodyImpl;
import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.exchange.DirectExchange;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.exchange.ExchangeType;
import org.apache.qpid.server.exchange.TopicExchange;
import org.apache.qpid.server.message.AMQMessage;
import org.apache.qpid.server.message.MessageMetaData;
import org.apache.qpid.server.queue.AMQPriorityQueue;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.ConflationQueue;
import org.apache.qpid.server.queue.IncomingMessage;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.queue.SimpleAMQQueue;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.InternalBrokerBaseCase;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.util.FileUtils;
import java.io.File;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* This tests the MessageStores by using the available interfaces.
*
* For persistent stores, it validates that Exchanges, Queues, Bindings and
* Messages are persisted and recovered correctly.
*/
public class MessageStoreTest extends InternalBrokerBaseCase
{
public static final int DEFAULT_PRIORTY_LEVEL = 5;
public static final String SELECTOR_VALUE = "Test = 'MST'";
public static final String LVQ_KEY = "MST-LVQ-KEY";
private AMQShortString nonDurableExchangeName = new AMQShortString("MST-NonDurableDirectExchange");
private AMQShortString directExchangeName = new AMQShortString("MST-DirectExchange");
private AMQShortString topicExchangeName = new AMQShortString("MST-TopicExchange");
private AMQShortString durablePriorityTopicQueueName = new AMQShortString("MST-PriorityTopicQueue-Durable");
private AMQShortString durableTopicQueueName = new AMQShortString("MST-TopicQueue-Durable");
private AMQShortString priorityTopicQueueName = new AMQShortString("MST-PriorityTopicQueue");
private AMQShortString topicQueueName = new AMQShortString("MST-TopicQueue");
private AMQShortString durableExclusiveQueueName = new AMQShortString("MST-Queue-Durable-Exclusive");
private AMQShortString durablePriorityQueueName = new AMQShortString("MST-PriorityQueue-Durable");
private AMQShortString durableLastValueQueueName = new AMQShortString("MST-LastValueQueue-Durable");
private AMQShortString durableQueueName = new AMQShortString("MST-Queue-Durable");
private AMQShortString priorityQueueName = new AMQShortString("MST-PriorityQueue");
private AMQShortString queueName = new AMQShortString("MST-Queue");
private AMQShortString directRouting = new AMQShortString("MST-direct");
private AMQShortString topicRouting = new AMQShortString("MST-topic");
private AMQShortString queueOwner = new AMQShortString("MST");
protected PropertiesConfiguration _config;
public void setUp() throws Exception
{
super.setUp();
String storePath = System.getProperty("QPID_WORK") + "/" + getName();
_config = new PropertiesConfiguration();
_config.addProperty("store.class", getTestProfileMessageStoreClassName());
_config.addProperty("store.environment-path", storePath);
cleanup(new File(storePath));
reloadVirtualHost();
}
protected void reloadVirtualHost()
{
VirtualHost original = getVirtualHost();
if (getVirtualHost() != null)
{
try
{
getVirtualHost().close();
getVirtualHost().getApplicationRegistry().
getVirtualHostRegistry().unregisterVirtualHost(getVirtualHost());
}
catch (Exception e)
{
e.printStackTrace();
fail(e.getMessage());
}
}
try
{
setVirtualHost(ApplicationRegistry.getInstance().createVirtualHost(new VirtualHostConfiguration(getClass().getName(), _config)));
}
catch (Exception e)
{
e.printStackTrace();
fail(e.getMessage());
}
assertTrue("Virtualhost has not changed, reload was not successful", original != getVirtualHost());
}
/**
* Old MessageStoreTest segment which runs against both persistent and non-persistent stores
* creating queues, exchanges and bindings and then verifying message delivery to them.
*/
public void testQueueExchangeAndBindingCreation() throws Exception
{
assertEquals("Should not be any existing queues", 0, getVirtualHost().getQueueRegistry().getQueues().size());
createAllQueues();
createAllTopicQueues();
//Register Non-Durable DirectExchange
Exchange nonDurableExchange = createExchange(DirectExchange.TYPE, nonDurableExchangeName, false);
bindAllQueuesToExchange(nonDurableExchange, directRouting);
//Register DirectExchange
Exchange directExchange = createExchange(DirectExchange.TYPE, directExchangeName, true);
bindAllQueuesToExchange(directExchange, directRouting);
//Register TopicExchange
Exchange topicExchange = createExchange(TopicExchange.TYPE, topicExchangeName, true);
bindAllTopicQueuesToExchange(topicExchange, topicRouting);
//Send Message To NonDurable direct Exchange = persistent
sendMessageOnExchange(nonDurableExchange, directRouting, true);
// and non-persistent
sendMessageOnExchange(nonDurableExchange, directRouting, false);
//Send Message To direct Exchange = persistent
sendMessageOnExchange(directExchange, directRouting, true);
// and non-persistent
sendMessageOnExchange(directExchange, directRouting, false);
//Send Message To topic Exchange = persistent
sendMessageOnExchange(topicExchange, topicRouting, true);
// and non-persistent
sendMessageOnExchange(topicExchange, topicRouting, false);
//Ensure all the Queues have four messages (one transient, one persistent) x 2 exchange routings
validateMessageOnQueues(4, true);
//Ensure all the topics have two messages (one transient, one persistent)
validateMessageOnTopics(2, true);
assertEquals("Not all queues correctly registered",
10, getVirtualHost().getQueueRegistry().getQueues().size());
}
/**
* Tests message persistence by running the testQueueExchangeAndBindingCreation() method above
* before reloading the virtual host and ensuring that the persistent messages were restored.
*
* More specific testing of message persistence is left to store-specific unit testing.
*/
public void testMessagePersistence() throws Exception
{
testQueueExchangeAndBindingCreation();
reloadVirtualHost();
//Validate durable queues and subscriptions still have the persistent messages
validateMessageOnQueues(2, false);
validateMessageOnTopics(1, false);
}
/**
* Tests message removal by running the testMessagePersistence() method above before
* clearing the queues, reloading the virtual host, and ensuring that the persistent
* messages were removed from the queues.
*/
public void testMessageRemoval() throws Exception
{
testMessagePersistence();
QueueRegistry queueRegistry = getVirtualHost().getQueueRegistry();
assertEquals("Incorrect number of queues registered after recovery",
6, queueRegistry.getQueues().size());
//clear the queue
queueRegistry.getQueue(durableQueueName).clearQueue();
//check the messages are gone
validateMessageOnQueue(durableQueueName, 0);
//reload and verify messages arent restored
reloadVirtualHost();
validateMessageOnQueue(durableQueueName, 0);
}
/**
* Tests queue persistence by creating a selection of queues with differing properties, both
* durable and non durable, and ensuring that following the recovery process the correct queues
* are present and any property manipulations (eg queue exclusivity) are correctly recovered.
*/
public void testQueuePersistence() throws Exception
{
assertEquals("Should not be any existing queues",
0, getVirtualHost().getQueueRegistry().getQueues().size());
//create durable and non durable queues/topics
createAllQueues();
createAllTopicQueues();
//reload the virtual host, prompting recovery of the queues/topics
reloadVirtualHost();
QueueRegistry queueRegistry = getVirtualHost().getQueueRegistry();
assertEquals("Incorrect number of queues registered after recovery",
6, queueRegistry.getQueues().size());
//Validate the non-Durable Queues were not recovered.
assertNull("Non-Durable queue still registered:" + priorityQueueName,
queueRegistry.getQueue(priorityQueueName));
assertNull("Non-Durable queue still registered:" + queueName,
queueRegistry.getQueue(queueName));
assertNull("Non-Durable queue still registered:" + priorityTopicQueueName,
queueRegistry.getQueue(priorityTopicQueueName));
assertNull("Non-Durable queue still registered:" + topicQueueName,
queueRegistry.getQueue(topicQueueName));
//Validate normally expected properties of Queues/Topics
validateDurableQueueProperties();
//Update the durable exclusive queue's exclusivity and verify it is persisted and recovered correctly
setQueueExclusivity(false);
validateQueueExclusivityProperty(false);
//Reload the Virtualhost to recover the queues again
reloadVirtualHost();
//verify the change was persisted and recovered correctly
validateQueueExclusivityProperty(false);
}
/**
* Tests queue removal by creating a durable queue, verifying it recovers, and
* then removing it from the store, and ensuring that following the second reload
* process it is not recovered.
*/
public void testDurableQueueRemoval() throws Exception
{
//Register Durable Queue
createQueue(durableQueueName, false, true, false, false);
QueueRegistry queueRegistry = getVirtualHost().getQueueRegistry();
assertEquals("Incorrect number of queues registered before recovery",
1, queueRegistry.getQueues().size());
reloadVirtualHost();
queueRegistry = getVirtualHost().getQueueRegistry();
assertEquals("Incorrect number of queues registered after first recovery",
1, queueRegistry.getQueues().size());
//test that removing the queue means it is not recovered next time
getVirtualHost().getDurableConfigurationStore().removeQueue(queueRegistry.getQueue(durableQueueName));
reloadVirtualHost();
queueRegistry = getVirtualHost().getQueueRegistry();
assertEquals("Incorrect number of queues registered after second recovery",
0, queueRegistry.getQueues().size());
assertNull("Durable queue was not removed:" + durableQueueName,
queueRegistry.getQueue(durableQueueName));
}
/**
* Tests exchange persistence by creating a selection of exchanges, both durable
* and non durable, and ensuring that following the recovery process the correct
* durable exchanges are still present.
*/
public void testExchangePersistence() throws Exception
{
int origExchangeCount = getVirtualHost().getExchangeRegistry().getExchangeNames().size();
Map<AMQShortString, Exchange> oldExchanges = createExchanges();
assertEquals("Incorrect number of exchanges registered before recovery",
origExchangeCount + 3, getVirtualHost().getExchangeRegistry().getExchangeNames().size());
reloadVirtualHost();
//verify the exchanges present after recovery
validateExchanges(origExchangeCount, oldExchanges);
}
/**
* Tests exchange removal by creating a durable exchange, verifying it recovers, and
* then removing it from the store, and ensuring that following the second reload
* process it is not recovered.
*/
public void testDurableExchangeRemoval() throws Exception
{
int origExchangeCount = getVirtualHost().getExchangeRegistry().getExchangeNames().size();
createExchange(DirectExchange.TYPE, directExchangeName, true);
ExchangeRegistry exchangeRegistry = getVirtualHost().getExchangeRegistry();
assertEquals("Incorrect number of exchanges registered before recovery",
origExchangeCount + 1, exchangeRegistry.getExchangeNames().size());
reloadVirtualHost();
exchangeRegistry = getVirtualHost().getExchangeRegistry();
assertEquals("Incorrect number of exchanges registered after first recovery",
origExchangeCount + 1, exchangeRegistry.getExchangeNames().size());
//test that removing the exchange means it is not recovered next time
getVirtualHost().getDurableConfigurationStore().removeExchange(exchangeRegistry.getExchange(directExchangeName));
reloadVirtualHost();
exchangeRegistry = getVirtualHost().getExchangeRegistry();
assertEquals("Incorrect number of exchanges registered after second recovery",
origExchangeCount, exchangeRegistry.getExchangeNames().size());
assertNull("Durable exchange was not removed:" + directExchangeName,
exchangeRegistry.getExchange(directExchangeName));
}
/**
* Tests binding persistence by creating a selection of queues and exchanges, both durable
* and non durable, then adding bindings with and without selectors before reloading the
* virtual host and verifying that following the recovery process the correct durable
* bindings (those for durable queues to durable exchanges) are still present.
*/
public void testBindingPersistence() throws Exception
{
int origExchangeCount = getVirtualHost().getExchangeRegistry().getExchangeNames().size();
createAllQueues();
createAllTopicQueues();
Map<AMQShortString, Exchange> exchanges = createExchanges();
Exchange nonDurableExchange = exchanges.get(nonDurableExchangeName);
Exchange directExchange = exchanges.get(directExchangeName);
Exchange topicExchange = exchanges.get(topicExchangeName);
bindAllQueuesToExchange(nonDurableExchange, directRouting);
bindAllQueuesToExchange(directExchange, directRouting);
bindAllTopicQueuesToExchange(topicExchange, topicRouting);
assertEquals("Incorrect number of exchanges registered before recovery",
origExchangeCount + 3, getVirtualHost().getExchangeRegistry().getExchangeNames().size());
reloadVirtualHost();
validateExchanges(origExchangeCount, exchanges);
validateBindingProperties();
}
/**
* Tests binding removal by creating a durable exchange, and queue, binding them together,
* recovering to verify the persistence, then removing it from the store, and ensuring
* that following the second reload process it is not recovered.
*/
public void testDurableBindingRemoval() throws Exception
{
QueueRegistry queueRegistry = getVirtualHost().getQueueRegistry();
//create durable queue and exchange, bind them
Exchange exch = createExchange(DirectExchange.TYPE, directExchangeName, true);
createQueue(durableQueueName, false, true, false, false);
bindQueueToExchange(exch, directRouting, queueRegistry.getQueue(durableQueueName), false, null);
assertEquals("Incorrect number of bindings registered before recovery",
1, queueRegistry.getQueue(durableQueueName).getBindings().size());
//verify binding is actually normally recovered
reloadVirtualHost();
queueRegistry = getVirtualHost().getQueueRegistry();
assertEquals("Incorrect number of bindings registered after first recovery",
1, queueRegistry.getQueue(durableQueueName).getBindings().size());
ExchangeRegistry exchangeRegistry = getVirtualHost().getExchangeRegistry();
exch = exchangeRegistry.getExchange(directExchangeName);
assertNotNull("Exchange was not recovered", exch);
//remove the binding and verify result after recovery
unbindQueueFromExchange(exch, directRouting, queueRegistry.getQueue(durableQueueName), false, null);
reloadVirtualHost();
queueRegistry = getVirtualHost().getQueueRegistry();
assertEquals("Incorrect number of bindings registered after second recovery",
0, queueRegistry.getQueue(durableQueueName).getBindings().size());
}
/**
* Validates that the durable exchanges are still present, the non durable exchange is not,
* and that the new exchanges are not the same objects as the provided list (i.e. that the
* reload actually generated new exchange objects)
*/
private void validateExchanges(int originalNumExchanges, Map<AMQShortString, Exchange> oldExchanges)
{
ExchangeRegistry registry = getVirtualHost().getExchangeRegistry();
assertTrue(directExchangeName + " exchange NOT reloaded",
registry.getExchangeNames().contains(directExchangeName));
assertTrue(topicExchangeName + " exchange NOT reloaded",
registry.getExchangeNames().contains(topicExchangeName));
assertTrue(nonDurableExchangeName + " exchange reloaded",
!registry.getExchangeNames().contains(nonDurableExchangeName));
//check the old exchange objects are not the same as the new exchanges
assertTrue(directExchangeName + " exchange NOT reloaded",
registry.getExchange(directExchangeName) != oldExchanges.get(directExchangeName));
assertTrue(topicExchangeName + " exchange NOT reloaded",
registry.getExchange(topicExchangeName) != oldExchanges.get(topicExchangeName));
// There should only be the original exchanges + our 2 recovered durable exchanges
assertEquals("Incorrect number of exchanges available",
originalNumExchanges + 2, registry.getExchangeNames().size());
}
/** Validates the Durable queues and their properties are as expected following recovery */
private void validateBindingProperties()
{
QueueRegistry queueRegistry = getVirtualHost().getQueueRegistry();
assertEquals("Incorrect number of (durable) queues following recovery", 6, queueRegistry.getQueues().size());
validateBindingProperties(queueRegistry.getQueue(durablePriorityQueueName).getBindings(), false);
validateBindingProperties(queueRegistry.getQueue(durablePriorityTopicQueueName).getBindings(), true);
validateBindingProperties(queueRegistry.getQueue(durableQueueName).getBindings(), false);
validateBindingProperties(queueRegistry.getQueue(durableTopicQueueName).getBindings(), true);
validateBindingProperties(queueRegistry.getQueue(durableExclusiveQueueName).getBindings(), false);
}
/**
* Validate that each queue is bound only once following recovery (i.e. that bindings for non durable
* queues or to non durable exchanges are not recovered), and if a selector should be present
* that it is and contains the correct value
*
* @param bindings the set of bindings to validate
* @param useSelectors if set, check the binding has a JMS_SELECTOR argument and the correct value for it
*/
private void validateBindingProperties(List<Binding> bindings, boolean useSelectors)
{
assertEquals("Each queue should only be bound once.", 1, bindings.size());
Binding binding = bindings.get(0);
if (useSelectors)
{
assertTrue("Binding does not contain a Selector argument.",
binding.getArguments().containsKey(AMQPFilterTypes.JMS_SELECTOR.getValue()));
assertEquals("The binding selector argument is incorrect", SELECTOR_VALUE,
binding.getArguments().get(AMQPFilterTypes.JMS_SELECTOR.getValue()).toString());
}
}
private void setQueueExclusivity(boolean exclusive) throws AMQException
{
QueueRegistry queueRegistry = getVirtualHost().getQueueRegistry();
AMQQueue queue = queueRegistry.getQueue(durableExclusiveQueueName);
queue.setExclusive(exclusive);
}
private void validateQueueExclusivityProperty(boolean expected)
{
QueueRegistry queueRegistry = getVirtualHost().getQueueRegistry();
AMQQueue queue = queueRegistry.getQueue(durableExclusiveQueueName);
assertEquals("Queue exclusivity was incorrect", queue.isExclusive(), expected);
}
private void validateDurableQueueProperties()
{
QueueRegistry queueRegistry = getVirtualHost().getQueueRegistry();
validateQueueProperties(queueRegistry.getQueue(durablePriorityQueueName), true, true, false, false);
validateQueueProperties(queueRegistry.getQueue(durablePriorityTopicQueueName), true, true, false, false);
validateQueueProperties(queueRegistry.getQueue(durableQueueName), false, true, false, false);
validateQueueProperties(queueRegistry.getQueue(durableTopicQueueName), false, true, false, false);
validateQueueProperties(queueRegistry.getQueue(durableExclusiveQueueName), false, true, true, false);
validateQueueProperties(queueRegistry.getQueue(durableLastValueQueueName), false, true, true, true);
}
private void validateQueueProperties(AMQQueue queue, boolean usePriority, boolean durable, boolean exclusive, boolean lastValueQueue)
{
if(usePriority || lastValueQueue)
{
assertNotSame("Queues cant be both Priority and LastValue based", usePriority, lastValueQueue);
}
if (usePriority)
{
assertEquals("Queue is no longer a Priority Queue", AMQPriorityQueue.class, queue.getClass());
assertEquals("Priority Queue does not have set priorities",
DEFAULT_PRIORTY_LEVEL, ((AMQPriorityQueue) queue).getPriorities());
}
else if (lastValueQueue)
{
assertEquals("Queue is no longer a LastValue Queue", ConflationQueue.class, queue.getClass());
assertEquals("LastValue Queue Key has changed", LVQ_KEY, ((ConflationQueue) queue).getConflationKey());
}
else
{
assertEquals("Queue is not 'simple'", SimpleAMQQueue.class, queue.getClass());
}
assertEquals("Queue owner is not as expected", queueOwner, queue.getOwner());
assertEquals("Queue durability is not as expected", durable, queue.isDurable());
assertEquals("Queue exclusivity is not as expected", exclusive, queue.isExclusive());
}
/**
* Delete the Store Environment path
*
* @param environmentPath The configuration that contains the store environment path.
*/
private void cleanup(File environmentPath)
{
if (environmentPath.exists())
{
FileUtils.delete(environmentPath, true);
}
}
private void sendMessageOnExchange(Exchange exchange, AMQShortString routingKey, boolean deliveryMode)
{
//Set MessagePersistence
BasicContentHeaderProperties properties = new BasicContentHeaderProperties();
properties.setDeliveryMode(deliveryMode ? Integer.valueOf(2).byteValue() : Integer.valueOf(1).byteValue());
FieldTable headers = properties.getHeaders();
headers.setString("Test", "MST");
properties.setHeaders(headers);
MessagePublishInfo messageInfo = new TestMessagePublishInfo(exchange, false, false, routingKey);
final IncomingMessage currentMessage;
currentMessage = new IncomingMessage(messageInfo);
currentMessage.setExchange(exchange);
ContentHeaderBody headerBody = new ContentHeaderBody(BasicConsumeBodyImpl.CLASS_ID,0,properties,0l);
try
{
currentMessage.setContentHeaderBody(headerBody);
}
catch (AMQException e)
{
fail(e.getMessage());
}
currentMessage.setExpiration();
MessageMetaData mmd = currentMessage.headersReceived(System.currentTimeMillis());
currentMessage.setStoredMessage(getVirtualHost().getMessageStore().addMessage(mmd));
currentMessage.getStoredMessage().flushToStore();
currentMessage.route();
// check and deliver if header says body length is zero
if (currentMessage.allContentReceived())
{
ServerTransaction trans = new AutoCommitTransaction(getVirtualHost().getMessageStore());
final List<? extends BaseQueue> destinationQueues = currentMessage.getDestinationQueues();
trans.enqueue(currentMessage.getDestinationQueues(), currentMessage, new ServerTransaction.Action() {
public void postCommit()
{
try
{
AMQMessage message = new AMQMessage(currentMessage.getStoredMessage());
for(BaseQueue queue : destinationQueues)
{
queue.enqueue(message);
}
}
catch (AMQException e)
{
e.printStackTrace();
}
}
public void onRollback()
{
//To change body of implemented methods use File | Settings | File Templates.
}
}, 0L);
}
}
private void createAllQueues()
{
//Register Durable Priority Queue
createQueue(durablePriorityQueueName, true, true, false, false);
//Register Durable Simple Queue
createQueue(durableQueueName, false, true, false, false);
//Register Durable Exclusive Simple Queue
createQueue(durableExclusiveQueueName, false, true, true, false);
//Register Durable LastValue Queue
createQueue(durableLastValueQueueName, false, true, true, true);
//Register NON-Durable Priority Queue
createQueue(priorityQueueName, true, false, false, false);
//Register NON-Durable Simple Queue
createQueue(queueName, false, false, false, false);
}
private void createAllTopicQueues()
{
//Register Durable Priority Queue
createQueue(durablePriorityTopicQueueName, true, true, false, false);
//Register Durable Simple Queue
createQueue(durableTopicQueueName, false, true, false, false);
//Register NON-Durable Priority Queue
createQueue(priorityTopicQueueName, true, false, false, false);
//Register NON-Durable Simple Queue
createQueue(topicQueueName, false, false, false, false);
}
private void createQueue(AMQShortString queueName, boolean usePriority, boolean durable, boolean exclusive, boolean lastValueQueue)
{
FieldTable queueArguments = null;
if(usePriority || lastValueQueue)
{
assertNotSame("Queues cant be both Priority and LastValue based", usePriority, lastValueQueue);
}
if (usePriority)
{
queueArguments = new FieldTable();
queueArguments.put(new AMQShortString(AMQQueueFactory.X_QPID_PRIORITIES), DEFAULT_PRIORTY_LEVEL);
}
if (lastValueQueue)
{
queueArguments = new FieldTable();
queueArguments.put(new AMQShortString(AMQQueueFactory.QPID_LAST_VALUE_QUEUE_KEY), LVQ_KEY);
}
AMQQueue queue = null;
//Ideally we would be able to use the QueueDeclareHandler here.
try
{
queue = AMQQueueFactory.createAMQQueueImpl(queueName, durable, queueOwner, false, exclusive,
getVirtualHost(), queueArguments);
validateQueueProperties(queue, usePriority, durable, exclusive, lastValueQueue);
if (queue.isDurable() && !queue.isAutoDelete())
{
getVirtualHost().getDurableConfigurationStore().createQueue(queue, queueArguments);
}
}
catch (AMQException e)
{
fail(e.getMessage());
}
getVirtualHost().getQueueRegistry().registerQueue(queue);
}
private Map<AMQShortString, Exchange> createExchanges()
{
Map<AMQShortString, Exchange> exchanges = new HashMap<AMQShortString, Exchange>();
//Register non-durable DirectExchange
exchanges.put(nonDurableExchangeName, createExchange(DirectExchange.TYPE, nonDurableExchangeName, false));
//Register durable DirectExchange and TopicExchange
exchanges.put(directExchangeName ,createExchange(DirectExchange.TYPE, directExchangeName, true));
exchanges.put(topicExchangeName,createExchange(TopicExchange.TYPE, topicExchangeName, true));
return exchanges;
}
private Exchange createExchange(ExchangeType<?> type, AMQShortString name, boolean durable)
{
Exchange exchange = null;
try
{
exchange = type.newInstance(getVirtualHost(), name, durable, 0, false);
}
catch (AMQException e)
{
fail(e.getMessage());
}
try
{
getVirtualHost().getExchangeRegistry().registerExchange(exchange);
if (durable)
{
getVirtualHost().getDurableConfigurationStore().createExchange(exchange);
}
}
catch (AMQException e)
{
fail(e.getMessage());
}
return exchange;
}
private void bindAllQueuesToExchange(Exchange exchange, AMQShortString routingKey)
{
FieldTable queueArguments = new FieldTable();
queueArguments.put(new AMQShortString(AMQQueueFactory.X_QPID_PRIORITIES), DEFAULT_PRIORTY_LEVEL);
QueueRegistry queueRegistry = getVirtualHost().getQueueRegistry();
bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(durablePriorityQueueName), false, queueArguments);
bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(durableQueueName), false, null);
bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(priorityQueueName), false, queueArguments);
bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(queueName), false, null);
bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(durableExclusiveQueueName), false, null);
}
private void bindAllTopicQueuesToExchange(Exchange exchange, AMQShortString routingKey)
{
FieldTable queueArguments = new FieldTable();
queueArguments.put(new AMQShortString(AMQQueueFactory.X_QPID_PRIORITIES), DEFAULT_PRIORTY_LEVEL);
QueueRegistry queueRegistry = getVirtualHost().getQueueRegistry();
bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(durablePriorityTopicQueueName), true, queueArguments);
bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(durableTopicQueueName), true, null);
bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(priorityTopicQueueName), true, queueArguments);
bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(topicQueueName), true, null);
}
protected void bindQueueToExchange(Exchange exchange, AMQShortString routingKey, AMQQueue queue, boolean useSelector, FieldTable queueArguments)
{
FieldTable bindArguments = null;
if (useSelector)
{
bindArguments = new FieldTable();
bindArguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), SELECTOR_VALUE );
}
try
{
getVirtualHost().getBindingFactory().addBinding(String.valueOf(routingKey), queue, exchange, FieldTable.convertToMap(bindArguments));
}
catch (Exception e)
{
fail(e.getMessage());
}
}
protected void unbindQueueFromExchange(Exchange exchange, AMQShortString routingKey, AMQQueue queue, boolean useSelector, FieldTable queueArguments)
{
FieldTable bindArguments = null;
if (useSelector)
{
bindArguments = new FieldTable();
bindArguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), SELECTOR_VALUE );
}
try
{
getVirtualHost().getBindingFactory().removeBinding(String.valueOf(routingKey), queue, exchange, FieldTable.convertToMap(bindArguments));
}
catch (Exception e)
{
fail(e.getMessage());
}
}
private void validateMessageOnTopics(long messageCount, boolean allQueues)
{
validateMessageOnQueue(durablePriorityTopicQueueName, messageCount);
validateMessageOnQueue(durableTopicQueueName, messageCount);
if (allQueues)
{
validateMessageOnQueue(priorityTopicQueueName, messageCount);
validateMessageOnQueue(topicQueueName, messageCount);
}
}
private void validateMessageOnQueues(long messageCount, boolean allQueues)
{
validateMessageOnQueue(durablePriorityQueueName, messageCount);
validateMessageOnQueue(durableQueueName, messageCount);
if (allQueues)
{
validateMessageOnQueue(priorityQueueName, messageCount);
validateMessageOnQueue(queueName, messageCount);
}
}
private void validateMessageOnQueue(AMQShortString queueName, long messageCount)
{
AMQQueue queue = getVirtualHost().getQueueRegistry().getQueue(queueName);
assertNotNull("Queue(" + queueName + ") not correctly registered:", queue);
assertEquals("Incorrect Message count on queue:" + queueName, messageCount, queue.getMessageCount());
}
private class TestMessagePublishInfo implements MessagePublishInfo
{
Exchange _exchange;
boolean _immediate;
boolean _mandatory;
AMQShortString _routingKey;
TestMessagePublishInfo(Exchange exchange, boolean immediate, boolean mandatory, AMQShortString routingKey)
{
_exchange = exchange;
_immediate = immediate;
_mandatory = mandatory;
_routingKey = routingKey;
}
public AMQShortString getExchange()
{
return _exchange.getNameShortString();
}
public void setExchange(AMQShortString exchange)
{
//no-op
}
public boolean isImmediate()
{
return _immediate;
}
public boolean isMandatory()
{
return _mandatory;
}
public AMQShortString getRoutingKey()
{
return _routingKey;
}
}
}