/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */
package org.apache.qpid.server.virtualhost;

import static com.google.common.collect.Iterators.cycle;
import static java.util.Collections.newSetFromMap;

import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.io.UnsupportedEncodingException;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.security.AccessControlContext;
import java.security.Principal;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;

import javax.security.auth.Subject;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.JdkFutureAdapters;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.configuration.updater.Task;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.configuration.updater.TaskExecutorImpl;
import org.apache.qpid.server.exchange.DefaultDestination;
import org.apache.qpid.server.exchange.ExchangeDefaults;
import org.apache.qpid.server.filter.AMQInvalidArgumentException;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.Outcome;
import org.apache.qpid.server.logging.messages.MessageStoreMessages;
import org.apache.qpid.server.logging.messages.VirtualHostMessages;
import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageDeletedException;
import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageNode;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.message.RoutingResult;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.message.internal.InternalMessage;
import org.apache.qpid.server.model.AbstractConfigurationChangeListener;
import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.model.Binding;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.BrokerModel;
import org.apache.qpid.server.model.ConfigurationChangeListener;
import org.apache.qpid.server.model.ConfigurationExtractor;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Connection;
import org.apache.qpid.server.model.Content;
import org.apache.qpid.server.model.CustomRestHeaders;
import org.apache.qpid.server.model.DoOnConfigThread;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.ExclusivityPolicy;
import org.apache.qpid.server.model.ManageableMessage;
import org.apache.qpid.server.model.ManagedAttributeField;
import org.apache.qpid.server.model.NoFactoryForTypeException;
import org.apache.qpid.server.model.NotFoundException;
import org.apache.qpid.server.model.Param;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.RestContentHeader;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.StateTransition;
import org.apache.qpid.server.model.SystemConfig;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.VirtualHostAccessControlProvider;
import org.apache.qpid.server.model.VirtualHostLogger;
import org.apache.qpid.server.model.VirtualHostNode;
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.model.preferences.UserPreferences;
import org.apache.qpid.server.model.preferences.UserPreferencesImpl;
import org.apache.qpid.server.plugin.ConnectionValidator;
import org.apache.qpid.server.plugin.QpidServiceLoader;
import org.apache.qpid.server.plugin.SystemNodeCreator;
import org.apache.qpid.server.pool.SuppressingInheritedAccessControlContextThreadFactory;
import org.apache.qpid.server.protocol.LinkModel;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.queue.QueueEntryIterator;
import org.apache.qpid.server.security.AccessControl;
import org.apache.qpid.server.security.CompoundAccessControl;
import org.apache.qpid.server.security.Result;
import org.apache.qpid.server.security.SubjectFixedResultAccessControl;
import org.apache.qpid.server.security.access.Operation;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.security.auth.SocketConnectionMetaData;
import org.apache.qpid.server.stats.StatisticsReportingTask;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.Event;
import org.apache.qpid.server.store.MessageEnqueueRecord;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.MessageStoreProvider;
import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.VirtualHostStoreUpgraderAndRecoverer;
import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
import org.apache.qpid.server.store.handler.MessageHandler;
import org.apache.qpid.server.store.handler.MessageInstanceHandler;
import org.apache.qpid.server.store.preferences.PreferenceRecord;
import org.apache.qpid.server.store.preferences.PreferenceStore;
import org.apache.qpid.server.store.preferences.PreferenceStoreUpdater;
import org.apache.qpid.server.store.preferences.PreferenceStoreUpdaterImpl;
import org.apache.qpid.server.store.preferences.PreferencesRecoverer;
import org.apache.qpid.server.store.preferences.PreferencesRoot;
import org.apache.qpid.server.store.serializer.MessageStoreSerializer;
import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.server.transport.NetworkConnectionScheduler;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.DtxRegistry;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.HousekeepingExecutor;
import org.apache.qpid.server.util.Strings;

