blob: 652c9aaf47b4d838be98d6265cdbf9a03c117c13 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.model.adapter;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.security.Principal;
import java.security.PrivilegedAction;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.security.auth.Subject;
import org.apache.qpid.server.configuration.updater.TaskExecutorImpl;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.BrokerImpl;
import org.apache.qpid.server.model.BrokerModel;
import org.apache.qpid.server.model.BrokerTestHelper;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.SystemConfig;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.VirtualHostNode;
import org.apache.qpid.server.model.preferences.Preference;
import org.apache.qpid.server.model.preferences.PreferenceFactory;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.security.auth.UsernamePrincipal;
import org.apache.qpid.server.security.auth.manager.SimpleAuthenticationManager;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.preferences.PreferenceStore;
import org.apache.qpid.server.virtualhost.TestMemoryVirtualHost;
import org.apache.qpid.server.virtualhostnode.TestVirtualHostNode;
import org.apache.qpid.test.utils.QpidTestCase;
public class BrokerImplTest extends QpidTestCase
{
private TaskExecutorImpl _taskExecutor;
private SystemConfig _systemConfig;
private BrokerImpl _brokerImpl;
private PreferenceStore _preferenceStore;
@Override
public void setUp() throws Exception
{
super.setUp();
_taskExecutor = new TaskExecutorImpl();
_taskExecutor.start();
_preferenceStore = mock(PreferenceStore.class);
_systemConfig = BrokerTestHelper.mockWithSystemPrincipal(SystemConfig.class, mock(Principal.class));
when(_systemConfig.getTaskExecutor()).thenReturn(_taskExecutor);
when(_systemConfig.getChildExecutor()).thenReturn(_taskExecutor);
when(_systemConfig.getModel()).thenReturn(BrokerModel.getInstance());
when(_systemConfig.getEventLogger()).thenReturn(new EventLogger());
when(_systemConfig.getCategoryClass()).thenReturn(SystemConfig.class);
when(_systemConfig.createPreferenceStore()).thenReturn(_preferenceStore);
}
@Override
public void tearDown() throws Exception
{
try
{
if (_brokerImpl != null)
{
_brokerImpl.close();
}
if (_taskExecutor != null)
{
_taskExecutor.stopImmediately();
}
}
finally
{
super.tearDown();
}
}
public void testAssignTargetSizes_NoQueueDepth() throws Exception
{
doAssignTargetSizeTest(new long[] {0, 0}, 1024 * 1024 * 1024);
}
public void testAssignTargetSizes_OneQueue() throws Exception
{
doAssignTargetSizeTest(new long[] {37}, 1024 * 1024 * 1024);
}
public void testAssignTargetSizes_ThreeQueues() throws Exception
{
doAssignTargetSizeTest(new long[] {37, 47, 0}, 1024 * 1024 * 1024);
}
public void testAssignTargetSizes_QueuesOversize() throws Exception
{
int flowToDiskThreshold = 1024 * 1024 * 1024;
doAssignTargetSizeTest(new long[] {flowToDiskThreshold / 2, flowToDiskThreshold / 2 , 1024},
flowToDiskThreshold);
}
public void testNetworkBufferSize()
{
Map<String, Object> attributes = new HashMap<>();
attributes.put(Broker.NAME, "Broker");
attributes.put(Broker.MODEL_VERSION, BrokerModel.MODEL_VERSION);
attributes.put(Broker.DURABLE, true);
// testing successful case is not possible because of the static nature of QpidByteBuffer which *should* be unrelated
// testing unsuccessful case
attributes.put(Broker.CONTEXT, Collections.singletonMap(Broker.NETWORK_BUFFER_SIZE, Broker.MINIMUM_NETWORK_BUFFER_SIZE - 1));
_brokerImpl = new BrokerImpl(attributes, _systemConfig);
_brokerImpl.open();
assertEquals("Broker open should fail with network buffer size less then minimum",
State.ERRORED,
_brokerImpl.getState());
assertEquals("Unexpected buffer size",
Broker.DEFAULT_NETWORK_BUFFER_SIZE,
_brokerImpl.getNetworkBufferSize());
}
public void testPurgeUser() throws Exception
{
final String testUsername = "testUser";
final String testPassword = "testPassword";
// setup broker
Map<String, Object> brokerAttributes = new HashMap<>();
brokerAttributes.put("name", "Broker");
brokerAttributes.put(Broker.MODEL_VERSION, BrokerModel.MODEL_VERSION);
brokerAttributes.put(Broker.DURABLE, true);
_brokerImpl = new BrokerImpl(brokerAttributes, _systemConfig);
_brokerImpl.open();
// setup auth provider with testuser
final Map<String, Object> authProviderAttributes = new HashMap<>();
authProviderAttributes.put(ConfiguredObject.NAME, "testAuthProvider");
authProviderAttributes.put(ConfiguredObject.TYPE, "Simple");
SimpleAuthenticationManager authenticationProvider = new SimpleAuthenticationManager(authProviderAttributes, _brokerImpl);
authenticationProvider.create();
authenticationProvider.addUser(testUsername, testPassword);
// setup preference owned by testuser
final Map<String, Object> preferenceAttributes = new HashMap<>();
UUID preferenceId = UUID.randomUUID();
preferenceAttributes.put(Preference.ID_ATTRIBUTE, preferenceId);
preferenceAttributes.put(Preference.NAME_ATTRIBUTE, "testPref");
preferenceAttributes.put(Preference.TYPE_ATTRIBUTE, "X-testPrefType");
preferenceAttributes.put(Preference.VALUE_ATTRIBUTE, Collections.EMPTY_MAP);
Subject testUserSubject = new Subject();
testUserSubject.getPrincipals()
.add(new AuthenticatedPrincipal(new UsernamePrincipal(testUsername, authenticationProvider)));
testUserSubject.setReadOnly();
final Collection<Preference> preferences =
Collections.singleton(PreferenceFactory.fromAttributes(_brokerImpl, preferenceAttributes));
Subject.doAs(testUserSubject, new PrivilegedAction<Void>()
{
@Override
public Void run()
{
try
{
_brokerImpl.getUserPreferences().updateOrAppend(preferences).get(10, TimeUnit.SECONDS);
}
catch (InterruptedException | ExecutionException | TimeoutException e)
{
e.printStackTrace();
fail("Failed to put preference:");
}
return null;
}
});
// test pre-conditions
Collection<Preference> preferencesBeforePurge = getPreferencesAs(testUserSubject);
assertEquals("Unexpected number of preferences before userPurge", 1, preferencesBeforePurge.size());
assertEquals("Unexpected preference before userPurge", preferenceId, preferencesBeforePurge.iterator().next().getId());
assertTrue("User was not valid before userPurge", authenticationProvider.getUsers().containsKey(testUsername));
_brokerImpl.purgeUser(authenticationProvider, testUsername);
// test post-conditions
Collection<Preference> preferencesAfterPurge = getPreferencesAs(testUserSubject);
assertEquals("Preferences were not deleted during userPurge", Collections.EMPTY_SET, preferencesAfterPurge);
assertEquals("User was not deleted from authentication Provider", Collections.EMPTY_MAP, authenticationProvider.getUsers());
verify(_preferenceStore).replace(Collections.singleton(preferenceId), Collections.EMPTY_SET);
}
private Collection<Preference> getPreferencesAs(final Subject testUserSubject)
{
return Subject.doAs(testUserSubject, new PrivilegedAction<Collection<Preference>>()
{
@Override
public Collection<Preference> run()
{
Collection<Preference> preferences = null;
try
{
preferences = _brokerImpl.getUserPreferences().getPreferences().get(10, TimeUnit.SECONDS);
}
catch (InterruptedException | ExecutionException | TimeoutException e)
{
e.printStackTrace();
fail("Failed to put preference:");
}
return preferences;
}
});
}
private void doAssignTargetSizeTest(final long[] virtualHostQueueSizes, final long flowToDiskThreshold)
{
Map<String, Object> attributes = new HashMap<>();
attributes.put("name", "Broker");
attributes.put(Broker.MODEL_VERSION, BrokerModel.MODEL_VERSION);
attributes.put(Broker.DURABLE, true);
attributes.put("context", Collections.singletonMap(Broker.BROKER_FLOW_TO_DISK_THRESHOLD, flowToDiskThreshold));
_brokerImpl = new BrokerImpl(attributes, _systemConfig);
_brokerImpl.open();
assertEquals("Unexpected broker state", State.ACTIVE, _brokerImpl.getState());
for(int i=0; i < virtualHostQueueSizes.length; i++)
{
createVhnWithVh(_brokerImpl, i, virtualHostQueueSizes[i]);
}
long totalAssignedTargetSize = 0;
for(VirtualHostNode<?> vhn : _brokerImpl.getVirtualHostNodes())
{
long targetSize = vhn.getVirtualHost().getTargetSize();
assertTrue("A virtualhost's target size cannot be zero", targetSize > 0);
totalAssignedTargetSize += targetSize;
}
long diff = Math.abs(flowToDiskThreshold - totalAssignedTargetSize);
long tolerance = _brokerImpl.getVirtualHostNodes().size() * 2;
assertTrue(String.format("Assigned target size not within expected tolerance. Diff %d Tolerance %d", diff, tolerance), diff < tolerance);
}
private void createVhnWithVh(final BrokerImpl brokerImpl, int nameSuffix, final long totalQueueSize)
{
final Map<String, Object> vhnAttributes = new HashMap<>();
vhnAttributes.put(VirtualHostNode.TYPE, TestVirtualHostNode.VIRTUAL_HOST_NODE_TYPE);
vhnAttributes.put(VirtualHostNode.NAME, "testVhn" + nameSuffix);
final DurableConfigurationStore store = mock(DurableConfigurationStore.class);
TestVirtualHostNode vhn = new TestVirtualHostNode(brokerImpl, vhnAttributes, store);
vhn.create();
final Map<String, Object> vhAttributes = new HashMap<>();
vhAttributes.put(VirtualHost.TYPE, TestMemoryVirtualHost.VIRTUAL_HOST_TYPE);
vhAttributes.put(VirtualHost.NAME, "testVh" + nameSuffix);
TestMemoryVirtualHost vh = new TestMemoryVirtualHost(vhAttributes, vhn)
{
@Override
public long getTotalQueueDepthBytes()
{
return totalQueueSize;
}
};
vh.create();
}
}