blob: 452e260d051e916594ec2caf0c3ef162545c384b [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.model;
import java.security.AccessControlException;
import java.security.AccessController;
import java.security.Principal;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import javax.security.auth.Subject;
import javax.security.auth.login.AccountNotFoundException;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.configuration.CommonProperties;
import org.apache.qpid.server.BrokerOptions;
import org.apache.qpid.server.BrokerPrincipal;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.configuration.updater.TaskExecutorImpl;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.messages.BrokerMessages;
import org.apache.qpid.server.logging.messages.VirtualHostMessages;
import org.apache.qpid.server.model.preferences.Preference;
import org.apache.qpid.server.model.preferences.UserPreferences;
import org.apache.qpid.server.model.preferences.UserPreferencesImpl;
import org.apache.qpid.server.plugin.QpidServiceLoader;
import org.apache.qpid.server.plugin.SystemAddressSpaceCreator;
import org.apache.qpid.server.plugin.SystemNodeCreator;
import org.apache.qpid.server.security.AccessControl;
import org.apache.qpid.server.security.CompoundAccessControl;
import org.apache.qpid.server.security.Result;
import org.apache.qpid.server.security.SubjectFixedResultAccessControl;
import org.apache.qpid.server.security.SubjectFixedResultAccessControl.ResultCalculator;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.security.auth.UsernamePrincipal;
import org.apache.qpid.server.security.auth.manager.SimpleAuthenticationManager;
import org.apache.qpid.server.security.group.GroupPrincipal;
import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.stats.StatisticsGatherer;
import org.apache.qpid.server.store.FileBasedSettings;
import org.apache.qpid.server.store.preferences.PreferenceRecord;
import org.apache.qpid.server.store.preferences.PreferenceStore;
import org.apache.qpid.server.store.preferences.PreferenceStoreUpdaterImpl;
import org.apache.qpid.server.store.preferences.PreferencesRecoverer;
import org.apache.qpid.server.store.preferences.PreferencesRoot;
import org.apache.qpid.server.util.HousekeepingExecutor;
import org.apache.qpid.server.virtualhost.VirtualHostPropertiesNodeCreator;
import org.apache.qpid.util.SystemUtils;
@ManagedObject( category = false, type = "Broker" )
public class BrokerImpl extends AbstractContainer<BrokerImpl> implements Broker<BrokerImpl>, StatisticsGatherer
{
private static final Logger LOGGER = LoggerFactory.getLogger(BrokerImpl.class);
private static final Pattern MODEL_VERSION_PATTERN = Pattern.compile("^\\d+\\.\\d+$");
private static final int HOUSEKEEPING_SHUTDOWN_TIMEOUT = 5;
public static final String MANAGEMENT_MODE_AUTHENTICATION = "MANAGEMENT_MODE_AUTHENTICATION";
private final AccessControl _systemUserAllowed = new SubjectFixedResultAccessControl(new ResultCalculator()
{
@Override
public Result getResult(final Subject subject)
{
return isSystemSubject(subject) ? Result.ALLOWED : Result.DEFER;
}
}, Result.DEFER);
private final BrokerPrincipal _principal;
private String[] POSITIVE_NUMERIC_ATTRIBUTES = { CONNECTION_SESSION_COUNT_LIMIT,
CONNECTION_HEART_BEAT_DELAY, STATISTICS_REPORTING_PERIOD };
private AuthenticationProvider<?> _managementModeAuthenticationProvider;
private Timer _reportingTimer;
private final StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
/** Flags used to control the reporting of flow to disk. Protected by this */
private boolean _totalMessageSizeExceedThresholdReported = false, _totalMessageSizeWithinThresholdReported = true;
@ManagedAttributeField
private int _connection_sessionCountLimit;
@ManagedAttributeField
private int _connection_heartBeatDelay;
@ManagedAttributeField
private boolean _connection_closeWhenNoRoute;
@ManagedAttributeField
private int _statisticsReportingPeriod;
@ManagedAttributeField
private boolean _statisticsReportingResetEnabled;
@ManagedAttributeField
private boolean _messageCompressionEnabled;
private PreferenceStore _preferenceStore;
private final boolean _virtualHostPropertiesNodeEnabled;
private Collection<BrokerLogger> _brokerLoggersToClose;
private int _networkBufferSize = DEFAULT_NETWORK_BUFFER_SIZE;
private final AddressSpaceRegistry _addressSpaceRegistry = new AddressSpaceRegistry();
private ConfigurationChangeListener _accessControlProviderListener = new AccessControlProviderListener();
private final AccessControl _accessControl;
private TaskExecutor _preferenceTaskExecutor;
private String _documentationUrl;
@ManagedObjectFactoryConstructor
public BrokerImpl(Map<String, Object> attributes,
SystemConfig parent)
{
super(parentsMap(parent), attributes, parent);
_principal = new BrokerPrincipal(this);
if (parent.isManagementMode())
{
Map<String,Object> authManagerAttrs = new HashMap<String, Object>();
authManagerAttrs.put(NAME,"MANAGEMENT_MODE_AUTHENTICATION");
authManagerAttrs.put(ID, UUID.randomUUID());
SimpleAuthenticationManager authManager = new SimpleAuthenticationManager(authManagerAttrs, this);
authManager.addUser(BrokerOptions.MANAGEMENT_MODE_USER_NAME, _parent.getManagementModePassword());
_managementModeAuthenticationProvider = authManager;
_accessControl = AccessControl.ALWAYS_ALLOWED;
}
else
{
_accessControl = new CompoundAccessControl(Collections.<AccessControl<?>>emptyList(), Result.ALLOWED);
}
QpidServiceLoader qpidServiceLoader = new QpidServiceLoader();
final Set<String> systemNodeCreatorTypes = qpidServiceLoader.getInstancesByType(SystemNodeCreator.class).keySet();
_virtualHostPropertiesNodeEnabled = systemNodeCreatorTypes.contains(VirtualHostPropertiesNodeCreator.TYPE);
_messagesDelivered = new StatisticsCounter("messages-delivered");
_dataDelivered = new StatisticsCounter("bytes-delivered");
_messagesReceived = new StatisticsCounter("messages-received");
_dataReceived = new StatisticsCounter("bytes-received");
}
private void registerSystemAddressSpaces()
{
QpidServiceLoader qpidServiceLoader = new QpidServiceLoader();
Iterable<SystemAddressSpaceCreator> factories = qpidServiceLoader.instancesOf(SystemAddressSpaceCreator.class);
for(SystemAddressSpaceCreator creator : factories)
{
creator.register(_addressSpaceRegistry);
}
}
@Override
protected void postResolve()
{
super.postResolve();
Integer networkBufferSize = getContextValue(Integer.class, NETWORK_BUFFER_SIZE);
if (networkBufferSize == null || networkBufferSize < MINIMUM_NETWORK_BUFFER_SIZE)
{
throw new IllegalConfigurationException(NETWORK_BUFFER_SIZE + " is set to unacceptable value '" +
networkBufferSize + "'. Must be larger than " + MINIMUM_NETWORK_BUFFER_SIZE + ".");
}
_networkBufferSize = networkBufferSize;
int poolSize = getContextValue(Integer.class, BROKER_DIRECT_BYTE_BUFFER_POOL_SIZE);
QpidByteBuffer.initialisePool(_networkBufferSize, poolSize);
}
@Override
protected void postResolveChildren()
{
super.postResolveChildren();
final SystemConfig parent = getParent(SystemConfig.class);
Runnable task = parent.getOnContainerResolveTask();
if(task != null)
{
task.run();
}
addChangeListener(_accessControlProviderListener);
for(AccessControlProvider aclProvider : getChildren(AccessControlProvider.class))
{
aclProvider.addChangeListener(_accessControlProviderListener);
}
_eventLogger.message(BrokerMessages.CONFIG(parent instanceof FileBasedSettings
? ((FileBasedSettings) parent).getStorePath()
: "N/A"));
}
@Override
public void onValidate()
{
super.onValidate();
String modelVersion = (String) getActualAttributes().get(Broker.MODEL_VERSION);
if (modelVersion == null)
{
deleted();
throw new IllegalConfigurationException("Broker " + Broker.MODEL_VERSION + " must be specified");
}
if (!MODEL_VERSION_PATTERN.matcher(modelVersion).matches())
{
deleted();
throw new IllegalConfigurationException("Broker " + Broker.MODEL_VERSION + " is specified in incorrect format: "
+ modelVersion);
}
int versionSeparatorPosition = modelVersion.indexOf(".");
String majorVersionPart = modelVersion.substring(0, versionSeparatorPosition);
int majorModelVersion = Integer.parseInt(majorVersionPart);
int minorModelVersion = Integer.parseInt(modelVersion.substring(versionSeparatorPosition + 1));
if (majorModelVersion != BrokerModel.MODEL_MAJOR_VERSION || minorModelVersion > BrokerModel.MODEL_MINOR_VERSION)
{
deleted();
throw new IllegalConfigurationException("The model version '" + modelVersion
+ "' in configuration is incompatible with the broker model version '" + BrokerModel.MODEL_VERSION + "'");
}
if(!isDurable())
{
deleted();
throw new IllegalArgumentException(getClass().getSimpleName() + " must be durable");
}
}
@Override
protected void validateChange(final ConfiguredObject<?> proxyForValidation, final Set<String> changedAttributes)
{
super.validateChange(proxyForValidation, changedAttributes);
Broker updated = (Broker) proxyForValidation;
if (changedAttributes.contains(MODEL_VERSION) && !BrokerModel.MODEL_VERSION.equals(updated.getModelVersion()))
{
throw new IllegalConfigurationException("Cannot change the model version");
}
for (String attributeName : POSITIVE_NUMERIC_ATTRIBUTES)
{
if(changedAttributes.contains(attributeName))
{
Number value = (Number) updated.getAttribute(attributeName);
if (value != null && value.longValue() < 0)
{
throw new IllegalConfigurationException(
"Only positive integer value can be specified for the attribute "
+ attributeName);
}
}
}
}
@Override
protected void validateChildDelete(final ConfiguredObject<?> child)
{
super.validateChildDelete(child);
if(child instanceof AccessControlProvider && getChildren(AccessControlProvider.class).size() == 1)
{
String categoryName = child.getCategoryClass().getSimpleName();
throw new IllegalConfigurationException("The " + categoryName + " named '" + child.getName()
+ "' cannot be deleted as at least one " + categoryName
+ " must be present");
}
}
@StateTransition( currentState = State.UNINITIALIZED, desiredState = State.ACTIVE )
private ListenableFuture<Void> activate()
{
if(_parent.isManagementMode())
{
return doAfter(_managementModeAuthenticationProvider.openAsync(),
new Runnable()
{
@Override
public void run()
{
performActivation();
}
});
}
else
{
performActivation();
return Futures.immediateFuture(null);
}
}
@SuppressWarnings("unused")
@StateTransition( currentState = {State.ACTIVE, State.ERRORED}, desiredState = State.STOPPED )
private ListenableFuture<Void> doStop()
{
stopPreferenceTaskExecutor();
closePreferenceStore();
return Futures.immediateFuture(null);
}
private void closePreferenceStore()
{
PreferenceStore ps = _preferenceStore;
if (ps != null)
{
ps.close();
}
}
private void stopPreferenceTaskExecutor()
{
TaskExecutor preferenceTaskExecutor = _preferenceTaskExecutor;
if (preferenceTaskExecutor != null)
{
preferenceTaskExecutor.stop();
}
}
@Override
public void initiateShutdown()
{
getEventLogger().message(BrokerMessages.OPERATION("initiateShutdown"));
_parent.closeAsync();
}
@Override
public Map<String, Object> extractConfig(final boolean includeSecureAttributes)
{
return (new ConfigurationExtractor()).extractConfig(this, includeSecureAttributes);
}
private void performActivation()
{
boolean hasBrokerAnyErroredChildren = false;
List<ConfiguredObject<?>> failedChildren = new ArrayList<>();
for (final Class<? extends ConfiguredObject> childClass : getModel().getChildTypes(getCategoryClass()))
{
final Collection<? extends ConfiguredObject> children = getChildren(childClass);
if (children != null) {
for (final ConfiguredObject<?> child : children)
{
if (child.getState() == State.ERRORED )
{
hasBrokerAnyErroredChildren = true;
LOGGER.warn("Broker child object '{}' of type '{}' is {}",
child.getName(), childClass.getSimpleName(), State.ERRORED);
failedChildren.add(child);
}
}
}
}
if(!failedChildren.isEmpty())
{
getEventLogger().message(BrokerMessages.FAILED_CHILDREN(failedChildren.toString()));
}
_documentationUrl = getContextValue(String.class, QPID_DOCUMENTATION_URL);
final boolean brokerShutdownOnErroredChild = getContextValue(Boolean.class,
BROKER_FAIL_STARTUP_WITH_ERRORED_CHILD);
if (!_parent.isManagementMode() && brokerShutdownOnErroredChild && hasBrokerAnyErroredChildren)
{
throw new IllegalStateException(String.format("Broker context variable %s is set and the broker has %s children",
BROKER_FAIL_STARTUP_WITH_ERRORED_CHILD, State.ERRORED));
}
updateAccessControl();
initialiseStatisticsReporting();
_houseKeepingTaskExecutor = new HousekeepingExecutor("broker-" + getName() + "-pool",
getHousekeepingThreadCount(),
getSystemTaskSubject("Housekeeping", _principal));
final PreferenceStoreUpdaterImpl updater = new PreferenceStoreUpdaterImpl();
final Collection<PreferenceRecord> preferenceRecords = _preferenceStore.openAndLoad(updater);
_preferenceTaskExecutor = new TaskExecutorImpl("broker-" + getName() + "-preferences", null);
_preferenceTaskExecutor.start();
PreferencesRecoverer preferencesRecoverer = new PreferencesRecoverer(_preferenceTaskExecutor);
preferencesRecoverer.recoverPreferences(this, preferenceRecords, _preferenceStore);
if (isManagementMode())
{
_eventLogger.message(BrokerMessages.MANAGEMENT_MODE(BrokerOptions.MANAGEMENT_MODE_USER_NAME,
_parent.getManagementModePassword()));
}
setState(State.ACTIVE);
}
private void initialiseStatisticsReporting()
{
long report = ((Number)getAttribute(Broker.STATISTICS_REPORTING_PERIOD)).intValue() * 1000; // convert to ms
final boolean reset = (Boolean)getAttribute(Broker.STATISTICS_REPORTING_RESET_ENABLED);
/* add a timer task to report statistics if generation is enabled for broker or virtualhosts */
if (report > 0L)
{
_reportingTimer = new Timer("Statistics-Reporting", true);
StatisticsReportingTask task = new StatisticsReportingTask(reset, _eventLogger);
_reportingTimer.scheduleAtFixedRate(task, report / 2, report);
}
}
@Override
public int getConnection_sessionCountLimit()
{
return _connection_sessionCountLimit;
}
@Override
public int getConnection_heartBeatDelay()
{
return _connection_heartBeatDelay;
}
@Override
public boolean getConnection_closeWhenNoRoute()
{
return _connection_closeWhenNoRoute;
}
@Override
public int getStatisticsReportingPeriod()
{
return _statisticsReportingPeriod;
}
@Override
public boolean getStatisticsReportingResetEnabled()
{
return _statisticsReportingResetEnabled;
}
@Override
public boolean isMessageCompressionEnabled()
{
return _messageCompressionEnabled;
}
@Override
public Collection<VirtualHostNode<?>> getVirtualHostNodes()
{
Collection children = getChildren(VirtualHostNode.class);
return children;
}
public Collection<Port<?>> getPorts()
{
Collection children = getChildren(Port.class);
return children;
}
public Collection<AuthenticationProvider<?>> getAuthenticationProviders()
{
Collection children = getChildren(AuthenticationProvider.class);
return children;
}
@Override
public synchronized void assignTargetSizes()
{
long totalTarget = getContextValue(Long.class, BROKER_FLOW_TO_DISK_THRESHOLD);
LOGGER.debug("Assigning target sizes based on total target {}", totalTarget);
long totalSize = 0l;
Collection<VirtualHostNode<?>> vhns = getVirtualHostNodes();
Map<VirtualHost<?>, Long> vhs = new HashMap<>();
for (VirtualHostNode<?> vhn : vhns)
{
VirtualHost<?> vh = vhn.getVirtualHost();
if (vh != null)
{
long totalQueueDepthBytes = vh.getTotalQueueDepthBytes();
vhs.put(vh, totalQueueDepthBytes);
totalSize += totalQueueDepthBytes;
}
}
if (totalSize > totalTarget && !_totalMessageSizeExceedThresholdReported)
{
_eventLogger.message(BrokerMessages.FLOW_TO_DISK_ACTIVE(totalSize / 1024, totalTarget / 1024));
_totalMessageSizeExceedThresholdReported = true;
_totalMessageSizeWithinThresholdReported = false;
}
else if (totalSize <= totalTarget && !_totalMessageSizeWithinThresholdReported)
{
_eventLogger.message(BrokerMessages.FLOW_TO_DISK_INACTIVE(totalSize / 1024, totalTarget / 1024));
_totalMessageSizeWithinThresholdReported = true;
_totalMessageSizeExceedThresholdReported = false;
}
final long proportionalShare = (long) ((double) totalTarget / (double) vhs.size());
for (Map.Entry<VirtualHost<?>, Long> entry : vhs.entrySet())
{
long virtualHostTotalQueueSize = entry.getValue();
final long size;
if (totalSize == 0)
{
size = proportionalShare;
}
else
{
long queueSizeBasedShare = (totalTarget * virtualHostTotalQueueSize) / (2 * totalSize);
size = queueSizeBasedShare + (proportionalShare / 2);
}
if (LOGGER.isDebugEnabled())
{
LOGGER.debug("Assigning target size {} to vhost {}", size, entry.getKey());
}
entry.getKey().setTargetSize(size);
}
}
@Override
protected void onOpen()
{
super.onOpen();
PreferencesRoot preferencesRoot = getParent(SystemConfig.class);
_preferenceStore = preferencesRoot.createPreferenceStore();
getEventLogger().message(BrokerMessages.STARTUP(CommonProperties.getReleaseVersion(),
CommonProperties.getBuildVersion()));
getEventLogger().message(BrokerMessages.PLATFORM(System.getProperty("java.vendor"),
System.getProperty("java.runtime.version",
System.getProperty("java.version")),
SystemUtils.getOSName(),
SystemUtils.getOSVersion(),
SystemUtils.getOSArch(),
String.valueOf(getNumberOfCores())));
long directMemory = getMaxDirectMemorySize();
long heapMemory = Runtime.getRuntime().maxMemory();
getEventLogger().message(BrokerMessages.MAX_MEMORY(heapMemory, directMemory));
if (SystemUtils.getProcessPid() != null)
{
getEventLogger().message(BrokerMessages.PROCESS(SystemUtils.getProcessPid()));
}
registerSystemAddressSpaces();
assignTargetSizes();
}
@Override
public NamedAddressSpace getSystemAddressSpace(String name)
{
return _addressSpaceRegistry.getAddressSpace(name);
}
@Override
public Collection<GroupProvider<?>> getGroupProviders()
{
Collection children = getChildren(GroupProvider.class);
return children;
}
private ListenableFuture<VirtualHostNode> createVirtualHostNodeAsync(Map<String, Object> attributes)
throws AccessControlException, IllegalArgumentException
{
return doAfter(getObjectFactory().createAsync(VirtualHostNode.class, attributes, this),
new CallableWithArgument<ListenableFuture<VirtualHostNode>, VirtualHostNode>()
{
@Override
public ListenableFuture<VirtualHostNode> call(final VirtualHostNode virtualHostNode)
throws Exception
{
// permission has already been granted to create the virtual host
// disable further access check on other operations, e.g. create exchange
Subject.doAs(getSubjectWithAddedSystemRights(),
new PrivilegedAction<Object>()
{
@Override
public Object run()
{
virtualHostNode.start();
return null;
}
});
return Futures.immediateFuture(virtualHostNode);
}
});
}
@Override
public long getBytesIn()
{
return getDataReceiptStatistics().getTotal();
}
@Override
public long getBytesOut()
{
return getDataDeliveryStatistics().getTotal();
}
@Override
public long getMessagesIn()
{
return getMessageReceiptStatistics().getTotal();
}
@Override
public long getMessagesOut()
{
return getMessageDeliveryStatistics().getTotal();
}
@SuppressWarnings("unchecked")
@Override
public <C extends ConfiguredObject> ListenableFuture<C> addChildAsync(final Class<C> childClass, final Map<String, Object> attributes, final ConfiguredObject... otherParents)
{
if (childClass == VirtualHostNode.class)
{
return (ListenableFuture<C>) createVirtualHostNodeAsync(attributes);
}
else
{
return getObjectFactory().createAsync(childClass, attributes, this);
}
}
@Override
protected ListenableFuture<Void> beforeClose()
{
_brokerLoggersToClose = new ArrayList(getChildren(BrokerLogger.class));
return super.beforeClose();
}
@Override
protected void onClose()
{
if (_reportingTimer != null)
{
_reportingTimer.cancel();
}
shutdownHouseKeeping();
stopPreferenceTaskExecutor();
closePreferenceStore();
_eventLogger.message(BrokerMessages.STOPPED());
try
{
for (BrokerLogger<?> logger : _brokerLoggersToClose)
{
logger.stopLogging();
}
}
finally
{
Runnable task = _parent.getOnContainerCloseTask();
if(task != null)
{
task.run();
}
}
}
@Override
public UserPreferences createUserPreferences(final ConfiguredObject<?> object)
{
return new UserPreferencesImpl(_preferenceTaskExecutor, object, _preferenceStore, Collections.<Preference>emptySet());
}
private void updateAccessControl()
{
if(!isManagementMode())
{
List<AccessControlProvider> children = new ArrayList<>(getChildren(AccessControlProvider.class));
Collections.sort(children, CommonAccessControlProvider.ACCESS_CONTROL_PROVIDER_COMPARATOR);
List<AccessControl<?>> accessControls = new ArrayList<>(children.size()+1);
accessControls.add(_systemUserAllowed);
for(AccessControlProvider prov : children)
{
if(prov.getState() == State.ERRORED)
{
accessControls.clear();
accessControls.add(AccessControl.ALWAYS_DENIED);
break;
}
else if(prov.getState() == State.ACTIVE)
{
accessControls.add(prov.getAccessControl());
}
}
((CompoundAccessControl)_accessControl).setAccessControls(accessControls);
}
}
public AccessControl getAccessControl()
{
return _accessControl;
}
@Override
public VirtualHost<?> findVirtualHostByName(String name)
{
for (VirtualHostNode<?> virtualHostNode : getChildren(VirtualHostNode.class))
{
VirtualHost<?> virtualHost = virtualHostNode.getVirtualHost();
if (virtualHost != null && virtualHost.getName().equals(name))
{
return virtualHost;
}
}
return null;
}
@Override
public VirtualHostNode findDefautVirtualHostNode()
{
VirtualHostNode existingDefault = null;
Collection<VirtualHostNode<?>> virtualHostNodes = new ArrayList<>(getVirtualHostNodes());
for(VirtualHostNode node : virtualHostNodes)
{
if (node.isDefaultVirtualHostNode())
{
existingDefault = node;
break;
}
}
return existingDefault;
}
@Override
public Collection<KeyStore<?>> getKeyStores()
{
Collection children = getChildren(KeyStore.class);
return children;
}
@Override
public Collection<TrustStore<?>> getTrustStores()
{
Collection children = getChildren(TrustStore.class);
return children;
}
@Override
public boolean isManagementMode()
{
return _parent.isManagementMode();
}
@Override
public Collection<AccessControlProvider<?>> getAccessControlProviders()
{
Collection children = getChildren(AccessControlProvider.class);
return children;
}
@Override
protected void onExceptionInOpen(RuntimeException e)
{
_eventLogger.message(BrokerMessages.FATAL_ERROR(e.getMessage()));
}
public void registerMessageDelivered(long messageSize)
{
_messagesDelivered.registerEvent(1L);
_dataDelivered.registerEvent(messageSize);
}
public void registerMessageReceived(long messageSize, long timestamp)
{
_messagesReceived.registerEvent(1L, timestamp);
_dataReceived.registerEvent(messageSize, timestamp);
}
public StatisticsCounter getMessageReceiptStatistics()
{
return _messagesReceived;
}
public StatisticsCounter getDataReceiptStatistics()
{
return _dataReceived;
}
public StatisticsCounter getMessageDeliveryStatistics()
{
return _messagesDelivered;
}
public StatisticsCounter getDataDeliveryStatistics()
{
return _dataDelivered;
}
public void resetStatistics()
{
_messagesDelivered.reset();
_dataDelivered.reset();
_messagesReceived.reset();
_dataReceived.reset();
for (VirtualHostNode<?> virtualHostNode : getChildren(VirtualHostNode.class))
{
VirtualHost<?> virtualHost = virtualHostNode.getVirtualHost();
if (virtualHost != null)
{
virtualHost.resetStatistics();
}
}
}
private class StatisticsReportingTask extends TimerTask
{
private final int DELIVERED = 0;
private final int RECEIVED = 1;
private final boolean _reset;
private final EventLogger _logger;
private final Subject _subject;
public StatisticsReportingTask(boolean reset, EventLogger logger)
{
_reset = reset;
_logger = logger;
_subject = getSystemTaskSubject("Statistics");
}
public void run()
{
Subject.doAs(_subject, new PrivilegedAction<Object>()
{
@Override
public Object run()
{
reportStatistics();
return null;
}
});
}
protected void reportStatistics()
{
try
{
_eventLogger.message(BrokerMessages.STATS_DATA(DELIVERED, _dataDelivered.getPeak() / 1024.0, _dataDelivered.getTotal()));
_eventLogger.message(BrokerMessages.STATS_MSGS(DELIVERED, _messagesDelivered.getPeak(), _messagesDelivered.getTotal()));
_eventLogger.message(BrokerMessages.STATS_DATA(RECEIVED, _dataReceived.getPeak() / 1024.0, _dataReceived.getTotal()));
_eventLogger.message(BrokerMessages.STATS_MSGS(RECEIVED,
_messagesReceived.getPeak(),
_messagesReceived.getTotal()));
for (VirtualHostNode<?> virtualHostNode : getChildren(VirtualHostNode.class))
{
VirtualHost<?> virtualHost = virtualHostNode.getVirtualHost();
if (virtualHost != null)
{
String name = virtualHost.getName();
StatisticsCounter dataDelivered = virtualHost.getDataDeliveryStatistics();
StatisticsCounter messagesDelivered = virtualHost.getMessageDeliveryStatistics();
StatisticsCounter dataReceived = virtualHost.getDataReceiptStatistics();
StatisticsCounter messagesReceived = virtualHost.getMessageReceiptStatistics();
EventLogger logger = virtualHost.getEventLogger();
logger.message(VirtualHostMessages.STATS_DATA(name,
DELIVERED,
dataDelivered.getPeak() / 1024.0,
dataDelivered.getTotal()));
logger.message(VirtualHostMessages.STATS_MSGS(name,
DELIVERED,
messagesDelivered.getPeak(),
messagesDelivered.getTotal()));
logger.message(VirtualHostMessages.STATS_DATA(name,
RECEIVED,
dataReceived.getPeak() / 1024.0,
dataReceived.getTotal()));
logger.message(VirtualHostMessages.STATS_MSGS(name,
RECEIVED,
messagesReceived.getPeak(),
messagesReceived.getTotal()));
}
}
if (_reset)
{
resetStatistics();
}
}
catch(Exception e)
{
LOGGER.warn("Unexpected exception occurred while reporting the statistics", e);
}
}
}
@Override
public boolean isVirtualHostPropertiesNodeEnabled()
{
return _virtualHostPropertiesNodeEnabled;
}
public AuthenticationProvider<?> getManagementModeAuthenticationProvider()
{
return _managementModeAuthenticationProvider;
}
@Override
public int getNetworkBufferSize()
{
return _networkBufferSize;
}
public String getDocumentationUrl()
{
return _documentationUrl;
}
@Override
public Principal getUser()
{
return AuthenticatedPrincipal.getCurrentUser();
}
@Override
public Set<Principal> getGroups()
{
Subject currentSubject = Subject.getSubject(AccessController.getContext());
if (currentSubject == null)
{
return Collections.emptySet();
}
final Set<Principal> currentPrincipals = Collections.<Principal>unmodifiableSet(currentSubject.getPrincipals(GroupPrincipal.class));
return currentPrincipals;
}
@Override
public void purgeUser(final AuthenticationProvider<?> origin, final String username)
{
doPurgeUser(origin, username);
}
private void doPurgeUser(final AuthenticationProvider<?> origin, final String username)
{
// remove from AuthenticationProvider
if (origin instanceof PasswordCredentialManagingAuthenticationProvider)
{
try
{
((PasswordCredentialManagingAuthenticationProvider) origin).deleteUser(username);
}
catch (AccountNotFoundException e)
{
// pass
}
}
// remove from Groups
final Collection<GroupProvider> groupProviders = getChildren(GroupProvider.class);
for (GroupProvider<?> groupProvider : groupProviders)
{
final Collection<Group> groups = groupProvider.getChildren(Group.class);
for (Group<?> group : groups)
{
final Collection<GroupMember> members = group.getChildren(GroupMember.class);
for (GroupMember<?> member : members)
{
if (username.equals(member.getName()))
{
member.delete();
}
}
}
}
// remove Preferences from all ConfiguredObjects
Subject userSubject = new Subject(true,
Collections.singleton(new AuthenticatedPrincipal(new UsernamePrincipal(username, origin))),
Collections.EMPTY_SET,
Collections.EMPTY_SET);
java.util.Queue<ConfiguredObject<?>> configuredObjects = new LinkedList<>();
configuredObjects.add(BrokerImpl.this);
while (!configuredObjects.isEmpty())
{
final ConfiguredObject<?> currentObject = configuredObjects.poll();
final Collection<Class<? extends ConfiguredObject>> childClasses = getModel().getChildTypes(currentObject.getClass());
for (Class<? extends ConfiguredObject> childClass : childClasses)
{
final Collection<? extends ConfiguredObject> children = currentObject.getChildren(childClass);
for (ConfiguredObject child : children)
{
configuredObjects.add(child);
}
}
Subject.doAs(userSubject, new PrivilegedAction<Void>()
{
@Override
public Void run()
{
currentObject.getUserPreferences().delete(null, null, null);
return null;
}
});
}
}
protected void shutdownHouseKeeping()
{
if(_houseKeepingTaskExecutor != null)
{
_houseKeepingTaskExecutor.shutdown();
try
{
if (!_houseKeepingTaskExecutor.awaitTermination(HOUSEKEEPING_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS))
{
_houseKeepingTaskExecutor.shutdownNow();
}
}
catch (InterruptedException e)
{
LOGGER.warn("Interrupted during Housekeeping shutdown:", e);
Thread.currentThread().interrupt();
}
}
}
private class AddressSpaceRegistry implements SystemAddressSpaceCreator.AddressSpaceRegistry
{
private final ConcurrentMap<String, NamedAddressSpace> _systemAddressSpaces = new ConcurrentHashMap<>();
@Override
public void registerAddressSpace(final NamedAddressSpace addressSpace)
{
_systemAddressSpaces.put(addressSpace.getName(), addressSpace);
}
@Override
public void removeAddressSpace(final NamedAddressSpace addressSpace)
{
_systemAddressSpaces.remove(addressSpace.getName(), addressSpace);
}
@Override
public void removeAddressSpace(final String name)
{
_systemAddressSpaces.remove(name);
}
@Override
public NamedAddressSpace getAddressSpace(final String name)
{
return name == null ? null : _systemAddressSpaces.get(name);
}
@Override
public Broker<?> getBroker()
{
return BrokerImpl.this;
}
}
private final class AccessControlProviderListener implements ConfigurationChangeListener
{
private final Set<ConfiguredObject<?>> _bulkChanges = new HashSet<>();
@Override
public void stateChanged(final ConfiguredObject<?> object, final State oldState, final State newState)
{
}
@Override
public void childAdded(final ConfiguredObject<?> object, final ConfiguredObject<?> child)
{
if(object.getCategoryClass() == Broker.class && child.getCategoryClass() == AccessControlProvider.class)
{
child.addChangeListener(this);
BrokerImpl.this.updateAccessControl();
}
}
@Override
public void childRemoved(final ConfiguredObject<?> object, final ConfiguredObject<?> child)
{
if(object.getCategoryClass() == Broker.class && child.getCategoryClass() == AccessControlProvider.class)
{
BrokerImpl.this.updateAccessControl();
}
}
@Override
public void attributeSet(final ConfiguredObject<?> object,
final String attributeName,
final Object oldAttributeValue,
final Object newAttributeValue)
{
if(object.getCategoryClass() == AccessControlProvider.class && !_bulkChanges.contains(object))
{
BrokerImpl.this.updateAccessControl();
}
}
@Override
public void bulkChangeStart(final ConfiguredObject<?> object)
{
if(object.getCategoryClass() == AccessControlProvider.class)
{
_bulkChanges.add(object);
}
}
@Override
public void bulkChangeEnd(final ConfiguredObject<?> object)
{
if(object.getCategoryClass() == AccessControlProvider.class)
{
_bulkChanges.remove(object);
BrokerImpl.this.updateAccessControl();
}
}
}
}