public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> extends AbstractConfiguredObject<X>
        implements QueueManagingVirtualHost<X>
{
    private static final String USE_ASYNC_RECOVERY = "use_async_message_store_recovery";
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractVirtualHost.class);
    private static final Logger DIRECT_MEMORY_USAGE_LOGGER = LoggerFactory.getLogger("org.apache.qpid.server.directMemory.virtualhost");
    private static final int HOUSEKEEPING_SHUTDOWN_TIMEOUT = 5;

    private final Collection<ConnectionValidator> _connectionValidators = new ArrayList<>();
    private final Set<AMQPConnection<?>> _connections = newSetFromMap(new ConcurrentHashMap<>());
    private final AccessControlContext _housekeepingJobContext;
    private final AccessControlContext _fileSystemSpaceCheckerJobContext;
    private final AtomicBoolean _acceptsConnections = new AtomicBoolean(false);
    private final ConcurrentMap<String, Cache> _caches = new ConcurrentHashMap<>();
    private final Broker<?> _broker;
    private final DtxRegistry _dtxRegistry;
    private final SystemNodeRegistry _systemNodeRegistry = new SystemNodeRegistry();
    private final AtomicLong _messagesIn = new AtomicLong();
    private final AtomicLong _messagesOut = new AtomicLong();
    private final AtomicLong _transactedMessagesIn = new AtomicLong();
    private final AtomicLong _transactedMessagesOut = new AtomicLong();
    private final AtomicLong _bytesIn = new AtomicLong();
    private final AtomicLong _bytesOut = new AtomicLong();
    private final AtomicLong _totalConnectionCount = new AtomicLong();
    private final AtomicLong _maximumMessageSize = new AtomicLong();
    private final AtomicBoolean _blocked = new AtomicBoolean();
    private final Map<String, MessageDestination> _systemNodeDestinations =
            Collections.synchronizedMap(new HashMap<>());
    private final Map<String, MessageSource> _systemNodeSources =
            Collections.synchronizedMap(new HashMap<>());
    private final EventLogger _eventLogger;
    private final VirtualHostNode<?> _virtualHostNode;
    private final AtomicLong _targetSize = new AtomicLong(100 * 1024 * 1024);
    private final Set<BlockingType> _blockingReasons = Collections.synchronizedSet(EnumSet.noneOf(BlockingType.class));
    private final VirtualHostPrincipal _principal;
    private final ConfigurationChangeListener _accessControlProviderListener = new AccessControlProviderListener();
    private final AccessControl _accessControl;
    private final AtomicBoolean _directMemoryExceedsTargetReported = new AtomicBoolean();
    private final AccessControl _systemUserAllowed =
            new SubjectFixedResultAccessControl(subject -> isSystemSubject(subject) ? Result.ALLOWED : Result.DEFER, Result.DEFER);
    private final VirtualHostConnectionLimiter _connectionLimiter;
    private final MessageDestination _defaultDestination;
    private final FileSystemSpaceChecker _fileSystemSpaceChecker;

    private volatile TaskExecutor _preferenceTaskExecutor;
    private volatile boolean _deleteRequested;
    private enum BlockingType { STORE, FILESYSTEM };
    private volatile ScheduledThreadPoolExecutor _houseKeepingTaskExecutor;
    private volatile ScheduledFuture<?> _statisticsReportingFuture;
    private volatile LinkRegistryModel _linkRegistry;
    private MessageStoreLogSubject _messageStoreLogSubject;
    private NetworkConnectionScheduler _networkConnectionScheduler;
    private volatile boolean _createDefaultExchanges;

    @ManagedAttributeField
    private boolean _queue_deadLetterQueueEnabled;

    @ManagedAttributeField
    private long _housekeepingCheckPeriod;

    @ManagedAttributeField
    private long _storeTransactionIdleTimeoutClose;

    @ManagedAttributeField
    private long _storeTransactionIdleTimeoutWarn;

    @ManagedAttributeField
    private long _storeTransactionOpenTimeoutClose;

    @ManagedAttributeField
    private long _storeTransactionOpenTimeoutWarn;

    @ManagedAttributeField
    private int _housekeepingThreadCount;

    @ManagedAttributeField
    private int _connectionThreadPoolSize;

    @ManagedAttributeField
    private int _numberOfSelectors;

    @ManagedAttributeField
    private List<String> _enabledConnectionValidators;

    @ManagedAttributeField
    private List<String> _disabledConnectionValidators;

    @ManagedAttributeField
    private List<String> _globalAddressDomains;

    @ManagedAttributeField
    private List<NodeAutoCreationPolicy> _nodeAutoCreationPolicies;

    @ManagedAttributeField
    private volatile int _statisticsReportingPeriod;

    private boolean _useAsyncRecoverer;

    private MessageStore _messageStore;
    private MessageStoreRecoverer _messageStoreRecoverer;
    private int _fileSystemMaxUsagePercent;
    private Collection<VirtualHostLogger> _virtualHostLoggersToClose;
    private PreferenceStore _preferenceStore;
    private long _flowToDiskCheckPeriod;
    private volatile boolean _isDiscardGlobalSharedSubscriptionLinksOnDetach;

    public AbstractVirtualHost(final Map<String, Object> attributes, VirtualHostNode<?> virtualHostNode)
    {
        super(virtualHostNode, attributes);
        _broker = (Broker<?>) virtualHostNode.getParent();
        _virtualHostNode = virtualHostNode;

        _dtxRegistry = new DtxRegistry(this);

        final SystemConfig systemConfig = (SystemConfig) _broker.getParent();
        _eventLogger = systemConfig.getEventLogger();

        _principal = new VirtualHostPrincipal(this);

        if (systemConfig.isManagementMode())
        {
            _accessControl = AccessControl.ALWAYS_ALLOWED;
        }
        else
        {
            _accessControl =  new CompoundAccessControl(List.of(), Result.DEFER);
        }

        _defaultDestination = new DefaultDestination(this, _accessControl);


        _housekeepingJobContext = getSystemTaskControllerContext("Housekeeping["+getName()+"]", _principal);
        _fileSystemSpaceCheckerJobContext = getSystemTaskControllerContext("FileSystemSpaceChecker["+getName()+"]", _principal);

        _fileSystemSpaceChecker = new FileSystemSpaceChecker();
        _connectionLimiter = new VirtualHostConnectionLimiter(this, _broker);
    }

    private void updateAccessControl()
    {
        if(!((SystemConfig)_broker.getParent()).isManagementMode())
        {
            List<VirtualHostAccessControlProvider> children = new ArrayList<>(getChildren(VirtualHostAccessControlProvider.class));
            LOGGER.debug("Updating access control list with {} provider children", children.size());
            Collections.sort(children, VirtualHostAccessControlProvider.ACCESS_CONTROL_PROVIDER_COMPARATOR);

            List<AccessControl<?>> accessControls = new ArrayList<>(children.size()+2);
            accessControls.add(_systemUserAllowed);
            for(VirtualHostAccessControlProvider prov : children)
            {
                if(prov.getState() == State.ERRORED)
                {
                    accessControls.clear();
                    accessControls.add(AccessControl.ALWAYS_DENIED);
                    break;
                }
                else if(prov.getState() == State.ACTIVE)
                {
                    accessControls.add(prov.getController());
                }

            }
            accessControls.add(getParentAccessControl());
            ((CompoundAccessControl)_accessControl).setAccessControls(accessControls);

        }
    }

    private void openConnectionLimiter()
    {
        _connectionLimiter.open();
    }

    private void activateConnectionLimiter()
    {
        _connectionLimiter.activate();
    }

    private void closeConnectionLimiter()
    {
        _connectionLimiter.close();
    }

    @Override
    protected void onCreate()
    {
        super.onCreate();
        _createDefaultExchanges = true;
    }

    @Override
    public void setFirstOpening(boolean firstOpening)
    {
        _createDefaultExchanges = firstOpening;
    }

    @Override
    public void onValidate()
    {
        super.onValidate();
        String name = getName();
        if (name == null || "".equals(name.trim()))
        {
            throw new IllegalConfigurationException("Virtual host name must be specified");
        }
        String type = getType();
        if (type == null || "".equals(type.trim()))
        {
            throw new IllegalConfigurationException("Virtual host type must be specified");
        }
        if(!isDurable())
        {
            throw new IllegalArgumentException(getClass().getSimpleName() + " must be durable");
        }
        if(getGlobalAddressDomains() != null)
        {
            for(String domain : getGlobalAddressDomains())
            {
                validateGlobalAddressDomain(domain);
            }
        }
        if(getNodeAutoCreationPolicies() != null)
        {
            for(NodeAutoCreationPolicy policy : getNodeAutoCreationPolicies())
            {
                validateNodeAutoCreationPolicy(policy);
            }
        }

        validateConnectionThreadPoolSettings(this);
        validateMessageStoreCreation();
    }

    @Override
    protected void validateChange(final ConfiguredObject<?> proxyForValidation, final Set<String> changedAttributes)
    {
        super.validateChange(proxyForValidation, changedAttributes);
        QueueManagingVirtualHost<?> virtualHost = (QueueManagingVirtualHost<?>) proxyForValidation;

        if(changedAttributes.contains(GLOBAL_ADDRESS_DOMAINS))
        {

            if(virtualHost.getGlobalAddressDomains() != null)
            {
                for(String name : virtualHost.getGlobalAddressDomains())
                {
                    validateGlobalAddressDomain(name);
                }
            }
        }
        if(changedAttributes.contains(NODE_AUTO_CREATION_POLICIES))
        {
            if(getNodeAutoCreationPolicies() != null)
            {
                for(NodeAutoCreationPolicy policy : virtualHost.getNodeAutoCreationPolicies())
                {
                    validateNodeAutoCreationPolicy(policy);
                }
            }

        }

        if (changedAttributes.contains(CONNECTION_THREAD_POOL_SIZE) || changedAttributes.contains(NUMBER_OF_SELECTORS))
        {
            validateConnectionThreadPoolSettings(virtualHost);
        }
    }

    @Override
    protected void changeAttributes(final Map<String, Object> attributes)
    {
        super.changeAttributes(attributes);
        if (attributes.containsKey(STATISTICS_REPORTING_PERIOD))
        {
            initialiseStatisticsReporting();
        }
    }

    @Override
    protected AccessControl getAccessControl()
    {
        return _accessControl;
    }

    private AccessControl getParentAccessControl()
    {
        return super.getAccessControl();
    }

    @Override
    protected void postResolveChildren()
    {
        super.postResolveChildren();
        addChangeListener(_accessControlProviderListener);
        Collection<VirtualHostAccessControlProvider> accessControlProviders = getChildren(VirtualHostAccessControlProvider.class);
        if (!accessControlProviders.isEmpty())
        {
            accessControlProviders.forEach(child -> child.addChangeListener(_accessControlProviderListener));
        }
    }

    private void validateNodeAutoCreationPolicy(final NodeAutoCreationPolicy policy)
    {
        String pattern = policy.getPattern();
        if(pattern == null)
        {
            throw new IllegalArgumentException("The 'pattern' attribute of a NodeAutoCreationPolicy MUST be supplied: " + policy);
        }

        try
        {
            Pattern.compile(pattern);
        }
        catch (PatternSyntaxException e)
        {
            throw new IllegalArgumentException("The 'pattern' attribute of a NodeAutoCreationPolicy MUST be a valid "
                                               + "Java Regular Expression Pattern, the value '" + pattern + "' is not: " + policy);

        }

        String nodeType = policy.getNodeType();
        Class<? extends ConfiguredObject> sourceClass = null;
        for (Class<? extends ConfiguredObject> childClass : getModel().getChildTypes(getCategoryClass()))
        {
            if (childClass.getSimpleName().equalsIgnoreCase(nodeType.trim()))
            {
                sourceClass = childClass;
                break;
            }
        }
        if(sourceClass == null)
        {
            throw new IllegalArgumentException("The node type of a NodeAutoCreationPolicy must be a valid child type "
                                               + "of a VirtualHost, '" + nodeType + "' is not.");
        }
        if(policy.isCreatedOnConsume() && !MessageSource.class.isAssignableFrom(sourceClass))
        {
            throw new IllegalArgumentException("A NodeAutoCreationPolicy which creates nodes on consume must have a "
                                               + "nodeType which implements MessageSource, '" + nodeType + "' does not.");
        }

        if(policy.isCreatedOnPublish() && !MessageDestination.class.isAssignableFrom(sourceClass))
        {
            throw new IllegalArgumentException("A NodeAutoCreationPolicy which creates nodes on publish must have a "
                                               + "nodeType which implements MessageDestination, '" + nodeType + "' does not.");
        }
        if(!(policy.isCreatedOnConsume() || policy.isCreatedOnPublish()))
        {
            throw new IllegalArgumentException("A NodeAutoCreationPolicy must create on consume, create on publish or both.");
        }

    }

    private void validateGlobalAddressDomain(final String name)
    {
        String regex = "/(/?)([\\w_\\-:.\\$]+/)*[\\w_\\-:.\\$]+";
        if(!name.matches(regex))
        {
            throw new IllegalArgumentException("'"+name+"' is not a valid global address domain");
        }
    }

    @Override
    public MessageStore getMessageStore()
    {
        return _messageStore;
    }

    private void validateConnectionThreadPoolSettings(QueueManagingVirtualHost<?> virtualHost)
    {
        if (virtualHost.getConnectionThreadPoolSize() < 1)
        {
            throw new IllegalConfigurationException(String.format("Thread pool size %d on VirtualHost %s must be greater than zero.", virtualHost.getConnectionThreadPoolSize(), getName()));
        }
        if (virtualHost.getNumberOfSelectors() < 1)
        {
            throw new IllegalConfigurationException(String.format("Number of Selectors %d on VirtualHost %s must be greater than zero.", virtualHost.getNumberOfSelectors(), getName()));
        }
        if (virtualHost.getConnectionThreadPoolSize() <= virtualHost.getNumberOfSelectors())
        {
            throw new IllegalConfigurationException(String.format("Number of Selectors %d on VirtualHost %s must be less than the connection pool size %d.", virtualHost.getNumberOfSelectors(), getName(), virtualHost.getConnectionThreadPoolSize()));
        }
    }

    protected void validateMessageStoreCreation()
    {
        MessageStore store = createMessageStore();
        if (store != null)
        {
            try
            {
                store.openMessageStore(this);
            }
            catch (Exception e)
            {
                throw new IllegalConfigurationException("Cannot open virtual host message store:" + e.getMessage(), e);
            }
            finally
            {
                try
                {
                    store.closeMessageStore();
                }
                catch(Exception e)
                {
                    LOGGER.warn("Failed to close database", e);
                }
            }
        }
    }

    @Override
    protected void onExceptionInOpen(RuntimeException e)
    {
        super.onExceptionInOpen(e);
        shutdownHouseKeeping();
        closeNetworkConnectionScheduler();
        closeMessageStore();
        stopPreferenceTaskExecutor();
        closePreferenceStore();
        closeConnectionLimiter();
        stopLogging(new ArrayList<>(getChildren(VirtualHostLogger.class)));
    }

    @Override
    protected void onOpen()
    {
        super.onOpen();

        registerSystemNodes();

        _messageStore = createMessageStore();

        _messageStoreLogSubject = new MessageStoreLogSubject(getName(), _messageStore.getClass().getSimpleName());

        _messageStore.addEventListener(this, Event.PERSISTENT_MESSAGE_SIZE_OVERFULL);
        _messageStore.addEventListener(this, Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL);

        _fileSystemMaxUsagePercent = getContextValue(Integer.class, Broker.STORE_FILESYSTEM_MAX_USAGE_PERCENT);
        _flowToDiskCheckPeriod = getContextValue(Long.class, FLOW_TO_DISK_CHECK_PERIOD);
        _isDiscardGlobalSharedSubscriptionLinksOnDetach = getContextValue(Boolean.class, DISCARD_GLOBAL_SHARED_SUBSCRIPTION_LINKS_ON_DETACH);

        QpidServiceLoader serviceLoader = new QpidServiceLoader();
        for(ConnectionValidator validator : serviceLoader.instancesOf(ConnectionValidator.class))
        {
            if((_enabledConnectionValidators.isEmpty()
                && (_disabledConnectionValidators.isEmpty()) || !_disabledConnectionValidators.contains(validator.getType()))
               || _enabledConnectionValidators.contains(validator.getType()))
            {
                _connectionValidators.add(validator);
            }

        }

        PreferencesRoot preferencesRoot = (VirtualHostNode) getParent();
        _preferenceStore = preferencesRoot.createPreferenceStore();

        _linkRegistry = createLinkRegistry();

        createHousekeepingExecutor();
        openConnectionLimiter();
    }

    LinkRegistryModel createLinkRegistry()
    {
        LinkRegistryModel linkRegistry;
        Iterator<LinkRegistryFactory>
                linkRegistryFactories = (new QpidServiceLoader()).instancesOf(LinkRegistryFactory.class).iterator();
        if (linkRegistryFactories.hasNext())
        {
            final LinkRegistryFactory linkRegistryFactory = linkRegistryFactories.next();
            if (linkRegistryFactories.hasNext())
            {
                throw new RuntimeException("Found multiple implementations of LinkRegistry");
            }
            linkRegistry = linkRegistryFactory.create(this);
        }
        else
        {
            linkRegistry = null;
        }
        return linkRegistry;
    }

    private void createHousekeepingExecutor()
    {
        if(_houseKeepingTaskExecutor == null || _houseKeepingTaskExecutor.isTerminated())
        {
            _houseKeepingTaskExecutor = new HousekeepingExecutor("virtualhost-" + getName() + "-pool",
                                                                 getHousekeepingThreadCount(),
                                                                 getSystemTaskSubject("Housekeeping", getPrincipal()));
        }
    }

    private void checkVHostStateIsActive()
    {
        if (getState() != State.ACTIVE)
        {
            throw new IllegalStateException("The virtual host state of " + getState()
                                            + " does not permit this operation.");
        }
    }

    @Override
    public boolean isActive()
    {
        return getState() == State.ACTIVE;
    }

    private void registerSystemNodes()
    {
        QpidServiceLoader qpidServiceLoader = new QpidServiceLoader();
        Iterable<SystemNodeCreator> factories = qpidServiceLoader.instancesOf(SystemNodeCreator.class);
        for(SystemNodeCreator creator : factories)
        {
            creator.register(_systemNodeRegistry);
        }
    }

    protected abstract MessageStore createMessageStore();

    private ListenableFuture<List<Void>> createDefaultExchanges()
    {
        return Subject.doAs(getSubjectWithAddedSystemRights(), new PrivilegedAction<ListenableFuture<List<Void>>>()
        {

            @Override
            public ListenableFuture<List<Void>> run()
            {
                List<ListenableFuture<Void>> standardExchangeFutures = new ArrayList<>();
                standardExchangeFutures.add(addStandardExchange(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS));
                standardExchangeFutures.add(addStandardExchange(ExchangeDefaults.TOPIC_EXCHANGE_NAME, ExchangeDefaults.TOPIC_EXCHANGE_CLASS));
                standardExchangeFutures.add(addStandardExchange(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS));
                standardExchangeFutures.add(addStandardExchange(ExchangeDefaults.FANOUT_EXCHANGE_NAME, ExchangeDefaults.FANOUT_EXCHANGE_CLASS));
                return Futures.allAsList(standardExchangeFutures);
            }

            ListenableFuture<Void> addStandardExchange(String name, String type)
            {

                Map<String, Object> attributes = new HashMap<>();
                attributes.put(Exchange.NAME, name);
                attributes.put(Exchange.TYPE, type);
                attributes.put(Exchange.ID, UUIDGenerator.generateExchangeUUID(name, getName()));
                final ListenableFuture<Exchange<?>> future = addExchangeAsync(attributes);
                final SettableFuture<Void> returnVal = SettableFuture.create();
                addFutureCallback(future, new FutureCallback<>()
                {
                    @Override
                    public void onSuccess(final Exchange<?> result)
                    {
                        try
                        {
                            childAdded(result);
                            returnVal.set(null);
                        }
                        catch (Throwable t)
                        {
                            returnVal.setException(t);
                        }
                    }

                    @Override
                    public void onFailure(final Throwable t)
                    {
                        returnVal.setException(t);
                    }
                }, getTaskExecutor());

                return returnVal;
            }
        });
    }

    protected MessageStoreLogSubject getMessageStoreLogSubject()
    {
        return _messageStoreLogSubject;
    }

    @Override
    public Collection<? extends Connection<?>> getConnections()
    {
        return _connections;
    }

    @Override
    public Connection<?> getConnection(String name)
    {
        for (Connection<?> connection : _connections)
        {
            if (connection.getName().equals(name))
            {
                return connection;
            }
        }
        return null;
    }

    @Override
    public int publishMessage(@Param(name = "message") final ManageableMessage message)
    {
        final String address = message.getAddress();
        MessageDestination destination = address == null ? getDefaultDestination() : getAttainedMessageDestination(address, true);
        if(destination == null)
        {
            destination = getDefaultDestination();
        }

        final AMQMessageHeader header = new MessageHeaderImpl(message);

        Serializable body = null;
        Object messageContent = message.getContent();
        if(messageContent != null)
        {
            if(messageContent instanceof Map || messageContent instanceof List)
            {
                if(message.getMimeType() != null || message.getEncoding() != null)
                {
                    throw new IllegalArgumentException("If the message content is provided as map or list, the mime type and encoding must be left unset");
                }
                body = (Serializable)messageContent;
            }
            else if(messageContent instanceof String)
            {
                String contentTransferEncoding = message.getContentTransferEncoding();
                if("base64".equalsIgnoreCase(contentTransferEncoding))
                {
                    body = Strings.decodeBase64((String) messageContent);
                }
                else if(contentTransferEncoding == null || contentTransferEncoding.trim().equals("") || contentTransferEncoding.trim().equalsIgnoreCase("identity"))
                {
                    String mimeType = message.getMimeType();
                    if(mimeType != null && !(mimeType = mimeType.trim().toLowerCase()).equals(""))
                    {
                        if (!(mimeType.startsWith("text/") || Arrays.asList("application/json", "application/xml")
                                                                    .contains(mimeType)))
                        {
                            throw new IllegalArgumentException(message.getMimeType()
                                                               + " is invalid as a MIME type for this message. "
                                                               + "Only MIME types of the text type can be used if a string is supplied as the content");
                        }
                        else if (mimeType.matches(".*;\\s*charset\\s*=.*"))
                        {
                            throw new IllegalArgumentException(message.getMimeType()
                                                               + " is invalid as a MIME type for this message. "
                                                               + "If a string is supplied as the content, the MIME type must not include a charset parameter");
                        }
                    }
                    body = (String) messageContent;
                }
                else
                {
                    throw new IllegalArgumentException("contentTransferEncoding value '" + contentTransferEncoding + "' is invalid.  The only valid values are base64 and identity");
                }
            }
            else
            {
                throw new IllegalArgumentException("The message content (if present) can only be a string, map or list");
            }
        }

        InternalMessage internalMessage = InternalMessage.createMessage(getMessageStore(), header, body, message.isPersistent(), address);
        AutoCommitTransaction txn = new AutoCommitTransaction(getMessageStore());
        final InstanceProperties instanceProperties = prop ->
        {
            switch (prop)
            {
                case EXPIRATION:
                    Date expiration = message.getExpiration();
                    return expiration == null ? 0 : expiration.getTime();
                case IMMEDIATE:
                case MANDATORY:
                case REDELIVERED:
                    return false;
                case PERSISTENT:
                    return message.isPersistent();
                default:
                    return null;
            }
        };
        final RoutingResult<InternalMessage> result =
                destination.route(internalMessage, address, instanceProperties);
        return result.send(txn, null);

    }

    @Override
    protected <C extends ConfiguredObject> ListenableFuture<C> addChildAsync(Class<C> childClass,
                                                                             Map<String, Object> attributes)
    {
        checkVHostStateIsActive();
        return super.addChildAsync(childClass, attributes);
    }


    @Override
    public EventLogger getEventLogger()
    {
        return _eventLogger;
    }

    @Override
    public Map<String, Object> extractConfig(final boolean includeSecureAttributes)
    {
        return doSync(doOnConfigThread(new Task<ListenableFuture<Map<String,Object>>, RuntimeException>()
        {
            @Override
            public ListenableFuture<Map<String, Object>> execute() throws RuntimeException
            {
                ConfigurationExtractor configExtractor = new ConfigurationExtractor();
                Map<String, Object> config = configExtractor.extractConfig(AbstractVirtualHost.this,
                                                                           includeSecureAttributes);
                return Futures.immediateFuture(config);
            }

            @Override
            public String getObject()
            {
                return AbstractVirtualHost.this.toString();
            }

            @Override
            public String getAction()
            {
                return "extractConfig";
            }

            @Override
            public String getArguments()
            {
                return "includeSecureAttributes=" + String.valueOf(includeSecureAttributes);
            }
        }));
    }

    @Override
    public Content exportMessageStore()
    {
        return new MessageStoreContent();
    }

    private class MessageStoreContent implements Content, CustomRestHeaders
    {

        @Override
        public void write(final OutputStream outputStream) throws IOException
        {
            doSync(doOnConfigThread(new Task<ListenableFuture<Void>, IOException>()
            {
                @Override
                public ListenableFuture<Void> execute() throws IOException
                {
                    if (getState() != State.STOPPED)
                    {
                        throw new IllegalArgumentException(
                                "The exportMessageStore operation can only be called when the virtual host is stopped");
                    }

                    _messageStore.openMessageStore(AbstractVirtualHost.this);
                    try
                    {
                        final Map<UUID, String> queueMap = new HashMap<>();
                        getDurableConfigurationStore().reload(record ->
                        {
                            if(record.getType().equals(Queue.class.getSimpleName()))
                            {
                                queueMap.put(record.getId(), (String) record.getAttributes().get(ConfiguredObject.NAME));
                            }
                        });
                        MessageStoreSerializer serializer = new QpidServiceLoader().getInstancesByType(MessageStoreSerializer.class).get(MessageStoreSerializer.LATEST);
                        MessageStore.MessageStoreReader reader = _messageStore.newMessageStoreReader();
                        serializer.serialize(queueMap, reader, outputStream);
                    }
                    finally
                    {
                        _messageStore.closeMessageStore();
                    }
                    return Futures.immediateFuture(null);
                }

                @Override
                public String getObject()
                {
                    return AbstractVirtualHost.this.toString();
                }

                @Override
                public String getAction()
                {
                    return "exportMessageStore";
                }

                @Override
                public String getArguments()
                {
                    return null;
                }
            }));
        }

        @Override
        public void release()
        {
        }

        @RestContentHeader("Content-Type")
        public String getContentType()
        {
            return "application/octet-stream";
        }

        @SuppressWarnings("unused")
        @RestContentHeader("Content-Disposition")
        public String getContentDisposition()
        {
            try
            {

                String vhostName = getName();
                // replace all non-ascii and non-printable characters and all backslashes and percent encoded characters
                // as suggested by rfc6266 Appendix D
                String asciiName = vhostName.replaceAll("[^\\x20-\\x7E]", "?")
                                                 .replace('\\', '?')
                                                 .replaceAll("%[0-9a-fA-F]{2}", "?");
                String disposition = String.format("attachment; filename=\"%s_messages.bin\"; filename*=\"UTF-8''%s_messages.bin\"",
                                                   asciiName,
                                                   URLEncoder.encode(vhostName, StandardCharsets.UTF_8.name())
                                                   );
                return disposition;
            }
            catch (UnsupportedEncodingException e)
            {
                throw new RuntimeException("JVM does not support UTF8", e);
            }
        }

    }

    @Override
    public void importMessageStore(final String source)
    {
        try
        {
            final URL url = convertStringToURL(source);

            try (InputStream input = url.openStream();
                 BufferedInputStream bufferedInputStream = new BufferedInputStream(input);
                 DataInputStream data = new DataInputStream(bufferedInputStream))
            {

                final MessageStoreSerializer serializer = MessageStoreSerializer.FACTORY.newInstance(data);

                doSync(doOnConfigThread(new Task<ListenableFuture<Void>, IOException>()
                {
                    @Override
                    public ListenableFuture<Void> execute() throws IOException
                    {
                        if (getState() != State.STOPPED)
                        {
                            throw new IllegalArgumentException(
                                    "The importMessageStore operation can only be called when the virtual host is stopped");
                        }

                        try
                        {
                            _messageStore.openMessageStore(AbstractVirtualHost.this);
                            checkMessageStoreEmpty();
                            final Map<String, UUID> queueMap = new HashMap<>();
                            getDurableConfigurationStore().reload(record ->
                            {
                                if (record.getType().equals(Queue.class.getSimpleName()))
                                {
                                    queueMap.put((String) record.getAttributes().get(ConfiguredObject.NAME),
                                                 record.getId());
                                }
                            });

                            serializer.deserialize(queueMap, _messageStore, data);
                        }
                        finally
                        {
                            _messageStore.closeMessageStore();
                        }
                        return Futures.immediateFuture(null);
                    }

                    @Override
                    public String getObject()
                    {
                        return AbstractVirtualHost.this.toString();
                    }

                    @Override
                    public String getAction()
                    {
                        return "importMessageStore";
                    }

                    @Override
                    public String getArguments()
                    {
                        if (url.getProtocol().equalsIgnoreCase("http") || url.getProtocol().equalsIgnoreCase("https") || url.getProtocol().equalsIgnoreCase("file"))
                        {
                            return "source=" + source;
                        }
                        else if (url.getProtocol().equalsIgnoreCase("data"))
                        {
                            return "source=<data stream>";
                        }
                        else
                        {
                            return "source=<unknown source type>";
                        }
                    }
                }));
            }
        }
        catch (IOException e)
        {
            throw new IllegalConfigurationException("Cannot convert '" + source + "' to a readable resource", e);
        }
    }

    private void checkMessageStoreEmpty()
    {
        final MessageStore.MessageStoreReader reader = _messageStore.newMessageStoreReader();
        final StoreEmptyCheckingHandler handler = new StoreEmptyCheckingHandler();
        reader.visitMessages(handler);
        if(handler.isEmpty())
        {
            reader.visitMessageInstances(handler);

            if(handler.isEmpty())
            {
                reader.visitDistributedTransactions(handler);
            }
        }

        if(!handler.isEmpty())
        {
            throw new IllegalArgumentException("The message store is not empty");
        }
    }

    private static URL convertStringToURL(final String source)
    {
        URL url;
        try
        {
            url = new URL(source);
        }
        catch (MalformedURLException e)
        {
            File file = new File(source);
            try
            {
                url = file.toURI().toURL();
            }
            catch (MalformedURLException notAFile)
            {
                throw new IllegalConfigurationException("Cannot convert " + source + " to a readable resource",
                                                        notAFile);
            }
        }
        return url;
    }

    @Override
    public boolean authoriseCreateConnection(final AMQPConnection<?> connection)
    {
        authorise(Operation.PERFORM_ACTION("connect"));
        for(ConnectionValidator validator : _connectionValidators)
        {
            if(!validator.validateConnectionCreation(connection, this))
            {
                return false;
            }
        }
        return true;
    }

    private void initialiseStatisticsReporting()
    {
        long report = getStatisticsReportingPeriod() * 1000L;

        ScheduledFuture<?> previousStatisticsReportingFuture = _statisticsReportingFuture;
        if (previousStatisticsReportingFuture != null)
        {
            previousStatisticsReportingFuture.cancel(false);
        }
        if (report > 0L)
        {
            _statisticsReportingFuture = _houseKeepingTaskExecutor.scheduleAtFixedRate(new StatisticsReportingTask(this,
                                                                                                                   getSystemTaskSubject(
                                                                                                                           "Statistics", _principal)),
                                                                                       report,
                                                                                       report,
                                                                                       TimeUnit.MILLISECONDS);
        }
    }

    private void initialiseHouseKeeping()
    {
        final long period = getHousekeepingCheckPeriod();
        if (period > 0L)
        {
            scheduleHouseKeepingTask(period, new VirtualHostHouseKeepingTask());
        }
    }

    private void initialiseFlowToDiskChecking()
    {
        final long period = getFlowToDiskCheckPeriod();
        if (period > 0L)
        {
            scheduleHouseKeepingTask(period, new FlowToDiskCheckingTask());
        }
    }

    private void shutdownHouseKeeping()
    {
        if(_houseKeepingTaskExecutor != null)
        {
            _houseKeepingTaskExecutor.shutdown();

            try
            {
                if (!_houseKeepingTaskExecutor.awaitTermination(HOUSEKEEPING_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS))
                {
                    _houseKeepingTaskExecutor.shutdownNow();
                }
            }
            catch (InterruptedException e)
            {
                LOGGER.warn("Interrupted during Housekeeping shutdown:", e);
                Thread.currentThread().interrupt();
            }
            finally
            {
                _houseKeepingTaskExecutor = null;
            }
        }
    }

    private void closeNetworkConnectionScheduler()
    {
        if(_networkConnectionScheduler != null)
        {
            _networkConnectionScheduler.close();
            _networkConnectionScheduler = null;
        }
    }

    /**
     * Allow other broker components to register a HouseKeepingTask
     *
     * @param period How often this task should run, in ms.
     * @param task The task to run.
     */
    @Override
    public void scheduleHouseKeepingTask(long period, HouseKeepingTask task)
    {
        task.setFuture(_houseKeepingTaskExecutor.scheduleAtFixedRate(task, period / 2, period, TimeUnit.MILLISECONDS));
    }


    @Override
    public ScheduledFuture<?> scheduleTask(long delay, Runnable task)
    {
        return _houseKeepingTaskExecutor.schedule(task, delay, TimeUnit.MILLISECONDS);
    }

    @Override
    public void executeTask(final String name, final Runnable task, AccessControlContext context)
    {
        _houseKeepingTaskExecutor.execute(new HouseKeepingTask(name, this, context)
        {
            @Override
            public void execute()
            {
                task.run();
            }
        });
    }


    @Override
    public List<String> getEnabledConnectionValidators()
    {
        return _enabledConnectionValidators;
    }

    @Override
    public List<String> getDisabledConnectionValidators()
    {
        return _disabledConnectionValidators;
    }

    @Override
    public List<String> getGlobalAddressDomains()
    {
        return _globalAddressDomains;
    }

    @Override
    public List<NodeAutoCreationPolicy> getNodeAutoCreationPolicies()
    {
        return _nodeAutoCreationPolicies;
    }

    @Override
    public MessageSource getAttainedMessageSource(final String name)
    {
        MessageSource messageSource = _systemNodeSources.get(name);
        if(messageSource == null)
        {
            messageSource = getAttainedChildFromAddress(Queue.class, name);
        }
        if(messageSource == null)
        {
            messageSource = autoCreateNode(name, MessageSource.class, false);
        }
        return messageSource;
    }


    private <T> T autoCreateNode(final String name, final Class<T> clazz, boolean publish)
    {
        for (NodeAutoCreationPolicy policy : getNodeAutoCreationPolicies())
        {
            String pattern = policy.getPattern();
            if (name.matches(pattern) &&
                ((publish && policy.isCreatedOnPublish()) || (!publish && policy.isCreatedOnConsume())))
            {
                String nodeType = policy.getNodeType();
                Class<? extends ConfiguredObject> sourceClass = null;
                for (Class<? extends ConfiguredObject> childClass : getModel().getChildTypes(getCategoryClass()))
                {
                    if (childClass.getSimpleName().equalsIgnoreCase(nodeType.trim())
                        && clazz.isAssignableFrom(childClass))
                    {
                        sourceClass = childClass;
                    }
                }
                if (sourceClass != null)
                {
                    final Map<String, Object> attributes = policy.getAttributes() == null
                        ? new HashMap<>()
                        : new HashMap<>(policy.getAttributes());
                    attributes.remove(ConfiguredObject.ID);
                    attributes.put(ConfiguredObject.NAME, name);
                    final Class<? extends ConfiguredObject> childClass = sourceClass;
                    try
                    {

                        final T node =  Subject.doAs(getSubjectWithAddedSystemRights(), (PrivilegedAction<T>) () ->
                                (T) doSync(createChildAsync(childClass, attributes)));

                        if (node != null)
                        {
                            return node;
                        }

                    }
                    catch (AbstractConfiguredObject.DuplicateNameException e)
                    {
                        return (T)e.getExisting();
                    }
                    catch (RuntimeException e)
                    {
                        LOGGER.info("Unable to auto create a node named '{}' due to exception", name, e);
                    }
                }
            }

        }
        return null;

    }


    @Override
    public Queue<?> getAttainedQueue(UUID id)
    {
        return (Queue<?>) awaitChildClassToAttainState(Queue.class, id);
    }

    @Override
    public Queue<?> getAttainedQueue(final String name)
    {
        return (Queue<?>) awaitChildClassToAttainState(Queue.class, name);
    }

    @Override
    public Broker<?> getBroker()
    {
        return _broker;
    }

    @Override
    public MessageDestination getAttainedMessageDestination(final String name, final boolean mayCreate)
    {
        MessageDestination destination = _systemNodeDestinations.get(name);
        if(destination == null)
        {
            destination = getAttainedChildFromAddress(Exchange.class, name);
        }
        if(destination == null)
        {
            destination = getAttainedChildFromAddress(Queue.class, name);
        }
        if(destination == null && mayCreate)
        {
            destination = autoCreateNode(name, MessageDestination.class, true);
        }
        return destination;
    }

    @Override
    public MessageDestination getSystemDestination(final String name)
    {
        return _systemNodeDestinations.get(name);
    }

    @Override
    public ListenableFuture<Void> reallocateMessages()
    {
        final ScheduledThreadPoolExecutor houseKeepingTaskExecutor = _houseKeepingTaskExecutor;
        if (houseKeepingTaskExecutor != null)
        {
            try
            {
                final Future<Void> future = houseKeepingTaskExecutor.submit(() ->
                                                                            {
                                                                                final Collection<Queue> queues =
                                                                                        getChildren(Queue.class);
                                                                                for (Queue q : queues)
                                                                                {
                                                                                    if (q.getState() == State.ACTIVE)
                                                                                    {
                                                                                        q.reallocateMessages();
                                                                                    }
                                                                                }
                                                                                return null;
                                                                            });
                return JdkFutureAdapters.listenInPoolThread(future);
            }
            catch (RejectedExecutionException e)
            {
                if (!houseKeepingTaskExecutor.isShutdown())
                {
                    LOGGER.warn("Failed to schedule reallocation of messages", e);
                }
            }
        }
        return Futures.immediateFuture(null);
    }

    @Override
    public long getTotalDepthOfQueuesBytes()
    {
        long total = 0;
        final Collection<Queue> queues = getChildren(Queue.class);
        for(Queue q : queues)
        {
            total += q.getQueueDepthBytes();
        }
        return total;
    }

    @Override
    public long getTotalDepthOfQueuesMessages()
    {
        long total = 0;
        final Collection<Queue> queues = getChildren(Queue.class);
        for(Queue q : queues)
        {
            total += q.getQueueDepthMessages();
        }
        return total;
    }

    @Override
    public long getInMemoryMessageSize()
    {
        return _messageStore == null ? -1 : _messageStore.getInMemorySize();
    }

    @Override
    public long getBytesEvacuatedFromMemory()
    {
        return _messageStore == null ? -1 : _messageStore.getBytesEvacuatedFromMemory();
    }

    @Override
    public long getInMemoryMessageThreshold()
    {
        return getTargetSize();
    }

    @Override
    public <T extends ConfiguredObject<?>> T getAttainedChildFromAddress(final Class<T> childClass,
                                                                         final String address)
    {
        T child = awaitChildClassToAttainState(childClass, address);
        if(child == null && getGlobalAddressDomains() != null)
        {
            for(String domain : getGlobalAddressDomains())
            {
                if(address.startsWith(domain + "/"))
                {
                    child = awaitChildClassToAttainState(childClass, address.substring(domain.length()+1));
                    if(child != null)
                    {
                        break;
                    }
                }
            }
        }
        return child;
    }

    @Override
    public long getInboundMessageSizeHighWatermark()
    {
        return _maximumMessageSize.get();
    }

    @Override
    public MessageDestination getDefaultDestination()
    {
        return _defaultDestination;
    }

    @Override
    public void resetStatistics()
    {
        _totalConnectionCount.set(0L);
        _maximumMessageSize.set(0L);

        _bytesIn.set(0L);
        _bytesOut.set(0L);
        _messagesIn.set(0L);
        _messagesOut.set(0L);
        _transactedMessagesIn.set(0L);
        _transactedMessagesOut.set(0L);

        _messageStore.resetStatistics();

        getChildren(VirtualHostLogger.class).forEach(VirtualHostLogger::resetStatistics);
        getChildren(Queue.class).forEach(Queue::resetStatistics);
        getChildren(Exchange.class).forEach(Exchange::resetStatistics);
    }

    private ListenableFuture<Exchange<?>> addExchangeAsync(Map<String,Object> attributes)
            throws ReservedExchangeNameException,
                   NoFactoryForTypeException
    {
        final SettableFuture<Exchange<?>> returnVal = SettableFuture.create();
        addFutureCallback(getObjectFactory().createAsync(Exchange.class, attributes, this),
                            new FutureCallback<Exchange>()
                            {
                                @Override
                                public void onSuccess(final Exchange result)
                                {
                                    returnVal.set(result);
                                }

                              @Override
                              public void onFailure(final Throwable t)
                              {
                                  returnVal.setException(t);
                              }
                          }, getTaskExecutor());
        return returnVal;

    }

    @Override
    public String getLocalAddress(final String routingAddress)
    {
        if(getGlobalAddressDomains() != null)
        {
            for(String domain : getGlobalAddressDomains())
            {
                if(routingAddress.startsWith(domain + "/"))
                {
                    return routingAddress.substring(domain.length() + 1);
                }
            }
        }
        return routingAddress;
    }

    @Override
    protected ListenableFuture<Void> beforeClose()
    {
        return beforeDeleteOrClose();
    }

    @Override
    protected ListenableFuture<Void> onClose()
    {
        return onCloseOrDelete();
    }

    @Override
    protected ListenableFuture<Void> beforeDelete()
    {
        return beforeDeleteOrClose();
    }

    @Override
    protected ListenableFuture<Void> onDelete()
    {
        _deleteRequested  = true;
        return onCloseOrDelete();
    }

    private ListenableFuture<Void> beforeDeleteOrClose()
    {
        setState(State.UNAVAILABLE);
        _virtualHostLoggersToClose = new ArrayList<>(getChildren(VirtualHostLogger.class));
        //Stop Connections
        return closeConnections();
    }

    private ListenableFuture<Void> onCloseOrDelete()
    {
        _dtxRegistry.close();
        shutdownHouseKeeping();

        if (_deleteRequested)
        {
            deleteLinkRegistry();
        }

        closeMessageStore();
        stopPreferenceTaskExecutor();
        closePreferenceStore();

        if (_deleteRequested)
        {
            deleteMessageStore();
            deletePreferenceStore();
        }

        closeNetworkConnectionScheduler();
        _eventLogger.message(VirtualHostMessages.CLOSE(getName()));

        stopLogging(_virtualHostLoggersToClose);
        _systemNodeRegistry.close();

        closeConnectionLimiter();
        return Futures.immediateFuture(null);
    }


    private ListenableFuture<Void> closeConnections()
    {
        if (LOGGER.isDebugEnabled())
        {
            LOGGER.debug("Closing connection registry : {} connection(s).", _connections.size());
        }
        _acceptsConnections.set(false);
        for(AMQPConnection<?> conn : _connections)
        {
            conn.stopConnection();
        }

        List<ListenableFuture<Void>> connectionCloseFutures = new ArrayList<>();
        while (!_connections.isEmpty())
        {
            Iterator<AMQPConnection<?>> itr = _connections.iterator();
            while(itr.hasNext())
            {
                Connection<?> connection = itr.next();
                try
                {
                    connectionCloseFutures.add(connection.closeAsync());
                }
                catch (Exception e)
                {
                    LOGGER.warn("Exception closing connection " + connection.getName() + " from " + connection.getRemoteAddress(), e);
                }
                finally
                {
                    itr.remove();
                }
            }
        }
        ListenableFuture<List<Void>> combinedFuture = Futures.allAsList(connectionCloseFutures);
        return Futures.transform(combinedFuture, voids -> null, MoreExecutors.directExecutor());
    }

    private void closeMessageStore()
    {
        if (getMessageStore() != null)
        {
            try
            {
                if (_messageStoreRecoverer != null)
                {
                    _messageStoreRecoverer.cancel();
                }

                getMessageStore().closeMessageStore();

            }
            catch (StoreException e)
            {
                LOGGER.error("Failed to close message store", e);
            }

            if (!(_virtualHostNode.getConfigurationStore() instanceof MessageStoreProvider))
            {
                getEventLogger().message(getMessageStoreLogSubject(), MessageStoreMessages.CLOSED());
            }
        }
    }

    @Override
    public void registerMessageDelivered(long messageSize)
    {
        _messagesOut.incrementAndGet();
        _bytesOut.addAndGet(messageSize);
        _broker.registerMessageDelivered(messageSize);
        reportDirectMemoryBelowTargetIfReached();
    }

    @Override
    public void registerMessageReceived(long messageSize)
    {
        _messagesIn.incrementAndGet();
        _bytesIn.addAndGet(messageSize);
        _broker.registerMessageReceived(messageSize);
        long hwm;
        while((hwm = _maximumMessageSize.get()) < messageSize)
        {
            _maximumMessageSize.compareAndSet(hwm, messageSize);
        }
        reportDirectMemoryAboveTargetIfExceeded();
    }

    @Override
    public void registerTransactedMessageReceived()
    {
        _transactedMessagesIn.incrementAndGet();
        _broker.registerTransactedMessageReceived();
    }

    @Override
    public void registerTransactedMessageDelivered()
    {
        _transactedMessagesOut.incrementAndGet();
        _broker.registerTransactedMessageDelivered();
    }

    @Override
    public long getMessagesIn()
    {
        return _messagesIn.get();
    }

    @Override
    public long getBytesIn()
    {
        return _bytesIn.get();
    }

    @Override
    public long getMessagesOut()
    {
        return _messagesOut.get();
    }

    @Override
    public long getBytesOut()
    {
        return _bytesOut.get();
    }

    @Override
    public long getTransactedMessagesIn()
    {
        return _transactedMessagesIn.get();
    }

    @Override
    public long getTransactedMessagesOut()
    {
        return _transactedMessagesOut.get();
    }

    @Override
    public <T extends LinkModel> T getSendingLink( String remoteContainerId, String linkName)
    {
        return doSync(doOnConfigThread(new Task<ListenableFuture<T>, RuntimeException>()
        {
            @Override
            public ListenableFuture<T> execute()
            {
                return Futures.immediateFuture((T)_linkRegistry.getSendingLink(remoteContainerId, linkName));
            }

            @Override
            public String getObject()
            {
                return AbstractVirtualHost.this.toString();
            }

            @Override
            public String getAction()
            {
                return "getSendingLink";
            }

            @Override
            public String getArguments()
            {
                return String.format("remoteContainerId='%s', linkName='%s'", remoteContainerId, linkName);
            }
        }));
    }

    @Override
    public <T extends LinkModel> T getReceivingLink(String remoteContainerId, String linkName)
    {
        return doSync(doOnConfigThread(new Task<ListenableFuture<T>, RuntimeException>()
        {
            @Override
            public ListenableFuture<T> execute()
            {
                return Futures.immediateFuture((T)_linkRegistry.getReceivingLink(remoteContainerId, linkName));
            }

            @Override
            public String getObject()
            {
                return AbstractVirtualHost.this.toString();
            }

            @Override
            public String getAction()
            {
                return "getReceivingLink";
            }

            @Override
            public String getArguments()
            {
                return String.format("remoteContainerId='%s', linkName='%s'", remoteContainerId, linkName);
            }
        }));
    }

    @Override
    public <T extends LinkModel> Collection<T> findSendingLinks(final Pattern containerIdPattern,
                                                                final Pattern linkNamePattern)
    {
        return doSync(doOnConfigThread(new Task<ListenableFuture<Collection<T>>, RuntimeException>()
        {
            @Override
            public ListenableFuture<Collection<T>> execute()
            {
                return Futures.immediateFuture(_linkRegistry.findSendingLinks(containerIdPattern, linkNamePattern));
            }

            @Override
            public String getObject()
            {
                return AbstractVirtualHost.this.toString();
            }

            @Override
            public String getAction()
            {
                return "findSendingLinks";
            }

            @Override
            public String getArguments()
            {
                return String.format("containerIdPattern='%s', linkNamePattern='%s'", containerIdPattern, linkNamePattern);
            }
        }));
    }

    @Override
    public <T extends LinkModel> void visitSendingLinks(final LinkRegistryModel.LinkVisitor<T> visitor)
    {
        doSync(doOnConfigThread(new Task<ListenableFuture<Void>, RuntimeException>()
        {
            @Override
            public ListenableFuture<Void> execute()
            {
                _linkRegistry.visitSendingLinks(visitor);
                return Futures.immediateFuture(null);
            }

            @Override
            public String getObject()
            {
                return AbstractVirtualHost.this.toString();
            }

            @Override
            public String getAction()
            {
                return "visitSendingLinks";
            }

            @Override
            public String getArguments()
            {
                return String.format("visitor='%s'", visitor);
            }
        }));
    }

    @Override
    public DtxRegistry getDtxRegistry()
    {
        return _dtxRegistry;
    }

    private void block(BlockingType blockingType)
    {
        synchronized (_connections)
        {
            _blockingReasons.add(blockingType);
            if(_blocked.compareAndSet(false,true))
            {
                for(AMQPConnection<?> conn : _connections)
                {
                    conn.block();
                }
            }
        }
    }


    private void unblock(BlockingType blockingType)
    {

        synchronized (_connections)
        {
            _blockingReasons.remove(blockingType);
            if(_blockingReasons.isEmpty() && _blocked.compareAndSet(true,false))
            {
                for(AMQPConnection<?> conn : _connections)
                {
                    conn.unblock();
                }
            }
        }
    }

    @Override
    public void event(final Event event)
    {
        switch(event)
        {
            case PERSISTENT_MESSAGE_SIZE_OVERFULL:
                block(BlockingType.STORE);
                _eventLogger.message(getMessageStoreLogSubject(), MessageStoreMessages.OVERFULL());
                break;
            case PERSISTENT_MESSAGE_SIZE_UNDERFULL:
                unblock(BlockingType.STORE);
                _eventLogger.message(getMessageStoreLogSubject(), MessageStoreMessages.UNDERFULL());
                break;
        }
    }

    private void reportIfError(State state)
    {
        if (state == State.ERRORED)
        {
            _eventLogger.message(VirtualHostMessages.ERRORED(getName()));
        }
    }

    @Override
    public String getRedirectHost(final AmqpPort<?> port)
    {
        return null;
    }

    @Override
    public boolean isOverTargetSize()
    {
        return getInMemoryMessageSize() > _targetSize.get();
    }

    private static class MessageHeaderImpl implements AMQMessageHeader
    {
        private final String _userName;
        private final long _timestamp;
        private final ManageableMessage _message;

        public MessageHeaderImpl(final ManageableMessage message)
        {
            _message = message;
            final AuthenticatedPrincipal currentUser = AuthenticatedPrincipal.getCurrentUser();
            _userName = (currentUser == null ? null : currentUser.getName());
            _timestamp = System.currentTimeMillis();
        }

        @Override
        public String getCorrelationId()
        {
            return _message.getCorrelationId();
        }

        @Override
        public long getExpiration()
        {
            Date expiration = _message.getExpiration();
            return expiration == null ? 0 : expiration.getTime();
        }

        @Override
        public String getUserId()
        {
            return _userName;
        }

        @Override
        public String getAppId()
        {
            return null;
        }

        @Override
        public String getGroupId()
        {
            Object jmsXGroupId = getHeader("JMSXGroupID");
            return jmsXGroupId == null ? null : String.valueOf(jmsXGroupId);
        }

        @Override
        public String getMessageId()
        {
            return _message.getMessageId();
        }

        @Override
        public String getMimeType()
        {
            return _message.getMimeType();
        }

        @Override
        public String getEncoding()
        {
            return _message.getEncoding();
        }

        @Override
        public byte getPriority()
        {
            return (byte) _message.getPriority();
        }

        @Override
        public long getTimestamp()
        {
            return _timestamp;
        }

        @Override
        public long getNotValidBefore()
        {
            final Date notValidBefore = _message.getNotValidBefore();
            return notValidBefore == null ? 0 : notValidBefore.getTime();
        }

        @Override
        public String getType()
        {
            return null;
        }

        @Override
        public String getReplyTo()
        {
            return _message.getReplyTo();
        }

        @Override
        public Object getHeader(final String name)
        {
            return getHeaders().get(name);
        }

        @Override
        public boolean containsHeaders(final Set<String> names)
        {
            return getHeaders().keySet().containsAll(names);
        }

        @Override
        public boolean containsHeader(final String name)
        {
            return getHeaders().keySet().contains(name);
        }

        @Override
        public Collection<String> getHeaderNames()
        {
            return Collections.unmodifiableCollection(getHeaders().keySet());
        }

        private Map<String, Object> getHeaders()
        {
            return _message.getHeaders() == null ? Map.of() : _message.getHeaders();
        }
    }

    private class VirtualHostHouseKeepingTask extends HouseKeepingTask
    {
        public VirtualHostHouseKeepingTask()
        {
            super("Housekeeping["+AbstractVirtualHost.this.getName()+"]",AbstractVirtualHost.this,_housekeepingJobContext);
        }

        @Override
        public void execute()
        {
            for (Queue<?> q : getChildren(Queue.class))
            {
                if (q.getState() == State.ACTIVE)
                {
                    LOGGER.debug("Checking message status for queue: {}", q.getName());
                    q.checkMessageStatus();
                }
            }
        }
    }

    class FlowToDiskCheckingTask extends HouseKeepingTask
    {
        public FlowToDiskCheckingTask()
        {
            super("FlowToDiskChecking["+AbstractVirtualHost.this.getName()+"]", AbstractVirtualHost.this, _housekeepingJobContext);
        }

        @Override
        public void execute()
        {
            if (isOverTargetSize())
            {
                long currentTargetSize = _targetSize.get();
                reportDirectMemoryAboveTargetIfExceeded(currentTargetSize,
                                                        AbstractVirtualHost.this.getInMemoryMessageSize());
                List<QueueEntryIterator> queueIterators = new ArrayList<>();
                for (Queue<?> q : getChildren(Queue.class))
                {
                    queueIterators.add(q.queueEntryIterator());
                }
                Collections.shuffle(queueIterators);

                long cumulativeSize = 0;
                final Iterator<QueueEntryIterator> cyclicIterators = cycle(queueIterators);
                while (cyclicIterators.hasNext())
                {
                    final QueueEntryIterator queueIterator = cyclicIterators.next();
                    if (queueIterator.advance())
                    {
                        QueueEntry node = queueIterator.getNode();
                        if (node != null && !node.isDeleted())
                        {
                            try (MessageReference messageReference = node.getMessage().newReference())
                            {
                                final StoredMessage storedMessage = messageReference.getMessage().getStoredMessage();
                                final long inMemorySize = storedMessage.getInMemorySize();
                                if (inMemorySize > 0)
                                {
                                    if (cumulativeSize <= currentTargetSize)
                                    {
                                        cumulativeSize += inMemorySize;
                                    }

                                    if (cumulativeSize > currentTargetSize && node.getQueue().checkValid(node))
                                    {
                                        storedMessage.flowToDisk();
                                    }
                                }
                            }
                            catch (MessageDeletedException e)
                            {
                                // pass
                            }
                        }
                    }
                    else
                    {
                        cyclicIterators.remove();
                    }
                }
                reportDirectMemoryBelowTargetIfReached(cumulativeSize,
                                                       AbstractVirtualHost.this.getInMemoryMessageSize());
            }
        }
    }

    private class SystemNodeRegistry implements SystemNodeCreator.SystemNodeRegistry
    {
        @Override
        public void registerSystemNode(final MessageNode node)
        {
            if(node instanceof MessageDestination)
            {
                _systemNodeDestinations.put(node.getName(), (MessageDestination) node);
            }
            if(node instanceof MessageSource)
            {
                _systemNodeSources.put(node.getName(), (MessageSource)node);
            }
        }

        @Override
        public void removeSystemNode(final MessageNode node)
        {
            if(node instanceof MessageDestination)
            {
                _systemNodeDestinations.remove(node.getName());
            }
            if(node instanceof MessageSource)
            {
                removeMessageSource(node.getName());
            }
        }

        private void removeMessageSource(final String name)
        {
            MessageSource messageSource = _systemNodeSources.remove(name);
            if (messageSource != null)
            {
                messageSource.close();
            }
        }

        @Override
        public void removeSystemNode(final String name)
        {
            _systemNodeDestinations.remove(name);
            removeMessageSource(name);
        }

        @Override
        public VirtualHostNode<?> getVirtualHostNode()
        {
            return (VirtualHostNode) getParent();
        }

        @Override
        public VirtualHost<?> getVirtualHost()
        {
            return AbstractVirtualHost.this;
        }

        @Override
        public boolean hasSystemNode(final String name)
        {
            return _systemNodeSources.containsKey(name) || _systemNodeDestinations.containsKey(name);
        }

        public void close()
        {
            _systemNodeSources.values().forEach(MessageSource::close);
        }
    }


    @Override
    public void executeTransaction(TransactionalOperation op)
    {
        final MessageStore store = getMessageStore();
        final LocalTransaction txn = new LocalTransaction(store);

        op.withinTransaction(new Transaction()
        {
            @Override
            public void dequeue(final QueueEntry messageInstance)
            {
                final ServerTransaction.Action deleteAction = new ServerTransaction.Action()
                {
                    @Override
                    public void postCommit()
                    {
                        messageInstance.delete();
                    }

                    @Override
                    public void onRollback()
                    {
                    }
                };

                boolean acquired = messageInstance.acquireOrSteal(() ->
                {
                    ServerTransaction txn1 = new AutoCommitTransaction(store);
                    txn1.dequeue(messageInstance.getEnqueueRecord(), deleteAction);
                });
                if(acquired)
                {
                    txn.dequeue(messageInstance.getEnqueueRecord(), deleteAction);
                }
            }

            @Override
            public void copy(QueueEntry entry, final Queue<?> queue)
            {
                final ServerMessage message = entry.getMessage();

                txn.enqueue(queue, message, new ServerTransaction.EnqueueAction()
                {
                    @Override
                    public void postCommit(MessageEnqueueRecord... records)
                    {
                        queue.enqueue(message, null, records[0]);
                    }

                    @Override
                    public void onRollback()
                    {
                    }
                });

            }

            @Override
            public void move(final QueueEntry entry, final Queue<?> queue)
            {
                final ServerMessage message = entry.getMessage();
                if(entry.acquire())
                {
                    txn.enqueue(queue, message,
                                new ServerTransaction.EnqueueAction()
                                {

                                    @Override
                                    public void postCommit(MessageEnqueueRecord... records)
                                    {
                                        queue.enqueue(message, null, records[0]);
                                    }

                                    @Override
                                    public void onRollback()
                                    {
                                        entry.release();
                                    }
                                });
                    txn.dequeue(entry.getEnqueueRecord(),
                                new ServerTransaction.Action()
                                {

                                    @Override
                                    public void postCommit()
                                    {
                                        entry.delete();
                                    }

                                    @Override
                                    public void onRollback()
                                    {

                                    }
                                });

                }
            }

        });
        txn.commit();
    }

    @Override
    public long getHousekeepingCheckPeriod()
    {
        return _housekeepingCheckPeriod;
    }

    @Override
    public long getFlowToDiskCheckPeriod()
    {
        return _flowToDiskCheckPeriod;
    }

    @Override
    public boolean isDiscardGlobalSharedSubscriptionLinksOnDetach()
    {
        return _isDiscardGlobalSharedSubscriptionLinksOnDetach;
    }

    @Override
    public long getStoreTransactionIdleTimeoutClose()
    {
        return _storeTransactionIdleTimeoutClose;
    }

    @Override
    public long getStoreTransactionIdleTimeoutWarn()
    {
        return _storeTransactionIdleTimeoutWarn;
    }

    @Override
    public long getStoreTransactionOpenTimeoutClose()
    {
        return _storeTransactionOpenTimeoutClose;
    }

    @Override
    public long getStoreTransactionOpenTimeoutWarn()
    {
        return _storeTransactionOpenTimeoutWarn;
    }

    @Override
    public long getQueueCount()
    {
        return getChildren(Queue.class).size();
    }

    @Override
    public long getExchangeCount()
    {
        return getChildren(Exchange.class).size();
    }

    @Override
    public long getConnectionCount()
    {
        return _connections.size();
    }

    @Override
    public long getTotalConnectionCount()
    {
        return _totalConnectionCount.get();
    }

    @Override
    public int getHousekeepingThreadCount()
    {
        return _housekeepingThreadCount;
    }

    @Override
    public int getStatisticsReportingPeriod()
    {
        return _statisticsReportingPeriod;
    }

    @Override
    public int getConnectionThreadPoolSize()
    {
        return _connectionThreadPoolSize;
    }

    @Override
    public int getNumberOfSelectors()
    {
        return _numberOfSelectors;
    }

    @StateTransition( currentState = { State.UNINITIALIZED, State.ACTIVE, State.ERRORED }, desiredState = State.STOPPED )
    protected ListenableFuture<Void> doStop()
    {
        final List<VirtualHostLogger> loggers = new ArrayList<>(getChildren(VirtualHostLogger.class));
        return doAfter(closeConnections(), () -> closeChildren()).then(() ->
        {
            shutdownHouseKeeping();
            closeNetworkConnectionScheduler();
            if (_linkRegistry != null)
            {
                _linkRegistry.close();
            }
            closeMessageStore();
            stopPreferenceTaskExecutor();
            closePreferenceStore();
            setState(State.STOPPED);
            stopLogging(loggers);
            closeConnectionLimiter();
        });
    }

    @Override
    public UserPreferences createUserPreferences(ConfiguredObject<?> object)
    {
        if (_preferenceTaskExecutor == null || !_preferenceTaskExecutor.isRunning())
        {
            throw new IllegalStateException("Cannot create user preferences in not fully initialized virtual host");
        }
        return new UserPreferencesImpl(_preferenceTaskExecutor, object, _preferenceStore, Set.of());
    }

    private void stopPreferenceTaskExecutor()
    {
        if (_preferenceTaskExecutor != null)
        {
            _preferenceTaskExecutor.stop();
            _preferenceTaskExecutor = null;
        }
    }

    private void closePreferenceStore()
    {
        if (_preferenceStore != null)
        {
            _preferenceStore.close();
        }
    }

    private void stopLogging(Collection<VirtualHostLogger> loggers)
    {
        for (VirtualHostLogger logger : loggers)
        {
            logger.stopLogging();
        }
    }

    private void deleteLinkRegistry()
    {
        if (_linkRegistry != null)
        {
            _linkRegistry.delete();
            _linkRegistry = null;
        }
    }

    private void deletePreferenceStore()
    {
        final PreferenceStore ps = _preferenceStore;
        if (ps != null)
        {
            try
            {
                ps.onDelete();
            }
            catch (Exception e)
            {
                LOGGER.warn("Exception occurred on preference store deletion", e);
            }
            finally
            {
                _preferenceStore = null;

            }
        }
    }

    private void deleteMessageStore()
    {
        MessageStore ms = _messageStore;
        if (ms != null)
        {
            try
            {
                ms.onDelete(AbstractVirtualHost.this);
            }
            catch (Exception e)
            {
                LOGGER.warn( "Exception occurred on message store deletion", e);
            }
            finally
            {
                _messageStore = null;
            }
        }
    }

    @Override
    public String getModelVersion()
    {
        return BrokerModel.MODEL_VERSION;
    }

    @Override
    public String getProductVersion()
    {
        return _broker.getProductVersion();
    }

    @Override
    public DurableConfigurationStore getDurableConfigurationStore()
    {
        return _virtualHostNode.getConfigurationStore();
    }

    @Override
    public void setTargetSize(final long targetSize)
    {
        _targetSize.set(targetSize);
        final long inMemoryMessageSize = getInMemoryMessageSize();
        reportDirectMemoryAboveTargetIfExceeded(targetSize, inMemoryMessageSize);
        reportDirectMemoryBelowTargetIfReached(targetSize, inMemoryMessageSize);
    }

    @Override
    public long getTargetSize()
    {
        return _targetSize.get();
    }

    @Override
    public Principal getPrincipal()
    {
        return _principal;
    }

    @Override
    public void registerConnection(final AMQPConnection<?> connection)
    {
        if (!_acceptsConnections.get())
        {
            throw new VirtualHostUnavailableException(String.format(
                    "VirtualHost '%s' not accepting connections",
                    getName()));
        }
        _connectionLimiter.register(connection);
        _connections.add(connection);
        _totalConnectionCount.incrementAndGet();
        if (_blocked.get())
        {
            connection.block();
        }
        connection.pushScheduler(_networkConnectionScheduler);
    }

    @Override
    public void deregisterConnection(final AMQPConnection<?> connection)
    {
        try
        {
            _connectionLimiter.deregister(connection);
        }
        finally
        {
            connection.popScheduler();
            _connections.remove(connection);
        }
    }

    @StateTransition(currentState = {State.UNINITIALIZED, State.ERRORED}, desiredState = State.ACTIVE)
    private ListenableFuture<Void> onActivate()
    {

        long threadPoolKeepAliveTimeout = getContextValue(Long.class, CONNECTION_THREAD_POOL_KEEP_ALIVE_TIMEOUT);

        final SuppressingInheritedAccessControlContextThreadFactory connectionThreadFactory =
                new SuppressingInheritedAccessControlContextThreadFactory("virtualhost-" + getName() + "-iopool",
                                                                          getSystemTaskSubject("IO Pool", getPrincipal()));

        _networkConnectionScheduler = new NetworkConnectionScheduler("virtualhost-" + getName() + "-iopool",
                                                                     getNumberOfSelectors(),
                                                                     getConnectionThreadPoolSize(),
                                                                     threadPoolKeepAliveTimeout,
                                                                     connectionThreadFactory);
        _networkConnectionScheduler.start();

        updateAccessControl();
        initialiseStatisticsReporting();

        MessageStore messageStore = getMessageStore();
        messageStore.openMessageStore(this);

        startFileSystemSpaceChecking();


        if (!(_virtualHostNode.getConfigurationStore() instanceof MessageStoreProvider))
        {
            getEventLogger().message(getMessageStoreLogSubject(), MessageStoreMessages.CREATED());
            getEventLogger().message(getMessageStoreLogSubject(), MessageStoreMessages.STORE_LOCATION(messageStore.getStoreLocation()));
        }

        messageStore.upgradeStoreStructure();

        if (_linkRegistry != null)
        {
            _linkRegistry.open();
        }

        getBroker().assignTargetSizes();

        final PreferenceStoreUpdater updater = new PreferenceStoreUpdaterImpl();
        Collection<PreferenceRecord> records = _preferenceStore.openAndLoad(updater);
        _preferenceTaskExecutor = new TaskExecutorImpl("virtualhost-" + getName() + "-preferences", null);
        _preferenceTaskExecutor.start();
        PreferencesRecoverer preferencesRecoverer = new PreferencesRecoverer(_preferenceTaskExecutor);
        preferencesRecoverer.recoverPreferences(this, records, _preferenceStore);

        activateConnectionLimiter();

        if (_createDefaultExchanges)
        {
            return doAfter(createDefaultExchanges(), () ->
            {
                _createDefaultExchanges = false;
                postCreateDefaultExchangeTasks();
            });
        }
        else
        {
            postCreateDefaultExchangeTasks();
            return Futures.immediateFuture(null);
        }
    }

    private void postCreateDefaultExchangeTasks()
    {
        if(getContextValue(Boolean.class, USE_ASYNC_RECOVERY))
        {
            _messageStoreRecoverer = new AsynchronousMessageStoreRecoverer();
        }
        else
        {
           _messageStoreRecoverer = new SynchronousMessageStoreRecoverer();
        }

        // propagate any exception thrown during recovery into HouseKeepingTaskExecutor to handle them accordingly
        // TODO if message recovery fails we ought to be transitioning the VH into ERROR and releasing the thread-pools etc.
        final ListenableFuture<Void> recoveryResult = _messageStoreRecoverer.recover(this);
        recoveryResult.addListener(() -> Futures.getUnchecked(recoveryResult), _houseKeepingTaskExecutor);

        State finalState = State.ERRORED;
        try
        {
            initialiseHouseKeeping();
            initialiseFlowToDiskChecking();
            finalState = State.ACTIVE;
            _acceptsConnections.set(true);
        }
        finally
        {
            setState(finalState);
            reportIfError(getState());
        }
    }

    @Override
    protected void logOperation(final String operation)
    {
        getEventLogger().message(VirtualHostMessages.OPERATION(operation));
    }

    protected void startFileSystemSpaceChecking()
    {
        long housekeepingCheckPeriod = getHousekeepingCheckPeriod();
        File storeLocationAsFile = _messageStore.getStoreLocationAsFile();
        if (storeLocationAsFile != null && _fileSystemMaxUsagePercent > 0 && housekeepingCheckPeriod > 0)
        {
            _fileSystemSpaceChecker.setFileSystem(storeLocationAsFile);

            scheduleHouseKeepingTask(housekeepingCheckPeriod, _fileSystemSpaceChecker);
        }
    }

    @Override
    public SocketConnectionMetaData getConnectionMetaData()
    {
        return getBroker().getConnectionMetaData();
    }

    @StateTransition( currentState = { State.STOPPED }, desiredState = State.ACTIVE )
    private ListenableFuture<Void> onRestart()
    {
        final SettableFuture<Void> returnVal = SettableFuture.create();
        try
        {
            addFutureCallback(doRestart(), new FutureCallback<>()
                              {
                                  @Override
                                  public void onSuccess(final Void result)
                                  {
                                      returnVal.set(null);
                                  }

                                  @Override
                                  public void onFailure(final Throwable t)
                                  {
                                      doAfterAlways(onRestartFailure(), () -> returnVal.setException(t));
                                  }
                              }, getTaskExecutor()
                             );
        }
        catch (IllegalArgumentException | IllegalConfigurationException e)
        {
            doAfterAlways(onRestartFailure(), ()-> returnVal.setException(e));
        }
        return returnVal;
    }

    private ListenableFuture<Void> doRestart()
    {
        createHousekeepingExecutor();

        final VirtualHostStoreUpgraderAndRecoverer virtualHostStoreUpgraderAndRecoverer =
                new VirtualHostStoreUpgraderAndRecoverer((VirtualHostNode<?>) getParent());
        virtualHostStoreUpgraderAndRecoverer.reloadAndRecoverVirtualHost(getDurableConfigurationStore());

        final Collection<VirtualHostAccessControlProvider> accessControlProviders = getChildren(VirtualHostAccessControlProvider.class);
        if (!accessControlProviders.isEmpty())
        {
            accessControlProviders.forEach(child -> child.addChangeListener(_accessControlProviderListener));
        }

        final List<ListenableFuture<Void>> childOpenFutures = new ArrayList<>();

        Subject.doAs(getSubjectWithAddedSystemRights(), (PrivilegedAction<Object>) () ->
        {
            applyToChildren(child ->
                            {
                                final ListenableFuture<Void> childOpenFuture = child.openAsync();
                                childOpenFutures.add(childOpenFuture);

                                addFutureCallback(childOpenFuture, new FutureCallback<>()
                                {
                                    @Override
                                    public void onSuccess(final Void result)
                                    {
                                    }

                                    @Override
                                    public void onFailure(final Throwable t)
                                    {
                                        LOGGER.error("Exception occurred while opening {} : {}",
                                                     child.getClass().getSimpleName(), child.getName(), t);
                                        onRestartFailure();
                                    }
                                }, getTaskExecutor());
                            });
            return null;
        });

        ListenableFuture<List<Void>> combinedFuture = Futures.allAsList(childOpenFutures);
        return Futures.transformAsync(combinedFuture, input -> onActivate(), MoreExecutors.directExecutor());
    }

    private ChainedListenableFuture<Void> onRestartFailure()
    {
        final List<VirtualHostLogger> loggers = new ArrayList<>(getChildren(VirtualHostLogger.class));
        return doAfter(closeChildren(), () -> {
            shutdownHouseKeeping();
            closeNetworkConnectionScheduler();
            if (_linkRegistry != null)
            {
                _linkRegistry.close();
            }
            closeMessageStore();
            stopPreferenceTaskExecutor();
            closePreferenceStore();
            setState(State.ERRORED);
            stopLogging(loggers);
            closeConnectionLimiter();
        });
    }

    private class FileSystemSpaceChecker extends HouseKeepingTask
    {
        private boolean _fileSystemFull;
        private File _fileSystem;

        public FileSystemSpaceChecker()
        {
            super("FileSystemSpaceChecker["+AbstractVirtualHost.this.getName()+"]",AbstractVirtualHost.this,_fileSystemSpaceCheckerJobContext);
        }

        @Override
        public void execute()
        {
            long totalSpace = _fileSystem.getTotalSpace();
            long freeSpace = _fileSystem.getFreeSpace();

            if (totalSpace == 0)
            {
                LOGGER.warn("Cannot check file system for disk space because store path '{}' is not valid", _fileSystem.getPath());
                return;
            }

            long usagePercent = (100L * (totalSpace - freeSpace)) / totalSpace;

            if (_fileSystemFull && (usagePercent < _fileSystemMaxUsagePercent))
            {
                _fileSystemFull = false;
                getEventLogger().message(getMessageStoreLogSubject(), VirtualHostMessages.FILESYSTEM_NOTFULL(
                        _fileSystemMaxUsagePercent));
                unblock(BlockingType.FILESYSTEM);
            }
            else if(!_fileSystemFull && usagePercent > _fileSystemMaxUsagePercent)
            {
                _fileSystemFull = true;
                getEventLogger().message(getMessageStoreLogSubject(), VirtualHostMessages.FILESYSTEM_FULL(
                        _fileSystemMaxUsagePercent));
                block(BlockingType.FILESYSTEM);
            }

        }

        public void setFileSystem(final File fileSystem)
        {
            _fileSystem = fileSystem;
        }
    }

    @Override
    public <T extends MessageSource> T createMessageSource(final Class<T> clazz, final Map<String, Object> attributes)
    {
        if(Queue.class.isAssignableFrom(clazz))
        {
            return (T) createChild((Class<? extends Queue>)clazz, attributes);
        }
        else if(clazz.isAssignableFrom(Queue.class))
        {
            return (T) createChild(Queue.class, attributes);
        }
        else
        {
            throw new IllegalArgumentException("Cannot create message source children of class " + clazz.getSimpleName());
        }
    }

    @Override
    public <T extends MessageDestination> T createMessageDestination(final Class<T> clazz,
                                                                     final Map<String, Object> attributes)
    {
        if(Exchange.class.isAssignableFrom(clazz))
        {
            return (T) createChild((Class<? extends Exchange>)clazz, attributes);
        }
        else if(Queue.class.isAssignableFrom(clazz))
        {
            return (T) createChild((Class<? extends Queue>)clazz, attributes);
        }
        else if(clazz.isAssignableFrom(Queue.class))
        {
            return (T) createChild(Queue.class, attributes);
        }
        else
        {
            throw new IllegalArgumentException("Cannot create message destination children of class " + clazz.getSimpleName());
        }
    }

    @Override
    public boolean hasMessageSources()
    {
        return !(_systemNodeSources.isEmpty() && getChildren(Queue.class).isEmpty());
    }

    @Override
    @DoOnConfigThread
    public Queue<?> getSubscriptionQueue(@Param(name = "exchangeName", mandatory = true) final String exchangeName,
                                         @Param(name = "attributes", mandatory = true) final Map<String, Object> attributes,
                                         @Param(name = "bindings", mandatory = true) final Map<String, Map<String, Object>> bindings)
    {
        Queue queue;
        Object exclusivityPolicy = attributes.get(Queue.EXCLUSIVE);
        if (exclusivityPolicy == null)
        {
            exclusivityPolicy = getContextValue(ExclusivityPolicy.class, Queue.QUEUE_DEFAULT_EXCLUSIVITY_POLICY);
        }
        if (!(exclusivityPolicy instanceof ExclusivityPolicy))
        {
            throw new IllegalArgumentException("Exclusivity policy is required");
        }
        Exchange<?> exchange = findConfiguredObject(Exchange.class, exchangeName);
        if (exchange == null)
        {
            throw new NotFoundException(String.format("Exchange '%s' was not found", exchangeName));
        }
        try
        {
            queue = createMessageDestination(Queue.class, attributes);

            for (String binding : bindings.keySet())
            {
                exchange.addBinding(binding, queue, bindings.get(binding));
            }
        }
        catch (AbstractConfiguredObject.DuplicateNameException e)
        {
            Queue<?> existingQueue = (Queue) e.getExisting();

            if (existingQueue.getExclusive() == exclusivityPolicy)
            {
                if (hasDifferentBindings(exchange, existingQueue, bindings))
                {
                    if (existingQueue.getConsumers().isEmpty())
                    {
                        existingQueue.delete();

                        queue = createMessageDestination(Queue.class, attributes);

                        for (String binding : bindings.keySet())
                        {
                            try
                            {
                                exchange.addBinding(binding, queue, bindings.get(binding));
                            }
                            catch (AMQInvalidArgumentException ia)
                            {
                                throw new IllegalArgumentException("Unexpected bind argument : " + ia.getMessage(), ia);
                            }
                        }
                    }
                    else
                    {
                        throw new IllegalStateException("subscription already in use");
                    }
                }
                else
                {
                    queue = existingQueue;
                }
            }
            else
            {
                throw new IllegalStateException("subscription already in use");
            }
        }
        catch (AMQInvalidArgumentException e)
        {
            throw new IllegalArgumentException("Unexpected bind argument : " + e.getMessage(), e);
        }
        return queue;
    }

    @Override
    @DoOnConfigThread
    public void removeSubscriptionQueue(@Param(name = "queueName", mandatory = true) final String queueName) throws NotFoundException
    {
        Queue<?> queue = findConfiguredObject(Queue.class, queueName);
        if (queue == null)
        {
            throw new NotFoundException(String.format("Queue '%s' was not found", queueName));
        }

        if (queue.getConsumers().isEmpty())
        {
            queue.delete();
        }
        else
        {
            throw new IllegalStateException("There are consumers on Queue '" + queueName + "'");
        }
    }

    @Override
    public Object dumpLinkRegistry()
    {
        return doSync(doOnConfigThread(new Task<ListenableFuture<Object>, IOException>()
        {
            @Override
            public ListenableFuture<Object> execute() throws IOException
            {
                Object dump;
                if (getState() == State.STOPPED)
                {
                    _messageStore.openMessageStore(AbstractVirtualHost.this);
                    try
                    {
                        _linkRegistry.open();
                        try
                        {
                            dump = _linkRegistry.dump();
                        }
                        finally
                        {
                            _linkRegistry.close();
                        }
                    }
                    finally
                    {
                        _messageStore.closeMessageStore();
                    }
                }
                else if (getState() == State.ACTIVE)
                {
                    dump = _linkRegistry.dump();
                }
                else
                {
                    throw new IllegalStateException("The dumpLinkRegistry operation can only be called when the virtual host is active or stopped.");
                }
                return Futures.immediateFuture(dump);
            }

            @Override
            public String getObject()
            {
                return AbstractVirtualHost.this.toString();
            }

            @Override
            public String getAction()
            {
                return "dumpLinkRegistry";
            }

            @Override
            public String getArguments()
            {
                return null;
            }
        }));
    }

    @Override
    public void purgeLinkRegistry(final String containerIdPatternString, final String role, final String linkNamePatternString)
    {
        doSync(doOnConfigThread(new Task<ListenableFuture<Void>, IOException>()
        {
            @Override
            public ListenableFuture<Void> execute() throws IOException
            {
                if (getState() != State.STOPPED)
                {
                    throw new IllegalArgumentException(
                            "The purgeLinkRegistry operation can only be called when the virtual host is stopped.");
                }
                Pattern containerIdPattern = Pattern.compile(containerIdPatternString);
                Pattern linkNamePattern = Pattern.compile(linkNamePatternString);

                _messageStore.openMessageStore(AbstractVirtualHost.this);
                try
                {
                    _linkRegistry.open();
                    try
                    {
                        if ("SENDER".equals(role) || "BOTH".equals(role))
                        {
                            _linkRegistry.purgeSendingLinks(containerIdPattern, linkNamePattern);
                        }
                        if ("RECEIVER".equals(role) || "BOTH".equals(role))
                        {
                            _linkRegistry.purgeReceivingLinks(containerIdPattern, linkNamePattern);
                        }
                        return Futures.immediateFuture(null);
                    }
                    finally
                    {
                        _linkRegistry.close();
                    }
                }
                finally
                {
                    _messageStore.closeMessageStore();
                }
            }

            @Override
            public String getObject()
            {
                return AbstractVirtualHost.this.toString();
            }

            @Override
            public String getAction()
            {
                return "purgeLinkRegistry";
            }

            @Override
            public String getArguments()
            {
                return String.format("containerIdPattern='%s',role='%s',linkNamePattern='%s'",
                                     containerIdPatternString,
                                     role,
                                     linkNamePatternString);
            }
        }));
    }

    @Override
    public <K, V> Cache<K, V> getNamedCache(final String cacheName)
    {
        final String maxSizeContextVarName = String.format(NAMED_CACHE_MAXIMUM_SIZE_FORMAT, cacheName);
        final String expirationContextVarName = String.format(NAMED_CACHE_EXPIRATION_FORMAT, cacheName);
        Set<String> contextKeys = getContextKeys(false);
        int maxSize = contextKeys.contains(maxSizeContextVarName) ? getContextValue(Integer.class, maxSizeContextVarName) : getContextValue(Integer.class, NAMED_CACHE_MAXIMUM_SIZE);
        long expiration = contextKeys.contains(expirationContextVarName) ? getContextValue(Long.class, expirationContextVarName) : getContextValue(Long.class, NAMED_CACHE_EXPIRATION);

        return _caches.computeIfAbsent(cacheName, (k) -> CacheBuilder.<K, V>newBuilder()
                .maximumSize(maxSize)
                .expireAfterAccess(expiration, TimeUnit.MILLISECONDS)
                .build());
    }

    private boolean hasDifferentBindings(final Exchange<?> exchange,
                                         final Queue queue,
                                         final Map<String, Map<String,Object>> bindings)
    {
        for(String binding: bindings.keySet())
        {
            boolean theSameBindingFound = false;
            for (Binding publishingLink : exchange.getPublishingLinks(queue))
            {
                if (publishingLink.getBindingKey().equals(binding))
                {
                    Map<String, Object> expectedArguments = bindings.get(binding);
                    Map<String, Object> actualArguments = publishingLink.getArguments();


                    if (new HashMap<>(expectedArguments == null ? Map.of() : expectedArguments).equals(new HashMap<>(actualArguments == null ? Map.of() : actualArguments)))
                    {
                        theSameBindingFound = true;
                    }

                }
            }
            if (!theSameBindingFound)
            {
                return true;
            }
        }
        return false;
    }

    private final class AccessControlProviderListener extends AbstractConfigurationChangeListener
    {
        private final Set<ConfiguredObject<?>> _bulkChanges = new HashSet<>();

        @Override
        public void childAdded(final ConfiguredObject<?> object, final ConfiguredObject<?> child)
        {
            if(object.getCategoryClass() == VirtualHost.class && child.getCategoryClass() == VirtualHostAccessControlProvider.class)
            {
                child.addChangeListener(this);
                AbstractVirtualHost.this.updateAccessControl();
            }
        }

        @Override
        public void childRemoved(final ConfiguredObject<?> object, final ConfiguredObject<?> child)
        {
            if(object.getCategoryClass() == VirtualHost.class && child.getCategoryClass() == VirtualHostAccessControlProvider.class)
            {
                AbstractVirtualHost.this.updateAccessControl();
            }
        }

        @Override
        public void attributeSet(final ConfiguredObject<?> object,
                                 final String attributeName,
                                 final Object oldAttributeValue,
                                 final Object newAttributeValue)
        {
            if(object.getCategoryClass() == VirtualHostAccessControlProvider.class && !_bulkChanges.contains(object))
            {
                AbstractVirtualHost.this.updateAccessControl();
            }
        }

        @Override
        public void bulkChangeStart(final ConfiguredObject<?> object)
        {
            if(object.getCategoryClass() == VirtualHostAccessControlProvider.class)
            {
                _bulkChanges.add(object);
            }
        }

        @Override
        public void bulkChangeEnd(final ConfiguredObject<?> object)
        {
            if(object.getCategoryClass() == VirtualHostAccessControlProvider.class)
            {
                _bulkChanges.remove(object);
                AbstractVirtualHost.this.updateAccessControl();
            }
        }
    }

    private static class StoreEmptyCheckingHandler
            implements MessageHandler, MessageInstanceHandler, DistributedTransactionHandler
    {
        private boolean _empty = true;

        @Override
        public boolean handle(final StoredMessage<?> storedMessage)
        {
            _empty = false;
            return false;
        }

        @Override
        public boolean handle(final MessageEnqueueRecord record)
        {
            _empty = false;
            return false;
        }

        @Override
        public boolean handle(final org.apache.qpid.server.store.Transaction.StoredXidRecord storedXid,
                              final org.apache.qpid.server.store.Transaction.EnqueueRecord[] enqueues,
                              final org.apache.qpid.server.store.Transaction.DequeueRecord[] dequeues)
        {
            _empty = false;
            return false;
        }


        public boolean isEmpty()
        {
            return _empty;
        }

    }

    @Override
    protected void logCreated(final Map<String, Object> attributes,
                              final Outcome outcome)
    {
        _eventLogger.message(VirtualHostMessages.CREATE(getName(),
                                                        String.valueOf(outcome),
                                                        attributesAsString(attributes)));
    }

    @Override
    protected void logRecovered(final Outcome outcome)
    {
        _eventLogger.message(VirtualHostMessages.OPEN(getName(), String.valueOf(outcome)));
    }

    @Override
    protected void logDeleted(final Outcome outcome)
    {
        _eventLogger.message(VirtualHostMessages.DELETE(getName(), String.valueOf(outcome)));
    }

    @Override
    protected void logUpdated(final Map<String, Object> attributes, final Outcome outcome)
    {
        _eventLogger.message(VirtualHostMessages.UPDATE(getName(),
                                                        String.valueOf(outcome),
                                                        attributesAsString(attributes)));
    }

    private void reportDirectMemoryAboveTargetIfExceeded()
    {
        if (DIRECT_MEMORY_USAGE_LOGGER.isDebugEnabled())
        {
            reportDirectMemoryAboveTargetIfExceeded(getTargetSize(), getInMemoryMessageSize());
        }
    }

    private void reportDirectMemoryBelowTargetIfReached()
    {
        if (DIRECT_MEMORY_USAGE_LOGGER.isDebugEnabled())
        {
            reportDirectMemoryBelowTargetIfReached(getTargetSize(), getInMemoryMessageSize());
        }
    }

    private void reportDirectMemoryBelowTargetIfReached(final long currentTargetSize, final long inMemoryMessageSize)
    {
        if (DIRECT_MEMORY_USAGE_LOGGER.isDebugEnabled()
            && inMemoryMessageSize <= currentTargetSize
            && QpidByteBuffer.getAllocatedDirectMemorySize() <= _broker.getFlowToDiskThreshold()
            && _directMemoryExceedsTargetReported.compareAndSet(true, false))
        {
            DIRECT_MEMORY_USAGE_LOGGER.debug(
                    "VirtualHost '{}' direct memory allocation threshold ({}) maintained : {} bytes. Flow to disk stopped.",
                    getName(),
                    currentTargetSize,
                    inMemoryMessageSize);
        }
    }

    private void reportDirectMemoryAboveTargetIfExceeded(final long currentTargetSize, final long inMemoryMessageSize)
    {
        if (DIRECT_MEMORY_USAGE_LOGGER.isDebugEnabled()
            && (inMemoryMessageSize > currentTargetSize
                || QpidByteBuffer.getAllocatedDirectMemorySize() > _broker.getFlowToDiskThreshold())
            && _directMemoryExceedsTargetReported.compareAndSet(false, true))
        {
            DIRECT_MEMORY_USAGE_LOGGER.debug(
                    "VirtualHost '{}' direct memory allocation threshold ({}) exceeded : {} bytes. Flow to disk enforced.",
                    getName(),
                    currentTargetSize,
                    inMemoryMessageSize);
        }
    }

    @Override
    public long clearMatchingQueues(String queueNamePattern)
    {
        LOGGER.debug("Clearing the queues with name that matches the pattern: '{}'", queueNamePattern);
        try
        {
            final Pattern pattern = Pattern.compile(queueNamePattern);

            long count = 0;
            for (final Queue<?> queue : getChildren(Queue.class))
            {
                if (pattern.matcher(queue.getName()).matches())
                {
                    LOGGER.debug("Clearing the queue with name '{}' and ID '{}'", queue.getName(), queue.getId());
                    count += queue.clearQueue();
                }
            }
            return count;
        }
        catch (PatternSyntaxException e)
        {
            final String message = String.format("Failed to compile queue name pattern: '%s'", queueNamePattern);
            LOGGER.debug(message, e);
            throw new IllegalArgumentException(message, e);
        }
    }

    @Override
    public long clearQueues(Collection<String> queues)
    {
        final Map<UUID, String> uuid = new HashMap<>();
        final Set<String> names = new HashSet<>();
        for (final String id : queues)
        {
            try
            {
                uuid.put(UUID.fromString(id), id);
            }
            catch (IllegalArgumentException e)
            {
                LOGGER.trace(String.format("'%s' is not a valid queue ID", id), e);
                names.add(id);
            }
        }

        final Collection<Queue> queueList = getChildren(Queue.class);
        long count = 0;

        if (!uuid.isEmpty())
        {
            LOGGER.debug("Clearing the queues with IDs: {}", uuid.values());
            for (final Queue<?> queue : queueList)
            {
                if (uuid.remove(queue.getId()) != null)
                {
                    LOGGER.debug("Clearing the queue with ID '{}'", queue.getId());
                    count += queue.clearQueue();
                }
            }
            names.addAll(uuid.values());
        }

        if (!names.isEmpty())
        {
            LOGGER.debug("Clearing the queues with names: {}", names);
            for (final Queue<?> queue : queueList)
            {
                if (names.contains(queue.getName()))
                {
                    LOGGER.debug("Clearing the queue with name '{}'", queue.getName());
                    count += queue.clearQueue();
                }
            }
        }

        return count;
    }
}
