blob: b38d9d7bd26dca8bfc56189d47a10c8198ba2d82 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.store;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.argThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.File;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.UUID;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.server.binding.BindingImpl;
import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.model.Binding;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.ExclusivityPolicy;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.util.FileUtils;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatcher;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTestCase
{
private static final String EXCHANGE_NAME = "exchangeName";
private static final String EXCHANGE = org.apache.qpid.server.model.Exchange.class.getSimpleName();
private static final String BINDING = org.apache.qpid.server.model.Binding.class.getSimpleName();
private static final String QUEUE = Queue.class.getSimpleName();
private static final UUID ANY_UUID = UUID.randomUUID();
private static final Map ANY_MAP = new HashMap();
private String _storePath;
private String _storeName;
private ConfiguredObjectRecordHandler _handler;
private static final String ROUTING_KEY = "routingKey";
private static final String QUEUE_NAME = "queueName";
private Map<String,Object> _bindingArgs;
private UUID _queueId;
private UUID _exchangeId;
private DurableConfigurationStore _configStore;
protected Map<String, Object> _configurationStoreSettings;
public void setUp() throws Exception
{
super.setUp();
_configurationStoreSettings = new HashMap<String, Object>();
_queueId = UUIDGenerator.generateRandomUUID();
_exchangeId = UUIDGenerator.generateRandomUUID();
_storeName = getName();
_storePath = TMP_FOLDER + File.separator + _storeName;
_configurationStoreSettings.put(MessageStore.STORE_PATH, _storePath);
FileUtils.delete(new File(_storePath), true);
setTestSystemProperty("QPID_WORK", TMP_FOLDER);
_handler = mock(ConfiguredObjectRecordHandler.class);
when(_handler.handle(any(ConfiguredObjectRecord.class))).thenReturn(true);
_bindingArgs = new HashMap<String, Object>();
String argKey = AMQPFilterTypes.JMS_SELECTOR.toString();
String argValue = "some selector expression";
_bindingArgs.put(argKey, argValue);
reopenStore();
}
public void tearDown() throws Exception
{
try
{
closeConfigStore();
FileUtils.delete(new File(_storePath), true);
}
finally
{
super.tearDown();
}
}
public void testCreateExchange() throws Exception
{
ExchangeImpl exchange = createTestExchange();
DurableConfigurationStoreHelper.createExchange(_configStore, exchange);
reopenStore();
verify(_handler).handle(matchesRecord(_exchangeId, EXCHANGE,
map( org.apache.qpid.server.model.Exchange.NAME, getName(),
org.apache.qpid.server.model.Exchange.TYPE, getName()+"Type",
org.apache.qpid.server.model.Exchange.LIFETIME_POLICY, LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS.name())));
}
private Map<String,Object> map(Object... vals)
{
Map<String,Object> map = new HashMap<String, Object>();
boolean isValue = false;
String key = null;
for(Object obj : vals)
{
if(isValue)
{
map.put(key,obj);
}
else
{
key = (String) obj;
}
isValue = !isValue;
}
return map;
}
public void testRemoveExchange() throws Exception
{
ExchangeImpl exchange = createTestExchange();
DurableConfigurationStoreHelper.createExchange(_configStore, exchange);
DurableConfigurationStoreHelper.removeExchange(_configStore, exchange);
reopenStore();
verify(_handler, never()).handle(any(ConfiguredObjectRecord.class));
}
public void testBindQueue() throws Exception
{
ExchangeImpl<?> exchange = createTestExchange();
AMQQueue queue = createTestQueue(QUEUE_NAME, "queueOwner", false, null);
BindingImpl binding = new BindingImpl(UUIDGenerator.generateRandomUUID(), ROUTING_KEY, queue,
exchange, _bindingArgs);
DurableConfigurationStoreHelper.createExchange(_configStore, exchange);
DurableConfigurationStoreHelper.createQueue(_configStore, queue);
DurableConfigurationStoreHelper.createBinding(_configStore, binding);
reopenStore();
Map<String,Object> map = new HashMap<String, Object>();
map.put(Binding.NAME, ROUTING_KEY);
map.put(Binding.ARGUMENTS,_bindingArgs);
Map<String,UUID> parents = new HashMap<String, UUID>();
parents.put(Exchange.class.getSimpleName(), exchange.getId());
parents.put(Queue.class.getSimpleName(), queue.getId());
verify(_handler).handle(matchesRecord(binding.getId(), BINDING, map, parents));
}
private ConfiguredObjectRecord matchesRecord(UUID id,
String type,
Map<String, Object> attributes,
final Map<String, UUID> parents)
{
return argThat(new ConfiguredObjectMatcher(id, type, attributes, parents));
}
private ConfiguredObjectRecord matchesRecord(UUID id, String type, Map<String, Object> attributes)
{
return argThat(new ConfiguredObjectMatcher(id, type, attributes, ANY_MAP));
}
private static class ConfiguredObjectMatcher extends ArgumentMatcher<ConfiguredObjectRecord>
{
private final Map<String,Object> _matchingMap;
private final UUID _id;
private final String _name;
private final Map<String,UUID> _parents;
private ConfiguredObjectMatcher(final UUID id, final String type, final Map<String, Object> matchingMap, Map<String,UUID> parents)
{
_id = id;
_name = type;
_matchingMap = matchingMap;
_parents = parents;
}
@Override
public boolean matches(final Object argument)
{
if(argument instanceof ConfiguredObjectRecord)
{
ConfiguredObjectRecord binding = (ConfiguredObjectRecord) argument;
Map<String,Object> arg = new HashMap<String, Object>(binding.getAttributes());
arg.remove("createdBy");
arg.remove("createdTime");
return (_id == ANY_UUID || _id.equals(binding.getId()))
&& _name.equals(binding.getType())
&& (_matchingMap == ANY_MAP || arg.equals(_matchingMap))
&& (_parents == ANY_MAP || matchesParents(binding));
}
return false;
}
private boolean matchesParents(ConfiguredObjectRecord binding)
{
Map<String, ConfiguredObjectRecord> bindingParents = binding.getParents();
if(bindingParents.size() != _parents.size())
{
return false;
}
for(Map.Entry<String,UUID> entry : _parents.entrySet())
{
if(!bindingParents.get(entry.getKey()).getId().equals(entry.getValue()))
{
return false;
}
}
return true;
}
}
public void testUnbindQueue() throws Exception
{
ExchangeImpl<?> exchange = createTestExchange();
DurableConfigurationStoreHelper.createExchange(_configStore, exchange);
AMQQueue queue = createTestQueue(QUEUE_NAME, "queueOwner", false, null);
BindingImpl binding = new BindingImpl(UUIDGenerator.generateRandomUUID(), ROUTING_KEY, queue,
exchange, _bindingArgs);
DurableConfigurationStoreHelper.createBinding(_configStore, binding);
DurableConfigurationStoreHelper.removeBinding(_configStore, binding);
reopenStore();
verify(_handler, never()).handle(matchesRecord(ANY_UUID, BINDING,
ANY_MAP));
}
public void testCreateQueueAMQQueue() throws Exception
{
AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true, null);
DurableConfigurationStoreHelper.createQueue(_configStore, queue);
reopenStore();
Map<String, Object> queueAttributes = new HashMap<String, Object>();
queueAttributes.put(Queue.NAME, getName());
queueAttributes.put(Queue.OWNER, getName()+"Owner");
queueAttributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONTAINER.name());
verify(_handler).handle(matchesRecord(_queueId, QUEUE, queueAttributes));
}
public void testCreateQueueAMQQueueFieldTable() throws Exception
{
Map<String, Object> attributes = new HashMap<String, Object>();
attributes.put(Queue.CREATE_DLQ_ON_CREATION, Boolean.TRUE);
attributes.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 10);
AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true, attributes);
DurableConfigurationStoreHelper.createQueue(_configStore, queue);
reopenStore();
Map<String,Object> queueAttributes = new HashMap<String, Object>();
queueAttributes.put(Queue.NAME, getName());
queueAttributes.put(Queue.OWNER, getName()+"Owner");
queueAttributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONTAINER.name());
queueAttributes.putAll(attributes);
verify(_handler).handle(matchesRecord(_queueId, QUEUE, queueAttributes));
}
public void testCreateQueueAMQQueueWithAlternateExchange() throws Exception
{
ExchangeImpl alternateExchange = createTestAlternateExchange();
AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true, alternateExchange, null);
DurableConfigurationStoreHelper.createQueue(_configStore, queue);
reopenStore();
Map<String, Object> queueAttributes = new HashMap<String, Object>();
queueAttributes.put(Queue.NAME, getName());
queueAttributes.put(Queue.OWNER, getName()+"Owner");
queueAttributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONTAINER.name());
queueAttributes.put(Queue.ALTERNATE_EXCHANGE, alternateExchange.getId().toString());
verify(_handler).handle(matchesRecord(_queueId, QUEUE, queueAttributes));
}
private ExchangeImpl createTestAlternateExchange()
{
UUID exchUuid = UUID.randomUUID();
ExchangeImpl alternateExchange = mock(ExchangeImpl.class);
when(alternateExchange.getId()).thenReturn(exchUuid);
return alternateExchange;
}
public void testUpdateQueueExclusivity() throws Exception
{
// create queue
Map<String, Object> attributes = new HashMap<String, Object>();
attributes.put(Queue.CREATE_DLQ_ON_CREATION, Boolean.TRUE);
attributes.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 10);
AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true, attributes);
DurableConfigurationStoreHelper.createQueue(_configStore, queue);
// update the queue to have exclusive=false
queue = createTestQueue(getName(), getName() + "Owner", false, attributes);
DurableConfigurationStoreHelper.updateQueue(_configStore, queue);
reopenStore();
Map<String,Object> queueAttributes = new HashMap<String, Object>();
queueAttributes.put(Queue.NAME, getName());
queueAttributes.putAll(attributes);
verify(_handler).handle(matchesRecord(_queueId, QUEUE, queueAttributes));
}
public void testUpdateQueueAlternateExchange() throws Exception
{
// create queue
Map<String, Object> attributes = new HashMap<String, Object>();
attributes.put(Queue.CREATE_DLQ_ON_CREATION, Boolean.TRUE);
attributes.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 10);
AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true, attributes);
DurableConfigurationStoreHelper.createQueue(_configStore, queue);
// update the queue to have exclusive=false
ExchangeImpl alternateExchange = createTestAlternateExchange();
queue = createTestQueue(getName(), getName() + "Owner", false, alternateExchange, attributes);
DurableConfigurationStoreHelper.updateQueue(_configStore, queue);
reopenStore();
Map<String,Object> queueAttributes = new HashMap<String, Object>();
queueAttributes.put(Queue.NAME, getName());
queueAttributes.putAll(attributes);
queueAttributes.put(Queue.ALTERNATE_EXCHANGE, alternateExchange.getId().toString());
verify(_handler).handle(matchesRecord(_queueId, QUEUE, queueAttributes));
}
public void testRemoveQueue() throws Exception
{
// create queue
Map<String, Object> attributes = new HashMap<String, Object>();
attributes.put(Queue.CREATE_DLQ_ON_CREATION, Boolean.TRUE);
attributes.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 10);
AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true, attributes);
DurableConfigurationStoreHelper.createQueue(_configStore, queue);
// remove queue
DurableConfigurationStoreHelper.removeQueue(_configStore,queue);
reopenStore();
verify(_handler, never()).handle(any(ConfiguredObjectRecord.class));
}
private AMQQueue createTestQueue(String queueName,
String queueOwner,
boolean exclusive,
final Map<String, Object> arguments) throws StoreException
{
return createTestQueue(queueName, queueOwner, exclusive, null, arguments);
}
private AMQQueue createTestQueue(String queueName,
String queueOwner,
boolean exclusive,
ExchangeImpl alternateExchange,
final Map<String, Object> arguments) throws StoreException
{
AMQQueue<?> queue = mock(AMQQueue.class);
when(queue.getName()).thenReturn(queueName);
when(queue.isExclusive()).thenReturn(exclusive);
when(queue.getId()).thenReturn(_queueId);
when(queue.getAlternateExchange()).thenReturn(alternateExchange);
final org.apache.qpid.server.virtualhost.VirtualHost vh = mock(org.apache.qpid.server.virtualhost.VirtualHost.class);
when(vh.getSecurityManager()).thenReturn(mock(SecurityManager.class));
when(queue.getVirtualHost()).thenReturn(vh);
final Map<String,Object> attributes = arguments == null ? new LinkedHashMap<String, Object>() : new LinkedHashMap<String, Object>(arguments);
attributes.put(Queue.NAME, queueName);
if(alternateExchange != null)
{
attributes.put(Queue.ALTERNATE_EXCHANGE, alternateExchange);
}
if(exclusive)
{
when(queue.getOwner()).thenReturn(queueOwner);
attributes.put(Queue.OWNER, queueOwner);
attributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONTAINER);
}
when(queue.getAvailableAttributes()).thenReturn(attributes.keySet());
final ArgumentCaptor<String> requestedAttribute = ArgumentCaptor.forClass(String.class);
when(queue.getAttribute(requestedAttribute.capture())).then(
new Answer()
{
@Override
public Object answer(final InvocationOnMock invocation) throws Throwable
{
String attrName = requestedAttribute.getValue();
return attributes.get(attrName);
}
});
when(queue.getActualAttributes()).thenReturn(attributes);
ConfiguredObjectRecord objectRecord = mock(ConfiguredObjectRecord.class);
when(objectRecord.getId()).thenReturn(_queueId);
when(objectRecord.getType()).thenReturn(Queue.class.getSimpleName());
when(objectRecord.getAttributes()).thenReturn(attributes);
when(queue.asObjectRecord()).thenReturn(objectRecord);
return queue;
}
private ExchangeImpl createTestExchange()
{
ExchangeImpl exchange = mock(ExchangeImpl.class);
Map<String,Object> actualAttributes = new HashMap<String, Object>();
actualAttributes.put("name", getName());
actualAttributes.put("type", getName() + "Type");
actualAttributes.put("lifetimePolicy", LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS);
when(exchange.getName()).thenReturn(getName());
when(exchange.getTypeName()).thenReturn(getName() + "Type");
when(exchange.isAutoDelete()).thenReturn(true);
when(exchange.getId()).thenReturn(_exchangeId);
ConfiguredObjectRecord exchangeRecord = mock(ConfiguredObjectRecord.class);
when(exchangeRecord.getId()).thenReturn(_exchangeId);
when(exchangeRecord.getType()).thenReturn(Exchange.class.getSimpleName());
when(exchangeRecord.getAttributes()).thenReturn(actualAttributes);
when(exchange.asObjectRecord()).thenReturn(exchangeRecord);
when(exchange.getExchangeType()).thenReturn(mock(ExchangeType.class));
when(exchange.getEventLogger()).thenReturn(new EventLogger());
return exchange;
}
private void reopenStore() throws Exception
{
closeConfigStore();
_configStore = createConfigStore();
ConfiguredObject<?> parent = mock(ConfiguredObject.class);
when(parent.getName()).thenReturn("testName");
_configStore.openConfigurationStore(parent, _configurationStoreSettings);
_configStore.visitConfiguredObjectRecords(_handler);
}
protected abstract DurableConfigurationStore createConfigStore() throws Exception;
protected void closeConfigStore() throws Exception
{
if (_configStore != null)
{
_configStore.closeConfigurationStore();
}
}
}