blob: dec4de4cc6aee9fab450ac95bca60b6b5d5ad2a7 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.store;
import junit.framework.TestCase;
import org.apache.qpid.server.exchange.DirectExchange;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeType;
import org.apache.qpid.server.exchange.TopicExchange;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.IncomingMessage;
import org.apache.qpid.server.queue.MessageHandleFactory;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.queue.AMQPriorityQueue;
import org.apache.qpid.server.queue.SimpleAMQQueue;
import org.apache.qpid.server.queue.ExchangeBinding;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.protocol.InternalTestProtocolSession;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.amqp_8_0.BasicConsumeBodyImpl;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.AMQException;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.List;
/**
* This tests the MessageStores by using the available interfaces.
*
* This test validates that Exchanges, Queues, Bindings and Messages are persisted correctly.
*/
public class MessageStoreTest extends TestCase
{
private static final int DEFAULT_PRIORTY_LEVEL = 5;
private static final Logger _logger = LoggerFactory.getLogger(MessageStoreTest.class);
public void testMemoryMessageStore()
{
PropertiesConfiguration config = new PropertiesConfiguration();
config.addProperty("store.class", "org.apache.qpid.server.store.MemoryMessageStore");
runTestWithStore(config);
}
public void DISABLE_testDerbyMessageStore()
{
PropertiesConfiguration config = new PropertiesConfiguration();
config.addProperty("store.environment-path", "derbyDB_MST");
config.addProperty("store.class", "org.apache.qpid.server.store.DerbyMessageStore");
runTestWithStore(config);
}
private void reload(Configuration configuration)
{
if (_virtualHost != null)
{
try
{
_virtualHost.close();
}
catch (Exception e)
{
fail(e.getMessage());
}
}
try
{
_virtualHost = new VirtualHost(virtualHostName, configuration, null);
ApplicationRegistry.getInstance().getVirtualHostRegistry().registerVirtualHost(_virtualHost);
}
catch (Exception e)
{
e.printStackTrace();
fail(e.getMessage());
}
}
VirtualHost _virtualHost = null;
String virtualHostName = "MessageStoreTest";
AMQShortString nonDurableExchangeName = new AMQShortString("MST-NonDurableDirectExchange");
AMQShortString directExchangeName = new AMQShortString("MST-DirectExchange");
AMQShortString topicExchangeName = new AMQShortString("MST-TopicExchange");
AMQShortString queueOwner = new AMQShortString("MST");
AMQShortString durablePriorityTopicQueueName = new AMQShortString("MST-PriorityTopicQueue-Durable");
AMQShortString durableTopicQueueName = new AMQShortString("MST-TopicQueue-Durable");
AMQShortString priorityTopicQueueName = new AMQShortString("MST-PriorityTopicQueue");
AMQShortString topicQueueName = new AMQShortString("MST-TopicQueue");
AMQShortString durablePriorityQueueName = new AMQShortString("MST-PriorityQueue-Durable");
AMQShortString durableQueueName = new AMQShortString("MST-Queue-Durable");
AMQShortString priorityQueueName = new AMQShortString("MST-PriorityQueue");
AMQShortString queueName = new AMQShortString("MST-Queue");
AMQShortString directRouting = new AMQShortString("MST-direct");
AMQShortString topicRouting = new AMQShortString("MST-topic");
protected void setUp()
{
ApplicationRegistry.getInstance(1);
}
protected void tearDown()
{
ApplicationRegistry.remove(1);
}
protected void runTestWithStore(Configuration configuration)
{
//Ensure Environment Path is empty
cleanup(configuration);
//Load the Virtualhost with the required MessageStore
reload(configuration);
MessageStore messageStore = _virtualHost.getMessageStore();
createAllQueues();
createAllTopicQueues();
//Register Non-Durable DirectExchange
Exchange nonDurableExchange = createExchange(DirectExchange.TYPE, nonDurableExchangeName, false);
bindAllQueuesToExchange(nonDurableExchange, directRouting);
//Register DirectExchange
Exchange directExchange = createExchange(DirectExchange.TYPE, directExchangeName, true);
bindAllQueuesToExchange(directExchange, directRouting);
//Register TopicExchange
Exchange topicExchange = createExchange(TopicExchange.TYPE, topicExchangeName, true);
bindAllTopicQueuesToExchange(topicExchange, topicRouting);
//Send Message To NonDurable direct Exchange = persistent
sendMessageOnExchange(nonDurableExchange, directRouting, true);
// and non-persistent
sendMessageOnExchange(nonDurableExchange, directRouting, false);
//Send Message To direct Exchange = persistent
sendMessageOnExchange(directExchange, directRouting, true);
// and non-persistent
sendMessageOnExchange(directExchange, directRouting, false);
//Send Message To topic Exchange = persistent
sendMessageOnExchange(topicExchange, topicRouting, true);
// and non-persistent
sendMessageOnExchange(topicExchange, topicRouting, false);
//Ensure all the Queues have four messages (one transient, one persistent) x 2 exchange routings
validateMessageOnQueues(4, true);
//Ensure all the topics have two messages (one transient, one persistent)
validateMessageOnTopics(2, true);
assertEquals("Not all queues correctly registered", 8, _virtualHost.getQueueRegistry().getQueues().size());
if (!messageStore.isPersistent())
{
_logger.warn("Unable to test Persistent capabilities of messages store(" + messageStore.getClass() + ") as it is not capable of peristence.");
return;
}
//Reload the Virtualhost to test persistence
_logger.info("Reloading Virtualhost");
VirtualHost original = _virtualHost;
reload(configuration);
assertTrue("Virtualhost has not been reloaded", original != _virtualHost);
validateExchanges();
//Validate Durable Queues still have the persistentn message
validateMessageOnQueues(2, false);
//Validate Durable Queues still have the persistentn message
validateMessageOnTopics(1, false);
//Validate Properties of Binding
validateBindingProperties();
//Validate Properties of Queues
validateQueueProperties();
//Validate Non-Durable Queues are gone.
assertNull("Non-Durable queue still registered:" + priorityQueueName, _virtualHost.getQueueRegistry().getQueue(priorityQueueName));
assertNull("Non-Durable queue still registered:" + queueName, _virtualHost.getQueueRegistry().getQueue(queueName));
assertNull("Non-Durable queue still registered:" + priorityTopicQueueName, _virtualHost.getQueueRegistry().getQueue(priorityTopicQueueName));
assertNull("Non-Durable queue still registered:" + topicQueueName, _virtualHost.getQueueRegistry().getQueue(topicQueueName));
assertEquals("Not all queues correctly registered", 4, _virtualHost.getQueueRegistry().getQueues().size());
}
private void validateExchanges()
{
ExchangeRegistry registry = _virtualHost.getExchangeRegistry();
assertTrue(directExchangeName + " exchange NOT reloaded after failover",
registry.getExchangeNames().contains(directExchangeName));
assertTrue(topicExchangeName + " exchange NOT reloaded after failover",
registry.getExchangeNames().contains(topicExchangeName));
assertTrue(nonDurableExchangeName + " exchange reloaded after failover",
!registry.getExchangeNames().contains(nonDurableExchangeName));
// There are 5 required exchanges + our 2 durable queues
assertEquals("Incorrect number of exchanges available", 5 + 2, registry.getExchangeNames().size());
}
/** Validates that the Durable queues */
private void validateBindingProperties()
{
QueueRegistry queueRegistry = _virtualHost.getQueueRegistry();
validateBindingProperties(queueRegistry.getQueue(durablePriorityQueueName).getExchangeBindings(), false);
validateBindingProperties(queueRegistry.getQueue(durablePriorityTopicQueueName).getExchangeBindings(), true);
validateBindingProperties(queueRegistry.getQueue(durableQueueName).getExchangeBindings(), false);
validateBindingProperties(queueRegistry.getQueue(durableTopicQueueName).getExchangeBindings(), true);
}
/**
* Validate that each queue is bound once.
*
* @param bindings the set of bindings to validate
* @param useSelectors if set validate that the binding has a JMS_SELECTOR argument
*/
private void validateBindingProperties(List<ExchangeBinding> bindings, boolean useSelectors)
{
assertEquals("Each queue should only be bound once.", 1, bindings.size());
ExchangeBinding binding = bindings.get(0);
if (useSelectors)
{
assertTrue("Binding does not contain a Selector argument.",
binding.getArguments().containsKey(AMQPFilterTypes.JMS_SELECTOR.getValue()));
}
}
private void validateQueueProperties()
{
QueueRegistry queueRegistry = _virtualHost.getQueueRegistry();
validateQueueProperties(queueRegistry.getQueue(durablePriorityQueueName), true);
validateQueueProperties(queueRegistry.getQueue(durablePriorityTopicQueueName), true);
validateQueueProperties(queueRegistry.getQueue(durableQueueName), false);
validateQueueProperties(queueRegistry.getQueue(durableTopicQueueName), false);
}
private void validateQueueProperties(AMQQueue queue, boolean usePriority)
{
if (usePriority)
{
assertEquals("Queue is no longer a Priority Queue", AMQPriorityQueue.class, queue.getClass());
assertEquals("Priority Queue does not have set priorities", DEFAULT_PRIORTY_LEVEL, ((AMQPriorityQueue) queue).getPriorities());
}
else
{
assertEquals("Queue is no longer a Priority Queue", SimpleAMQQueue.class, queue.getClass());
}
}
/**
* Delete the Store Environment path
*
* @param configuration The configuration that contains the store environment path.
*/
private void cleanup(Configuration configuration)
{
String environment = configuration.getString("store.environment-path");
if (environment != null)
{
File environmentPath = new File(environment);
if (environmentPath.exists())
{
deleteDirectory(environmentPath);
}
}
}
private void deleteDirectory(File path)
{
if (path.isDirectory())
{
for (File file : path.listFiles())
{
deleteDirectory(file);
}
}
else
{
path.delete();
}
}
private void sendMessageOnExchange(Exchange directExchange, AMQShortString routingKey, boolean deliveryMode)
{
//Set MessagePersustebce
BasicContentHeaderProperties properties = new BasicContentHeaderProperties();
properties.setDeliveryMode(deliveryMode ? Integer.valueOf(2).byteValue() : Integer.valueOf(1).byteValue());
FieldTable headers = properties.getHeaders();
headers.setString("Test", "MST");
properties.setHeaders(headers);
MessagePublishInfo messageInfo = new TestMessagePublishInfo(directExchange, false, false, routingKey);
IncomingMessage currentMessage = null;
try
{
currentMessage = new IncomingMessage(_virtualHost.getMessageStore().getNewMessageId(),
messageInfo,
new NonTransactionalContext(_virtualHost.getMessageStore(),
new StoreContext(), null, null),
new InternalTestProtocolSession());
}
catch (AMQException e)
{
fail(e.getMessage());
}
currentMessage.setMessageStore(_virtualHost.getMessageStore());
currentMessage.setExchange(directExchange);
ContentHeaderBody headerBody = new ContentHeaderBody();
headerBody.classId = BasicConsumeBodyImpl.CLASS_ID;
headerBody.bodySize = 0;
headerBody.properties = properties;
try
{
currentMessage.setContentHeaderBody(headerBody);
}
catch (AMQException e)
{
fail(e.getMessage());
}
currentMessage.setExpiration();
try
{
currentMessage.route();
}
catch (AMQException e)
{
fail(e.getMessage());
}
try
{
currentMessage.routingComplete(_virtualHost.getMessageStore(), new MessageHandleFactory());
}
catch (AMQException e)
{
fail(e.getMessage());
}
// check and deliver if header says body length is zero
if (currentMessage.allContentReceived())
{
try
{
currentMessage.deliverToQueues();
}
catch (AMQException e)
{
fail(e.getMessage());
}
}
}
private void createAllQueues()
{
//Register Durable Priority Queue
createQueue(durablePriorityQueueName, true, true);
//Register Durable Simple Queue
createQueue(durableQueueName, false, true);
//Register NON-Durable Priority Queue
createQueue(priorityQueueName, true, false);
//Register NON-Durable Simple Queue
createQueue(queueName, false, false);
}
private void createAllTopicQueues()
{
//Register Durable Priority Queue
createQueue(durablePriorityTopicQueueName, true, true);
//Register Durable Simple Queue
createQueue(durableTopicQueueName, false, true);
//Register NON-Durable Priority Queue
createQueue(priorityTopicQueueName, true, false);
//Register NON-Durable Simple Queue
createQueue(topicQueueName, false, false);
}
private Exchange createExchange(ExchangeType type, AMQShortString name, boolean durable)
{
Exchange exchange = null;
try
{
exchange = type.newInstance(_virtualHost, name, durable, 0, false);
}
catch (AMQException e)
{
fail(e.getMessage());
}
try
{
_virtualHost.getExchangeRegistry().registerExchange(exchange);
}
catch (AMQException e)
{
fail(e.getMessage());
}
return exchange;
}
private void createQueue(AMQShortString queueName, boolean usePriority, boolean durable)
{
FieldTable queueArguments = null;
if (usePriority)
{
queueArguments = new FieldTable();
queueArguments.put(AMQQueueFactory.X_QPID_PRIORITIES, DEFAULT_PRIORTY_LEVEL);
}
AMQQueue queue = null;
//Ideally we would be able to use the QueueDeclareHandler here.
try
{
queue = AMQQueueFactory.createAMQQueueImpl(queueName, durable, queueOwner, false, _virtualHost,
queueArguments);
validateQueueProperties(queue, usePriority);
if (queue.isDurable() && !queue.isAutoDelete())
{
_virtualHost.getMessageStore().createQueue(queue, queueArguments);
}
}
catch (AMQException e)
{
fail(e.getMessage());
}
try
{
_virtualHost.getQueueRegistry().registerQueue(queue);
}
catch (AMQException e)
{
fail(e.getMessage());
}
}
private void bindAllQueuesToExchange(Exchange exchange, AMQShortString routingKey)
{
FieldTable queueArguments = new FieldTable();
queueArguments.put(AMQQueueFactory.X_QPID_PRIORITIES, DEFAULT_PRIORTY_LEVEL);
QueueRegistry queueRegistry = _virtualHost.getQueueRegistry();
bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(durablePriorityQueueName), false, queueArguments);
bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(durableQueueName), false, null);
bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(priorityQueueName), false, queueArguments);
bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(queueName), false, null);
}
private void bindAllTopicQueuesToExchange(Exchange exchange, AMQShortString routingKey)
{
FieldTable queueArguments = new FieldTable();
queueArguments.put(AMQQueueFactory.X_QPID_PRIORITIES, DEFAULT_PRIORTY_LEVEL);
QueueRegistry queueRegistry = _virtualHost.getQueueRegistry();
bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(durablePriorityTopicQueueName), true, queueArguments);
bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(durableTopicQueueName), true, null);
bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(priorityTopicQueueName), true, queueArguments);
bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(topicQueueName), true, null);
}
protected void bindQueueToExchange(Exchange exchange, AMQShortString routingKey, AMQQueue queue, boolean useSelector, FieldTable queueArguments)
{
try
{
exchange.registerQueue(queueName, queue, queueArguments);
}
catch (AMQException e)
{
fail(e.getMessage());
}
FieldTable bindArguments = null;
if (useSelector)
{
bindArguments = new FieldTable();
bindArguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), "Test = 'MST'");
}
try
{
queue.bind(exchange, routingKey, bindArguments);
}
catch (AMQException e)
{
fail(e.getMessage());
}
}
private void validateMessage(long messageCount, boolean allQueues)
{
validateMessageOnTopics(messageCount, allQueues);
validateMessageOnQueues(messageCount, allQueues);
}
private void validateMessageOnTopics(long messageCount, boolean allQueues)
{
validateMessageOnQueue(durablePriorityTopicQueueName, messageCount);
validateMessageOnQueue(durableTopicQueueName, messageCount);
if (allQueues)
{
validateMessageOnQueue(priorityTopicQueueName, messageCount);
validateMessageOnQueue(topicQueueName, messageCount);
}
}
private void validateMessageOnQueues(long messageCount, boolean allQueues)
{
validateMessageOnQueue(durablePriorityQueueName, messageCount);
validateMessageOnQueue(durableQueueName, messageCount);
if (allQueues)
{
validateMessageOnQueue(priorityQueueName, messageCount);
validateMessageOnQueue(queueName, messageCount);
}
}
private void validateMessageOnQueue(AMQShortString queueName, long messageCount)
{
AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(queueName);
assertNotNull("Queue(" + queueName + ") not correctly registered:", queue);
assertEquals("Incorrect Message count on queue:" + queueName, messageCount, queue.getMessageCount());
}
private class TestMessagePublishInfo implements MessagePublishInfo
{
Exchange _exchange;
boolean _immediate;
boolean _mandatory;
AMQShortString _routingKey;
TestMessagePublishInfo(Exchange exchange, boolean immediate, boolean mandatory, AMQShortString routingKey)
{
_exchange = exchange;
_immediate = immediate;
_mandatory = mandatory;
_routingKey = routingKey;
}
public AMQShortString getExchange()
{
return _exchange.getName();
}
public void setExchange(AMQShortString exchange)
{
//no-op
}
public boolean isImmediate()
{
return _immediate;
}
public boolean isMandatory()
{
return _mandatory;
}
public AMQShortString getRoutingKey()
{
return _routingKey;
}
}
}