blob: e9ceecf22ea63a2f0bf624c489b58940771ec265 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.model;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.security.AccessControlException;
import java.security.Principal;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatcher;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.configuration.store.StoreConfigurationChangeListener;
import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.security.AccessControl;
import org.apache.qpid.server.security.Result;
import org.apache.qpid.server.security.access.Operation;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.Event;
import org.apache.qpid.server.store.EventListener;
import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
import org.apache.qpid.server.store.preferences.PreferenceStore;
import org.apache.qpid.server.store.preferences.PreferenceStoreUpdater;
import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.server.transport.AbstractAMQPConnection;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.TestMemoryVirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
import org.apache.qpid.test.utils.QpidTestCase;
public class VirtualHostTest extends QpidTestCase
{
private final AccessControl _mockAccessControl = BrokerTestHelper.createAccessControlMock();
private Broker _broker;
private TaskExecutor _taskExecutor;
private VirtualHostNode _virtualHostNode;
private DurableConfigurationStore _configStore;
private VirtualHost<?> _virtualHost;
private StoreConfigurationChangeListener _storeConfigurationChangeListener;
private PreferenceStore _preferenceStore;
@Override
protected void setUp() throws Exception
{
super.setUp();
_broker = BrokerTestHelper.createBrokerMock();
_taskExecutor = new CurrentThreadTaskExecutor();
_taskExecutor.start();
when(_broker.getTaskExecutor()).thenReturn(_taskExecutor);
when(_broker.getChildExecutor()).thenReturn(_taskExecutor);
Principal systemPrincipal = ((SystemPrincipalSource)_broker).getSystemPrincipal();
_virtualHostNode = BrokerTestHelper.mockWithSystemPrincipalAndAccessControl(VirtualHostNode.class, systemPrincipal, _mockAccessControl);
when(_virtualHostNode.getParent(Broker.class)).thenReturn(_broker);
when(_virtualHostNode.getCategoryClass()).thenReturn(VirtualHostNode.class);
when(_virtualHostNode.isDurable()).thenReturn(true);
_configStore = mock(DurableConfigurationStore.class);
_storeConfigurationChangeListener = new StoreConfigurationChangeListener(_configStore);
when(_virtualHostNode.getConfigurationStore()).thenReturn(_configStore);
// Virtualhost needs the EventLogger from the SystemContext.
when(_virtualHostNode.getParent(Broker.class)).thenReturn(_broker);
ConfiguredObjectFactory objectFactory = _broker.getObjectFactory();
when(_virtualHostNode.getModel()).thenReturn(objectFactory.getModel());
when(_virtualHostNode.getTaskExecutor()).thenReturn(_taskExecutor);
when(_virtualHostNode.getChildExecutor()).thenReturn(_taskExecutor);
_preferenceStore = mock(PreferenceStore.class);
when(_virtualHostNode.createPreferenceStore()).thenReturn(_preferenceStore);
}
@Override
public void tearDown() throws Exception
{
try
{
try
{
_taskExecutor.stopImmediately();
}
finally
{
if (_virtualHost != null)
{
_virtualHost.close();
}
}
}
finally
{
super.tearDown();
}
}
public void testNewVirtualHost()
{
String virtualHostName = getName();
VirtualHost<?> virtualHost = createVirtualHost(virtualHostName);
assertNotNull("Unexpected id", virtualHost.getId());
assertEquals("Unexpected name", virtualHostName, virtualHost.getName());
assertEquals("Unexpected state", State.ACTIVE, virtualHost.getState());
verify(_configStore).update(eq(true), matchesRecord(virtualHost.getId(), virtualHost.getType()));
}
public void testDeleteVirtualHost()
{
VirtualHost<?> virtualHost = createVirtualHost(getName());
assertEquals("Unexpected state", State.ACTIVE, virtualHost.getState());
virtualHost.delete();
assertEquals("Unexpected state", State.DELETED, virtualHost.getState());
verify(_configStore).remove(matchesRecord(virtualHost.getId(), virtualHost.getType()));
verify(_preferenceStore).onDelete();
}
public void testStopAndStartVirtualHost()
{
String virtualHostName = getName();
VirtualHost<?> virtualHost = createVirtualHost(virtualHostName);
assertEquals("Unexpected state", State.ACTIVE, virtualHost.getState());
virtualHost.stop();
assertEquals("Unexpected state", State.STOPPED, virtualHost.getState());
verify(_preferenceStore).close();
virtualHost.start();
assertEquals("Unexpected state", State.ACTIVE, virtualHost.getState());
verify(_configStore, times(1)).update(eq(true), matchesRecord(virtualHost.getId(), virtualHost.getType()));
verify(_configStore, times(2)).update(eq(false), matchesRecord(virtualHost.getId(), virtualHost.getType()));
verify(_preferenceStore, times(2)).openAndLoad(any(PreferenceStoreUpdater.class));
}
public void testRestartingVirtualHostRecoversChildren()
{
String virtualHostName = getName();
VirtualHost<?> virtualHost = createVirtualHost(virtualHostName);
assertEquals("Unexpected state", State.ACTIVE, virtualHost.getState());
final ConfiguredObjectRecord virtualHostCor = virtualHost.asObjectRecord();
// Give virtualhost a queue and an exchange
Queue queue = virtualHost.createChild(Queue.class, Collections.<String, Object>singletonMap(Queue.NAME, "myQueue"));
final ConfiguredObjectRecord queueCor = queue.asObjectRecord();
Map<String, Object> exchangeArgs = new HashMap<>();
exchangeArgs.put(Exchange.NAME, "myExchange");
exchangeArgs.put(Exchange.TYPE, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
Exchange exchange = virtualHost.createChild(Exchange.class, exchangeArgs);
final ConfiguredObjectRecord exchangeCor = exchange.asObjectRecord();
assertEquals("Unexpected number of queues before stop", 1, virtualHost.getChildren(Queue.class).size());
assertEquals("Unexpected number of exchanges before stop", 5, virtualHost.getChildren(Exchange.class).size());
final List<ConfiguredObjectRecord> allObjects = new ArrayList<>();
allObjects.add(virtualHostCor);
allObjects.add(queueCor);
for(Exchange e : virtualHost.getChildren(Exchange.class))
{
allObjects.add(e.asObjectRecord());
}
virtualHost.stop();
assertEquals("Unexpected state", State.STOPPED, virtualHost.getState());
assertEquals("Unexpected number of queues after stop", 0, virtualHost.getChildren(Queue.class).size());
assertEquals("Unexpected number of exchanges after stop", 0, virtualHost.getChildren(Exchange.class).size());
// Setup an answer that will return the configured object records
doAnswer(new Answer()
{
final Iterator<ConfiguredObjectRecord> corIterator = allObjects.iterator();
@Override
public Object answer(final InvocationOnMock invocation) throws Throwable
{
ConfiguredObjectRecordHandler handler = (ConfiguredObjectRecordHandler) invocation.getArguments()[0];
boolean handlerContinue = true;
while(corIterator.hasNext())
{
handler.handle(corIterator.next());
}
return null;
}
}).when(_configStore).reload(any(ConfiguredObjectRecordHandler.class));
virtualHost.start();
assertEquals("Unexpected state", State.ACTIVE, virtualHost.getState());
assertEquals("Unexpected number of queues after restart", 1, virtualHost.getChildren(Queue.class).size());
assertEquals("Unexpected number of exchanges after restart", 5, virtualHost.getChildren(Exchange.class).size());
}
public void testModifyDurableChildAfterRestartingVirtualHost()
{
String virtualHostName = getName();
VirtualHost<?> virtualHost = createVirtualHost(virtualHostName);
final ConfiguredObjectRecord virtualHostCor = virtualHost.asObjectRecord();
// Give virtualhost a queue and an exchange
Queue queue = virtualHost.createChild(Queue.class, Collections.<String, Object>singletonMap(Queue.NAME, "myQueue"));
final ConfiguredObjectRecord queueCor = queue.asObjectRecord();
final List<ConfiguredObjectRecord> allObjects = new ArrayList<>();
allObjects.add(virtualHostCor);
allObjects.add(queueCor);
virtualHost.stop();
assertEquals("Unexpected state", State.STOPPED, virtualHost.getState());
// Setup an answer that will return the configured object records
doAnswer(new Answer()
{
final Iterator<ConfiguredObjectRecord> corIterator = allObjects.iterator();
@Override
public Object answer(final InvocationOnMock invocation) throws Throwable
{
ConfiguredObjectRecordHandler handler = (ConfiguredObjectRecordHandler) invocation.getArguments()[0];
while (corIterator.hasNext())
{
handler.handle(corIterator.next());
}
return null;
}
}).when(_configStore).reload(any(ConfiguredObjectRecordHandler.class));
virtualHost.start();
assertEquals("Unexpected state", State.ACTIVE, virtualHost.getState());
final Collection<Queue> queues = virtualHost.getChildren(Queue.class);
assertEquals("Unexpected number of queues after restart", 1, queues.size());
final Queue recoveredQueue = queues.iterator().next();
recoveredQueue.setAttributes(Collections.singletonMap(ConfiguredObject.DESCRIPTION, "testDescription"));
final ConfiguredObjectRecord recoveredQueueCor = queue.asObjectRecord();
verify(_configStore).update(eq(false), matchesRecord(recoveredQueueCor.getId(), recoveredQueueCor.getType()));
}
public void testStopVirtualHost_ClosesConnections()
{
String virtualHostName = getName();
VirtualHost<?> virtualHost = createVirtualHost(virtualHostName);
assertEquals("Unexpected state", State.ACTIVE, virtualHost.getState());
AbstractAMQPConnection connection = createMockProtocolConnection(virtualHost);
assertEquals("Unexpected number of connections before connection registered", 0, virtualHost.getConnectionCount());
AMQPConnection modelConnection = mock(AMQPConnection.class);
when(modelConnection.closeAsync()).thenReturn(Futures.immediateFuture(null));
virtualHost.registerConnection(modelConnection);
assertEquals("Unexpected number of connections after connection registered", 1, virtualHost.getConnectionCount());
virtualHost.stop();
assertEquals("Unexpected state", State.STOPPED, virtualHost.getState());
assertEquals("Unexpected number of connections after virtualhost stopped",
0,
virtualHost.getConnectionCount());
verify(modelConnection).closeAsync();
}
public void testDeleteVirtualHost_ClosesConnections()
{
String virtualHostName = getName();
VirtualHost<?> virtualHost = createVirtualHost(virtualHostName);
assertEquals("Unexpected state", State.ACTIVE, virtualHost.getState());
AbstractAMQPConnection connection = createMockProtocolConnection(virtualHost);
assertEquals("Unexpected number of connections before connection registered",
0,
virtualHost.getConnectionCount());
AMQPConnection modelConnection = mock(AMQPConnection.class);
when(modelConnection.closeAsync()).thenReturn(Futures.immediateFuture(null));
virtualHost.registerConnection(modelConnection);
assertEquals("Unexpected number of connections after connection registered",
1,
virtualHost.getConnectionCount());
virtualHost.delete();
assertEquals("Unexpected state", State.DELETED, virtualHost.getState());
assertEquals("Unexpected number of connections after virtualhost deleted",
0,
virtualHost.getConnectionCount());
verify(modelConnection).closeAsync();
}
public void testCreateDurableQueue()
{
String virtualHostName = getName();
VirtualHost<?> virtualHost = createVirtualHost(virtualHostName);
String queueName = "myQueue";
Map<String, Object> arguments = new HashMap<>();
arguments.put(Queue.NAME, queueName);
arguments.put(Queue.DURABLE, Boolean.TRUE);
Queue queue = virtualHost.createChild(Queue.class, arguments);
assertNotNull(queue.getId());
assertEquals(queueName, queue.getName());
verify(_configStore).update(eq(true), matchesRecord(queue.getId(), queue.getType()));
}
public void testCreateNonDurableQueue()
{
String virtualHostName = getName();
VirtualHost<?> virtualHost = createVirtualHost(virtualHostName);
String queueName = "myQueue";
Map<String, Object> arguments = new HashMap<>();
arguments.put(Queue.NAME, queueName);
arguments.put(Queue.DURABLE, Boolean.FALSE);
Queue queue = virtualHost.createChild(Queue.class, arguments);
assertNotNull(queue.getId());
assertEquals(queueName, queue.getName());
verify(_configStore, never()).create(matchesRecord(queue.getId(), queue.getType()));
}
// *************** VH Access Control Tests ***************
public void testUpdateDeniedByACL()
{
String virtualHostName = getName();
VirtualHost<?> virtualHost = createVirtualHost(virtualHostName);
when(_mockAccessControl.authorise(null, Operation.UPDATE, virtualHost, Collections.<String,Object>emptyMap())).thenReturn(Result.DENIED);
assertNull(virtualHost.getDescription());
try
{
virtualHost.setAttributes(Collections.<String, Object>singletonMap(VirtualHost.DESCRIPTION, "My description"));
fail("Exception not thrown");
}
catch (AccessControlException ace)
{
// PASS
}
verify(_configStore, never()).update(eq(false), matchesRecord(virtualHost.getId(), virtualHost.getType()));
}
public void testStopDeniedByACL()
{
String virtualHostName = getName();
VirtualHost<?> virtualHost = createVirtualHost(virtualHostName);
when(_mockAccessControl.authorise(null, Operation.UPDATE,
virtualHost, Collections.<String,Object>emptyMap())).thenReturn(Result.DENIED);
try
{
virtualHost.stop();
fail("Exception not thrown");
}
catch (AccessControlException ace)
{
// PASS
}
verify(_configStore, never()).update(eq(false), matchesRecord(virtualHost.getId(), virtualHost.getType()));
}
public void testDeleteDeniedByACL()
{
String virtualHostName = getName();
VirtualHost<?> virtualHost = createVirtualHost(virtualHostName);
when(_mockAccessControl.authorise(null,
Operation.DELETE, virtualHost, Collections.<String,Object>emptyMap())).thenReturn(Result.DENIED);
try
{
virtualHost.delete();
fail("Exception not thrown");
}
catch (AccessControlException ace)
{
// PASS
}
verify(_configStore, never()).remove(matchesRecord(virtualHost.getId(), virtualHost.getType()));
}
public void testExistingConnectionBlocking()
{
VirtualHost<?> host = createVirtualHost(getTestName());
AbstractAMQPConnection connection = mock(AbstractAMQPConnection.class);
host.registerConnection(connection);
((EventListener)host).event(Event.PERSISTENT_MESSAGE_SIZE_OVERFULL);
verify(connection).block();
}
public void testCreateValidation() throws Exception
{
try
{
createVirtualHost(getTestName(), Collections.<String, Object>singletonMap(VirtualHost.NUMBER_OF_SELECTORS, "-1"));
fail("Exception not thrown for negative number of selectors");
}
catch (IllegalConfigurationException e)
{
// pass
}
try
{
createVirtualHost(getTestName(), Collections.<String, Object>singletonMap(VirtualHost.CONNECTION_THREAD_POOL_SIZE, "-1"));
fail("Exception not thrown for negative connection thread pool size");
}
catch (IllegalConfigurationException e)
{
// pass
}
try
{
createVirtualHost(getTestName(), Collections.<String, Object>singletonMap(VirtualHost.NUMBER_OF_SELECTORS, VirtualHost.DEFAULT_VIRTUALHOST_CONNECTION_THREAD_POOL_SIZE));
fail("Exception not thrown for number of selectors equal to connection thread pool size");
}
catch (IllegalConfigurationException e)
{
// pass
}
}
public void testChangeValidation() throws Exception
{
VirtualHost<?> virtualHost = createVirtualHost(getTestName());
try
{
virtualHost.setAttributes(Collections.<String, Object>singletonMap(VirtualHost.NUMBER_OF_SELECTORS, "-1"));
fail("Exception not thrown for negative number of selectors");
}
catch (IllegalConfigurationException e)
{
// pass
}
try
{
virtualHost.setAttributes(Collections.<String, Object>singletonMap(VirtualHost.CONNECTION_THREAD_POOL_SIZE,
"-1"));
fail("Exception not thrown for negative connection thread pool size");
}
catch (IllegalConfigurationException e)
{
// pass
}
try
{
virtualHost.setAttributes(Collections.<String, Object>singletonMap(VirtualHost.NUMBER_OF_SELECTORS, VirtualHost.DEFAULT_VIRTUALHOST_CONNECTION_THREAD_POOL_SIZE));
fail("Exception not thrown for number of selectors equal to connection thread pool size");
}
catch (IllegalConfigurationException e)
{
// pass
}
}
public void testRegisterConnection() throws Exception
{
VirtualHost<?> vhost = createVirtualHost("sdf");
AMQPConnection<?> connection = getMockConnection();
assertEquals("unexpected number of connections before test", 0, vhost.getConnectionCount());
vhost.registerConnection(connection);
assertEquals("unexpected number of connections after registerConnection", 1, vhost.getConnectionCount());
assertEquals("unexpected connection object", Collections.singleton(connection), vhost.getConnections());
}
public void testStopVirtualhostClosesConnections() throws Exception
{
VirtualHost<?> vhost = createVirtualHost("sdf");
AMQPConnection<?> connection = getMockConnection();
vhost.registerConnection(connection);
assertEquals("unexpected number of connections after registerConnection", 1, vhost.getConnectionCount());
assertEquals("unexpected connection object", Collections.singleton(connection), vhost.getConnections());
vhost.stop();
verify(connection).stopConnection();
verify(connection).closeAsync();
}
public void testRegisterConnectionOnStoppedVirtualhost() throws Exception
{
VirtualHost<?> vhost = createVirtualHost("sdf");
AMQPConnection<?> connection = getMockConnection();
vhost.stop();
try
{
vhost.registerConnection(connection);
fail("exception not thrown");
}
catch (VirtualHostUnavailableException e)
{
// pass
}
assertEquals("unexpected number of connections", 0, vhost.getConnectionCount());
vhost.start();
vhost.registerConnection(connection);
assertEquals("unexpected number of connections", 1, vhost.getConnectionCount());
}
private AMQPConnection<?> getMockConnection()
{
AMQPConnection<?> connection = mock(AMQPConnection.class);
final ListenableFuture<Void> listenableFuture = Futures.immediateFuture(null);
when(connection.closeAsync()).thenReturn(listenableFuture);
return connection;
}
private VirtualHost<?> createVirtualHost(final String virtualHostName)
{
return createVirtualHost(virtualHostName, Collections.<String, Object>emptyMap());
}
private VirtualHost<?> createVirtualHost(final String virtualHostName, Map<String,Object> attributes)
{
Map<String, Object> vhAttributes = new HashMap<>();
vhAttributes.put(VirtualHost.NAME, virtualHostName);
vhAttributes.put(VirtualHost.TYPE, TestMemoryVirtualHost.VIRTUAL_HOST_TYPE);
vhAttributes.putAll(attributes);
TestMemoryVirtualHost host = new TestMemoryVirtualHost(vhAttributes, _virtualHostNode);
host.addChangeListener(_storeConfigurationChangeListener);
host.create();
// Fire the child added event on the node
_storeConfigurationChangeListener.childAdded(_virtualHostNode,host);
_virtualHost = host;
when(_virtualHostNode.getVirtualHost()).thenReturn(_virtualHost);
return host;
}
private AbstractAMQPConnection createMockProtocolConnection(final VirtualHost<?> virtualHost)
{
final AbstractAMQPConnection connection = mock(AbstractAMQPConnection.class);
final List<Action<?>> tasks = new ArrayList<>();
final ArgumentCaptor<Action> deleteTaskCaptor = ArgumentCaptor.forClass(Action.class);
Answer answer = new Answer()
{
@Override
public Object answer(final InvocationOnMock invocation) throws Throwable
{
return tasks.add(deleteTaskCaptor.getValue());
}
};
doAnswer(answer).when(connection).addDeleteTask(deleteTaskCaptor.capture());
when(connection.getAddressSpace()).thenReturn(virtualHost);
doAnswer(new Answer()
{
@Override
public Object answer(final InvocationOnMock invocation) throws Throwable
{
for(Action action : tasks)
{
action.performAction(connection);
}
return null;
}
}).when(connection).sendConnectionCloseAsync(any(AMQConstant.class), anyString());
when(connection.getRemoteAddressString()).thenReturn("peer:1234");
return connection;
}
private static ConfiguredObjectRecord matchesRecord(UUID id, String type)
{
return argThat(new MinimalConfiguredObjectRecordMatcher(id, type));
}
private static class MinimalConfiguredObjectRecordMatcher extends ArgumentMatcher<ConfiguredObjectRecord>
{
private final UUID _id;
private final String _type;
private MinimalConfiguredObjectRecordMatcher(UUID id, String type)
{
_id = id;
_type = type;
}
@Override
public boolean matches(Object argument)
{
ConfiguredObjectRecord rhs = (ConfiguredObjectRecord) argument;
return (_id.equals(rhs.getId()) || _type.equals(rhs.getType()));
}
}
}