blob: f0ecfb6407cc76ac8c7f0789f71b5bc53ea8484f [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.store;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.isA;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.times;
import java.io.File;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import org.apache.commons.configuration.Configuration;
import org.apache.qpid.AMQStoreException;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.logging.SystemOutMessageLogger;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.TestLogActor;
import org.apache.qpid.server.message.EnqueableMessage;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.MockStoredMessage;
import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BindingRecoveryHandler;
import org.apache.qpid.server.store.ConfigurationRecoveryHandler.ExchangeRecoveryHandler;
import org.apache.qpid.server.store.ConfigurationRecoveryHandler.QueueRecoveryHandler;
import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler;
import org.apache.qpid.server.store.Transaction.Record;
import org.apache.qpid.server.store.derby.DerbyMessageStore;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.util.FileUtils;
public class DurableConfigurationStoreTest extends QpidTestCase
{
private static final String EXCHANGE_NAME = "exchangeName";
private String _storePath;
private String _storeName;
private MessageStore _store;
private Configuration _configuration;
private ConfigurationRecoveryHandler _recoveryHandler;
private QueueRecoveryHandler _queueRecoveryHandler;
private ExchangeRecoveryHandler _exchangeRecoveryHandler;
private BindingRecoveryHandler _bindingRecoveryHandler;
private MessageStoreRecoveryHandler _messageStoreRecoveryHandler;
private StoredMessageRecoveryHandler _storedMessageRecoveryHandler;
private TransactionLogRecoveryHandler _logRecoveryHandler;
private TransactionLogRecoveryHandler.QueueEntryRecoveryHandler _queueEntryRecoveryHandler;
private TransactionLogRecoveryHandler.DtxRecordRecoveryHandler _dtxRecordRecoveryHandler;
private Exchange _exchange = mock(Exchange.class);
private static final String ROUTING_KEY = "routingKey";
private static final String QUEUE_NAME = "queueName";
private FieldTable _bindingArgs;
private UUID _queueId;
private UUID _exchangeId;
public void setUp() throws Exception
{
super.setUp();
_queueId = UUIDGenerator.generateRandomUUID();
_exchangeId = UUIDGenerator.generateRandomUUID();
_storeName = getName();
_storePath = TMP_FOLDER + File.separator + _storeName;
FileUtils.delete(new File(_storePath), true);
setTestSystemProperty("QPID_WORK", TMP_FOLDER);
_configuration = mock(Configuration.class);
_recoveryHandler = mock(ConfigurationRecoveryHandler.class);
_queueRecoveryHandler = mock(QueueRecoveryHandler.class);
_exchangeRecoveryHandler = mock(ExchangeRecoveryHandler.class);
_bindingRecoveryHandler = mock(BindingRecoveryHandler.class);
_storedMessageRecoveryHandler = mock(StoredMessageRecoveryHandler.class);
_logRecoveryHandler = mock(TransactionLogRecoveryHandler.class);
_messageStoreRecoveryHandler = mock(MessageStoreRecoveryHandler.class);
_queueEntryRecoveryHandler = mock(TransactionLogRecoveryHandler.QueueEntryRecoveryHandler.class);
_dtxRecordRecoveryHandler = mock(TransactionLogRecoveryHandler.DtxRecordRecoveryHandler.class);
when(_messageStoreRecoveryHandler.begin()).thenReturn(_storedMessageRecoveryHandler);
when(_recoveryHandler.begin(isA(MessageStore.class))).thenReturn(_exchangeRecoveryHandler);
when(_exchangeRecoveryHandler.completeExchangeRecovery()).thenReturn(_queueRecoveryHandler);
when(_queueRecoveryHandler.completeQueueRecovery()).thenReturn(_bindingRecoveryHandler);
when(_logRecoveryHandler.begin(any(MessageStore.class))).thenReturn(_queueEntryRecoveryHandler);
when(_queueEntryRecoveryHandler.completeQueueEntryRecovery()).thenReturn(_dtxRecordRecoveryHandler);
when(_exchange.getNameShortString()).thenReturn(AMQShortString.valueOf(EXCHANGE_NAME));
when(_exchange.getId()).thenReturn(_exchangeId);
when(_configuration.getString(eq(MessageStoreConstants.ENVIRONMENT_PATH_PROPERTY), anyString())).thenReturn(
_storePath);
_bindingArgs = new FieldTable();
AMQShortString argKey = AMQPFilterTypes.JMS_SELECTOR.getValue();
String argValue = "some selector expression";
_bindingArgs.put(argKey, argValue);
reopenStore();
}
public void tearDown() throws Exception
{
FileUtils.delete(new File(_storePath), true);
super.tearDown();
}
public void testCreateExchange() throws Exception
{
Exchange exchange = createTestExchange();
_store.createExchange(exchange);
reopenStore();
verify(_exchangeRecoveryHandler).exchange(_exchangeId, getName(), getName() + "Type", true);
}
public void testRemoveExchange() throws Exception
{
Exchange exchange = createTestExchange();
_store.createExchange(exchange);
_store.removeExchange(exchange);
reopenStore();
verify(_exchangeRecoveryHandler, never()).exchange(any(UUID.class), anyString(), anyString(), anyBoolean());
}
public void testBindQueue() throws Exception
{
AMQQueue queue = createTestQueue(QUEUE_NAME, "queueOwner", false);
Binding binding = new Binding(UUIDGenerator.generateRandomUUID(), ROUTING_KEY, queue,
_exchange, FieldTable.convertToMap(_bindingArgs));
_store.bindQueue(binding);
reopenStore();
ByteBuffer argsAsBytes = ByteBuffer.wrap(_bindingArgs.getDataAsBytes());
verify(_bindingRecoveryHandler).binding(binding.getId(), _exchange.getId(), queue.getId(), ROUTING_KEY, argsAsBytes);
}
public void testUnbindQueue() throws Exception
{
AMQQueue queue = createTestQueue(QUEUE_NAME, "queueOwner", false);
Binding binding = new Binding(UUIDGenerator.generateRandomUUID(), ROUTING_KEY, queue,
_exchange, FieldTable.convertToMap(_bindingArgs));
_store.bindQueue(binding);
_store.unbindQueue(binding);
reopenStore();
verify(_bindingRecoveryHandler, never()).binding(any(UUID.class), any(UUID.class), any(UUID.class), anyString(),
isA(ByteBuffer.class));
}
public void testCreateQueueAMQQueue() throws Exception
{
AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true);
_store.createQueue(queue);
reopenStore();
verify(_queueRecoveryHandler).queue(_queueId, getName(), getName() + "Owner", true, null, null);
}
public void testCreateQueueAMQQueueFieldTable() throws Exception
{
AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true);
Map<String, Object> attributes = new HashMap<String, Object>();
attributes.put("x-qpid-dlq-enabled", Boolean.TRUE);
attributes.put("x-qpid-maximum-delivery-count", new Integer(10));
FieldTable arguments = FieldTable.convertToFieldTable(attributes);
_store.createQueue(queue, arguments);
reopenStore();
verify(_queueRecoveryHandler).queue(_queueId, getName(), getName() + "Owner", true, arguments, null);
}
public void testCreateQueueAMQQueueWithAlternateExchange() throws Exception
{
Exchange alternateExchange = createTestAlternateExchange();
AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true, alternateExchange);
_store.createQueue(queue);
reopenStore();
verify(_queueRecoveryHandler).queue(_queueId, getName(), getName() + "Owner", true, null, alternateExchange.getId());
}
private Exchange createTestAlternateExchange()
{
UUID exchUuid = UUID.randomUUID();
Exchange alternateExchange = mock(Exchange.class);
when(alternateExchange.getId()).thenReturn(exchUuid);
return alternateExchange;
}
public void testUpdateQueueExclusivity() throws Exception
{
// create queue
AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true);
Map<String, Object> attributes = new HashMap<String, Object>();
attributes.put("x-qpid-dlq-enabled", Boolean.TRUE);
attributes.put("x-qpid-maximum-delivery-count", new Integer(10));
FieldTable arguments = FieldTable.convertToFieldTable(attributes);
_store.createQueue(queue, arguments);
// update the queue to have exclusive=false
queue = createTestQueue(getName(), getName() + "Owner", false);
_store.updateQueue(queue);
reopenStore();
verify(_queueRecoveryHandler).queue(_queueId, getName(), getName() + "Owner", false, arguments, null);
}
public void testUpdateQueueAlternateExchange() throws Exception
{
// create queue
AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true);
Map<String, Object> attributes = new HashMap<String, Object>();
attributes.put("x-qpid-dlq-enabled", Boolean.TRUE);
attributes.put("x-qpid-maximum-delivery-count", new Integer(10));
FieldTable arguments = FieldTable.convertToFieldTable(attributes);
_store.createQueue(queue, arguments);
// update the queue to have exclusive=false
Exchange alternateExchange = createTestAlternateExchange();
queue = createTestQueue(getName(), getName() + "Owner", false, alternateExchange);
_store.updateQueue(queue);
reopenStore();
verify(_queueRecoveryHandler).queue(_queueId, getName(), getName() + "Owner", false, arguments, alternateExchange.getId());
}
public void testRemoveQueue() throws Exception
{
// create queue
AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true);
Map<String, Object> attributes = new HashMap<String, Object>();
attributes.put("x-qpid-dlq-enabled", Boolean.TRUE);
attributes.put("x-qpid-maximum-delivery-count", new Integer(10));
FieldTable arguments = FieldTable.convertToFieldTable(attributes);
_store.createQueue(queue, arguments);
// remove queue
_store.removeQueue(queue);
reopenStore();
verify(_queueRecoveryHandler, never()).queue(any(UUID.class), anyString(), anyString(), anyBoolean(),
any(FieldTable.class), any(UUID.class));
}
private AMQQueue createTestQueue(String queueName, String queueOwner, boolean exclusive) throws AMQStoreException
{
return createTestQueue(queueName, queueOwner, exclusive, null);
}
private AMQQueue createTestQueue(String queueName, String queueOwner, boolean exclusive, Exchange alternateExchange) throws AMQStoreException
{
AMQQueue queue = mock(AMQQueue.class);
when(queue.getName()).thenReturn(queueName);
when(queue.getNameShortString()).thenReturn(AMQShortString.valueOf(queueName));
when(queue.getOwner()).thenReturn(AMQShortString.valueOf(queueOwner));
when(queue.isExclusive()).thenReturn(exclusive);
when(queue.getId()).thenReturn(_queueId);
when(queue.getAlternateExchange()).thenReturn(alternateExchange);
return queue;
}
private Exchange createTestExchange()
{
Exchange exchange = mock(Exchange.class);
when(exchange.getNameShortString()).thenReturn(AMQShortString.valueOf(getName()));
when(exchange.getName()).thenReturn(getName());
when(exchange.getTypeShortString()).thenReturn(AMQShortString.valueOf(getName() + "Type"));
when(exchange.isAutoDelete()).thenReturn(true);
when(exchange.getId()).thenReturn(_exchangeId);
return exchange;
}
private void reopenStore() throws Exception
{
if (_store != null)
{
_store.close();
}
_store = createStore();
_store.configureConfigStore(_storeName, _recoveryHandler, _configuration);
_store.configureMessageStore(_storeName, _messageStoreRecoveryHandler, _logRecoveryHandler, _configuration);
_store.activate();
}
protected MessageStore createStore() throws Exception
{
String storeClass = System.getProperty(MESSAGE_STORE_CLASS_NAME_KEY);
if (storeClass == null)
{
storeClass = DerbyMessageStore.class.getName();
}
CurrentActor.set(new TestLogActor(new SystemOutMessageLogger()));
MessageStore messageStore = (MessageStore) Class.forName(storeClass).newInstance();
return messageStore;
}
public void testRecordXid() throws Exception
{
Record enqueueRecord = getTestRecord(1);
Record dequeueRecord = getTestRecord(2);
Record[] enqueues = { enqueueRecord };
Record[] dequeues = { dequeueRecord };
byte[] globalId = new byte[] { 1 };
byte[] branchId = new byte[] { 2 };
Transaction transaction = _store.newTransaction();
transaction.recordXid(1l, globalId, branchId, enqueues, dequeues);
transaction.commitTran();
reopenStore();
verify(_dtxRecordRecoveryHandler).dtxRecord(1l, globalId, branchId, enqueues, dequeues);
transaction = _store.newTransaction();
transaction.removeXid(1l, globalId, branchId);
transaction.commitTran();
reopenStore();
verify(_dtxRecordRecoveryHandler, times(1)).dtxRecord(1l, globalId, branchId, enqueues, dequeues);
}
private Record getTestRecord(long messageNumber)
{
UUID queueId1 = UUIDGenerator.generateRandomUUID();
TransactionLogResource queue1 = mock(TransactionLogResource.class);
when(queue1.getId()).thenReturn(queueId1);
EnqueableMessage message1 = mock(EnqueableMessage.class);
when(message1.isPersistent()).thenReturn(true);
when(message1.getMessageNumber()).thenReturn(messageNumber);
when(message1.getStoredMessage()).thenReturn(new MockStoredMessage(messageNumber));
Record enqueueRecord = new TestRecord(queue1, message1);
return enqueueRecord;
}
private static class TestRecord implements Record
{
private TransactionLogResource _queue;
private EnqueableMessage _message;
public TestRecord(TransactionLogResource queue, EnqueableMessage message)
{
super();
_queue = queue;
_message = message;
}
@Override
public TransactionLogResource getQueue()
{
return _queue;
}
@Override
public EnqueableMessage getMessage()
{
return _message;
}
@Override
public int hashCode()
{
final int prime = 31;
int result = 1;
result = prime * result + ((_message == null) ? 0 : new Long(_message.getMessageNumber()).hashCode());
result = prime * result + ((_queue == null) ? 0 : _queue.getId().hashCode());
return result;
}
@Override
public boolean equals(Object obj)
{
if (this == obj)
{
return true;
}
if (obj == null)
{
return false;
}
if (!(obj instanceof Record))
{
return false;
}
Record other = (Record) obj;
if (_message == null && other.getMessage() != null)
{
return false;
}
if (_queue == null && other.getQueue() != null)
{
return false;
}
if (_message.getMessageNumber() != other.getMessage().getMessageNumber())
{
return false;
}
return _queue.getId().equals(other.getQueue().getId());
}
}
}