blob: ab21a7a11a0a7d47a98247d2aa56c32f150906bf [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.model.adapter;
import java.io.IOException;
import java.io.OutputStream;
import java.lang.management.BufferPoolMXBean;
import java.lang.management.LockInfo;
import java.lang.management.ManagementFactory;
import java.lang.management.MonitorInfo;
import java.lang.management.PlatformManagedObject;
import java.lang.management.RuntimeMXBean;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.charset.Charset;
import java.security.AccessControlException;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.UUID;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.security.auth.Subject;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.logging.QpidLoggerTurboFilter;
import org.apache.qpid.server.logging.StartupAppender;
import org.apache.qpid.server.security.access.Operation;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.configuration.CommonProperties;
import org.apache.qpid.server.BrokerOptions;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.LogRecorder;
import org.apache.qpid.server.logging.messages.BrokerMessages;
import org.apache.qpid.server.logging.messages.VirtualHostMessages;
import org.apache.qpid.server.model.*;
import org.apache.qpid.server.plugin.ConfigurationSecretEncrypterFactory;
import org.apache.qpid.server.plugin.PluggableFactoryLoader;
import org.apache.qpid.server.plugin.QpidServiceLoader;
import org.apache.qpid.server.plugin.SystemNodeCreator;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.security.auth.manager.SimpleAuthenticationManager;
import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.stats.StatisticsGatherer;
import org.apache.qpid.server.store.FileBasedSettings;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.server.virtualhost.VirtualHostPropertiesNodeCreator;
import org.apache.qpid.util.SystemUtils;
public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> implements Broker<BrokerAdapter>, StatisticsGatherer
{
private static final Logger LOGGER = LoggerFactory.getLogger(BrokerAdapter.class);
private static final Pattern MODEL_VERSION_PATTERN = Pattern.compile("^\\d+\\.\\d+$");
public static final String MANAGEMENT_MODE_AUTHENTICATION = "MANAGEMENT_MODE_AUTHENTICATION";
private String[] POSITIVE_NUMERIC_ATTRIBUTES = { CONNECTION_SESSION_COUNT_LIMIT,
CONNECTION_HEART_BEAT_DELAY, STATISTICS_REPORTING_PERIOD };
private SystemConfig<?> _parent;
private EventLogger _eventLogger;
private LogRecorder _logRecorder;
private final SecurityManager _securityManager;
private AuthenticationProvider<?> _managementModeAuthenticationProvider;
private Timer _reportingTimer;
private final StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
/** Flags used to control the reporting of flow to disk. Protected by this */
private boolean _totalMessageSizeExceedThresholdReported = false, _totalMessageSizeWithinThresholdReported = true;
@ManagedAttributeField
private int _connection_sessionCountLimit;
@ManagedAttributeField
private int _connection_heartBeatDelay;
@ManagedAttributeField
private boolean _connection_closeWhenNoRoute;
@ManagedAttributeField
private int _statisticsReportingPeriod;
@ManagedAttributeField
private boolean _statisticsReportingResetEnabled;
@ManagedAttributeField
private boolean _messageCompressionEnabled;
@ManagedAttributeField(afterSet = "postEncrypterProviderSet")
private String _confidentialConfigurationEncryptionProvider;
private final boolean _virtualHostPropertiesNodeEnabled;
private Collection<BrokerLogger> _brokerLoggersToClose;
private int _networkBufferSize = DEFAULT_NETWORK_BUFFER_SIZE;
private final long _maximumHeapHize = Runtime.getRuntime().maxMemory();
private final long _maximumDirectMemorySize = getMaxDirectMemorySize();
private final BufferPoolMXBean _bufferPoolMXBean;
private final List<String> _jvmArguments;
@ManagedObjectFactoryConstructor
public BrokerAdapter(Map<String, Object> attributes,
SystemConfig parent)
{
super(parentsMap(parent), attributes);
_parent = parent;
_eventLogger = parent.getEventLogger();
_securityManager = new SecurityManager(this, parent.isManagementMode());
if (parent.isManagementMode())
{
Map<String,Object> authManagerAttrs = new HashMap<String, Object>();
authManagerAttrs.put(NAME,"MANAGEMENT_MODE_AUTHENTICATION");
authManagerAttrs.put(ID, UUID.randomUUID());
SimpleAuthenticationManager authManager = new SimpleAuthenticationManager(authManagerAttrs, this);
authManager.addUser(BrokerOptions.MANAGEMENT_MODE_USER_NAME, _parent.getManagementModePassword());
_managementModeAuthenticationProvider = authManager;
}
QpidServiceLoader qpidServiceLoader = new QpidServiceLoader();
final Set<String> systemNodeCreatorTypes = qpidServiceLoader.getInstancesByType(SystemNodeCreator.class).keySet();
_virtualHostPropertiesNodeEnabled = systemNodeCreatorTypes.contains(VirtualHostPropertiesNodeCreator.TYPE);
if(attributes.get(CONFIDENTIAL_CONFIGURATION_ENCRYPTION_PROVIDER) != null )
{
final String encryptionProviderType = String.valueOf(attributes.get(CONFIDENTIAL_CONFIGURATION_ENCRYPTION_PROVIDER));
updateEncrypter(encryptionProviderType);
}
_messagesDelivered = new StatisticsCounter("messages-delivered");
_dataDelivered = new StatisticsCounter("bytes-delivered");
_messagesReceived = new StatisticsCounter("messages-received");
_dataReceived = new StatisticsCounter("bytes-received");
BufferPoolMXBean bufferPoolMXBean = null;
List<BufferPoolMXBean> bufferPoolMXBeans = ManagementFactory.getPlatformMXBeans(BufferPoolMXBean.class);
for(BufferPoolMXBean mBean : bufferPoolMXBeans)
{
if (mBean.getName().equals("direct"))
{
bufferPoolMXBean = mBean;
break;
}
}
_bufferPoolMXBean = bufferPoolMXBean;
_jvmArguments = ManagementFactory.getRuntimeMXBean().getInputArguments();
}
private void updateEncrypter(final String encryptionProviderType)
{
if(encryptionProviderType != null && !"".equals(encryptionProviderType.trim()))
{
PluggableFactoryLoader<ConfigurationSecretEncrypterFactory> factoryLoader =
new PluggableFactoryLoader<>(ConfigurationSecretEncrypterFactory.class);
ConfigurationSecretEncrypterFactory factory = factoryLoader.get(encryptionProviderType);
if (factory == null)
{
throw new IllegalConfigurationException("Unknown Configuration Secret Encryption method "
+ encryptionProviderType);
}
setEncrypter(factory.createEncrypter(this));
}
else
{
setEncrypter(null);
}
}
@Override
protected void postResolve()
{
super.postResolve();
Integer networkBufferSize = getContextValue(Integer.class, NETWORK_BUFFER_SIZE);
if (networkBufferSize == null || networkBufferSize < MINIMUM_NETWORK_BUFFER_SIZE)
{
throw new IllegalConfigurationException(NETWORK_BUFFER_SIZE + " is set to unacceptable value '" +
networkBufferSize + "'. Must be larger than " + MINIMUM_NETWORK_BUFFER_SIZE + ".");
}
_networkBufferSize = networkBufferSize;
int poolSize = getContextValue(Integer.class, BROKER_DIRECT_BYTE_BUFFER_POOL_SIZE);
QpidByteBuffer.initialisePool(_networkBufferSize, poolSize);
}
@Override
protected void postResolveChildren()
{
super.postResolveChildren();
ch.qos.logback.classic.Logger rootLogger =
(ch.qos.logback.classic.Logger) LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME);
StartupAppender startupAppender = (StartupAppender) rootLogger.getAppender(StartupAppender.class.getName());
if (startupAppender != null)
{
rootLogger.detachAppender(startupAppender);
startupAppender.stop();
}
final SystemConfig parent = getParent(SystemConfig.class);
_eventLogger.message(BrokerMessages.CONFIG(parent instanceof FileBasedSettings
? ((FileBasedSettings) parent).getStorePath()
: "N/A"));
}
@Override
public void onValidate()
{
super.onValidate();
String modelVersion = (String) getActualAttributes().get(Broker.MODEL_VERSION);
if (modelVersion == null)
{
deleted();
throw new IllegalConfigurationException("Broker " + Broker.MODEL_VERSION + " must be specified");
}
if (!MODEL_VERSION_PATTERN.matcher(modelVersion).matches())
{
deleted();
throw new IllegalConfigurationException("Broker " + Broker.MODEL_VERSION + " is specified in incorrect format: "
+ modelVersion);
}
int versionSeparatorPosition = modelVersion.indexOf(".");
String majorVersionPart = modelVersion.substring(0, versionSeparatorPosition);
int majorModelVersion = Integer.parseInt(majorVersionPart);
int minorModelVersion = Integer.parseInt(modelVersion.substring(versionSeparatorPosition + 1));
if (majorModelVersion != BrokerModel.MODEL_MAJOR_VERSION || minorModelVersion > BrokerModel.MODEL_MINOR_VERSION)
{
deleted();
throw new IllegalConfigurationException("The model version '" + modelVersion
+ "' in configuration is incompatible with the broker model version '" + BrokerModel.MODEL_VERSION + "'");
}
if(!isDurable())
{
deleted();
throw new IllegalArgumentException(getClass().getSimpleName() + " must be durable");
}
Collection<AccessControlProvider<?>> accessControlProviders = getAccessControlProviders();
if(accessControlProviders != null && accessControlProviders.size() > 1)
{
deleted();
throw new IllegalArgumentException("At most one AccessControlProvider can be defined");
}
}
@Override
protected void validateChange(final ConfiguredObject<?> proxyForValidation, final Set<String> changedAttributes)
{
super.validateChange(proxyForValidation, changedAttributes);
Broker updated = (Broker) proxyForValidation;
if (changedAttributes.contains(MODEL_VERSION) && !BrokerModel.MODEL_VERSION.equals(updated.getModelVersion()))
{
throw new IllegalConfigurationException("Cannot change the model version");
}
for (String attributeName : POSITIVE_NUMERIC_ATTRIBUTES)
{
if(changedAttributes.contains(attributeName))
{
Number value = (Number) updated.getAttribute(attributeName);
if (value != null && value.longValue() < 0)
{
throw new IllegalConfigurationException(
"Only positive integer value can be specified for the attribute "
+ attributeName);
}
}
}
}
@StateTransition( currentState = State.UNINITIALIZED, desiredState = State.ACTIVE )
private ListenableFuture<Void> activate()
{
if(_parent.isManagementMode())
{
return doAfter(_managementModeAuthenticationProvider.openAsync(),
new Runnable()
{
@Override
public void run()
{
performActivation();
}
});
}
else
{
performActivation();
return Futures.immediateFuture(null);
}
}
@Override
public void initiateShutdown()
{
_securityManager.authorise(Operation.SHUTDOWN, this);
getEventLogger().message(BrokerMessages.OPERATION("initiateShutdown"));
_parent.closeAsync();
}
private void performActivation()
{
boolean hasBrokerAnyErroredChildren = false;
List<ConfiguredObject<?>> failedChildren = new ArrayList<>();
for (final Class<? extends ConfiguredObject> childClass : getModel().getChildTypes(getCategoryClass()))
{
final Collection<? extends ConfiguredObject> children = getChildren(childClass);
if (children != null) {
for (final ConfiguredObject<?> child : children)
{
if (child.getState() == State.ERRORED )
{
hasBrokerAnyErroredChildren = true;
LOGGER.warn("Broker child object '{}' of type '{}' is {}",
child.getName(), childClass.getSimpleName(), State.ERRORED);
failedChildren.add(child);
}
}
}
}
if(!failedChildren.isEmpty())
{
getEventLogger().message(BrokerMessages.FAILED_CHILDREN(failedChildren.toString()));
}
final boolean brokerShutdownOnErroredChild = getContextValue(Boolean.class,
BROKER_FAIL_STARTUP_WITH_ERRORED_CHILD);
if (!_parent.isManagementMode() && brokerShutdownOnErroredChild && hasBrokerAnyErroredChildren)
{
throw new IllegalStateException(String.format("Broker context variable %s is set and the broker has %s children",
BROKER_FAIL_STARTUP_WITH_ERRORED_CHILD, State.ERRORED));
}
initialiseStatisticsReporting();
if (isManagementMode())
{
_eventLogger.message(BrokerMessages.MANAGEMENT_MODE(BrokerOptions.MANAGEMENT_MODE_USER_NAME,
_parent.getManagementModePassword()));
}
setState(State.ACTIVE);
}
private void initialiseStatisticsReporting()
{
long report = ((Number)getAttribute(Broker.STATISTICS_REPORTING_PERIOD)).intValue() * 1000; // convert to ms
final boolean reset = (Boolean)getAttribute(Broker.STATISTICS_REPORTING_RESET_ENABLED);
/* add a timer task to report statistics if generation is enabled for broker or virtualhosts */
if (report > 0L)
{
_reportingTimer = new Timer("Statistics-Reporting", true);
StatisticsReportingTask task = new StatisticsReportingTask(reset, _eventLogger);
_reportingTimer.scheduleAtFixedRate(task, report / 2, report);
}
}
@Override
public String getBuildVersion()
{
return CommonProperties.getBuildVersion();
}
@Override
public String getOperatingSystem()
{
return SystemUtils.getOSString();
}
@Override
public String getPlatform()
{
return System.getProperty("java.vendor") + " "
+ System.getProperty("java.runtime.version", System.getProperty("java.version"));
}
@Override
public String getProcessPid()
{
return SystemUtils.getProcessPid();
}
@Override
public String getProductVersion()
{
return CommonProperties.getReleaseVersion();
}
@Override
public int getNumberOfCores()
{
return Runtime.getRuntime().availableProcessors();
}
@Override
public int getConnection_sessionCountLimit()
{
return _connection_sessionCountLimit;
}
@Override
public int getConnection_heartBeatDelay()
{
return _connection_heartBeatDelay;
}
@Override
public boolean getConnection_closeWhenNoRoute()
{
return _connection_closeWhenNoRoute;
}
@Override
public int getStatisticsReportingPeriod()
{
return _statisticsReportingPeriod;
}
@Override
public boolean getStatisticsReportingResetEnabled()
{
return _statisticsReportingResetEnabled;
}
@Override
public boolean isMessageCompressionEnabled()
{
return _messageCompressionEnabled;
}
@Override
public String getConfidentialConfigurationEncryptionProvider()
{
return _confidentialConfigurationEncryptionProvider;
}
@Override
public String getModelVersion()
{
return BrokerModel.MODEL_VERSION;
}
@Override
public Collection<VirtualHostNode<?>> getVirtualHostNodes()
{
Collection children = getChildren(VirtualHostNode.class);
return children;
}
public Collection<Port<?>> getPorts()
{
Collection children = getChildren(Port.class);
return children;
}
public Collection<AuthenticationProvider<?>> getAuthenticationProviders()
{
Collection children = getChildren(AuthenticationProvider.class);
return children;
}
@Override
public synchronized void assignTargetSizes()
{
long totalTarget = getContextValue(Long.class, BROKER_FLOW_TO_DISK_THRESHOLD);
LOGGER.debug("Assigning target sizes based on total target {}", totalTarget);
long totalSize = 0l;
Collection<VirtualHostNode<?>> vhns = getVirtualHostNodes();
Map<VirtualHost<?, ?, ?>, Long> vhs = new HashMap<>();
for (VirtualHostNode<?> vhn : vhns)
{
VirtualHost<?, ?, ?> vh = vhn.getVirtualHost();
if (vh != null)
{
long totalQueueDepthBytes = vh.getTotalQueueDepthBytes();
vhs.put(vh, totalQueueDepthBytes);
totalSize += totalQueueDepthBytes;
}
}
if (totalSize > totalTarget && !_totalMessageSizeExceedThresholdReported)
{
_eventLogger.message(BrokerMessages.FLOW_TO_DISK_ACTIVE(totalSize / 1024, totalTarget / 1024));
_totalMessageSizeExceedThresholdReported = true;
_totalMessageSizeWithinThresholdReported = false;
}
else if (totalSize <= totalTarget && !_totalMessageSizeWithinThresholdReported)
{
_eventLogger.message(BrokerMessages.FLOW_TO_DISK_INACTIVE(totalSize / 1024, totalTarget / 1024));
_totalMessageSizeWithinThresholdReported = true;
_totalMessageSizeExceedThresholdReported = false;
}
final long proportionalShare = (long) ((double) totalTarget / (double) vhs.size());
for (Map.Entry<VirtualHost<?, ?, ?>, Long> entry : vhs.entrySet())
{
long virtualHostTotalQueueSize = entry.getValue();
final long size;
if (totalSize == 0)
{
size = proportionalShare;
}
else
{
long queueSizeBasedShare = (totalTarget * virtualHostTotalQueueSize) / (2 * totalSize);
size = queueSizeBasedShare + (proportionalShare / 2);
}
if (LOGGER.isDebugEnabled())
{
LOGGER.debug("Assigning target size {} to vhost {}", size, entry.getKey());
}
entry.getKey().setTargetSize(size);
}
}
@Override
protected void onOpen()
{
super.onOpen();
getEventLogger().message(BrokerMessages.STARTUP(CommonProperties.getReleaseVersion(),
CommonProperties.getBuildVersion()));
getEventLogger().message(BrokerMessages.PLATFORM(System.getProperty("java.vendor"),
System.getProperty("java.runtime.version",
System.getProperty("java.version")),
SystemUtils.getOSName(),
SystemUtils.getOSVersion(),
SystemUtils.getOSArch(),
String.valueOf(getNumberOfCores())));
long directMemory = getMaxDirectMemorySize();
long heapMemory = Runtime.getRuntime().maxMemory();
getEventLogger().message(BrokerMessages.MAX_MEMORY(heapMemory, directMemory));
if (SystemUtils.getProcessPid() != null)
{
getEventLogger().message(BrokerMessages.PROCESS(SystemUtils.getProcessPid()));
}
assignTargetSizes();
}
@Override
public Collection<GroupProvider<?>> getGroupProviders()
{
Collection children = getChildren(GroupProvider.class);
return children;
}
private ListenableFuture<VirtualHostNode> createVirtualHostNodeAsync(Map<String, Object> attributes)
throws AccessControlException, IllegalArgumentException
{
return doAfter(getObjectFactory().createAsync(VirtualHostNode.class, attributes, this),
new CallableWithArgument<ListenableFuture<VirtualHostNode>, VirtualHostNode>()
{
@Override
public ListenableFuture<VirtualHostNode> call(final VirtualHostNode virtualHostNode)
throws Exception
{
// permission has already been granted to create the virtual host
// disable further access check on other operations, e.g. create exchange
Subject.doAs(SecurityManager.getSubjectWithAddedSystemRights(),
new PrivilegedAction<Object>()
{
@Override
public Object run()
{
virtualHostNode.start();
return null;
}
});
return Futures.immediateFuture(virtualHostNode);
}
});
}
@Override
public long getBytesIn()
{
return getDataReceiptStatistics().getTotal();
}
@Override
public long getBytesOut()
{
return getDataDeliveryStatistics().getTotal();
}
@Override
public long getMessagesIn()
{
return getMessageReceiptStatistics().getTotal();
}
@Override
public long getMessagesOut()
{
return getMessageDeliveryStatistics().getTotal();
}
@SuppressWarnings("unchecked")
@Override
public <C extends ConfiguredObject> ListenableFuture<C> addChildAsync(final Class<C> childClass, final Map<String, Object> attributes, final ConfiguredObject... otherParents)
{
if (childClass == VirtualHostNode.class)
{
return (ListenableFuture<C>) createVirtualHostNodeAsync(attributes);
}
else
{
return getObjectFactory().createAsync(childClass, attributes, this);
}
}
@Override
protected ListenableFuture<Void> beforeClose()
{
_brokerLoggersToClose = new ArrayList(getChildren(BrokerLogger.class));
return super.beforeClose();
}
@Override
protected void onClose()
{
if (_reportingTimer != null)
{
_reportingTimer.cancel();
}
_eventLogger.message(BrokerMessages.STOPPED());
try
{
for (BrokerLogger<?> logger : _brokerLoggersToClose)
{
logger.stopLogging();
}
}
finally
{
// uninstall Qpid turbo filter
QpidLoggerTurboFilter.uninstallFromRootContext();
}
}
@Override
public SecurityManager getSecurityManager()
{
return _securityManager;
}
@Override
public VirtualHost<?,?,?> findVirtualHostByName(String name)
{
for (VirtualHostNode<?> virtualHostNode : getChildren(VirtualHostNode.class))
{
VirtualHost<?, ?, ?> virtualHost = virtualHostNode.getVirtualHost();
if (virtualHost != null && virtualHost.getName().equals(name))
{
return virtualHost;
}
}
return null;
}
@Override
public VirtualHostNode findDefautVirtualHostNode()
{
VirtualHostNode existingDefault = null;
Collection<VirtualHostNode<?>> virtualHostNodes = new ArrayList<>(getVirtualHostNodes());
for(VirtualHostNode node : virtualHostNodes)
{
if (node.isDefaultVirtualHostNode())
{
existingDefault = node;
break;
}
}
return existingDefault;
}
@Override
public Collection<KeyStore<?>> getKeyStores()
{
Collection children = getChildren(KeyStore.class);
return children;
}
@Override
public Collection<TrustStore<?>> getTrustStores()
{
Collection children = getChildren(TrustStore.class);
return children;
}
@Override
public boolean isManagementMode()
{
return _parent.isManagementMode();
}
@Override
public Collection<AccessControlProvider<?>> getAccessControlProviders()
{
Collection children = getChildren(AccessControlProvider.class);
return children;
}
@Override
public EventLogger getEventLogger()
{
return _eventLogger;
}
@Override
public void setEventLogger(final EventLogger eventLogger)
{
_eventLogger = eventLogger;
}
@Override
protected void onExceptionInOpen(RuntimeException e)
{
_eventLogger.message(BrokerMessages.FATAL_ERROR(e.getMessage()));
}
public void registerMessageDelivered(long messageSize)
{
_messagesDelivered.registerEvent(1L);
_dataDelivered.registerEvent(messageSize);
}
public void registerMessageReceived(long messageSize, long timestamp)
{
_messagesReceived.registerEvent(1L, timestamp);
_dataReceived.registerEvent(messageSize, timestamp);
}
public StatisticsCounter getMessageReceiptStatistics()
{
return _messagesReceived;
}
public StatisticsCounter getDataReceiptStatistics()
{
return _dataReceived;
}
public StatisticsCounter getMessageDeliveryStatistics()
{
return _messagesDelivered;
}
public StatisticsCounter getDataDeliveryStatistics()
{
return _dataDelivered;
}
public void resetStatistics()
{
_messagesDelivered.reset();
_dataDelivered.reset();
_messagesReceived.reset();
_dataReceived.reset();
for (VirtualHostNode<?> virtualHostNode : getChildren(VirtualHostNode.class))
{
VirtualHost<?, ?, ?> virtualHost = virtualHostNode.getVirtualHost();
if (virtualHost instanceof VirtualHostImpl)
{
((VirtualHostImpl) virtualHost).resetStatistics();
}
}
}
private class StatisticsReportingTask extends TimerTask
{
private final int DELIVERED = 0;
private final int RECEIVED = 1;
private final boolean _reset;
private final EventLogger _logger;
private final Subject _subject;
public StatisticsReportingTask(boolean reset, EventLogger logger)
{
_reset = reset;
_logger = logger;
_subject = SecurityManager.getSystemTaskSubject("Statistics");
}
public void run()
{
Subject.doAs(_subject, new PrivilegedAction<Object>()
{
@Override
public Object run()
{
reportStatistics();
return null;
}
});
}
protected void reportStatistics()
{
try
{
_eventLogger.message(BrokerMessages.STATS_DATA(DELIVERED, _dataDelivered.getPeak() / 1024.0, _dataDelivered.getTotal()));
_eventLogger.message(BrokerMessages.STATS_MSGS(DELIVERED, _messagesDelivered.getPeak(), _messagesDelivered.getTotal()));
_eventLogger.message(BrokerMessages.STATS_DATA(RECEIVED, _dataReceived.getPeak() / 1024.0, _dataReceived.getTotal()));
_eventLogger.message(BrokerMessages.STATS_MSGS(RECEIVED,
_messagesReceived.getPeak(),
_messagesReceived.getTotal()));
for (VirtualHostNode<?> virtualHostNode : getChildren(VirtualHostNode.class))
{
VirtualHost<?, ?, ?> virtualHost = virtualHostNode.getVirtualHost();
if (virtualHost instanceof VirtualHostImpl)
{
VirtualHostImpl vhostImpl = (VirtualHostImpl) virtualHost;
String name = virtualHost.getName();
StatisticsCounter dataDelivered = vhostImpl.getDataDeliveryStatistics();
StatisticsCounter messagesDelivered = vhostImpl.getMessageDeliveryStatistics();
StatisticsCounter dataReceived = vhostImpl.getDataReceiptStatistics();
StatisticsCounter messagesReceived = vhostImpl.getMessageReceiptStatistics();
EventLogger logger = vhostImpl.getEventLogger();
logger.message(VirtualHostMessages.STATS_DATA(name,
DELIVERED,
dataDelivered.getPeak() / 1024.0,
dataDelivered.getTotal()));
logger.message(VirtualHostMessages.STATS_MSGS(name,
DELIVERED,
messagesDelivered.getPeak(),
messagesDelivered.getTotal()));
logger.message(VirtualHostMessages.STATS_DATA(name,
RECEIVED,
dataReceived.getPeak() / 1024.0,
dataReceived.getTotal()));
logger.message(VirtualHostMessages.STATS_MSGS(name,
RECEIVED,
messagesReceived.getPeak(),
messagesReceived.getTotal()));
}
}
if (_reset)
{
resetStatistics();
}
}
catch(Exception e)
{
LOGGER.warn("Unexpected exception occurred while reporting the statistics", e);
}
}
}
@Override
public boolean isVirtualHostPropertiesNodeEnabled()
{
return _virtualHostPropertiesNodeEnabled;
}
public AuthenticationProvider<?> getManagementModeAuthenticationProvider()
{
return _managementModeAuthenticationProvider;
}
@SuppressWarnings("unused")
private void postEncrypterProviderSet()
{
updateEncrypter(_confidentialConfigurationEncryptionProvider);
forceUpdateAllSecureAttributes();
}
@SuppressWarnings("unused")
public static Collection<String> getAvailableConfigurationEncrypters()
{
return (new QpidServiceLoader()).getInstancesByType(ConfigurationSecretEncrypterFactory.class).keySet();
}
public static long getMaxDirectMemorySize()
{
long maxMemory = 0;
try
{
ClassLoader systemClassLoader = ClassLoader.getSystemClassLoader();
Class<?> hotSpotDiagnosticMXBeanClass = Class.forName("com.sun.management.HotSpotDiagnosticMXBean", true, systemClassLoader);
Class<?> vmOptionClass = Class.forName("com.sun.management.VMOption", true, systemClassLoader);
Object hotSpotDiagnosticMXBean = ManagementFactory.getPlatformMXBean((Class<? extends PlatformManagedObject>)hotSpotDiagnosticMXBeanClass);
Method getVMOption = hotSpotDiagnosticMXBeanClass.getDeclaredMethod("getVMOption", String.class);
Object vmOption = getVMOption.invoke(hotSpotDiagnosticMXBean, "MaxDirectMemorySize");
Method getValue = vmOptionClass.getDeclaredMethod("getValue");
String maxDirectMemoryAsString = (String)getValue.invoke(vmOption);
maxMemory = Long.parseLong(maxDirectMemoryAsString);
}
catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException e)
{
LOGGER.debug("Cannot determine direct memory max size using com.sun.management.HotSpotDiagnosticMXBean: " + e.getMessage());
}
catch (InvocationTargetException e)
{
throw new ServerScopedRuntimeException("Unexpected exception in evaluation of MaxDirectMemorySize with HotSpotDiagnosticMXBean", e.getTargetException());
}
if (maxMemory == 0)
{
Pattern maxDirectMemorySizeArgumentPattern = Pattern.compile("^\\s*-XX:MaxDirectMemorySize\\s*=\\s*(\\d+)\\s*([KkMmGgTt]?)\\s*$");
RuntimeMXBean RuntimemxBean = ManagementFactory.getRuntimeMXBean();
List<String> inputArguments = RuntimemxBean.getInputArguments();
boolean argumentFound = false;
for (String argument : inputArguments)
{
Matcher matcher = maxDirectMemorySizeArgumentPattern.matcher(argument);
if (matcher.matches())
{
argumentFound = true;
maxMemory = Long.parseLong(matcher.group(1));
String unit = matcher.group(2);
char unitChar = "".equals(unit) ? 0 : unit.charAt(0);
switch (unitChar)
{
case 'k':
case 'K':
maxMemory *= 1024l;
break;
case 'm':
case 'M':
maxMemory *= 1024l * 1024l;
break;
case 'g':
case 'G':
maxMemory *= 1024l * 1024l * 1024l;
break;
case 't':
case 'T':
maxMemory *= 1024l * 1024l * 1024l * 1024l;
break;
case 0:
// noop
break;
default:
throw new IllegalStateException("Unexpected unit character in MaxDirectMemorySize argument : " + argument);
}
// do not break; continue. Oracle and IBM JVMs use the last value when argument is specified multiple times
}
}
if (maxMemory == 0)
{
if (argumentFound)
{
throw new IllegalArgumentException("Qpid Broker cannot operate with 0 direct memory. Please, set JVM argument MaxDirectMemorySize to non-zero value");
}
else
{
maxMemory = Runtime.getRuntime().maxMemory();
}
}
}
return maxMemory;
}
@Override
public int getNetworkBufferSize()
{
return _networkBufferSize;
}
@Override
public int getNumberOfLiveThreads()
{
return ManagementFactory.getThreadMXBean().getThreadCount();
}
@Override
public long getMaximumHeapMemorySize()
{
return _maximumHeapHize;
}
@Override
public long getUsedHeapMemorySize()
{
return Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
}
@Override
public long getMaximumDirectMemorySize()
{
return _maximumDirectMemorySize;
}
@Override
public long getUsedDirectMemorySize()
{
if (_bufferPoolMXBean == null)
{
return -1;
}
return _bufferPoolMXBean.getMemoryUsed();
}
@Override
public long getDirectMemoryTotalCapacity()
{
if (_bufferPoolMXBean == null)
{
return -1;
}
return _bufferPoolMXBean.getTotalCapacity();
}
@Override
public int getNumberOfObjectsPendingFinalization()
{
return ManagementFactory.getMemoryMXBean().getObjectPendingFinalizationCount();
}
@Override
public List<String> getJvmArguments()
{
return _jvmArguments;
}
@Override
public void performGC()
{
_securityManager.authorise(Operation.CONFIGURE, this);
getEventLogger().message(BrokerMessages.OPERATION("performGC"));
System.gc();
}
@Override
public Content getThreadStackTraces(boolean appendToLog)
{
_securityManager.authorise(Operation.CONFIGURE, this);
getEventLogger().message(BrokerMessages.OPERATION("getThreadStackTraces"));
ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
ThreadInfo[] threadInfos = threadMXBean.dumpAllThreads(true, true);
StringBuilder threadDump = new StringBuilder();
for (ThreadInfo threadInfo : threadInfos)
{
threadDump.append(getThreadStackTraces(threadInfo));
}
long[] deadLocks = threadMXBean.findDeadlockedThreads();
if (deadLocks != null && deadLocks.length > 0)
{
ThreadInfo[] deadlockedThreads = threadMXBean.getThreadInfo(deadLocks);
threadDump.append(System.lineSeparator()).append("Deadlock is detected!").append(System.lineSeparator());
for (ThreadInfo threadInfo : deadlockedThreads)
{
threadDump.append(getThreadStackTraces(threadInfo));
}
}
String threadStackTraces = threadDump.toString();
if (appendToLog)
{
LOGGER.warn("Thread dump:{} {}", System.lineSeparator(), threadStackTraces);
}
return new ThreadStackContent(threadStackTraces);
}
@Override
public Content findThreadStackTraces(String threadNameFindExpression)
{
_securityManager.authorise(Operation.CONFIGURE, this);
getEventLogger().message(BrokerMessages.OPERATION("findThreadStackTraces"));
ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
ThreadInfo[] threadInfos = threadMXBean.dumpAllThreads(true, true);
StringBuilder threadDump = new StringBuilder();
Pattern pattern = threadNameFindExpression == null || threadNameFindExpression.equals("") ? null : Pattern.compile(
threadNameFindExpression);
for (ThreadInfo threadInfo : threadInfos)
{
if (pattern== null || pattern.matcher(threadInfo.getThreadName()).find())
{
threadDump.append(getThreadStackTraces(threadInfo));
}
}
return new ThreadStackContent(threadDump.toString());
}
private String getThreadStackTraces(final ThreadInfo threadInfo)
{
String lineSeparator = System.lineSeparator();
StringBuilder dump = new StringBuilder();
dump.append("\"").append(threadInfo.getThreadName()).append("\"").append(" Id=")
.append(threadInfo.getThreadId()).append( " ").append(threadInfo.getThreadState());
if (threadInfo.getLockName() != null)
{
dump.append(" on ").append(threadInfo.getLockName());
}
if (threadInfo.getLockOwnerName() != null)
{
dump.append(" owned by \"").append(threadInfo.getLockOwnerName())
.append("\" Id=").append(threadInfo.getLockOwnerId());
}
if (threadInfo.isSuspended())
{
dump.append(" (suspended)");
}
if (threadInfo.isInNative())
{
dump.append(" (in native)");
}
dump.append(lineSeparator);
StackTraceElement[] stackTrace = threadInfo.getStackTrace();
for (int i = 0; i < stackTrace.length; i++)
{
StackTraceElement stackTraceElement = stackTrace[i];
dump.append(" at ").append(stackTraceElement.toString()).append(lineSeparator);
LockInfo lockInfo = threadInfo.getLockInfo();
if (i == 0 && lockInfo != null)
{
Thread.State threadState = threadInfo.getThreadState();
switch (threadState)
{
case BLOCKED:
dump.append(" - blocked on ").append(lockInfo).append(lineSeparator);
break;
case WAITING:
dump.append(" - waiting on ").append(lockInfo).append(lineSeparator);
break;
case TIMED_WAITING:
dump.append(" - waiting on ").append(lockInfo).append(lineSeparator);
break;
default:
}
}
for (MonitorInfo mi : threadInfo.getLockedMonitors())
{
if (mi.getLockedStackDepth() == i)
{
dump.append(" - locked ").append(mi).append(lineSeparator);
}
}
}
LockInfo[] locks = threadInfo.getLockedSynchronizers();
if (locks.length > 0)
{
dump.append(lineSeparator).append(" Number of locked synchronizers = ").append(locks.length);
dump.append(lineSeparator);
for (LockInfo li : locks)
{
dump.append(" - " + li);
dump.append(lineSeparator);
}
}
dump.append(lineSeparator);
return dump.toString();
}
public static class ThreadStackContent implements Content, CustomRestHeaders
{
private final String _threadStackTraces;
public ThreadStackContent(final String threadStackTraces)
{
_threadStackTraces = threadStackTraces;
}
@Override
public void write(final OutputStream outputStream) throws IOException
{
if (_threadStackTraces != null)
{
outputStream.write(_threadStackTraces.getBytes(Charset.forName("UTF-8")));
}
}
@RestContentHeader("Content-Type")
public String getContentType()
{
return "text/plain;charset=utf-8";
}
}
}