blob: 8391c1a9ad44ebd0054cbfd76557fc664e17990d [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.virtualhost;
import static com.google.common.collect.Iterators.cycle;
import static java.util.Collections.newSetFromMap;
import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.io.UnsupportedEncodingException;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.security.AccessControlContext;
import java.security.Principal;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
import javax.security.auth.Subject;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.JdkFutureAdapters;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.configuration.updater.Task;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.configuration.updater.TaskExecutorImpl;
import org.apache.qpid.server.exchange.DefaultDestination;
import org.apache.qpid.server.exchange.ExchangeDefaults;
import org.apache.qpid.server.filter.AMQInvalidArgumentException;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.Outcome;
import org.apache.qpid.server.logging.messages.MessageStoreMessages;
import org.apache.qpid.server.logging.messages.VirtualHostMessages;
import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageDeletedException;
import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageNode;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.message.RoutingResult;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.message.internal.InternalMessage;
import org.apache.qpid.server.model.AbstractConfigurationChangeListener;
import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.model.Binding;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.BrokerModel;
import org.apache.qpid.server.model.ConfigurationChangeListener;
import org.apache.qpid.server.model.ConfigurationExtractor;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Connection;
import org.apache.qpid.server.model.Content;
import org.apache.qpid.server.model.CustomRestHeaders;
import org.apache.qpid.server.model.DoOnConfigThread;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.ExclusivityPolicy;
import org.apache.qpid.server.model.ManageableMessage;
import org.apache.qpid.server.model.ManagedAttributeField;
import org.apache.qpid.server.model.NoFactoryForTypeException;
import org.apache.qpid.server.model.NotFoundException;
import org.apache.qpid.server.model.Param;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.RestContentHeader;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.StateTransition;
import org.apache.qpid.server.model.SystemConfig;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.VirtualHostAccessControlProvider;
import org.apache.qpid.server.model.VirtualHostLogger;
import org.apache.qpid.server.model.VirtualHostNode;
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.model.preferences.UserPreferences;
import org.apache.qpid.server.model.preferences.UserPreferencesImpl;
import org.apache.qpid.server.plugin.ConnectionValidator;
import org.apache.qpid.server.plugin.QpidServiceLoader;
import org.apache.qpid.server.plugin.SystemNodeCreator;
import org.apache.qpid.server.pool.SuppressingInheritedAccessControlContextThreadFactory;
import org.apache.qpid.server.protocol.LinkModel;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.queue.QueueEntryIterator;
import org.apache.qpid.server.security.AccessControl;
import org.apache.qpid.server.security.CompoundAccessControl;
import org.apache.qpid.server.security.Result;
import org.apache.qpid.server.security.SubjectFixedResultAccessControl;
import org.apache.qpid.server.security.access.Operation;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.security.auth.SocketConnectionMetaData;
import org.apache.qpid.server.stats.StatisticsReportingTask;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.Event;
import org.apache.qpid.server.store.MessageEnqueueRecord;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.MessageStoreProvider;
import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.VirtualHostStoreUpgraderAndRecoverer;
import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
import org.apache.qpid.server.store.handler.MessageHandler;
import org.apache.qpid.server.store.handler.MessageInstanceHandler;
import org.apache.qpid.server.store.preferences.PreferenceRecord;
import org.apache.qpid.server.store.preferences.PreferenceStore;
import org.apache.qpid.server.store.preferences.PreferenceStoreUpdater;
import org.apache.qpid.server.store.preferences.PreferenceStoreUpdaterImpl;
import org.apache.qpid.server.store.preferences.PreferencesRecoverer;
import org.apache.qpid.server.store.preferences.PreferencesRoot;
import org.apache.qpid.server.store.serializer.MessageStoreSerializer;
import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.server.transport.NetworkConnectionScheduler;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.DtxRegistry;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.HousekeepingExecutor;
import org.apache.qpid.server.util.Strings;
public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> extends AbstractConfiguredObject<X>
implements QueueManagingVirtualHost<X>
{
private static final String USE_ASYNC_RECOVERY = "use_async_message_store_recovery";
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractVirtualHost.class);
private static final Logger DIRECT_MEMORY_USAGE_LOGGER = LoggerFactory.getLogger("org.apache.qpid.server.directMemory.virtualhost");
private static final int HOUSEKEEPING_SHUTDOWN_TIMEOUT = 5;
private final Collection<ConnectionValidator> _connectionValidators = new ArrayList<>();
private final Set<AMQPConnection<?>> _connections = newSetFromMap(new ConcurrentHashMap<>());
private final AccessControlContext _housekeepingJobContext;
private final AccessControlContext _fileSystemSpaceCheckerJobContext;
private final AtomicBoolean _acceptsConnections = new AtomicBoolean(false);
private final ConcurrentMap<String, Cache> _caches = new ConcurrentHashMap<>();
private final Broker<?> _broker;
private final DtxRegistry _dtxRegistry;
private final SystemNodeRegistry _systemNodeRegistry = new SystemNodeRegistry();
private final AtomicLong _messagesIn = new AtomicLong();
private final AtomicLong _messagesOut = new AtomicLong();
private final AtomicLong _transactedMessagesIn = new AtomicLong();
private final AtomicLong _transactedMessagesOut = new AtomicLong();
private final AtomicLong _bytesIn = new AtomicLong();
private final AtomicLong _bytesOut = new AtomicLong();
private final AtomicLong _totalConnectionCount = new AtomicLong();
private final AtomicLong _maximumMessageSize = new AtomicLong();
private final AtomicBoolean _blocked = new AtomicBoolean();
private final Map<String, MessageDestination> _systemNodeDestinations =
Collections.synchronizedMap(new HashMap<>());
private final Map<String, MessageSource> _systemNodeSources =
Collections.synchronizedMap(new HashMap<>());
private final EventLogger _eventLogger;
private final VirtualHostNode<?> _virtualHostNode;
private final AtomicLong _targetSize = new AtomicLong(100 * 1024 * 1024);
private final Set<BlockingType> _blockingReasons = Collections.synchronizedSet(EnumSet.noneOf(BlockingType.class));
private final VirtualHostPrincipal _principal;
private final ConfigurationChangeListener _accessControlProviderListener = new AccessControlProviderListener();
private final AccessControl _accessControl;
private final AtomicBoolean _directMemoryExceedsTargetReported = new AtomicBoolean();
private final AccessControl _systemUserAllowed =
new SubjectFixedResultAccessControl(subject -> isSystemSubject(subject) ? Result.ALLOWED : Result.DEFER, Result.DEFER);
private final VirtualHostConnectionLimiter _connectionLimiter;
private final MessageDestination _defaultDestination;
private final FileSystemSpaceChecker _fileSystemSpaceChecker;
private volatile TaskExecutor _preferenceTaskExecutor;
private volatile boolean _deleteRequested;
private enum BlockingType { STORE, FILESYSTEM };
private volatile ScheduledThreadPoolExecutor _houseKeepingTaskExecutor;
private volatile ScheduledFuture<?> _statisticsReportingFuture;
private volatile LinkRegistryModel _linkRegistry;
private MessageStoreLogSubject _messageStoreLogSubject;
private NetworkConnectionScheduler _networkConnectionScheduler;
private volatile boolean _createDefaultExchanges;
@ManagedAttributeField
private boolean _queue_deadLetterQueueEnabled;
@ManagedAttributeField
private long _housekeepingCheckPeriod;
@ManagedAttributeField
private long _storeTransactionIdleTimeoutClose;
@ManagedAttributeField
private long _storeTransactionIdleTimeoutWarn;
@ManagedAttributeField
private long _storeTransactionOpenTimeoutClose;
@ManagedAttributeField
private long _storeTransactionOpenTimeoutWarn;
@ManagedAttributeField
private int _housekeepingThreadCount;
@ManagedAttributeField
private int _connectionThreadPoolSize;
@ManagedAttributeField
private int _numberOfSelectors;
@ManagedAttributeField
private List<String> _enabledConnectionValidators;
@ManagedAttributeField
private List<String> _disabledConnectionValidators;
@ManagedAttributeField
private List<String> _globalAddressDomains;
@ManagedAttributeField
private List<NodeAutoCreationPolicy> _nodeAutoCreationPolicies;
@ManagedAttributeField
private volatile int _statisticsReportingPeriod;
private boolean _useAsyncRecoverer;
private MessageStore _messageStore;
private MessageStoreRecoverer _messageStoreRecoverer;
private int _fileSystemMaxUsagePercent;
private Collection<VirtualHostLogger> _virtualHostLoggersToClose;
private PreferenceStore _preferenceStore;
private long _flowToDiskCheckPeriod;
private volatile boolean _isDiscardGlobalSharedSubscriptionLinksOnDetach;
public AbstractVirtualHost(final Map<String, Object> attributes, VirtualHostNode<?> virtualHostNode)
{
super(virtualHostNode, attributes);
_broker = (Broker<?>) virtualHostNode.getParent();
_virtualHostNode = virtualHostNode;
_dtxRegistry = new DtxRegistry(this);
final SystemConfig systemConfig = (SystemConfig) _broker.getParent();
_eventLogger = systemConfig.getEventLogger();
_principal = new VirtualHostPrincipal(this);
if (systemConfig.isManagementMode())
{
_accessControl = AccessControl.ALWAYS_ALLOWED;
}
else
{
_accessControl = new CompoundAccessControl(List.of(), Result.DEFER);
}
_defaultDestination = new DefaultDestination(this, _accessControl);
_housekeepingJobContext = getSystemTaskControllerContext("Housekeeping["+getName()+"]", _principal);
_fileSystemSpaceCheckerJobContext = getSystemTaskControllerContext("FileSystemSpaceChecker["+getName()+"]", _principal);
_fileSystemSpaceChecker = new FileSystemSpaceChecker();
_connectionLimiter = new VirtualHostConnectionLimiter(this, _broker);
}
private void updateAccessControl()
{
if(!((SystemConfig)_broker.getParent()).isManagementMode())
{
List<VirtualHostAccessControlProvider> children = new ArrayList<>(getChildren(VirtualHostAccessControlProvider.class));
LOGGER.debug("Updating access control list with {} provider children", children.size());
Collections.sort(children, VirtualHostAccessControlProvider.ACCESS_CONTROL_PROVIDER_COMPARATOR);
List<AccessControl<?>> accessControls = new ArrayList<>(children.size()+2);
accessControls.add(_systemUserAllowed);
for(VirtualHostAccessControlProvider prov : children)
{
if(prov.getState() == State.ERRORED)
{
accessControls.clear();
accessControls.add(AccessControl.ALWAYS_DENIED);
break;
}
else if(prov.getState() == State.ACTIVE)
{
accessControls.add(prov.getController());
}
}
accessControls.add(getParentAccessControl());
((CompoundAccessControl)_accessControl).setAccessControls(accessControls);
}
}
private void openConnectionLimiter()
{
_connectionLimiter.open();
}
private void activateConnectionLimiter()
{
_connectionLimiter.activate();
}
private void closeConnectionLimiter()
{
_connectionLimiter.close();
}
@Override
protected void onCreate()
{
super.onCreate();
_createDefaultExchanges = true;
}
@Override
public void setFirstOpening(boolean firstOpening)
{
_createDefaultExchanges = firstOpening;
}
@Override
public void onValidate()
{
super.onValidate();
String name = getName();
if (name == null || "".equals(name.trim()))
{
throw new IllegalConfigurationException("Virtual host name must be specified");
}
String type = getType();
if (type == null || "".equals(type.trim()))
{
throw new IllegalConfigurationException("Virtual host type must be specified");
}
if(!isDurable())
{
throw new IllegalArgumentException(getClass().getSimpleName() + " must be durable");
}
if(getGlobalAddressDomains() != null)
{
for(String domain : getGlobalAddressDomains())
{
validateGlobalAddressDomain(domain);
}
}
if(getNodeAutoCreationPolicies() != null)
{
for(NodeAutoCreationPolicy policy : getNodeAutoCreationPolicies())
{
validateNodeAutoCreationPolicy(policy);
}
}
validateConnectionThreadPoolSettings(this);
validateMessageStoreCreation();
}
@Override
protected void validateChange(final ConfiguredObject<?> proxyForValidation, final Set<String> changedAttributes)
{
super.validateChange(proxyForValidation, changedAttributes);
QueueManagingVirtualHost<?> virtualHost = (QueueManagingVirtualHost<?>) proxyForValidation;
if(changedAttributes.contains(GLOBAL_ADDRESS_DOMAINS))
{
if(virtualHost.getGlobalAddressDomains() != null)
{
for(String name : virtualHost.getGlobalAddressDomains())
{
validateGlobalAddressDomain(name);
}
}
}
if(changedAttributes.contains(NODE_AUTO_CREATION_POLICIES))
{
if(getNodeAutoCreationPolicies() != null)
{
for(NodeAutoCreationPolicy policy : virtualHost.getNodeAutoCreationPolicies())
{
validateNodeAutoCreationPolicy(policy);
}
}
}
if (changedAttributes.contains(CONNECTION_THREAD_POOL_SIZE) || changedAttributes.contains(NUMBER_OF_SELECTORS))
{
validateConnectionThreadPoolSettings(virtualHost);
}
}
@Override
protected void changeAttributes(final Map<String, Object> attributes)
{
super.changeAttributes(attributes);
if (attributes.containsKey(STATISTICS_REPORTING_PERIOD))
{
initialiseStatisticsReporting();
}
}
@Override
protected AccessControl getAccessControl()
{
return _accessControl;
}
private AccessControl getParentAccessControl()
{
return super.getAccessControl();
}
@Override
protected void postResolveChildren()
{
super.postResolveChildren();
addChangeListener(_accessControlProviderListener);
Collection<VirtualHostAccessControlProvider> accessControlProviders = getChildren(VirtualHostAccessControlProvider.class);
if (!accessControlProviders.isEmpty())
{
accessControlProviders.forEach(child -> child.addChangeListener(_accessControlProviderListener));
}
}
private void validateNodeAutoCreationPolicy(final NodeAutoCreationPolicy policy)
{
String pattern = policy.getPattern();
if(pattern == null)
{
throw new IllegalArgumentException("The 'pattern' attribute of a NodeAutoCreationPolicy MUST be supplied: " + policy);
}
try
{
Pattern.compile(pattern);
}
catch (PatternSyntaxException e)
{
throw new IllegalArgumentException("The 'pattern' attribute of a NodeAutoCreationPolicy MUST be a valid "
+ "Java Regular Expression Pattern, the value '" + pattern + "' is not: " + policy);
}
String nodeType = policy.getNodeType();
Class<? extends ConfiguredObject> sourceClass = null;
for (Class<? extends ConfiguredObject> childClass : getModel().getChildTypes(getCategoryClass()))
{
if (childClass.getSimpleName().equalsIgnoreCase(nodeType.trim()))
{
sourceClass = childClass;
break;
}
}
if(sourceClass == null)
{
throw new IllegalArgumentException("The node type of a NodeAutoCreationPolicy must be a valid child type "
+ "of a VirtualHost, '" + nodeType + "' is not.");
}
if(policy.isCreatedOnConsume() && !MessageSource.class.isAssignableFrom(sourceClass))
{
throw new IllegalArgumentException("A NodeAutoCreationPolicy which creates nodes on consume must have a "
+ "nodeType which implements MessageSource, '" + nodeType + "' does not.");
}
if(policy.isCreatedOnPublish() && !MessageDestination.class.isAssignableFrom(sourceClass))
{
throw new IllegalArgumentException("A NodeAutoCreationPolicy which creates nodes on publish must have a "
+ "nodeType which implements MessageDestination, '" + nodeType + "' does not.");
}
if(!(policy.isCreatedOnConsume() || policy.isCreatedOnPublish()))
{
throw new IllegalArgumentException("A NodeAutoCreationPolicy must create on consume, create on publish or both.");
}
}
private void validateGlobalAddressDomain(final String name)
{
String regex = "/(/?)([\\w_\\-:.\\$]+/)*[\\w_\\-:.\\$]+";
if(!name.matches(regex))
{
throw new IllegalArgumentException("'"+name+"' is not a valid global address domain");
}
}
@Override
public MessageStore getMessageStore()
{
return _messageStore;
}
private void validateConnectionThreadPoolSettings(QueueManagingVirtualHost<?> virtualHost)
{
if (virtualHost.getConnectionThreadPoolSize() < 1)
{
throw new IllegalConfigurationException(String.format("Thread pool size %d on VirtualHost %s must be greater than zero.", virtualHost.getConnectionThreadPoolSize(), getName()));
}
if (virtualHost.getNumberOfSelectors() < 1)
{
throw new IllegalConfigurationException(String.format("Number of Selectors %d on VirtualHost %s must be greater than zero.", virtualHost.getNumberOfSelectors(), getName()));
}
if (virtualHost.getConnectionThreadPoolSize() <= virtualHost.getNumberOfSelectors())
{
throw new IllegalConfigurationException(String.format("Number of Selectors %d on VirtualHost %s must be less than the connection pool size %d.", virtualHost.getNumberOfSelectors(), getName(), virtualHost.getConnectionThreadPoolSize()));
}
}
protected void validateMessageStoreCreation()
{
MessageStore store = createMessageStore();
if (store != null)
{
try
{
store.openMessageStore(this);
}
catch (Exception e)
{
throw new IllegalConfigurationException("Cannot open virtual host message store:" + e.getMessage(), e);
}
finally
{
try
{
store.closeMessageStore();
}
catch(Exception e)
{
LOGGER.warn("Failed to close database", e);
}
}
}
}
@Override
protected void onExceptionInOpen(RuntimeException e)
{
super.onExceptionInOpen(e);
shutdownHouseKeeping();
closeNetworkConnectionScheduler();
closeMessageStore();
stopPreferenceTaskExecutor();
closePreferenceStore();
closeConnectionLimiter();
stopLogging(new ArrayList<>(getChildren(VirtualHostLogger.class)));
}
@Override
protected void onOpen()
{
super.onOpen();
registerSystemNodes();
_messageStore = createMessageStore();
_messageStoreLogSubject = new MessageStoreLogSubject(getName(), _messageStore.getClass().getSimpleName());
_messageStore.addEventListener(this, Event.PERSISTENT_MESSAGE_SIZE_OVERFULL);
_messageStore.addEventListener(this, Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL);
_fileSystemMaxUsagePercent = getContextValue(Integer.class, Broker.STORE_FILESYSTEM_MAX_USAGE_PERCENT);
_flowToDiskCheckPeriod = getContextValue(Long.class, FLOW_TO_DISK_CHECK_PERIOD);
_isDiscardGlobalSharedSubscriptionLinksOnDetach = getContextValue(Boolean.class, DISCARD_GLOBAL_SHARED_SUBSCRIPTION_LINKS_ON_DETACH);
QpidServiceLoader serviceLoader = new QpidServiceLoader();
for(ConnectionValidator validator : serviceLoader.instancesOf(ConnectionValidator.class))
{
if((_enabledConnectionValidators.isEmpty()
&& (_disabledConnectionValidators.isEmpty()) || !_disabledConnectionValidators.contains(validator.getType()))
|| _enabledConnectionValidators.contains(validator.getType()))
{
_connectionValidators.add(validator);
}
}
PreferencesRoot preferencesRoot = (VirtualHostNode) getParent();
_preferenceStore = preferencesRoot.createPreferenceStore();
_linkRegistry = createLinkRegistry();
createHousekeepingExecutor();
openConnectionLimiter();
}
LinkRegistryModel createLinkRegistry()
{
LinkRegistryModel linkRegistry;
Iterator<LinkRegistryFactory>
linkRegistryFactories = (new QpidServiceLoader()).instancesOf(LinkRegistryFactory.class).iterator();
if (linkRegistryFactories.hasNext())
{
final LinkRegistryFactory linkRegistryFactory = linkRegistryFactories.next();
if (linkRegistryFactories.hasNext())
{
throw new RuntimeException("Found multiple implementations of LinkRegistry");
}
linkRegistry = linkRegistryFactory.create(this);
}
else
{
linkRegistry = null;
}
return linkRegistry;
}
private void createHousekeepingExecutor()
{
if(_houseKeepingTaskExecutor == null || _houseKeepingTaskExecutor.isTerminated())
{
_houseKeepingTaskExecutor = new HousekeepingExecutor("virtualhost-" + getName() + "-pool",
getHousekeepingThreadCount(),
getSystemTaskSubject("Housekeeping", getPrincipal()));
}
}
private void checkVHostStateIsActive()
{
if (getState() != State.ACTIVE)
{
throw new IllegalStateException("The virtual host state of " + getState()
+ " does not permit this operation.");
}
}
@Override
public boolean isActive()
{
return getState() == State.ACTIVE;
}
private void registerSystemNodes()
{
QpidServiceLoader qpidServiceLoader = new QpidServiceLoader();
Iterable<SystemNodeCreator> factories = qpidServiceLoader.instancesOf(SystemNodeCreator.class);
for(SystemNodeCreator creator : factories)
{
creator.register(_systemNodeRegistry);
}
}
protected abstract MessageStore createMessageStore();
private ListenableFuture<List<Void>> createDefaultExchanges()
{
return Subject.doAs(getSubjectWithAddedSystemRights(), new PrivilegedAction<ListenableFuture<List<Void>>>()
{
@Override
public ListenableFuture<List<Void>> run()
{
List<ListenableFuture<Void>> standardExchangeFutures = new ArrayList<>();
standardExchangeFutures.add(addStandardExchange(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS));
standardExchangeFutures.add(addStandardExchange(ExchangeDefaults.TOPIC_EXCHANGE_NAME, ExchangeDefaults.TOPIC_EXCHANGE_CLASS));
standardExchangeFutures.add(addStandardExchange(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS));
standardExchangeFutures.add(addStandardExchange(ExchangeDefaults.FANOUT_EXCHANGE_NAME, ExchangeDefaults.FANOUT_EXCHANGE_CLASS));
return Futures.allAsList(standardExchangeFutures);
}
ListenableFuture<Void> addStandardExchange(String name, String type)
{
Map<String, Object> attributes = new HashMap<>();
attributes.put(Exchange.NAME, name);
attributes.put(Exchange.TYPE, type);
attributes.put(Exchange.ID, UUIDGenerator.generateExchangeUUID(name, getName()));
final ListenableFuture<Exchange<?>> future = addExchangeAsync(attributes);
final SettableFuture<Void> returnVal = SettableFuture.create();
addFutureCallback(future, new FutureCallback<>()
{
@Override
public void onSuccess(final Exchange<?> result)
{
try
{
childAdded(result);
returnVal.set(null);
}
catch (Throwable t)
{
returnVal.setException(t);
}
}
@Override
public void onFailure(final Throwable t)
{
returnVal.setException(t);
}
}, getTaskExecutor());
return returnVal;
}
});
}
protected MessageStoreLogSubject getMessageStoreLogSubject()
{
return _messageStoreLogSubject;
}
@Override
public Collection<? extends Connection<?>> getConnections()
{
return _connections;
}
@Override
public Connection<?> getConnection(String name)
{
for (Connection<?> connection : _connections)
{
if (connection.getName().equals(name))
{
return connection;
}
}
return null;
}
@Override
public int publishMessage(@Param(name = "message") final ManageableMessage message)
{
final String address = message.getAddress();
MessageDestination destination = address == null ? getDefaultDestination() : getAttainedMessageDestination(address, true);
if(destination == null)
{
destination = getDefaultDestination();
}
final AMQMessageHeader header = new MessageHeaderImpl(message);
Serializable body = null;
Object messageContent = message.getContent();
if(messageContent != null)
{
if(messageContent instanceof Map || messageContent instanceof List)
{
if(message.getMimeType() != null || message.getEncoding() != null)
{
throw new IllegalArgumentException("If the message content is provided as map or list, the mime type and encoding must be left unset");
}
body = (Serializable)messageContent;
}
else if(messageContent instanceof String)
{
String contentTransferEncoding = message.getContentTransferEncoding();
if("base64".equalsIgnoreCase(contentTransferEncoding))
{
body = Strings.decodeBase64((String) messageContent);
}
else if(contentTransferEncoding == null || contentTransferEncoding.trim().equals("") || contentTransferEncoding.trim().equalsIgnoreCase("identity"))
{
String mimeType = message.getMimeType();
if(mimeType != null && !(mimeType = mimeType.trim().toLowerCase()).equals(""))
{
if (!(mimeType.startsWith("text/") || Arrays.asList("application/json", "application/xml")
.contains(mimeType)))
{
throw new IllegalArgumentException(message.getMimeType()
+ " is invalid as a MIME type for this message. "
+ "Only MIME types of the text type can be used if a string is supplied as the content");
}
else if (mimeType.matches(".*;\\s*charset\\s*=.*"))
{
throw new IllegalArgumentException(message.getMimeType()
+ " is invalid as a MIME type for this message. "
+ "If a string is supplied as the content, the MIME type must not include a charset parameter");
}
}
body = (String) messageContent;
}
else
{
throw new IllegalArgumentException("contentTransferEncoding value '" + contentTransferEncoding + "' is invalid. The only valid values are base64 and identity");
}
}
else
{
throw new IllegalArgumentException("The message content (if present) can only be a string, map or list");
}
}
InternalMessage internalMessage = InternalMessage.createMessage(getMessageStore(), header, body, message.isPersistent(), address);
AutoCommitTransaction txn = new AutoCommitTransaction(getMessageStore());
final InstanceProperties instanceProperties = prop ->
{
switch (prop)
{
case EXPIRATION:
Date expiration = message.getExpiration();
return expiration == null ? 0 : expiration.getTime();
case IMMEDIATE:
case MANDATORY:
case REDELIVERED:
return false;
case PERSISTENT:
return message.isPersistent();
default:
return null;
}
};
final RoutingResult<InternalMessage> result =
destination.route(internalMessage, address, instanceProperties);
return result.send(txn, null);
}
@Override
protected <C extends ConfiguredObject> ListenableFuture<C> addChildAsync(Class<C> childClass,
Map<String, Object> attributes)
{
checkVHostStateIsActive();
return super.addChildAsync(childClass, attributes);
}
@Override
public EventLogger getEventLogger()
{
return _eventLogger;
}
@Override
public Map<String, Object> extractConfig(final boolean includeSecureAttributes)
{
return doSync(doOnConfigThread(new Task<ListenableFuture<Map<String,Object>>, RuntimeException>()
{
@Override
public ListenableFuture<Map<String, Object>> execute() throws RuntimeException
{
ConfigurationExtractor configExtractor = new ConfigurationExtractor();
Map<String, Object> config = configExtractor.extractConfig(AbstractVirtualHost.this,
includeSecureAttributes);
return Futures.immediateFuture(config);
}
@Override
public String getObject()
{
return AbstractVirtualHost.this.toString();
}
@Override
public String getAction()
{
return "extractConfig";
}
@Override
public String getArguments()
{
return "includeSecureAttributes=" + String.valueOf(includeSecureAttributes);
}
}));
}
@Override
public Content exportMessageStore()
{
return new MessageStoreContent();
}
private class MessageStoreContent implements Content, CustomRestHeaders
{
@Override
public void write(final OutputStream outputStream) throws IOException
{
doSync(doOnConfigThread(new Task<ListenableFuture<Void>, IOException>()
{
@Override
public ListenableFuture<Void> execute() throws IOException
{
if (getState() != State.STOPPED)
{
throw new IllegalArgumentException(
"The exportMessageStore operation can only be called when the virtual host is stopped");
}
_messageStore.openMessageStore(AbstractVirtualHost.this);
try
{
final Map<UUID, String> queueMap = new HashMap<>();
getDurableConfigurationStore().reload(record ->
{
if(record.getType().equals(Queue.class.getSimpleName()))
{
queueMap.put(record.getId(), (String) record.getAttributes().get(ConfiguredObject.NAME));
}
});
MessageStoreSerializer serializer = new QpidServiceLoader().getInstancesByType(MessageStoreSerializer.class).get(MessageStoreSerializer.LATEST);
MessageStore.MessageStoreReader reader = _messageStore.newMessageStoreReader();
serializer.serialize(queueMap, reader, outputStream);
}
finally
{
_messageStore.closeMessageStore();
}
return Futures.immediateFuture(null);
}
@Override
public String getObject()
{
return AbstractVirtualHost.this.toString();
}
@Override
public String getAction()
{
return "exportMessageStore";
}
@Override
public String getArguments()
{
return null;
}
}));
}
@Override
public void release()
{
}
@RestContentHeader("Content-Type")
public String getContentType()
{
return "application/octet-stream";
}
@SuppressWarnings("unused")
@RestContentHeader("Content-Disposition")
public String getContentDisposition()
{
try
{
String vhostName = getName();
// replace all non-ascii and non-printable characters and all backslashes and percent encoded characters
// as suggested by rfc6266 Appendix D
String asciiName = vhostName.replaceAll("[^\\x20-\\x7E]", "?")
.replace('\\', '?')
.replaceAll("%[0-9a-fA-F]{2}", "?");
String disposition = String.format("attachment; filename=\"%s_messages.bin\"; filename*=\"UTF-8''%s_messages.bin\"",
asciiName,
URLEncoder.encode(vhostName, StandardCharsets.UTF_8.name())
);
return disposition;
}
catch (UnsupportedEncodingException e)
{
throw new RuntimeException("JVM does not support UTF8", e);
}
}
}
@Override
public void importMessageStore(final String source)
{
try
{
final URL url = convertStringToURL(source);
try (InputStream input = url.openStream();
BufferedInputStream bufferedInputStream = new BufferedInputStream(input);
DataInputStream data = new DataInputStream(bufferedInputStream))
{
final MessageStoreSerializer serializer = MessageStoreSerializer.FACTORY.newInstance(data);
doSync(doOnConfigThread(new Task<ListenableFuture<Void>, IOException>()
{
@Override
public ListenableFuture<Void> execute() throws IOException
{
if (getState() != State.STOPPED)
{
throw new IllegalArgumentException(
"The importMessageStore operation can only be called when the virtual host is stopped");
}
try
{
_messageStore.openMessageStore(AbstractVirtualHost.this);
checkMessageStoreEmpty();
final Map<String, UUID> queueMap = new HashMap<>();
getDurableConfigurationStore().reload(record ->
{
if (record.getType().equals(Queue.class.getSimpleName()))
{
queueMap.put((String) record.getAttributes().get(ConfiguredObject.NAME),
record.getId());
}
});
serializer.deserialize(queueMap, _messageStore, data);
}
finally
{
_messageStore.closeMessageStore();
}
return Futures.immediateFuture(null);
}
@Override
public String getObject()
{
return AbstractVirtualHost.this.toString();
}
@Override
public String getAction()
{
return "importMessageStore";
}
@Override
public String getArguments()
{
if (url.getProtocol().equalsIgnoreCase("http") || url.getProtocol().equalsIgnoreCase("https") || url.getProtocol().equalsIgnoreCase("file"))
{
return "source=" + source;
}
else if (url.getProtocol().equalsIgnoreCase("data"))
{
return "source=<data stream>";
}
else
{
return "source=<unknown source type>";
}
}
}));
}
}
catch (IOException e)
{
throw new IllegalConfigurationException("Cannot convert '" + source + "' to a readable resource", e);
}
}
private void checkMessageStoreEmpty()
{
final MessageStore.MessageStoreReader reader = _messageStore.newMessageStoreReader();
final StoreEmptyCheckingHandler handler = new StoreEmptyCheckingHandler();
reader.visitMessages(handler);
if(handler.isEmpty())
{
reader.visitMessageInstances(handler);
if(handler.isEmpty())
{
reader.visitDistributedTransactions(handler);
}
}
if(!handler.isEmpty())
{
throw new IllegalArgumentException("The message store is not empty");
}
}
private static URL convertStringToURL(final String source)
{
URL url;
try
{
url = new URL(source);
}
catch (MalformedURLException e)
{
File file = new File(source);
try
{
url = file.toURI().toURL();
}
catch (MalformedURLException notAFile)
{
throw new IllegalConfigurationException("Cannot convert " + source + " to a readable resource",
notAFile);
}
}
return url;
}
@Override
public boolean authoriseCreateConnection(final AMQPConnection<?> connection)
{
authorise(Operation.PERFORM_ACTION("connect"));
for(ConnectionValidator validator : _connectionValidators)
{
if(!validator.validateConnectionCreation(connection, this))
{
return false;
}
}
return true;
}
private void initialiseStatisticsReporting()
{
long report = getStatisticsReportingPeriod() * 1000L;
ScheduledFuture<?> previousStatisticsReportingFuture = _statisticsReportingFuture;
if (previousStatisticsReportingFuture != null)
{
previousStatisticsReportingFuture.cancel(false);
}
if (report > 0L)
{
_statisticsReportingFuture = _houseKeepingTaskExecutor.scheduleAtFixedRate(new StatisticsReportingTask(this,
getSystemTaskSubject(
"Statistics", _principal)),
report,
report,
TimeUnit.MILLISECONDS);
}
}
private void initialiseHouseKeeping()
{
final long period = getHousekeepingCheckPeriod();
if (period > 0L)
{
scheduleHouseKeepingTask(period, new VirtualHostHouseKeepingTask());
}
}
private void initialiseFlowToDiskChecking()
{
final long period = getFlowToDiskCheckPeriod();
if (period > 0L)
{
scheduleHouseKeepingTask(period, new FlowToDiskCheckingTask());
}
}
private void shutdownHouseKeeping()
{
if(_houseKeepingTaskExecutor != null)
{
_houseKeepingTaskExecutor.shutdown();
try
{
if (!_houseKeepingTaskExecutor.awaitTermination(HOUSEKEEPING_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS))
{
_houseKeepingTaskExecutor.shutdownNow();
}
}
catch (InterruptedException e)
{
LOGGER.warn("Interrupted during Housekeeping shutdown:", e);
Thread.currentThread().interrupt();
}
finally
{
_houseKeepingTaskExecutor = null;
}
}
}
private void closeNetworkConnectionScheduler()
{
if(_networkConnectionScheduler != null)
{
_networkConnectionScheduler.close();
_networkConnectionScheduler = null;
}
}
/**
* Allow other broker components to register a HouseKeepingTask
*
* @param period How often this task should run, in ms.
* @param task The task to run.
*/
@Override
public void scheduleHouseKeepingTask(long period, HouseKeepingTask task)
{
task.setFuture(_houseKeepingTaskExecutor.scheduleAtFixedRate(task, period / 2, period, TimeUnit.MILLISECONDS));
}
@Override
public ScheduledFuture<?> scheduleTask(long delay, Runnable task)
{
return _houseKeepingTaskExecutor.schedule(task, delay, TimeUnit.MILLISECONDS);
}
@Override
public void executeTask(final String name, final Runnable task, AccessControlContext context)
{
_houseKeepingTaskExecutor.execute(new HouseKeepingTask(name, this, context)
{
@Override
public void execute()
{
task.run();
}
});
}
@Override
public List<String> getEnabledConnectionValidators()
{
return _enabledConnectionValidators;
}
@Override
public List<String> getDisabledConnectionValidators()
{
return _disabledConnectionValidators;
}
@Override
public List<String> getGlobalAddressDomains()
{
return _globalAddressDomains;
}
@Override
public List<NodeAutoCreationPolicy> getNodeAutoCreationPolicies()
{
return _nodeAutoCreationPolicies;
}
@Override
public MessageSource getAttainedMessageSource(final String name)
{
MessageSource messageSource = _systemNodeSources.get(name);
if(messageSource == null)
{
messageSource = getAttainedChildFromAddress(Queue.class, name);
}
if(messageSource == null)
{
messageSource = autoCreateNode(name, MessageSource.class, false);
}
return messageSource;
}
private <T> T autoCreateNode(final String name, final Class<T> clazz, boolean publish)
{
for (NodeAutoCreationPolicy policy : getNodeAutoCreationPolicies())
{
String pattern = policy.getPattern();
if (name.matches(pattern) &&
((publish && policy.isCreatedOnPublish()) || (!publish && policy.isCreatedOnConsume())))
{
String nodeType = policy.getNodeType();
Class<? extends ConfiguredObject> sourceClass = null;
for (Class<? extends ConfiguredObject> childClass : getModel().getChildTypes(getCategoryClass()))
{
if (childClass.getSimpleName().equalsIgnoreCase(nodeType.trim())
&& clazz.isAssignableFrom(childClass))
{
sourceClass = childClass;
}
}
if (sourceClass != null)
{
final Map<String, Object> attributes = policy.getAttributes() == null
? new HashMap<>()
: new HashMap<>(policy.getAttributes());
attributes.remove(ConfiguredObject.ID);
attributes.put(ConfiguredObject.NAME, name);
final Class<? extends ConfiguredObject> childClass = sourceClass;
try
{
final T node = Subject.doAs(getSubjectWithAddedSystemRights(), (PrivilegedAction<T>) () ->
(T) doSync(createChildAsync(childClass, attributes)));
if (node != null)
{
return node;
}
}
catch (AbstractConfiguredObject.DuplicateNameException e)
{
return (T)e.getExisting();
}
catch (RuntimeException e)
{
LOGGER.info("Unable to auto create a node named '{}' due to exception", name, e);
}
}
}
}
return null;
}
@Override
public Queue<?> getAttainedQueue(UUID id)
{
return (Queue<?>) awaitChildClassToAttainState(Queue.class, id);
}
@Override
public Queue<?> getAttainedQueue(final String name)
{
return (Queue<?>) awaitChildClassToAttainState(Queue.class, name);
}
@Override
public Broker<?> getBroker()
{
return _broker;
}
@Override
public MessageDestination getAttainedMessageDestination(final String name, final boolean mayCreate)
{
MessageDestination destination = _systemNodeDestinations.get(name);
if(destination == null)
{
destination = getAttainedChildFromAddress(Exchange.class, name);
}
if(destination == null)
{
destination = getAttainedChildFromAddress(Queue.class, name);
}
if(destination == null && mayCreate)
{
destination = autoCreateNode(name, MessageDestination.class, true);
}
return destination;
}
@Override
public MessageDestination getSystemDestination(final String name)
{
return _systemNodeDestinations.get(name);
}
@Override
public ListenableFuture<Void> reallocateMessages()
{
final ScheduledThreadPoolExecutor houseKeepingTaskExecutor = _houseKeepingTaskExecutor;
if (houseKeepingTaskExecutor != null)
{
try
{
final Future<Void> future = houseKeepingTaskExecutor.submit(() ->
{
final Collection<Queue> queues =
getChildren(Queue.class);
for (Queue q : queues)
{
if (q.getState() == State.ACTIVE)
{
q.reallocateMessages();
}
}
return null;
});
return JdkFutureAdapters.listenInPoolThread(future);
}
catch (RejectedExecutionException e)
{
if (!houseKeepingTaskExecutor.isShutdown())
{
LOGGER.warn("Failed to schedule reallocation of messages", e);
}
}
}
return Futures.immediateFuture(null);
}
@Override
public long getTotalDepthOfQueuesBytes()
{
long total = 0;
final Collection<Queue> queues = getChildren(Queue.class);
for(Queue q : queues)
{
total += q.getQueueDepthBytes();
}
return total;
}
@Override
public long getTotalDepthOfQueuesMessages()
{
long total = 0;
final Collection<Queue> queues = getChildren(Queue.class);
for(Queue q : queues)
{
total += q.getQueueDepthMessages();
}
return total;
}
@Override
public long getInMemoryMessageSize()
{
return _messageStore == null ? -1 : _messageStore.getInMemorySize();
}
@Override
public long getBytesEvacuatedFromMemory()
{
return _messageStore == null ? -1 : _messageStore.getBytesEvacuatedFromMemory();
}
@Override
public long getInMemoryMessageThreshold()
{
return getTargetSize();
}
@Override
public <T extends ConfiguredObject<?>> T getAttainedChildFromAddress(final Class<T> childClass,
final String address)
{
T child = awaitChildClassToAttainState(childClass, address);
if(child == null && getGlobalAddressDomains() != null)
{
for(String domain : getGlobalAddressDomains())
{
if(address.startsWith(domain + "/"))
{
child = awaitChildClassToAttainState(childClass, address.substring(domain.length()+1));
if(child != null)
{
break;
}
}
}
}
return child;
}
@Override
public long getInboundMessageSizeHighWatermark()
{
return _maximumMessageSize.get();
}
@Override
public MessageDestination getDefaultDestination()
{
return _defaultDestination;
}
@Override
public void resetStatistics()
{
_totalConnectionCount.set(0L);
_maximumMessageSize.set(0L);
_bytesIn.set(0L);
_bytesOut.set(0L);
_messagesIn.set(0L);
_messagesOut.set(0L);
_transactedMessagesIn.set(0L);
_transactedMessagesOut.set(0L);
_messageStore.resetStatistics();
getChildren(VirtualHostLogger.class).forEach(VirtualHostLogger::resetStatistics);
getChildren(Queue.class).forEach(Queue::resetStatistics);
getChildren(Exchange.class).forEach(Exchange::resetStatistics);
}
private ListenableFuture<Exchange<?>> addExchangeAsync(Map<String,Object> attributes)
throws ReservedExchangeNameException,
NoFactoryForTypeException
{
final SettableFuture<Exchange<?>> returnVal = SettableFuture.create();
addFutureCallback(getObjectFactory().createAsync(Exchange.class, attributes, this),
new FutureCallback<Exchange>()
{
@Override
public void onSuccess(final Exchange result)
{
returnVal.set(result);
}
@Override
public void onFailure(final Throwable t)
{
returnVal.setException(t);
}
}, getTaskExecutor());
return returnVal;
}
@Override
public String getLocalAddress(final String routingAddress)
{
if(getGlobalAddressDomains() != null)
{
for(String domain : getGlobalAddressDomains())
{
if(routingAddress.startsWith(domain + "/"))
{
return routingAddress.substring(domain.length() + 1);
}
}
}
return routingAddress;
}
@Override
protected ListenableFuture<Void> beforeClose()
{
return beforeDeleteOrClose();
}
@Override
protected ListenableFuture<Void> onClose()
{
return onCloseOrDelete();
}
@Override
protected ListenableFuture<Void> beforeDelete()
{
return beforeDeleteOrClose();
}
@Override
protected ListenableFuture<Void> onDelete()
{
_deleteRequested = true;
return onCloseOrDelete();
}
private ListenableFuture<Void> beforeDeleteOrClose()
{
setState(State.UNAVAILABLE);
_virtualHostLoggersToClose = new ArrayList<>(getChildren(VirtualHostLogger.class));
//Stop Connections
return closeConnections();
}
private ListenableFuture<Void> onCloseOrDelete()
{
_dtxRegistry.close();
shutdownHouseKeeping();
if (_deleteRequested)
{
deleteLinkRegistry();
}
closeMessageStore();
stopPreferenceTaskExecutor();
closePreferenceStore();
if (_deleteRequested)
{
deleteMessageStore();
deletePreferenceStore();
}
closeNetworkConnectionScheduler();
_eventLogger.message(VirtualHostMessages.CLOSE(getName()));
stopLogging(_virtualHostLoggersToClose);
_systemNodeRegistry.close();
closeConnectionLimiter();
return Futures.immediateFuture(null);
}
private ListenableFuture<Void> closeConnections()
{
if (LOGGER.isDebugEnabled())
{
LOGGER.debug("Closing connection registry : {} connection(s).", _connections.size());
}
_acceptsConnections.set(false);
for(AMQPConnection<?> conn : _connections)
{
conn.stopConnection();
}
List<ListenableFuture<Void>> connectionCloseFutures = new ArrayList<>();
while (!_connections.isEmpty())
{
Iterator<AMQPConnection<?>> itr = _connections.iterator();
while(itr.hasNext())
{
Connection<?> connection = itr.next();
try
{
connectionCloseFutures.add(connection.closeAsync());
}
catch (Exception e)
{
LOGGER.warn("Exception closing connection " + connection.getName() + " from " + connection.getRemoteAddress(), e);
}
finally
{
itr.remove();
}
}
}
ListenableFuture<List<Void>> combinedFuture = Futures.allAsList(connectionCloseFutures);
return Futures.transform(combinedFuture, voids -> null, MoreExecutors.directExecutor());
}
private void closeMessageStore()
{
if (getMessageStore() != null)
{
try
{
if (_messageStoreRecoverer != null)
{
_messageStoreRecoverer.cancel();
}
getMessageStore().closeMessageStore();
}
catch (StoreException e)
{
LOGGER.error("Failed to close message store", e);
}
if (!(_virtualHostNode.getConfigurationStore() instanceof MessageStoreProvider))
{
getEventLogger().message(getMessageStoreLogSubject(), MessageStoreMessages.CLOSED());
}
}
}
@Override
public void registerMessageDelivered(long messageSize)
{
_messagesOut.incrementAndGet();
_bytesOut.addAndGet(messageSize);
_broker.registerMessageDelivered(messageSize);
reportDirectMemoryBelowTargetIfReached();
}
@Override
public void registerMessageReceived(long messageSize)
{
_messagesIn.incrementAndGet();
_bytesIn.addAndGet(messageSize);
_broker.registerMessageReceived(messageSize);
long hwm;
while((hwm = _maximumMessageSize.get()) < messageSize)
{
_maximumMessageSize.compareAndSet(hwm, messageSize);
}
reportDirectMemoryAboveTargetIfExceeded();
}
@Override
public void registerTransactedMessageReceived()
{
_transactedMessagesIn.incrementAndGet();
_broker.registerTransactedMessageReceived();
}
@Override
public void registerTransactedMessageDelivered()
{
_transactedMessagesOut.incrementAndGet();
_broker.registerTransactedMessageDelivered();
}
@Override
public long getMessagesIn()
{
return _messagesIn.get();
}
@Override
public long getBytesIn()
{
return _bytesIn.get();
}
@Override
public long getMessagesOut()
{
return _messagesOut.get();
}
@Override
public long getBytesOut()
{
return _bytesOut.get();
}
@Override
public long getTransactedMessagesIn()
{
return _transactedMessagesIn.get();
}
@Override
public long getTransactedMessagesOut()
{
return _transactedMessagesOut.get();
}
@Override
public <T extends LinkModel> T getSendingLink( String remoteContainerId, String linkName)
{
return doSync(doOnConfigThread(new Task<ListenableFuture<T>, RuntimeException>()
{
@Override
public ListenableFuture<T> execute()
{
return Futures.immediateFuture((T)_linkRegistry.getSendingLink(remoteContainerId, linkName));
}
@Override
public String getObject()
{
return AbstractVirtualHost.this.toString();
}
@Override
public String getAction()
{
return "getSendingLink";
}
@Override
public String getArguments()
{
return String.format("remoteContainerId='%s', linkName='%s'", remoteContainerId, linkName);
}
}));
}
@Override
public <T extends LinkModel> T getReceivingLink(String remoteContainerId, String linkName)
{
return doSync(doOnConfigThread(new Task<ListenableFuture<T>, RuntimeException>()
{
@Override
public ListenableFuture<T> execute()
{
return Futures.immediateFuture((T)_linkRegistry.getReceivingLink(remoteContainerId, linkName));
}
@Override
public String getObject()
{
return AbstractVirtualHost.this.toString();
}
@Override
public String getAction()
{
return "getReceivingLink";
}
@Override
public String getArguments()
{
return String.format("remoteContainerId='%s', linkName='%s'", remoteContainerId, linkName);
}
}));
}
@Override
public <T extends LinkModel> Collection<T> findSendingLinks(final Pattern containerIdPattern,
final Pattern linkNamePattern)
{
return doSync(doOnConfigThread(new Task<ListenableFuture<Collection<T>>, RuntimeException>()
{
@Override
public ListenableFuture<Collection<T>> execute()
{
return Futures.immediateFuture(_linkRegistry.findSendingLinks(containerIdPattern, linkNamePattern));
}
@Override
public String getObject()
{
return AbstractVirtualHost.this.toString();
}
@Override
public String getAction()
{
return "findSendingLinks";
}
@Override
public String getArguments()
{
return String.format("containerIdPattern='%s', linkNamePattern='%s'", containerIdPattern, linkNamePattern);
}
}));
}
@Override
public <T extends LinkModel> void visitSendingLinks(final LinkRegistryModel.LinkVisitor<T> visitor)
{
doSync(doOnConfigThread(new Task<ListenableFuture<Void>, RuntimeException>()
{
@Override
public ListenableFuture<Void> execute()
{
_linkRegistry.visitSendingLinks(visitor);
return Futures.immediateFuture(null);
}
@Override
public String getObject()
{
return AbstractVirtualHost.this.toString();
}
@Override
public String getAction()
{
return "visitSendingLinks";
}
@Override
public String getArguments()
{
return String.format("visitor='%s'", visitor);
}
}));
}
@Override
public DtxRegistry getDtxRegistry()
{
return _dtxRegistry;
}
private void block(BlockingType blockingType)
{
synchronized (_connections)
{
_blockingReasons.add(blockingType);
if(_blocked.compareAndSet(false,true))
{
for(AMQPConnection<?> conn : _connections)
{
conn.block();
}
}
}
}
private void unblock(BlockingType blockingType)
{
synchronized (_connections)
{
_blockingReasons.remove(blockingType);
if(_blockingReasons.isEmpty() && _blocked.compareAndSet(true,false))
{
for(AMQPConnection<?> conn : _connections)
{
conn.unblock();
}
}
}
}
@Override
public void event(final Event event)
{
switch(event)
{
case PERSISTENT_MESSAGE_SIZE_OVERFULL:
block(BlockingType.STORE);
_eventLogger.message(getMessageStoreLogSubject(), MessageStoreMessages.OVERFULL());
break;
case PERSISTENT_MESSAGE_SIZE_UNDERFULL:
unblock(BlockingType.STORE);
_eventLogger.message(getMessageStoreLogSubject(), MessageStoreMessages.UNDERFULL());
break;
}
}
private void reportIfError(State state)
{
if (state == State.ERRORED)
{
_eventLogger.message(VirtualHostMessages.ERRORED(getName()));
}
}
@Override
public String getRedirectHost(final AmqpPort<?> port)
{
return null;
}
@Override
public boolean isOverTargetSize()
{
return getInMemoryMessageSize() > _targetSize.get();
}
private static class MessageHeaderImpl implements AMQMessageHeader
{
private final String _userName;
private final long _timestamp;
private final ManageableMessage _message;
public MessageHeaderImpl(final ManageableMessage message)
{
_message = message;
final AuthenticatedPrincipal currentUser = AuthenticatedPrincipal.getCurrentUser();
_userName = (currentUser == null ? null : currentUser.getName());
_timestamp = System.currentTimeMillis();
}
@Override
public String getCorrelationId()
{
return _message.getCorrelationId();
}
@Override
public long getExpiration()
{
Date expiration = _message.getExpiration();
return expiration == null ? 0 : expiration.getTime();
}
@Override
public String getUserId()
{
return _userName;
}
@Override
public String getAppId()
{
return null;
}
@Override
public String getGroupId()
{
Object jmsXGroupId = getHeader("JMSXGroupID");
return jmsXGroupId == null ? null : String.valueOf(jmsXGroupId);
}
@Override
public String getMessageId()
{
return _message.getMessageId();
}
@Override
public String getMimeType()
{
return _message.getMimeType();
}
@Override
public String getEncoding()
{
return _message.getEncoding();
}
@Override
public byte getPriority()
{
return (byte) _message.getPriority();
}
@Override
public long getTimestamp()
{
return _timestamp;
}
@Override
public long getNotValidBefore()
{
final Date notValidBefore = _message.getNotValidBefore();
return notValidBefore == null ? 0 : notValidBefore.getTime();
}
@Override
public String getType()
{
return null;
}
@Override
public String getReplyTo()
{
return _message.getReplyTo();
}
@Override
public Object getHeader(final String name)
{
return getHeaders().get(name);
}
@Override
public boolean containsHeaders(final Set<String> names)
{
return getHeaders().keySet().containsAll(names);
}
@Override
public boolean containsHeader(final String name)
{
return getHeaders().keySet().contains(name);
}
@Override
public Collection<String> getHeaderNames()
{
return Collections.unmodifiableCollection(getHeaders().keySet());
}
private Map<String, Object> getHeaders()
{
return _message.getHeaders() == null ? Map.of() : _message.getHeaders();
}
}
private class VirtualHostHouseKeepingTask extends HouseKeepingTask
{
public VirtualHostHouseKeepingTask()
{
super("Housekeeping["+AbstractVirtualHost.this.getName()+"]",AbstractVirtualHost.this,_housekeepingJobContext);
}
@Override
public void execute()
{
for (Queue<?> q : getChildren(Queue.class))
{
if (q.getState() == State.ACTIVE)
{
LOGGER.debug("Checking message status for queue: {}", q.getName());
q.checkMessageStatus();
}
}
}
}
class FlowToDiskCheckingTask extends HouseKeepingTask
{
public FlowToDiskCheckingTask()
{
super("FlowToDiskChecking["+AbstractVirtualHost.this.getName()+"]", AbstractVirtualHost.this, _housekeepingJobContext);
}
@Override
public void execute()
{
if (isOverTargetSize())
{
long currentTargetSize = _targetSize.get();
reportDirectMemoryAboveTargetIfExceeded(currentTargetSize,
AbstractVirtualHost.this.getInMemoryMessageSize());
List<QueueEntryIterator> queueIterators = new ArrayList<>();
for (Queue<?> q : getChildren(Queue.class))
{
queueIterators.add(q.queueEntryIterator());
}
Collections.shuffle(queueIterators);
long cumulativeSize = 0;
final Iterator<QueueEntryIterator> cyclicIterators = cycle(queueIterators);
while (cyclicIterators.hasNext())
{
final QueueEntryIterator queueIterator = cyclicIterators.next();
if (queueIterator.advance())
{
QueueEntry node = queueIterator.getNode();
if (node != null && !node.isDeleted())
{
try (MessageReference messageReference = node.getMessage().newReference())
{
final StoredMessage storedMessage = messageReference.getMessage().getStoredMessage();
final long inMemorySize = storedMessage.getInMemorySize();
if (inMemorySize > 0)
{
if (cumulativeSize <= currentTargetSize)
{
cumulativeSize += inMemorySize;
}
if (cumulativeSize > currentTargetSize && node.getQueue().checkValid(node))
{
storedMessage.flowToDisk();
}
}
}
catch (MessageDeletedException e)
{
// pass
}
}
}
else
{
cyclicIterators.remove();
}
}
reportDirectMemoryBelowTargetIfReached(cumulativeSize,
AbstractVirtualHost.this.getInMemoryMessageSize());
}
}
}
private class SystemNodeRegistry implements SystemNodeCreator.SystemNodeRegistry
{
@Override
public void registerSystemNode(final MessageNode node)
{
if(node instanceof MessageDestination)
{
_systemNodeDestinations.put(node.getName(), (MessageDestination) node);
}
if(node instanceof MessageSource)
{
_systemNodeSources.put(node.getName(), (MessageSource)node);
}
}
@Override
public void removeSystemNode(final MessageNode node)
{
if(node instanceof MessageDestination)
{
_systemNodeDestinations.remove(node.getName());
}
if(node instanceof MessageSource)
{
removeMessageSource(node.getName());
}
}
private void removeMessageSource(final String name)
{
MessageSource messageSource = _systemNodeSources.remove(name);
if (messageSource != null)
{
messageSource.close();
}
}
@Override
public void removeSystemNode(final String name)
{
_systemNodeDestinations.remove(name);
removeMessageSource(name);
}
@Override
public VirtualHostNode<?> getVirtualHostNode()
{
return (VirtualHostNode) getParent();
}
@Override
public VirtualHost<?> getVirtualHost()
{
return AbstractVirtualHost.this;
}
@Override
public boolean hasSystemNode(final String name)
{
return _systemNodeSources.containsKey(name) || _systemNodeDestinations.containsKey(name);
}
public void close()
{
_systemNodeSources.values().forEach(MessageSource::close);
}
}
@Override
public void executeTransaction(TransactionalOperation op)
{
final MessageStore store = getMessageStore();
final LocalTransaction txn = new LocalTransaction(store);
op.withinTransaction(new Transaction()
{
@Override
public void dequeue(final QueueEntry messageInstance)
{
final ServerTransaction.Action deleteAction = new ServerTransaction.Action()
{
@Override
public void postCommit()
{
messageInstance.delete();
}
@Override
public void onRollback()
{
}
};
boolean acquired = messageInstance.acquireOrSteal(() ->
{
ServerTransaction txn1 = new AutoCommitTransaction(store);
txn1.dequeue(messageInstance.getEnqueueRecord(), deleteAction);
});
if(acquired)
{
txn.dequeue(messageInstance.getEnqueueRecord(), deleteAction);
}
}
@Override
public void copy(QueueEntry entry, final Queue<?> queue)
{
final ServerMessage message = entry.getMessage();
txn.enqueue(queue, message, new ServerTransaction.EnqueueAction()
{
@Override
public void postCommit(MessageEnqueueRecord... records)
{
queue.enqueue(message, null, records[0]);
}
@Override
public void onRollback()
{
}
});
}
@Override
public void move(final QueueEntry entry, final Queue<?> queue)
{
final ServerMessage message = entry.getMessage();
if(entry.acquire())
{
txn.enqueue(queue, message,
new ServerTransaction.EnqueueAction()
{
@Override
public void postCommit(MessageEnqueueRecord... records)
{
queue.enqueue(message, null, records[0]);
}
@Override
public void onRollback()
{
entry.release();
}
});
txn.dequeue(entry.getEnqueueRecord(),
new ServerTransaction.Action()
{
@Override
public void postCommit()
{
entry.delete();
}
@Override
public void onRollback()
{
}
});
}
}
});
txn.commit();
}
@Override
public long getHousekeepingCheckPeriod()
{
return _housekeepingCheckPeriod;
}
@Override
public long getFlowToDiskCheckPeriod()
{
return _flowToDiskCheckPeriod;
}
@Override
public boolean isDiscardGlobalSharedSubscriptionLinksOnDetach()
{
return _isDiscardGlobalSharedSubscriptionLinksOnDetach;
}
@Override
public long getStoreTransactionIdleTimeoutClose()
{
return _storeTransactionIdleTimeoutClose;
}
@Override
public long getStoreTransactionIdleTimeoutWarn()
{
return _storeTransactionIdleTimeoutWarn;
}
@Override
public long getStoreTransactionOpenTimeoutClose()
{
return _storeTransactionOpenTimeoutClose;
}
@Override
public long getStoreTransactionOpenTimeoutWarn()
{
return _storeTransactionOpenTimeoutWarn;
}
@Override
public long getQueueCount()
{
return getChildren(Queue.class).size();
}
@Override
public long getExchangeCount()
{
return getChildren(Exchange.class).size();
}
@Override
public long getConnectionCount()
{
return _connections.size();
}
@Override
public long getTotalConnectionCount()
{
return _totalConnectionCount.get();
}
@Override
public int getHousekeepingThreadCount()
{
return _housekeepingThreadCount;
}
@Override
public int getStatisticsReportingPeriod()
{
return _statisticsReportingPeriod;
}
@Override
public int getConnectionThreadPoolSize()
{
return _connectionThreadPoolSize;
}
@Override
public int getNumberOfSelectors()
{
return _numberOfSelectors;
}
@StateTransition( currentState = { State.UNINITIALIZED, State.ACTIVE, State.ERRORED }, desiredState = State.STOPPED )
protected ListenableFuture<Void> doStop()
{
final List<VirtualHostLogger> loggers = new ArrayList<>(getChildren(VirtualHostLogger.class));
return doAfter(closeConnections(), () -> closeChildren()).then(() ->
{
shutdownHouseKeeping();
closeNetworkConnectionScheduler();
if (_linkRegistry != null)
{
_linkRegistry.close();
}
closeMessageStore();
stopPreferenceTaskExecutor();
closePreferenceStore();
setState(State.STOPPED);
stopLogging(loggers);
closeConnectionLimiter();
});
}
@Override
public UserPreferences createUserPreferences(ConfiguredObject<?> object)
{
if (_preferenceTaskExecutor == null || !_preferenceTaskExecutor.isRunning())
{
throw new IllegalStateException("Cannot create user preferences in not fully initialized virtual host");
}
return new UserPreferencesImpl(_preferenceTaskExecutor, object, _preferenceStore, Set.of());
}
private void stopPreferenceTaskExecutor()
{
if (_preferenceTaskExecutor != null)
{
_preferenceTaskExecutor.stop();
_preferenceTaskExecutor = null;
}
}
private void closePreferenceStore()
{
if (_preferenceStore != null)
{
_preferenceStore.close();
}
}
private void stopLogging(Collection<VirtualHostLogger> loggers)
{
for (VirtualHostLogger logger : loggers)
{
logger.stopLogging();
}
}
private void deleteLinkRegistry()
{
if (_linkRegistry != null)
{
_linkRegistry.delete();
_linkRegistry = null;
}
}
private void deletePreferenceStore()
{
final PreferenceStore ps = _preferenceStore;
if (ps != null)
{
try
{
ps.onDelete();
}
catch (Exception e)
{
LOGGER.warn("Exception occurred on preference store deletion", e);
}
finally
{
_preferenceStore = null;
}
}
}
private void deleteMessageStore()
{
MessageStore ms = _messageStore;
if (ms != null)
{
try
{
ms.onDelete(AbstractVirtualHost.this);
}
catch (Exception e)
{
LOGGER.warn( "Exception occurred on message store deletion", e);
}
finally
{
_messageStore = null;
}
}
}
@Override
public String getModelVersion()
{
return BrokerModel.MODEL_VERSION;
}
@Override
public String getProductVersion()
{
return _broker.getProductVersion();
}
@Override
public DurableConfigurationStore getDurableConfigurationStore()
{
return _virtualHostNode.getConfigurationStore();
}
@Override
public void setTargetSize(final long targetSize)
{
_targetSize.set(targetSize);
final long inMemoryMessageSize = getInMemoryMessageSize();
reportDirectMemoryAboveTargetIfExceeded(targetSize, inMemoryMessageSize);
reportDirectMemoryBelowTargetIfReached(targetSize, inMemoryMessageSize);
}
@Override
public long getTargetSize()
{
return _targetSize.get();
}
@Override
public Principal getPrincipal()
{
return _principal;
}
@Override
public void registerConnection(final AMQPConnection<?> connection)
{
if (!_acceptsConnections.get())
{
throw new VirtualHostUnavailableException(String.format(
"VirtualHost '%s' not accepting connections",
getName()));
}
_connectionLimiter.register(connection);
_connections.add(connection);
_totalConnectionCount.incrementAndGet();
if (_blocked.get())
{
connection.block();
}
connection.pushScheduler(_networkConnectionScheduler);
}
@Override
public void deregisterConnection(final AMQPConnection<?> connection)
{
try
{
_connectionLimiter.deregister(connection);
}
finally
{
connection.popScheduler();
_connections.remove(connection);
}
}
@StateTransition(currentState = {State.UNINITIALIZED, State.ERRORED}, desiredState = State.ACTIVE)
private ListenableFuture<Void> onActivate()
{
long threadPoolKeepAliveTimeout = getContextValue(Long.class, CONNECTION_THREAD_POOL_KEEP_ALIVE_TIMEOUT);
final SuppressingInheritedAccessControlContextThreadFactory connectionThreadFactory =
new SuppressingInheritedAccessControlContextThreadFactory("virtualhost-" + getName() + "-iopool",
getSystemTaskSubject("IO Pool", getPrincipal()));
_networkConnectionScheduler = new NetworkConnectionScheduler("virtualhost-" + getName() + "-iopool",
getNumberOfSelectors(),
getConnectionThreadPoolSize(),
threadPoolKeepAliveTimeout,
connectionThreadFactory);
_networkConnectionScheduler.start();
updateAccessControl();
initialiseStatisticsReporting();
MessageStore messageStore = getMessageStore();
messageStore.openMessageStore(this);
startFileSystemSpaceChecking();
if (!(_virtualHostNode.getConfigurationStore() instanceof MessageStoreProvider))
{
getEventLogger().message(getMessageStoreLogSubject(), MessageStoreMessages.CREATED());
getEventLogger().message(getMessageStoreLogSubject(), MessageStoreMessages.STORE_LOCATION(messageStore.getStoreLocation()));
}
messageStore.upgradeStoreStructure();
if (_linkRegistry != null)
{
_linkRegistry.open();
}
getBroker().assignTargetSizes();
final PreferenceStoreUpdater updater = new PreferenceStoreUpdaterImpl();
Collection<PreferenceRecord> records = _preferenceStore.openAndLoad(updater);
_preferenceTaskExecutor = new TaskExecutorImpl("virtualhost-" + getName() + "-preferences", null);
_preferenceTaskExecutor.start();
PreferencesRecoverer preferencesRecoverer = new PreferencesRecoverer(_preferenceTaskExecutor);
preferencesRecoverer.recoverPreferences(this, records, _preferenceStore);
activateConnectionLimiter();
if (_createDefaultExchanges)
{
return doAfter(createDefaultExchanges(), () ->
{
_createDefaultExchanges = false;
postCreateDefaultExchangeTasks();
});
}
else
{
postCreateDefaultExchangeTasks();
return Futures.immediateFuture(null);
}
}
private void postCreateDefaultExchangeTasks()
{
if(getContextValue(Boolean.class, USE_ASYNC_RECOVERY))
{
_messageStoreRecoverer = new AsynchronousMessageStoreRecoverer();
}
else
{
_messageStoreRecoverer = new SynchronousMessageStoreRecoverer();
}
// propagate any exception thrown during recovery into HouseKeepingTaskExecutor to handle them accordingly
// TODO if message recovery fails we ought to be transitioning the VH into ERROR and releasing the thread-pools etc.
final ListenableFuture<Void> recoveryResult = _messageStoreRecoverer.recover(this);
recoveryResult.addListener(() -> Futures.getUnchecked(recoveryResult), _houseKeepingTaskExecutor);
State finalState = State.ERRORED;
try
{
initialiseHouseKeeping();
initialiseFlowToDiskChecking();
finalState = State.ACTIVE;
_acceptsConnections.set(true);
}
finally
{
setState(finalState);
reportIfError(getState());
}
}
@Override
protected void logOperation(final String operation)
{
getEventLogger().message(VirtualHostMessages.OPERATION(operation));
}
protected void startFileSystemSpaceChecking()
{
long housekeepingCheckPeriod = getHousekeepingCheckPeriod();
File storeLocationAsFile = _messageStore.getStoreLocationAsFile();
if (storeLocationAsFile != null && _fileSystemMaxUsagePercent > 0 && housekeepingCheckPeriod > 0)
{
_fileSystemSpaceChecker.setFileSystem(storeLocationAsFile);
scheduleHouseKeepingTask(housekeepingCheckPeriod, _fileSystemSpaceChecker);
}
}
@Override
public SocketConnectionMetaData getConnectionMetaData()
{
return getBroker().getConnectionMetaData();
}
@StateTransition( currentState = { State.STOPPED }, desiredState = State.ACTIVE )
private ListenableFuture<Void> onRestart()
{
final SettableFuture<Void> returnVal = SettableFuture.create();
try
{
addFutureCallback(doRestart(), new FutureCallback<>()
{
@Override
public void onSuccess(final Void result)
{
returnVal.set(null);
}
@Override
public void onFailure(final Throwable t)
{
doAfterAlways(onRestartFailure(), () -> returnVal.setException(t));
}
}, getTaskExecutor()
);
}
catch (IllegalArgumentException | IllegalConfigurationException e)
{
doAfterAlways(onRestartFailure(), ()-> returnVal.setException(e));
}
return returnVal;
}
private ListenableFuture<Void> doRestart()
{
createHousekeepingExecutor();
final VirtualHostStoreUpgraderAndRecoverer virtualHostStoreUpgraderAndRecoverer =
new VirtualHostStoreUpgraderAndRecoverer((VirtualHostNode<?>) getParent());
virtualHostStoreUpgraderAndRecoverer.reloadAndRecoverVirtualHost(getDurableConfigurationStore());
final Collection<VirtualHostAccessControlProvider> accessControlProviders = getChildren(VirtualHostAccessControlProvider.class);
if (!accessControlProviders.isEmpty())
{
accessControlProviders.forEach(child -> child.addChangeListener(_accessControlProviderListener));
}
final List<ListenableFuture<Void>> childOpenFutures = new ArrayList<>();
Subject.doAs(getSubjectWithAddedSystemRights(), (PrivilegedAction<Object>) () ->
{
applyToChildren(child ->
{
final ListenableFuture<Void> childOpenFuture = child.openAsync();
childOpenFutures.add(childOpenFuture);
addFutureCallback(childOpenFuture, new FutureCallback<>()
{
@Override
public void onSuccess(final Void result)
{
}
@Override
public void onFailure(final Throwable t)
{
LOGGER.error("Exception occurred while opening {} : {}",
child.getClass().getSimpleName(), child.getName(), t);
onRestartFailure();
}
}, getTaskExecutor());
});
return null;
});
ListenableFuture<List<Void>> combinedFuture = Futures.allAsList(childOpenFutures);
return Futures.transformAsync(combinedFuture, input -> onActivate(), MoreExecutors.directExecutor());
}
private ChainedListenableFuture<Void> onRestartFailure()
{
final List<VirtualHostLogger> loggers = new ArrayList<>(getChildren(VirtualHostLogger.class));
return doAfter(closeChildren(), () -> {
shutdownHouseKeeping();
closeNetworkConnectionScheduler();
if (_linkRegistry != null)
{
_linkRegistry.close();
}
closeMessageStore();
stopPreferenceTaskExecutor();
closePreferenceStore();
setState(State.ERRORED);
stopLogging(loggers);
closeConnectionLimiter();
});
}
private class FileSystemSpaceChecker extends HouseKeepingTask
{
private boolean _fileSystemFull;
private File _fileSystem;
public FileSystemSpaceChecker()
{
super("FileSystemSpaceChecker["+AbstractVirtualHost.this.getName()+"]",AbstractVirtualHost.this,_fileSystemSpaceCheckerJobContext);
}
@Override
public void execute()
{
long totalSpace = _fileSystem.getTotalSpace();
long freeSpace = _fileSystem.getFreeSpace();
if (totalSpace == 0)
{
LOGGER.warn("Cannot check file system for disk space because store path '{}' is not valid", _fileSystem.getPath());
return;
}
long usagePercent = (100L * (totalSpace - freeSpace)) / totalSpace;
if (_fileSystemFull && (usagePercent < _fileSystemMaxUsagePercent))
{
_fileSystemFull = false;
getEventLogger().message(getMessageStoreLogSubject(), VirtualHostMessages.FILESYSTEM_NOTFULL(
_fileSystemMaxUsagePercent));
unblock(BlockingType.FILESYSTEM);
}
else if(!_fileSystemFull && usagePercent > _fileSystemMaxUsagePercent)
{
_fileSystemFull = true;
getEventLogger().message(getMessageStoreLogSubject(), VirtualHostMessages.FILESYSTEM_FULL(
_fileSystemMaxUsagePercent));
block(BlockingType.FILESYSTEM);
}
}
public void setFileSystem(final File fileSystem)
{
_fileSystem = fileSystem;
}
}
@Override
public <T extends MessageSource> T createMessageSource(final Class<T> clazz, final Map<String, Object> attributes)
{
if(Queue.class.isAssignableFrom(clazz))
{
return (T) createChild((Class<? extends Queue>)clazz, attributes);
}
else if(clazz.isAssignableFrom(Queue.class))
{
return (T) createChild(Queue.class, attributes);
}
else
{
throw new IllegalArgumentException("Cannot create message source children of class " + clazz.getSimpleName());
}
}
@Override
public <T extends MessageDestination> T createMessageDestination(final Class<T> clazz,
final Map<String, Object> attributes)
{
if(Exchange.class.isAssignableFrom(clazz))
{
return (T) createChild((Class<? extends Exchange>)clazz, attributes);
}
else if(Queue.class.isAssignableFrom(clazz))
{
return (T) createChild((Class<? extends Queue>)clazz, attributes);
}
else if(clazz.isAssignableFrom(Queue.class))
{
return (T) createChild(Queue.class, attributes);
}
else
{
throw new IllegalArgumentException("Cannot create message destination children of class " + clazz.getSimpleName());
}
}
@Override
public boolean hasMessageSources()
{
return !(_systemNodeSources.isEmpty() && getChildren(Queue.class).isEmpty());
}
@Override
@DoOnConfigThread
public Queue<?> getSubscriptionQueue(@Param(name = "exchangeName", mandatory = true) final String exchangeName,
@Param(name = "attributes", mandatory = true) final Map<String, Object> attributes,
@Param(name = "bindings", mandatory = true) final Map<String, Map<String, Object>> bindings)
{
Queue queue;
Object exclusivityPolicy = attributes.get(Queue.EXCLUSIVE);
if (exclusivityPolicy == null)
{
exclusivityPolicy = getContextValue(ExclusivityPolicy.class, Queue.QUEUE_DEFAULT_EXCLUSIVITY_POLICY);
}
if (!(exclusivityPolicy instanceof ExclusivityPolicy))
{
throw new IllegalArgumentException("Exclusivity policy is required");
}
Exchange<?> exchange = findConfiguredObject(Exchange.class, exchangeName);
if (exchange == null)
{
throw new NotFoundException(String.format("Exchange '%s' was not found", exchangeName));
}
try
{
queue = createMessageDestination(Queue.class, attributes);
for (String binding : bindings.keySet())
{
exchange.addBinding(binding, queue, bindings.get(binding));
}
}
catch (AbstractConfiguredObject.DuplicateNameException e)
{
Queue<?> existingQueue = (Queue) e.getExisting();
if (existingQueue.getExclusive() == exclusivityPolicy)
{
if (hasDifferentBindings(exchange, existingQueue, bindings))
{
if (existingQueue.getConsumers().isEmpty())
{
existingQueue.delete();
queue = createMessageDestination(Queue.class, attributes);
for (String binding : bindings.keySet())
{
try
{
exchange.addBinding(binding, queue, bindings.get(binding));
}
catch (AMQInvalidArgumentException ia)
{
throw new IllegalArgumentException("Unexpected bind argument : " + ia.getMessage(), ia);
}
}
}
else
{
throw new IllegalStateException("subscription already in use");
}
}
else
{
queue = existingQueue;
}
}
else
{
throw new IllegalStateException("subscription already in use");
}
}
catch (AMQInvalidArgumentException e)
{
throw new IllegalArgumentException("Unexpected bind argument : " + e.getMessage(), e);
}
return queue;
}
@Override
@DoOnConfigThread
public void removeSubscriptionQueue(@Param(name = "queueName", mandatory = true) final String queueName) throws NotFoundException
{
Queue<?> queue = findConfiguredObject(Queue.class, queueName);
if (queue == null)
{
throw new NotFoundException(String.format("Queue '%s' was not found", queueName));
}
if (queue.getConsumers().isEmpty())
{
queue.delete();
}
else
{
throw new IllegalStateException("There are consumers on Queue '" + queueName + "'");
}
}
@Override
public Object dumpLinkRegistry()
{
return doSync(doOnConfigThread(new Task<ListenableFuture<Object>, IOException>()
{
@Override
public ListenableFuture<Object> execute() throws IOException
{
Object dump;
if (getState() == State.STOPPED)
{
_messageStore.openMessageStore(AbstractVirtualHost.this);
try
{
_linkRegistry.open();
try
{
dump = _linkRegistry.dump();
}
finally
{
_linkRegistry.close();
}
}
finally
{
_messageStore.closeMessageStore();
}
}
else if (getState() == State.ACTIVE)
{
dump = _linkRegistry.dump();
}
else
{
throw new IllegalStateException("The dumpLinkRegistry operation can only be called when the virtual host is active or stopped.");
}
return Futures.immediateFuture(dump);
}
@Override
public String getObject()
{
return AbstractVirtualHost.this.toString();
}
@Override
public String getAction()
{
return "dumpLinkRegistry";
}
@Override
public String getArguments()
{
return null;
}
}));
}
@Override
public void purgeLinkRegistry(final String containerIdPatternString, final String role, final String linkNamePatternString)
{
doSync(doOnConfigThread(new Task<ListenableFuture<Void>, IOException>()
{
@Override
public ListenableFuture<Void> execute() throws IOException
{
if (getState() != State.STOPPED)
{
throw new IllegalArgumentException(
"The purgeLinkRegistry operation can only be called when the virtual host is stopped.");
}
Pattern containerIdPattern = Pattern.compile(containerIdPatternString);
Pattern linkNamePattern = Pattern.compile(linkNamePatternString);
_messageStore.openMessageStore(AbstractVirtualHost.this);
try
{
_linkRegistry.open();
try
{
if ("SENDER".equals(role) || "BOTH".equals(role))
{
_linkRegistry.purgeSendingLinks(containerIdPattern, linkNamePattern);
}
if ("RECEIVER".equals(role) || "BOTH".equals(role))
{
_linkRegistry.purgeReceivingLinks(containerIdPattern, linkNamePattern);
}
return Futures.immediateFuture(null);
}
finally
{
_linkRegistry.close();
}
}
finally
{
_messageStore.closeMessageStore();
}
}
@Override
public String getObject()
{
return AbstractVirtualHost.this.toString();
}
@Override
public String getAction()
{
return "purgeLinkRegistry";
}
@Override
public String getArguments()
{
return String.format("containerIdPattern='%s',role='%s',linkNamePattern='%s'",
containerIdPatternString,
role,
linkNamePatternString);
}
}));
}
@Override
public <K, V> Cache<K, V> getNamedCache(final String cacheName)
{
final String maxSizeContextVarName = String.format(NAMED_CACHE_MAXIMUM_SIZE_FORMAT, cacheName);
final String expirationContextVarName = String.format(NAMED_CACHE_EXPIRATION_FORMAT, cacheName);
Set<String> contextKeys = getContextKeys(false);
int maxSize = contextKeys.contains(maxSizeContextVarName) ? getContextValue(Integer.class, maxSizeContextVarName) : getContextValue(Integer.class, NAMED_CACHE_MAXIMUM_SIZE);
long expiration = contextKeys.contains(expirationContextVarName) ? getContextValue(Long.class, expirationContextVarName) : getContextValue(Long.class, NAMED_CACHE_EXPIRATION);
return _caches.computeIfAbsent(cacheName, (k) -> CacheBuilder.<K, V>newBuilder()
.maximumSize(maxSize)
.expireAfterAccess(expiration, TimeUnit.MILLISECONDS)
.build());
}
private boolean hasDifferentBindings(final Exchange<?> exchange,
final Queue queue,
final Map<String, Map<String,Object>> bindings)
{
for(String binding: bindings.keySet())
{
boolean theSameBindingFound = false;
for (Binding publishingLink : exchange.getPublishingLinks(queue))
{
if (publishingLink.getBindingKey().equals(binding))
{
Map<String, Object> expectedArguments = bindings.get(binding);
Map<String, Object> actualArguments = publishingLink.getArguments();
if (new HashMap<>(expectedArguments == null ? Map.of() : expectedArguments).equals(new HashMap<>(actualArguments == null ? Map.of() : actualArguments)))
{
theSameBindingFound = true;
}
}
}
if (!theSameBindingFound)
{
return true;
}
}
return false;
}
private final class AccessControlProviderListener extends AbstractConfigurationChangeListener
{
private final Set<ConfiguredObject<?>> _bulkChanges = new HashSet<>();
@Override
public void childAdded(final ConfiguredObject<?> object, final ConfiguredObject<?> child)
{
if(object.getCategoryClass() == VirtualHost.class && child.getCategoryClass() == VirtualHostAccessControlProvider.class)
{
child.addChangeListener(this);
AbstractVirtualHost.this.updateAccessControl();
}
}
@Override
public void childRemoved(final ConfiguredObject<?> object, final ConfiguredObject<?> child)
{
if(object.getCategoryClass() == VirtualHost.class && child.getCategoryClass() == VirtualHostAccessControlProvider.class)
{
AbstractVirtualHost.this.updateAccessControl();
}
}
@Override
public void attributeSet(final ConfiguredObject<?> object,
final String attributeName,
final Object oldAttributeValue,
final Object newAttributeValue)
{
if(object.getCategoryClass() == VirtualHostAccessControlProvider.class && !_bulkChanges.contains(object))
{
AbstractVirtualHost.this.updateAccessControl();
}
}
@Override
public void bulkChangeStart(final ConfiguredObject<?> object)
{
if(object.getCategoryClass() == VirtualHostAccessControlProvider.class)
{
_bulkChanges.add(object);
}
}
@Override
public void bulkChangeEnd(final ConfiguredObject<?> object)
{
if(object.getCategoryClass() == VirtualHostAccessControlProvider.class)
{
_bulkChanges.remove(object);
AbstractVirtualHost.this.updateAccessControl();
}
}
}
private static class StoreEmptyCheckingHandler
implements MessageHandler, MessageInstanceHandler, DistributedTransactionHandler
{
private boolean _empty = true;
@Override
public boolean handle(final StoredMessage<?> storedMessage)
{
_empty = false;
return false;
}
@Override
public boolean handle(final MessageEnqueueRecord record)
{
_empty = false;
return false;
}
@Override
public boolean handle(final org.apache.qpid.server.store.Transaction.StoredXidRecord storedXid,
final org.apache.qpid.server.store.Transaction.EnqueueRecord[] enqueues,
final org.apache.qpid.server.store.Transaction.DequeueRecord[] dequeues)
{
_empty = false;
return false;
}
public boolean isEmpty()
{
return _empty;
}
}
@Override
protected void logCreated(final Map<String, Object> attributes,
final Outcome outcome)
{
_eventLogger.message(VirtualHostMessages.CREATE(getName(),
String.valueOf(outcome),
attributesAsString(attributes)));
}
@Override
protected void logRecovered(final Outcome outcome)
{
_eventLogger.message(VirtualHostMessages.OPEN(getName(), String.valueOf(outcome)));
}
@Override
protected void logDeleted(final Outcome outcome)
{
_eventLogger.message(VirtualHostMessages.DELETE(getName(), String.valueOf(outcome)));
}
@Override
protected void logUpdated(final Map<String, Object> attributes, final Outcome outcome)
{
_eventLogger.message(VirtualHostMessages.UPDATE(getName(),
String.valueOf(outcome),
attributesAsString(attributes)));
}
private void reportDirectMemoryAboveTargetIfExceeded()
{
if (DIRECT_MEMORY_USAGE_LOGGER.isDebugEnabled())
{
reportDirectMemoryAboveTargetIfExceeded(getTargetSize(), getInMemoryMessageSize());
}
}
private void reportDirectMemoryBelowTargetIfReached()
{
if (DIRECT_MEMORY_USAGE_LOGGER.isDebugEnabled())
{
reportDirectMemoryBelowTargetIfReached(getTargetSize(), getInMemoryMessageSize());
}
}
private void reportDirectMemoryBelowTargetIfReached(final long currentTargetSize, final long inMemoryMessageSize)
{
if (DIRECT_MEMORY_USAGE_LOGGER.isDebugEnabled()
&& inMemoryMessageSize <= currentTargetSize
&& QpidByteBuffer.getAllocatedDirectMemorySize() <= _broker.getFlowToDiskThreshold()
&& _directMemoryExceedsTargetReported.compareAndSet(true, false))
{
DIRECT_MEMORY_USAGE_LOGGER.debug(
"VirtualHost '{}' direct memory allocation threshold ({}) maintained : {} bytes. Flow to disk stopped.",
getName(),
currentTargetSize,
inMemoryMessageSize);
}
}
private void reportDirectMemoryAboveTargetIfExceeded(final long currentTargetSize, final long inMemoryMessageSize)
{
if (DIRECT_MEMORY_USAGE_LOGGER.isDebugEnabled()
&& (inMemoryMessageSize > currentTargetSize
|| QpidByteBuffer.getAllocatedDirectMemorySize() > _broker.getFlowToDiskThreshold())
&& _directMemoryExceedsTargetReported.compareAndSet(false, true))
{
DIRECT_MEMORY_USAGE_LOGGER.debug(
"VirtualHost '{}' direct memory allocation threshold ({}) exceeded : {} bytes. Flow to disk enforced.",
getName(),
currentTargetSize,
inMemoryMessageSize);
}
}
@Override
public long clearMatchingQueues(String queueNamePattern)
{
LOGGER.debug("Clearing the queues with name that matches the pattern: '{}'", queueNamePattern);
try
{
final Pattern pattern = Pattern.compile(queueNamePattern);
long count = 0;
for (final Queue<?> queue : getChildren(Queue.class))
{
if (pattern.matcher(queue.getName()).matches())
{
LOGGER.debug("Clearing the queue with name '{}' and ID '{}'", queue.getName(), queue.getId());
count += queue.clearQueue();
}
}
return count;
}
catch (PatternSyntaxException e)
{
final String message = String.format("Failed to compile queue name pattern: '%s'", queueNamePattern);
LOGGER.debug(message, e);
throw new IllegalArgumentException(message, e);
}
}
@Override
public long clearQueues(Collection<String> queues)
{
final Map<UUID, String> uuid = new HashMap<>();
final Set<String> names = new HashSet<>();
for (final String id : queues)
{
try
{
uuid.put(UUID.fromString(id), id);
}
catch (IllegalArgumentException e)
{
LOGGER.trace(String.format("'%s' is not a valid queue ID", id), e);
names.add(id);
}
}
final Collection<Queue> queueList = getChildren(Queue.class);
long count = 0;
if (!uuid.isEmpty())
{
LOGGER.debug("Clearing the queues with IDs: {}", uuid.values());
for (final Queue<?> queue : queueList)
{
if (uuid.remove(queue.getId()) != null)
{
LOGGER.debug("Clearing the queue with ID '{}'", queue.getId());
count += queue.clearQueue();
}
}
names.addAll(uuid.values());
}
if (!names.isEmpty())
{
LOGGER.debug("Clearing the queues with names: {}", names);
for (final Queue<?> queue : queueList)
{
if (names.contains(queue.getName()))
{
LOGGER.debug("Clearing the queue with name '{}'", queue.getName());
count += queue.clearQueue();
}
}
}
return count;
}
}