blob: 07d10a8fb6c243a26ab6ec95fa13c10efa2eee48 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.store;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.File;
import java.security.Principal;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.UUID;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatcher;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.filter.AMQPFilterTypes;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.model.BrokerModel;
import org.apache.qpid.server.model.BrokerTestHelper;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.ConfiguredObjectFactory;
import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.ExclusivityPolicy;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.VirtualHostNode;
import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
import org.apache.qpid.server.util.FileUtils;
import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
import org.apache.qpid.test.utils.UnitTestBase;
public abstract class AbstractDurableConfigurationStoreTestCase extends UnitTestBase
{
private static final String EXCHANGE = org.apache.qpid.server.model.Exchange.class.getSimpleName();
private static final String QUEUE = Queue.class.getSimpleName();
private static final UUID ANY_UUID = UUID.randomUUID();
private static final Map ANY_MAP = new HashMap();
private static final String STANDARD = "standard";
private String _storePath;
private String _storeName;
private ConfiguredObjectRecordHandler _handler;
private Map<String,Object> _bindingArgs;
private UUID _queueId;
private UUID _exchangeId;
private DurableConfigurationStore _configStore;
private ConfiguredObjectFactoryImpl _factory;
private VirtualHostNode<?> _parent;
private ConfiguredObjectRecord _rootRecord;
@Before
public void setUp() throws Exception
{
_queueId = UUIDGenerator.generateRandomUUID();
_exchangeId = UUIDGenerator.generateRandomUUID();
_factory = new ConfiguredObjectFactoryImpl(BrokerModel.getInstance());
_storeName = getTestName();
_storePath = TMP_FOLDER + File.separator + _storeName;
FileUtils.delete(new File(_storePath), true);
_handler = mock(ConfiguredObjectRecordHandler.class);
_bindingArgs = new HashMap<>();
String argKey = AMQPFilterTypes.JMS_SELECTOR.toString();
String argValue = "some selector expression";
_bindingArgs.put(argKey, argValue);
_parent = createVirtualHostNode(_storePath, _factory);
_configStore = createConfigStore();
_configStore.init(_parent);
_configStore.openConfigurationStore(new ConfiguredObjectRecordHandler()
{
@Override
public void handle(final ConfiguredObjectRecord record)
{
}
});
_rootRecord = new ConfiguredObjectRecordImpl(UUID.randomUUID(), VirtualHost.class.getSimpleName(), Collections.<String, Object>singletonMap(ConfiguredObject.NAME, "vhost"));
_configStore.create(_rootRecord);
}
protected VirtualHostNode<?> getVirtualHostNode()
{
return _parent;
}
protected DurableConfigurationStore getConfigurationStore()
{
return _configStore;
}
protected abstract VirtualHostNode createVirtualHostNode(String storeLocation, ConfiguredObjectFactory factory);
@After
public void tearDown() throws Exception
{
try
{
closeConfigStore();
}
finally
{
if (_storePath != null)
{
FileUtils.delete(new File(_storePath), true);
}
}
}
@Test
public void testCloseIsIdempotent() throws Exception
{
_configStore.closeConfigurationStore();
// Second close should be accepted without exception
_configStore.closeConfigurationStore();
}
@Test
public void testCreateExchange() throws Exception
{
Exchange<?> exchange = createTestExchange();
_configStore.create(exchange.asObjectRecord());
reopenStore();
_configStore.openConfigurationStore(_handler);
verify(_handler).handle(matchesRecord(_exchangeId, EXCHANGE,
map( org.apache.qpid.server.model.Exchange.NAME, getTestName(),
org.apache.qpid.server.model.Exchange.TYPE, getTestName()+"Type",
org.apache.qpid.server.model.Exchange.LIFETIME_POLICY, LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS.name())));
}
protected Map<String,Object> map(Object... vals)
{
Map<String,Object> map = new HashMap<>();
boolean isValue = false;
String key = null;
for(Object obj : vals)
{
if(isValue)
{
map.put(key,obj);
}
else
{
key = (String) obj;
}
isValue = !isValue;
}
return map;
}
@Test
public void testRemoveExchange() throws Exception
{
Exchange<?> exchange = createTestExchange();
_configStore.create(exchange.asObjectRecord());
_configStore.remove(exchange.asObjectRecord());
reopenStore();
verify(_handler, never()).handle(any(ConfiguredObjectRecord.class));
}
protected ConfiguredObjectRecord matchesRecord(UUID id, String type, Map<String, Object> attributes)
{
return argThat(new ConfiguredObjectMatcher(id, type, attributes, ANY_MAP));
}
protected ConfiguredObjectRecord matchesRecord(UUID id, String type, Map<String, Object> attributes, Map<String,UUID> parents)
{
return argThat(new ConfiguredObjectMatcher(id, type, attributes, parents));
}
private static class ConfiguredObjectMatcher implements ArgumentMatcher<ConfiguredObjectRecord>
{
private final Map<String,Object> _matchingMap;
private final UUID _id;
private final String _name;
private final Map<String,UUID> _parents;
private ConfiguredObjectMatcher(final UUID id, final String type, final Map<String, Object> matchingMap, Map<String,UUID> parents)
{
_id = id;
_name = type;
_matchingMap = matchingMap;
_parents = parents;
}
@Override
public boolean matches(final ConfiguredObjectRecord binding)
{
Map<String,Object> arg = new HashMap<>(binding.getAttributes());
arg.remove("createdBy");
arg.remove("createdTime");
arg.remove("lastUpdatedTime");
arg.remove("lastUpdatedBy");
return (_id == ANY_UUID || _id.equals(binding.getId()))
&& _name.equals(binding.getType())
&& (_matchingMap == ANY_MAP || arg.equals(_matchingMap))
&& (_parents == ANY_MAP || matchesParents(binding));
}
private boolean matchesParents(ConfiguredObjectRecord binding)
{
Map<String, UUID> bindingParents = binding.getParents();
if(bindingParents.size() != _parents.size())
{
return false;
}
for(Map.Entry<String,UUID> entry : _parents.entrySet())
{
if(!bindingParents.get(entry.getKey()).equals(entry.getValue()))
{
return false;
}
}
return true;
}
}
@Test
public void testCreateQueue() throws Exception
{
Queue<?> queue = createTestQueue(getTestName(), getTestName() + "Owner", true, null);
_configStore.create(queue.asObjectRecord());
reopenStore();
_configStore.openConfigurationStore(_handler);
Map<String, Object> queueAttributes = new HashMap<>();
queueAttributes.put(Queue.NAME, getTestName());
queueAttributes.put(Queue.OWNER, getTestName() + "Owner");
queueAttributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONTAINER.name());
queueAttributes.put(Queue.TYPE, STANDARD);
verify(_handler).handle(matchesRecord(_queueId, QUEUE, queueAttributes));
}
@Test
public void testUpdateQueue() throws Exception
{
Map<String, Object> attributes = new HashMap<>();
attributes.put(Queue.TYPE, STANDARD);
Queue<?> queue = createTestQueue(getTestName(), getTestName() + "Owner", true, attributes);
_configStore.create(queue.asObjectRecord());
// update the queue to have exclusive=false
queue = createTestQueue(getTestName(), getTestName() + "Owner", false, attributes);
_configStore.update(false, queue.asObjectRecord());
reopenStore();
_configStore.openConfigurationStore(_handler);
Map<String,Object> queueAttributes = new HashMap<>();
queueAttributes.put(Queue.NAME, getTestName());
queueAttributes.putAll(attributes);
verify(_handler).handle(matchesRecord(_queueId, QUEUE, queueAttributes));
}
@Test
public void testRemoveQueue() throws Exception
{
Queue<?> queue = createTestQueue(getTestName(), getTestName() + "Owner", true, Collections.emptyMap());
_configStore.create(queue.asObjectRecord());
_configStore.remove(queue.asObjectRecord());
reopenStore();
verify(_handler, never()).handle(any(ConfiguredObjectRecord.class));
}
private Queue<?> createTestQueue(String queueName,
String queueOwner,
boolean exclusive,
final Map<String, Object> arguments) throws StoreException
{
Queue queue = BrokerTestHelper.mockWithSystemPrincipal(Queue.class, mock(Principal.class));
when(queue.getName()).thenReturn(queueName);
when(queue.isExclusive()).thenReturn(exclusive);
when(queue.getId()).thenReturn(_queueId);
when(queue.getType()).thenReturn(STANDARD);
when(queue.getCategoryClass()).thenReturn(Queue.class);
when(queue.isDurable()).thenReturn(true);
TaskExecutor taskExecutor = CurrentThreadTaskExecutor.newStartedInstance();
when(queue.getTaskExecutor()).thenReturn(taskExecutor);
when(queue.getChildExecutor()).thenReturn(taskExecutor);
final QueueManagingVirtualHost vh = mock(QueueManagingVirtualHost.class);
when(queue.getVirtualHost()).thenReturn(vh);
final Map<String,Object> attributes = arguments == null ? new LinkedHashMap<>() : new LinkedHashMap<>(arguments);
attributes.put(Queue.NAME, queueName);
attributes.put(Queue.TYPE, STANDARD);
if(exclusive)
{
when(queue.getOwner()).thenReturn(queueOwner);
attributes.put(Queue.OWNER, queueOwner);
attributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONTAINER);
}
when(queue.getAvailableAttributes()).thenReturn(attributes.keySet());
final ArgumentCaptor<String> requestedAttribute = ArgumentCaptor.forClass(String.class);
when(queue.getAttribute(requestedAttribute.capture())).then(
new Answer()
{
@Override
public Object answer(final InvocationOnMock invocation) throws Throwable
{
String attrName = requestedAttribute.getValue();
return attributes.get(attrName);
}
});
when(queue.getActualAttributes()).thenReturn(attributes);
when(queue.getObjectFactory()).thenReturn(_factory);
when(queue.getModel()).thenReturn(_factory.getModel());
ConfiguredObjectRecord objectRecord = mock(ConfiguredObjectRecord.class);
when(objectRecord.getId()).thenReturn(_queueId);
when(objectRecord.getType()).thenReturn(Queue.class.getSimpleName());
when(objectRecord.getAttributes()).thenReturn(attributes);
when(objectRecord.getParents()).thenReturn(Collections.singletonMap(_rootRecord.getType(), _rootRecord.getId()));
when(queue.asObjectRecord()).thenReturn(objectRecord);
return queue;
}
private Exchange<?> createTestExchange()
{
Exchange exchange = mock(Exchange.class);
Map<String,Object> actualAttributes = new HashMap<>();
actualAttributes.put("name", getTestName());
actualAttributes.put("type", getTestName() + "Type");
actualAttributes.put("lifetimePolicy", LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS);
when(exchange.getName()).thenReturn(getTestName());
when(exchange.getType()).thenReturn(getTestName() + "Type");
when(exchange.isAutoDelete()).thenReturn(true);
when(exchange.getId()).thenReturn(_exchangeId);
when(exchange.getCategoryClass()).thenReturn(Exchange.class);
when(exchange.isDurable()).thenReturn(true);
when(exchange.getObjectFactory()).thenReturn(_factory);
when(exchange.getModel()).thenReturn(_factory.getModel());
TaskExecutor taskExecutor = CurrentThreadTaskExecutor.newStartedInstance();
when(exchange.getTaskExecutor()).thenReturn(taskExecutor);
when(exchange.getChildExecutor()).thenReturn(taskExecutor);
ConfiguredObjectRecord exchangeRecord = mock(ConfiguredObjectRecord.class);
when(exchangeRecord.getId()).thenReturn(_exchangeId);
when(exchangeRecord.getType()).thenReturn(Exchange.class.getSimpleName());
when(exchangeRecord.getAttributes()).thenReturn(actualAttributes);
when(exchangeRecord.getParents()).thenReturn(Collections.singletonMap(_rootRecord.getType(), _rootRecord.getId()));
when(exchange.asObjectRecord()).thenReturn(exchangeRecord);
when(exchange.getEventLogger()).thenReturn(new EventLogger());
return exchange;
}
private void reopenStore() throws Exception
{
closeConfigStore();
_configStore = createConfigStore();
_configStore.init(_parent);
}
protected abstract DurableConfigurationStore createConfigStore() throws Exception;
protected void closeConfigStore() throws Exception
{
if (_configStore != null)
{
_configStore.closeConfigurationStore();
}
}
}