/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package flex.messaging;

import flex.management.ManageableComponent;
import flex.management.runtime.messaging.MessageBrokerControl;
import flex.management.runtime.messaging.log.LogManager;
import flex.messaging.client.FlexClient;
import flex.messaging.client.FlexClientManager;
import flex.messaging.cluster.ClusterManager;
import flex.messaging.config.ChannelSettings;
import flex.messaging.config.ConfigMap;
import flex.messaging.config.ConfigurationConstants;
import flex.messaging.config.ConfigurationException;
import flex.messaging.config.ConfigurationManager;
import flex.messaging.config.FlexClientSettings;
import flex.messaging.config.SecurityConstraint;
import flex.messaging.config.SecuritySettings;
import flex.messaging.config.SystemSettings;
import flex.messaging.endpoints.AbstractEndpoint;
import flex.messaging.endpoints.Endpoint;
import flex.messaging.endpoints.Endpoint2;
import flex.messaging.factories.JavaFactory;
import flex.messaging.io.BeanProxy;
import flex.messaging.io.PropertyProxyRegistry;
import flex.messaging.log.Log;
import flex.messaging.log.LogCategories;
import flex.messaging.messages.AbstractMessage;
import flex.messaging.messages.AcknowledgeMessage;
import flex.messaging.messages.AsyncMessage;
import flex.messaging.messages.CommandMessage;
import flex.messaging.messages.Message;
import flex.messaging.security.LoginManager;
import flex.messaging.security.SecurityException;
import flex.messaging.services.AbstractService;
import flex.messaging.services.Service;
import flex.messaging.services.ServiceException;
import flex.messaging.services.messaging.ThrottleManager;
import flex.messaging.util.Base64;
import flex.messaging.util.ClassUtil;
import flex.messaging.util.ExceptionUtil;
import flex.messaging.util.RedeployManager;
import flex.messaging.util.StringUtils;
import flex.messaging.util.UUIDGenerator;
import flex.messaging.util.UUIDUtils;
import flex.messaging.validators.ClassDeserializationValidator;
import flex.messaging.validators.DeserializationValidator;

import javax.servlet.ServletContext;
import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.security.Principal;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * The MessageBroker is the hub of message traffic. It has a number of endpoints which send and
 * receive messages over the network, and it has a number of
 * services that are message destinations. The broker routes
 * decoded messages received by endpoints to services based
 * on the service destination specified in each message.
 * The broker also has a means of pushing messages back through
 * endpoints to clients.
 */
public class MessageBroker extends ManageableComponent {
    //--------------------------------------------------------------------------
    //
    // Public Static Constants
    //
    //--------------------------------------------------------------------------

    /**
     * Id that the AuthenticationService uses to register itself with the broker.
     */
    public static final String AUTHENTICATION_SERVICE_ID = "authentication-service";

    /**
     * Localized error messages for <code>MessageBroker</code>.
     */
    public static final int ERR_MSG_NO_SERVICE_FOR_DEST = 10004;
    public static final int ERR_MSG_DESTINATION_UNACCESSIBLE = 10005;
    public static final int ERR_MSG_UNKNOWN_REMOTE_CREDENTIALS_FORMAT = 10020;
    public static final int ERR_MSG_NULL_MESSAGE_ID = 10029;
    public static final int ERR_MSG_CANNOT_SERVICE_STOPPED = 10038;
    public static final int ERR_MSG_NULL_ENDPOINT_URL = 10128;
    public static final int ERR_MSG_SERVICE_CMD_NOT_SUPPORTED = 10451;
    public static final int ERR_MSG_URI_ALREADY_REGISTERED = 11109;

    /**
     * Log category for <code>MessageBroker</code>.
     */
    public static final String LOG_CATEGORY = LogCategories.MESSAGE_GENERAL;

    /**
     * Log category that captures startup information for broker's destinations.
     */
    public static final String LOG_CATEGORY_STARTUP_SERVICE = LogCategories.STARTUP_SERVICE;


    public static final String TYPE = "MessageBroker";

    //--------------------------------------------------------------------------
    //
    // Package Protected Static Constants
    //
    //--------------------------------------------------------------------------
    /**
     * The default message broker id when one is not specified in web.xml.
     */
    public static final String DEFAULT_BROKER_ID = "__default__";

    /**
     * A map of currently available message brokers indexed by message broker id.
     */
    static final Map<String, MessageBroker> messageBrokers = new HashMap<String, MessageBroker>();

    //--------------------------------------------------------------------------
    //
    // Private Static Constants
    //
    //--------------------------------------------------------------------------

    private static final String LOG_MANAGER_ID = "log";
    private static final Integer INTEGER_ONE = 1;
    private static final String MESSAGEBROKER = "MessageBroker";
    private static final String ENDPOINT = "Endpoint";
    private static final String SERVICE = "Service";

    //--------------------------------------------------------------------------
    //
    // Constructors
    //
    //--------------------------------------------------------------------------

    /**
     * Create a MessageBroker. This constructor will
     * establish collections for routers, endpoints,
     * and services.
     */
    public MessageBroker() {
        this(true, null);
    }


    public MessageBroker(boolean enableManagement) {
        this(enableManagement, null);
    }


    public MessageBroker(boolean enableManagement, String mbid) {
        this(enableManagement, mbid, MessageBroker.class.getClassLoader());
    }


    public MessageBroker(boolean enableManagement, String mbid, ClassLoader loader) {
        super(enableManagement);
        classLoader = loader;
        attributes = new ConcurrentHashMap<String, Object>();
        destinationToService = new ConcurrentHashMap<String, String>();
        endpoints = new LinkedHashMap<String, Endpoint>();
        services = new LinkedHashMap<String, Service>();
        servers = new LinkedHashMap<String, Server>();
        factories = new HashMap<String, FlexFactory>();
        registeredEndpoints = new HashMap<String, String>();

        // Initialize the default validator.
        deserializationValidator = new ClassDeserializationValidator();

        // Add the built-in java factory
        addFactory("java", new JavaFactory());

        setId(mbid);

        log = Log.createLog();

        clusterManager = new ClusterManager(this);
        systemSettings = new SystemSettings();

        if (isManaged()) {
            controller = new MessageBrokerControl(this);
            controller.register();
            setControl(controller);

            logManager = new LogManager();
            logManager.setLog(log);
            logManager.setParent(this);
            logManager.setupLogControl();
            logManager.initialize(LOG_MANAGER_ID, null);
        }
    }

    //--------------------------------------------------------------------------
    //
    // Variables
    //
    //--------------------------------------------------------------------------

    private Map<String, Object> attributes;
    /**
     * Map of attribute ids of Application or Session level scoped destination assemblers
     * to the number of active destinations referring to.
     */
    private final Map<String, Integer> attributeIdRefCounts = new HashMap<String, Integer>();
    private Map<String, ChannelSettings> channelSettings;
    private ClassLoader classLoader;
    private ClusterManager clusterManager;
    private MessageBrokerControl controller;
    private List<String> defaultChannels;
    private DeserializationValidator deserializationValidator;
    private Map<String, String> destinationToService; // destiantionId to serviceId map.
    private Map<String, Endpoint> endpoints;
    private boolean enforceEndpointValidation;
    private Map<String, FlexFactory> factories;
    private FlexClientManager flexClientManager;
    private FlexClientSettings flexClientSettings;
    private FlexSessionManager flexSessionManager;
    private PathResolver externalPathResolver;
    private InternalPathResolver internalPathResolver;
    private Log log;
    private LogManager logManager;
    private LoginManager loginManager;
    private RedeployManager redeployManager;
    private Map<String, String> registeredEndpoints;
    private SecuritySettings securitySettings;
    private Map<String, Service> services;
    private Map<String, Server> servers;
    private final ConcurrentHashMap<String, ServiceValidationListener> serviceValidationListeners = new ConcurrentHashMap<String, ServiceValidationListener>();
    private ServletContext servletContext;
    private SystemSettings systemSettings;
    private Class<? extends ThrottleManager> throttleManagerClass = ThrottleManager.class; // The default ThrottleManager class.
    private UUIDGenerator uuidGenerator;

    //--------------------------------------------------------------------------
    //
    // Public Methods
    //
    //--------------------------------------------------------------------------

    /**
     * Sets the id of the <code>MessageBroker</code>. If id is null, uses the
     * default broker id.
     */
    @Override
    public void setId(String id) {
        if (id == null)
            id = DEFAULT_BROKER_ID;

        super.setId(id);
    }

    /**
     * Retrieves a message broker with the supplied id.  This is defined via
     * the servlet init parameter messageBrokerId.  If no messageBrokerId is supplied, pass
     * in a null value for the id parameter.
     * <p>
     * In case null is passed, the method tries to lookup the broker from current FlexContext.
     * If not available, it uses default ID to lookup the message broker.
     *
     * @param id The id of the message broker to retrieve.
     * @return The <code>MessageBroker</code> for the supplied id.
     */
    public static MessageBroker getMessageBroker(String id) {
        if (id == null) {
            // If available, return the broker from FlexContext
            MessageBroker broker = FlexContext.getMessageBroker();
            if (broker != null) {
                return broker;
            }

            id = DEFAULT_BROKER_ID;
        }

        return messageBrokers.get(id);
    }

    /**
     * Start the message broker's endpoints and services.
     */
    @Override
    public void start() {
        if (isStarted())
            return;

        /*
         * J2EE can be a real pain in terms of getting the right class loader so dump out
         * some detailed info about what is going on.
         */
        if (Log.isDebug()) {
            StringBuffer sb = new StringBuffer(100);
            if (classLoader == MessageBroker.class.getClassLoader())
                sb.append(" the MessageBroker's class loader");
            if (classLoader == Thread.currentThread().getContextClassLoader()) {
                if (sb.length() > 0) sb.append(" and");
                sb.append(" the context class loader");
            }
            if (sb.length() == 0)
                sb.append(" not the context or the message broker's class loader");
            Log.getLogger(LogCategories.CONFIGURATION).debug(
                    "MessageBroker id: " + getId() + " classLoader is:" +
                            sb.toString() + " (" + "classLoader " + ClassUtil.classLoaderToString(classLoader));
        }

        // Catch any startup errors and log using our log machinery, then rethrow to trigger shutdown.
        try {
            // MessageBroker doesn't call super.start() because it doesn't need the
            // usual validation that other components need
            setStarted(true);

            registerMessageBroker();
            if (flexClientManager == null) {
                flexClientManager = new FlexClientManager(isManaged(), this);
            }
            flexClientManager.start();
            flexSessionManager = new FlexSessionManager(isManaged(), this);
            flexSessionManager.start();
            if (systemSettings == null) {
                systemSettings = new SystemSettings();
            }
            startServices();
            loginManager.start();
            startEndpoints();
            startServers();
            redeployManager.start();
        } catch (Exception e) {
            if (Log.isError())
                Log.getLogger(LogCategories.CONFIGURATION).error("MessageBroker failed to start: " + ExceptionUtil.exceptionFollowedByRootCausesToString(e));

            // Rethrow.
            throw new RuntimeException(e.getMessage(), e);
        }
    }

    /**
     * Stop the broker's endpoints, clusters, and services.
     */
    @Override
    public void stop() {
        if (!isStarted())
            return;

        if (Log.isDebug())
            Log.getLogger(LogCategories.CONFIGURATION).debug("MessageBroker stopping: " + getId());

        serviceValidationListeners.clear();

        flexSessionManager.stop();
        flexClientManager.stop();
        stopServers();
        stopEndpoints();

        // set this MB in FlexContext as it is needed for reference counts in destination stopping
        FlexContext.setThreadLocalMessageBroker(this);
        stopServices();
        FlexContext.setThreadLocalMessageBroker(null);

        if (loginManager != null)
            loginManager.stop();
        try {
            if (redeployManager != null)
                redeployManager.stop();
        } catch (Throwable t) {
            t.printStackTrace();
        }
        clusterManager.destroyClusters();

        super.stop();
        unRegisterMessageBroker();

        // clear static proxy caches
        BeanProxy.clear();
        PropertyProxyRegistry.release();

        // clear system settings
        systemSettings.clear();
        systemSettings = null;

        if (Log.isDebug())
            Log.getLogger(LogCategories.CONFIGURATION).debug("MessageBroker stopped: " + getId());
    }

    /**
     * Returns an <tt>Iterator</tt> containing the current names that attributes have been bound
     * to the <tt>MessageBroker</tt> under.
     * Use {@link #getAttribute(String)} to retrieve an attribute value.
     *
     * @return An iterator containing the current names of the attributes.
     */
    public Iterator<String> getAttributeNames() {
        return attributes.keySet().iterator();
    }

    /**
     * Returns the attribute value bound to the <tt>MessageBroker</tt> under the provided name.
     *
     * @param name The attribute name.
     * @return Object the attribute object
     */
    public Object getAttribute(String name) {
        return attributes.get(name);
    }

    /**
     * Binds an attribute value to the <tt>MessageBroker</tt> under the provided name.
     *
     * @param name  The attribute name.
     * @param value The attribute value.
     */
    public void setAttribute(String name, Object value) {
        if (value == null)
            removeAttribute(name);
        else
            attributes.put(name, value);
    }

    /**
     * Removes the attribute with the given name from the <tt>MessageBroker</tt>.
     *
     * @param name The attribute name.
     */
    public void removeAttribute(String name) {
        attributes.remove(name);
    }

    /**
     * Returns the deserialization validator of the <tt>MessageBroker</tt> or null
     * if none exists.
     *
     * @return The deserialization validator of the <tt>MessageBroker</tt> or null
     * if none exists.
     */
    public DeserializationValidator getDeserializationValidator() {
        return deserializationValidator;
    }

    /**
     * Sets the deserialization validator of the <tt>MessageBroker</tt>.
     *
     * @param deserializationValidator The deserialization validator.
     */
    public void setDeserializationValidator(DeserializationValidator deserializationValidator) {
        this.deserializationValidator = deserializationValidator;
    }

    public void setExternalPathResolver(PathResolver externalPathResolver) {
        this.externalPathResolver = externalPathResolver;
    }


    public void setInternalPathResolver(InternalPathResolver internalPathResolver) {
        this.internalPathResolver = internalPathResolver;
    }


    public InputStream resolveExternalPath(String filename) throws IOException {
        return externalPathResolver.resolve(filename);
    }


    public InputStream resolveInternalPath(String filename) throws IOException {
        return internalPathResolver.resolve(filename);
    }


    public interface PathResolver {
        InputStream resolve(String filename) throws IOException;
    }

    /**
     * This interface is being kept for backwards compatibility.
     */
    public interface InternalPathResolver extends PathResolver {
        // No-op.
    }


    public ClusterManager getClusterManager() {
        return clusterManager;
    }

    /**
     * Add a <code>Server</code> to the broker's collection.
     *
     * @param server <code>Server</code> to be added.
     */
    public void addServer(Server server) {
        if (server == null) {
            // Cannot add null ''{0}'' to the ''{1}''
            ConfigurationException ex = new ConfigurationException();
            ex.setMessage(ConfigurationConstants.NULL_COMPONENT, new Object[]{"Server", MESSAGEBROKER});
            throw ex;
        }

        String id = server.getId();

        if (id == null) {
            // Cannot add ''{0}'' with null id to the ''{1}''
            ConfigurationException ex = new ConfigurationException();
            ex.setMessage(ConfigurationConstants.NULL_COMPONENT_ID, new Object[]{"Server", MESSAGEBROKER});
            throw ex;
        }

        // No need to add if server is already added
        Server currentServer = getServer(id);
        if (currentServer == server)
            return;

        // Do not allow servers with the same id
        if (currentServer != null) {
            // Cannot add a ''{0}'' with the id ''{1}'' that is already registered with the ''{2}''
            ConfigurationException ex = new ConfigurationException();
            ex.setMessage(ConfigurationConstants.DUPLICATE_COMPONENT_ID, new Object[]{"Server", id, MESSAGEBROKER});
            throw ex;
        }

        servers.put(id, server);
    }

    /**
     * Returns the <code>Server</code> with the specified id.
     *
     * @param id The id of the <code>Server</code>/
     * @return The <code>Server</code> with the specified id or null if no
     * <code>Server</code> with the id exists.
     */
    public Server getServer(String id) {
        return servers.get(id);
    }

    /**
     * Stops and removes the <code>Server</code> from the set of shared servers managed by the <code>MessageBroker</code>.
     *
     * @param id The id of the <code>Server</code> to remove.
     * @return <code>Server</code> that has been removed or <code>null</code> if it doesn't exist.
     */
    public Server removeServer(String id) {
        Server server = servers.get(id);
        if (server != null) {
            server.stop();
            servers.remove(id);
        }
        return server;
    }

    /**
     * Creates an <code>Endpoint</code> instance, sets its id and url.
     * It further sets the endpoint manageable if the <code>MessageBroker</code>
     * is manageable, and assigns its <code>MessageBroker</code> to the
     * <code>MessageBroker</code> that created it.
     *
     * @param id        The id of the endpoint.
     * @param url       The url of the endpoint.
     * @param className The class name of the endpoint.
     * @return The created <code>Endpoint</code> instance.
     */
    public Endpoint createEndpoint(String id, String url, String className) {
        Class endpointClass = ClassUtil.createClass(className, getClassLoader());

        Endpoint endpoint = (Endpoint) ClassUtil.createDefaultInstance(endpointClass, Endpoint.class);
        endpoint.setId(id);
        endpoint.setUrl(url);
        endpoint.setManaged(isManaged());
        endpoint.setMessageBroker(this);

        return endpoint;
    }

    /**
     * Add an endpoint to the broker's collection. Broker will accept the endpoint
     * to be added only if the endpoint is not null, it does not have null id or
     * url, and it does not have the same id or url as another endpoint.
     *
     * @param endpoint Endpoint to be added.
     */
    public void addEndpoint(Endpoint endpoint) {
        if (endpoint == null) {
            // Cannot add null ''{0}'' to the ''{1}''
            ConfigurationException ex = new ConfigurationException();
            ex.setMessage(ConfigurationConstants.NULL_COMPONENT, new Object[]{ENDPOINT, MESSAGEBROKER});
            throw ex;
        }

        String id = endpoint.getId();

        if (id == null) {
            // Cannot add ''{0}'' with null id to the ''{1}''
            ConfigurationException ex = new ConfigurationException();
            ex.setMessage(ConfigurationConstants.NULL_COMPONENT_ID, new Object[]{ENDPOINT, MESSAGEBROKER});
            throw ex;
        }

        // No need to add if endpoint is already added
        if (getEndpoint(id) == endpoint)
            return;

        // Do not allow endpoints with the same id
        if (getEndpoint(id) != null) {
            // Cannot add a ''{0}'' with the id ''{1}'' that is already registered with the ''{2}''
            ConfigurationException ex = new ConfigurationException();
            ex.setMessage(ConfigurationConstants.DUPLICATE_COMPONENT_ID, new Object[]{ENDPOINT, id, MESSAGEBROKER});
            throw ex;
        }

        // Add the endpoint only if its url is not null and it is not registered
        // by another channel
        checkEndpointUrl(id, endpoint.getUrl());

        // Finally add the endpoint to endpoints map
        endpoints.put(id, endpoint);
    }

    /**
     * Returns the <code>Endpoint</code> with the specified id.
     *
     * @param id The id of the <code>Endpoint</code>/
     * @return The <code>Endpoint</code> with the specified id or null if no <code>Endpoint</code> with the id exists.
     */
    public Endpoint getEndpoint(String id) {
        return endpoints.get(id);
    }

    /**
     * Retrieve the map of all endpoints in this broker.
     *
     * @return the map of all endpoints in this broker
     */
    public Map<String, Endpoint> getEndpoints() {
        return endpoints;
    }

    /**
     * Retrieve an endpoint based on a requested URL path. Two endpoints should not be
     * registered to the same path.
     *
     * @param path        the URL path
     * @param contextPath the context path
     * @return endpoint based on a requested URL path
     */
    public Endpoint getEndpoint(String path, String contextPath) {
        for (String id : endpoints.keySet()) {
            Endpoint e = endpoints.get(id);

            if (matchEndpoint(path, contextPath, e)) {
                return e;
            }
        }
        MessageException lme = new MessageException();
        lme.setMessage(10003, new Object[]{path});
        throw lme;
    }

    /**
     * Removes an endpoint from the <code>MessageBroker</code>.
     *
     * @param id The id of the endpoint.
     * @return The removed endpoint.
     */
    public Endpoint removeEndpoint(String id) {
        Endpoint endpoint = getEndpoint(id);
        if (endpoint != null) {
            endpoint.stop();
            endpoints.remove(id);
        }
        return endpoint;
    }

    /**
     * Returns whether the endpoint validation is enforced on the server, regardless
     * of whether client requested endpoint validation or not.
     *
     * @return True if the endpoint validation is enforced on the server, regardless
     * of whether client requested endpoint validation or not.
     */
    public boolean isEnforceEndpointValidation() {
        return enforceEndpointValidation;
    }

    /**
     * Sets whether the endpoint validation is enforced on the server, regardless
     * of whether client requested endpoint validation or not.
     *
     * @param enforceEndpointValidation The endpoint validation flag.
     */
    public void setEnforceEndpointValidation(boolean enforceEndpointValidation) {
        this.enforceEndpointValidation = enforceEndpointValidation;
    }

    /**
     * Returns the <code>FlexFactory</code> with the specified id.
     *
     * @param id The id of the <code>FlexFactory</code>.
     * @return The <code>FlexFactory</code> with the specified id or null if no
     * factory with the id exists.
     */
    public FlexFactory getFactory(String id) {
        return factories.get(id);
    }

    /**
     * Returns the map of <code>FlexFactory</code> instances.
     *
     * @return The map of <code>FlexFactory</code> instances.
     */
    public Map<String, FlexFactory> getFactories() {
        return factories;
    }

    /**
     * Registers a factory with the <code>MessageBroker</code>.
     *
     * @param id      The id of the factory.
     * @param factory <code>FlexFactory</code> instance.
     */
    public void addFactory(String id, FlexFactory factory) {
        if (id == null) {
            // Cannot add ''{0}'' with null id to the ''{1}''
            ConfigurationException ex = new ConfigurationException();
            ex.setMessage(ConfigurationConstants.NULL_COMPONENT_ID, new Object[]{"FlexFactory", MESSAGEBROKER});
            throw ex;
        }
        // No need to add if factory is already added
        if (getFactory(id) == factory) {
            return;
        }
        // Do not allow multiple factories with the same id
        if (getFactory(id) != null) {
            // Cannot add a ''{0}'' with the id ''{1}'' that is already registered with the ''{2}''
            ConfigurationException ex = new ConfigurationException();
            ex.setMessage(ConfigurationConstants.DUPLICATE_COMPONENT_ID, new Object[]{"FlexFactory", id, MESSAGEBROKER});
            throw ex;
        }
        factories.put(id, factory);
    }

    /**
     * Removes the <code>FlexFactory</code> from the list of factories known
     * by the <code>MessageBroker</code>.
     *
     * @param id The id of the <code>FlexFactory</code>.
     * @return <code>FlexFactory</code> that has been removed.
     */
    public FlexFactory removeFactory(String id) {
        FlexFactory factory = getFactory(id);
        if (factory != null) {
            factories.remove(id);
        }
        return factory;
    }

    /**
     * Returns the <code>Service</code> with the specified id.
     *
     * @param id The id of the <code>Service</code>/
     * @return The <code>Service</code> with the specified id or null if no
     * <code>Service</code> with the id exists.
     */
    public Service getService(String id) {
        return services.get(id);
    }

    /**
     * Return a service of the specific type.
     * The current list of services is searched for the specified class name.
     * If there is no service of the specified type, null is  returned.
     * If there is more than one service of the same type it is undefined which instance is returned.
     * If more than one service of a specific type is configured,
     * callers should use {@link #getService(String)} and provide the service id of the specific service,
     * or {@link #getServices()} to get access to the map of all registered services.
     *
     * @param type the fully qualified class name of the service implementation.
     * @return a service or null if not found.
     */
    public Service getServiceByType(String type) {
        for (Service svc : services.values()) {
            if (svc.getClass().getName().equals(type)) {
                return svc;
            }
        }
        return null;
    }

    /**
     * Returns the Map of <code>Service</code> instances.
     *
     * @return The Map of <code>Service</code> instances.
     */
    public Map<String, Service> getServices() {
        return services;
    }

    /**
     * Returns a <tt>ConfigMap</tt> of service and channel properties that the client
     * needs.
     *
     * @param endpoint Endpoint used to filter the destinations of the service;
     *                 no filtering is done if the endpoint is null.
     * @return ConfigMap of server properties.
     */
    public ConfigMap describeServices(Endpoint endpoint) {
        return describeServices(endpoint, true);
    }

    /**
     * Returns a <tt>ConfigMap</tt> of service and channel properties that the client
     * needs.
     * The <tt>allDestinations</tt> flag controls whether configuration for all
     * destinations or only reliable client destinations is returned.
     *
     * @param endpoint     Endpoint used to filter the destinations of the service.
     *                     No filtering is done if the endpoint is null.
     * @param onlyReliable When false, configuration for all destinations is
     *                     returned instead of only reliable destinations.
     * @return ConfigMap of service properties.
     */
    public ConfigMap describeServices(Endpoint endpoint, boolean onlyReliable) {
        // Let the service validation listeners know about the configuration change.
        if (!serviceValidationListeners.isEmpty()) {
            for (Enumeration<ServiceValidationListener> iter = serviceValidationListeners.elements(); iter.hasMoreElements(); )
                iter.nextElement().validateServices();
        }

        ConfigMap servicesConfig = new ConfigMap();

        // Keep track of channel ids as we encounter them so we can generate
        // the channel properties that might be needed by the client.
        ArrayList<String> channelIds = new ArrayList<String>();
        if (endpoint == null) {
            for (Endpoint endpointToAdd : getEndpoints().values())
                channelIds.add(endpointToAdd.getId());
        } else {
            channelIds.add(endpoint.getId());
        }

        if (defaultChannels != null) {
            ConfigMap defaultChannelsMap = new ConfigMap();
            for (Object defaultChannel : defaultChannels) {
                String id = (String) defaultChannel;
                ConfigMap channelConfig = new ConfigMap();
                channelConfig.addProperty(ConfigurationConstants.REF_ATTR, id);
                defaultChannelsMap.addProperty(ConfigurationConstants.CHANNEL_ELEMENT, channelConfig);
                if (!channelIds.contains(id))
                    channelIds.add(id);
            }
            if (defaultChannelsMap.size() > 0)
                servicesConfig.addProperty(ConfigurationConstants.DEFAULT_CHANNELS_ELEMENT, defaultChannelsMap);
        }

        for (Service service : services.values()) {
            ConfigMap serviceConfig = service instanceof AbstractService ?
                    ((AbstractService) service).describeService(endpoint, onlyReliable) : service.describeService(endpoint);
            if (serviceConfig != null && serviceConfig.size() > 0)
                servicesConfig.addProperty(ConfigurationConstants.SERVICE_ELEMENT, serviceConfig);
        }

        // Need to send channel properties again in case the client didn't
        // compile in services-config.xml and hence doesn't have channels section
        // of the configuration - but only if channel/endpoint is not tagged as "remote"!
        ConfigMap channels = new ConfigMap();
        for (String id : channelIds) {
            Endpoint currentEndpoint = getEndpoint(id);
            if (currentEndpoint instanceof AbstractEndpoint && ((AbstractEndpoint) currentEndpoint).isRemote()) {
                continue; // Client already has configuration for "remote" endpoint by other means.
            }

            ConfigMap channel = currentEndpoint.describeEndpoint();
            if (channel != null && channel.size() > 0)
                channels.addProperty(ConfigurationConstants.CHANNEL_ELEMENT, channel);
        }
        if (channels.size() > 0)
            servicesConfig.addProperty(ConfigurationConstants.CHANNELS_ELEMENT, channels);

        if (Log.isDebug())
            Log.getLogger(ConfigurationManager.LOG_CATEGORY).debug(
                    "Returning service description for endpoint: " +
                            (endpoint == null ? "all" : endpoint.getId()) + " config: " + servicesConfig);

        return servicesConfig;
    }

    /**
     * Add a listener for the describeServices callback.  The describeServices listener
     * is called before any execution of the describeServices method.
     *
     * @param id       Identifier of the listener to add
     * @param listener The listener callback
     */
    public void addServiceValidationListener(String id, ServiceValidationListener listener) {
        if (listener != null) {
            serviceValidationListeners.putIfAbsent(id, listener);
        }
    }

    /**
     * Returns an <tt>Iterator</tt> for all <tt>ServiceValidationListeners</tt> currently
     * registered with the broker.
     *
     * @return An <tt>Iterator</tt> for all registered <tt>ServiceValidationListeners</tt>.
     */
    public Iterator<ServiceValidationListener> getServiceValidationListenerIterator() {
        return serviceValidationListeners.values().iterator();
    }

    /**
     * Remove a listener from the describeServices callback.
     *
     * @param id Identifier of the listener to remove
     */
    public void removeServiceValidationListener(String id) {
        if (serviceValidationListeners.containsKey(id)) {
            serviceValidationListeners.remove(id);
        }
    }

    /**
     * Creates a <code>Service</code> instance, sets its id, sets it manageable
     * if the <code>MessageBroker</code> that created it is manageable,
     * and sets its <code>MessageBroker</code> to the <code>MessageBroker</code> that
     * created it.
     *
     * @param id        The id of the <code>Service</code>.
     * @param className The class name of the <code>Service</code>.
     * @return The <code>Service</code> instanced created.
     */
    public Service createService(String id, String className) {
        Class svcClass = ClassUtil.createClass(className, getClassLoader());

        Service service = (Service) ClassUtil.createDefaultInstance(svcClass, Service.class);
        service.setId(id);
        service.setManaged(isManaged());
        service.setMessageBroker(this);

        return service;
    }

    /**
     * Add a message type -to- service mapping to the broker's collection.
     * When the broker attempts to route a message to a service, it finds the first
     * service capable of handling the message type.
     * <p>
     * Note that <code>Service</code> cannot be null, it cannot have a null
     * id, and it cannot have the same id or type of a <code>Service</code>
     * already registered with the <code>MessageBroker</code>.
     *
     * <code>Service</code> needs to be started if the <code>MessageBroker</code>
     * is already running.
     *
     * @param service The service instance used to handle the messages
     */
    public void addService(Service service) {
        if (service == null) {
            // Cannot add null ''{0}'' to the ''{1}''
            ConfigurationException ex = new ConfigurationException();
            ex.setMessage(ConfigurationConstants.NULL_COMPONENT, new Object[]{SERVICE, MESSAGEBROKER});
            throw ex;
        }

        String id = service.getId();

        if (id == null) {
            // Cannot add ''{0}'' with null id to the ''{1}''
            ConfigurationException ex = new ConfigurationException();
            ex.setMessage(ConfigurationConstants.NULL_COMPONENT_ID, new Object[]{SERVICE, MESSAGEBROKER});
            throw ex;
        }
        // No need to add if service is already added
        if (getService(id) == service) {
            return;
        }
        // Do not allow multiple services with the same id
        if (getService(id) != null) {
            // Cannot add a ''{0}'' with the id ''{1}'' that is already registered with the ''{2}''
            ConfigurationException ex = new ConfigurationException();
            ex.setMessage(ConfigurationConstants.DUPLICATE_COMPONENT_ID, new Object[]{SERVICE, id, MESSAGEBROKER});
            throw ex;
        }
        // Not supposed to have multiple services of the same type; warn about it
        // but still allow it.
        String type = service.getClass().getName();
        if (getServiceByType(type) != null && Log.isWarn())
            Log.getLogger(LOG_CATEGORY).warn("Adding a service type '{0}' that is already registered with the MessageBroker",
                    new Object[]{type});


        services.put(id, service);

        if (service.getMessageBroker() == null || service.getMessageBroker() != this) {
            service.setMessageBroker(this);
        }
    }

    /**
     * Removes the <code>Service</code> from the list of services known
     * by the <code>MessageBroker</code>.
     *
     * @param id The id of the <code>Service</code>.
     * @return Previous <code>Service</code> associated with the id.
     */
    public Service removeService(String id) {
        Service service = getService(id);
        if (service != null) {
            service.stop();
            services.remove(id);
        }
        return service;
    }

    /**
     * Returns the logger of the <code>MessageBroker</code>.
     *
     * @return Logger of the <code>MessageBroker</code>.
     */
    public Log getLog() {
        return log;
    }


    public LogManager getLogManager() {
        return logManager;
    }


    public LoginManager getLoginManager() {
        return loginManager;
    }


    public void setLoginManager(LoginManager loginManager) {
        if (this.loginManager != null && this.loginManager.isStarted())
            this.loginManager.stop();

        this.loginManager = loginManager;

        if (isStarted())
            loginManager.start();
    }


    public FlexClientManager getFlexClientManager() {
        return flexClientManager;
    }


    public void setFlexClientManager(FlexClientManager value) {
        flexClientManager = value;
    }


    public FlexSessionManager getFlexSessionManager() {
        return flexSessionManager;
    }


    public void setFlexSessionManager(FlexSessionManager value) {
        flexSessionManager = value;
    }


    public RedeployManager getRedeployManager() {
        return redeployManager;
    }


    public void setRedeployManager(RedeployManager redeployManager) {
        if (this.redeployManager != null && this.redeployManager.isStarted())
            this.redeployManager.stop();

        this.redeployManager = redeployManager;

        if (isStarted())
            redeployManager.start();
    }


    public Class<? extends ThrottleManager> getThrottleManagerClass() {
        return throttleManagerClass;
    }


    public void setThrottleManagerClass(Class<? extends ThrottleManager> throttleManagerClass) {
        this.throttleManagerClass = throttleManagerClass;
    }

    /**
     * Returns a UUID either from the UUID generator assigned to <tt>MessageBroker</tt>,
     * or from the <tt>UUIDUtils#createUUID</tt> if there is no assigned UUID generator.
     *
     * @return String the UUID.
     */
    public String createUUID() {
        return uuidGenerator != null ? uuidGenerator.createUUID() : UUIDUtils.createUUID();
    }

    /**
     * Returns the custom <tt>UUIDGenerator</tt> used by the <tt>MessageBroker</tt>
     * for NIO-HTTP session cookie value and <tt>FlexClient</tt> id generation or null if
     * the default UUID generator, <tt>UUIDUtils</tt>, is being used.
     *
     * @return The custom <tt>UUIDGenerator</tt> used by <tt>MessageBroker</tt> or null.
     */
    public UUIDGenerator getUUIDGenerator() {
        return uuidGenerator;
    }


    /**
     * Sets the custom <tt>UUIDGenerator</tt> used by the <tt>MessageBroker</tt>
     * for NIO-HTTP session cookie value and <tt>FlexClient</tt> id generation.
     *
     * @param value The custom <tt>UUIDGenerator</tt>.
     */
    public void setUUIDGenerator(UUIDGenerator value) {
        uuidGenerator = value;
    }

    /**
     * Returns the list of channel ids known to the <code>MessageBroker</code>.
     *
     * @return The list of channel ids.
     */
    public List<String> getChannelIds() {
        return (endpoints != null && endpoints.size() != 0) ? new ArrayList<String>(endpoints.keySet()) : null;
    }


    public ChannelSettings getChannelSettings(String ref) {
        return channelSettings.get(ref);
    }


    public Map<String, ChannelSettings> getAllChannelSettings() {
        return channelSettings;
    }


    public void setChannelSettings(Map<String, ChannelSettings> channelSettings) {
        this.channelSettings = channelSettings;
    }

    /**
     * Returns the default channel ids of the MessageBroker. If a service
     * specifies its own list of channels it overrides these defaults.
     *
     * @return Default channel ids of the MessageBroker.
     */
    public List<String> getDefaultChannels() {
        return defaultChannels;
    }

    /**
     * Adds the channel id to the list of default channel ids.
     *
     * @param id The id of the channel to add to the list of default channel ids.
     */
    public void addDefaultChannel(String id) {
        if (defaultChannels == null)
            defaultChannels = new ArrayList<String>();
        else if (defaultChannels.contains(id))
            return;

        List<String> channelIds = getChannelIds();
        if (channelIds == null || !channelIds.contains(id)) {
            // No channel with id ''{0}'' is known by the MessageBroker.
            if (Log.isWarn()) {
                Log.getLogger(LOG_CATEGORY).warn("No channel with id '{0}' is known by the MessageBroker." +
                                " Not adding the channel.",
                        new Object[]{id});
            }
            return;
        }
        defaultChannels.add(id);
    }

    /**
     * Sets the default channel ids of the MessageBroker.
     *
     * @param ids Default channel ids of the MessageBroker.
     */
    public void setDefaultChannels(List<String> ids) {
        if (ids != null) {
            List<String> channelIds = getChannelIds();
            for (Iterator<String> iter = ids.iterator(); iter.hasNext(); ) {
                String id = iter.next();
                if (channelIds == null || !channelIds.contains(id)) {
                    iter.remove();
                    if (Log.isWarn()) {
                        Log.getLogger(LOG_CATEGORY).warn("No channel with id '{0}' is known by the MessageBroker." +
                                        " Not adding the channel.",
                                new Object[]{id});
                    }
                }
            }
        }
        defaultChannels = ids;
    }

    /**
     * Removes the channel id from the list of default channel ids.
     *
     * @param id The id of the channel to remove from the list of default channel ids.
     * @return <code>true</code> if the list contained the channel id.
     */
    public boolean removeDefaultChannel(String id) {
        return defaultChannels != null && defaultChannels.remove(id);
    }

    /**
     * Returns the <code>SecurityConstraint</code> with the indicated
     * reference id.
     *
     * @param ref The reference of the <code>SecurityConstraint</code>
     * @return The <code>SecurityConstraint</code> with the indicated reference id.
     */
    public SecurityConstraint getSecurityConstraint(String ref) {
        return getSecuritySettings().getConstraint(ref);
    }


    public ServletContext getServletContext() {
        return servletContext;
    }


    public SecuritySettings getSecuritySettings() {
        return securitySettings;
    }


    public void setSecuritySettings(SecuritySettings securitySettings) {
        this.securitySettings = securitySettings;
    }


    public SystemSettings getSystemSettings() {
        return systemSettings;
    }


    public void setSystemSettings(SystemSettings l) {
        systemSettings = l;
    }


    public FlexClientSettings getFlexClientSettings() {
        return flexClientSettings;
    }


    public void setFlexClientSettings(FlexClientSettings value) {
        flexClientSettings = value;
    }


    public void initThreadLocals() {
        // No thread-locals anymore, so no-op.
    }

    /**
     * You can call this method in order to send a message from your code into
     * the message routing system.  The message is routed to a service that
     * is defined to handle messages of this type.  Once the service is identified,
     * the destination property of the message is used to find a destination
     * configured for that service.  The adapter defined for that destination
     * is used to handle the message.
     *
     * @param message  The message to be routed to a service
     * @param endpoint This can identify the endpoint that is sending the message
     *                 but it is currently not used so you may pass in null.
     * @return <code>AcknowledgeMessage</code> with result.
     */
    public AcknowledgeMessage routeMessageToService(Message message, Endpoint endpoint) {
        // Make sure message has a messageId
        checkMessageId(message);

        Object serviceResult = null;
        boolean serviced = false;
        Service service = null;
        String destId = message.getDestination();
        try {
            String serviceId = destId != null ? destinationToService.get(destId) : null;

            if ((serviceId == null) && (destId != null) && (!serviceValidationListeners.isEmpty())) {
                for (Enumeration<ServiceValidationListener> iter = serviceValidationListeners.elements(); iter.hasMoreElements(); ) {
                    iter.nextElement().validateDestination(destId);
                }
                serviceId = destinationToService.get(destId);
            }

            if (serviceId != null) {
                service = services.get(serviceId);
                serviced = true;
                Destination destination = service.getDestination(destId);
                inspectOperation(message, destination);
                // Remove the validate endpoint header if it was set.
                if (message.headerExists(Message.VALIDATE_ENDPOINT_HEADER))
                    message.getHeaders().remove(Message.VALIDATE_ENDPOINT_HEADER);

                if (Log.isDebug())
                    Log.getLogger(getLogCategory(message)).debug(
                            "Before invoke service: " + service.getId() + StringUtils.NEWLINE +
                                    "  incomingMessage: " + message + StringUtils.NEWLINE);

                extractRemoteCredentials(service, message);
                serviceResult = service.serviceMessage(message);
            }

            if (!serviced) {
                MessageException lme = new MessageException();
                // The supplied destination id is not registered with any service.
                lme.setMessage(ERR_MSG_NO_SERVICE_FOR_DEST);
                throw lme;
            }

            if (Log.isDebug()) {
                String debugServiceResult = Log.getPrettyPrinter().prettify(serviceResult);
                Log.getLogger(getLogCategory(message)).debug(
                        "After invoke service: " + service.getId() + StringUtils.NEWLINE +
                                "  reply: " + debugServiceResult + StringUtils.NEWLINE);
            }

            AcknowledgeMessage ack;
            if (serviceResult instanceof AcknowledgeMessage) {
                // service will return an ack if they need to transform it in some
                // service-specific way (paging is an example)
                ack = (AcknowledgeMessage) serviceResult;
            } else {
                // most services will return a result of some sort, possibly null,
                // and expect the broker to compose a message to deliver it
                ack = new AcknowledgeMessage();
                ack.setBody(serviceResult);
            }
            ack.setCorrelationId(message.getMessageId());
            ack.setClientId(message.getClientId());
            return ack;
        } catch (MessageException exc) {
            exc.logAtHingePoint(message,
                    null, /* No outbound error message at this point. */
                    "Exception when invoking service '" + (service == null ? "(none)" : service.getId()) + "': ");

            throw exc;
        } catch (RuntimeException exc) {
            Log.getLogger(LogCategories.MESSAGE_GENERAL).error(
                    "Exception when invoking service: " +
                            (service == null ? "(none)" : service.getId()) +
                            StringUtils.NEWLINE +
                            "  with message: " + message + StringUtils.NEWLINE +
                            ExceptionUtil.exceptionFollowedByRootCausesToString(exc) + StringUtils.NEWLINE);

            throw exc;
        } catch (Error exc) {
            Log.getLogger(LogCategories.MESSAGE_GENERAL).error(
                    "Error when invoking service: " +
                            (service == null ? "(none)" : service.getId()) +
                            StringUtils.NEWLINE +
                            "  with message: " + message + StringUtils.NEWLINE +
                            ExceptionUtil.exceptionFollowedByRootCausesToString(exc) + StringUtils.NEWLINE);

            throw exc;
        }

    }


    public AsyncMessage routeCommandToService(CommandMessage command, Endpoint endpoint) {
        // Make sure command has a messageId
        checkMessageId(command);

        String destId = command.getDestination();

        AsyncMessage replyMessage;
        Service service;
        String serviceId;
        Object commandResult = null;
        boolean serviced = false;
        boolean recreateHttpFlexSessionAfterLogin = false;

        // Forward login and logout commands to AuthenticationService
        int operation = command.getOperation();
        if (operation == CommandMessage.LOGIN_OPERATION || operation == CommandMessage.LOGOUT_OPERATION) {
            serviceId = AUTHENTICATION_SERVICE_ID;
            recreateHttpFlexSessionAfterLogin = securitySettings.isRecreateHttpSessionAfterLogin()
                    && operation == CommandMessage.LOGIN_OPERATION && FlexContext.getFlexSession() instanceof HttpFlexSession;
        } else {
            serviceId = destId != null ? destinationToService.get(destId) : null;
        }

        service = serviceId != null ? services.get(serviceId) : null;
        if (service != null) {
            // Before passing the message to the service, need to check
            // the security constraints.
            Destination destination = service.getDestination(destId);
            if (destination != null)
                inspectOperation(command, destination);

            try {
                extractRemoteCredentials(service, command);
                commandResult = service.serviceCommand(command);
                serviced = true;
            } catch (UnsupportedOperationException e) {
                ServiceException se = new ServiceException();
                se.setMessage(ERR_MSG_SERVICE_CMD_NOT_SUPPORTED, new Object[]{service.getClass().getName()});
                throw se;
            } catch (SecurityException se) {
                // when a LOGIN message causes a security exception, we want to continue processing here
                // to allow metadata to be sent to clients communicating with runtime destinations.
                // The result will be an error message with a login fault message as well as the metadata
                if (AUTHENTICATION_SERVICE_ID.equals(serviceId)) {
                    commandResult = se.createErrorMessage();
                    if (Log.isDebug())
                        Log.getLogger(LOG_CATEGORY).debug("Security error for message: " +
                                se.toString() + StringUtils.NEWLINE +
                                "  incomingMessage: " + command + StringUtils.NEWLINE +
                                "  errorReply: " + commandResult);
                    serviced = true;
                } else {
                    throw se;
                }
            }
        }

        if (recreateHttpFlexSessionAfterLogin)
            recreateHttpFlexSessionAfterLogin();

        if (commandResult == null) {
            replyMessage = new AcknowledgeMessage();
        } else if (commandResult instanceof AsyncMessage) {
            replyMessage = (AsyncMessage) commandResult;
        } else {
            replyMessage = new AcknowledgeMessage();
            replyMessage.setBody(commandResult);
        }

        // Update the replyMessage body with server configuration if the
        // operation is ping or login and make sure to return the FlexClient Id value.
        if (command.getOperation() == CommandMessage.CLIENT_PING_OPERATION
                || command.getOperation() == CommandMessage.LOGIN_OPERATION) {
            boolean needsConfig = false;
            if (command.getHeader(CommandMessage.NEEDS_CONFIG_HEADER) != null)
                needsConfig = ((Boolean) (command.getHeader(CommandMessage.NEEDS_CONFIG_HEADER)));

            // Send configuration information only if the client requested.
            if (needsConfig) {
                ConfigMap serverConfig = describeServices(endpoint);
                if (serverConfig.size() > 0)
                    replyMessage.setBody(serverConfig);
            }

            // Record the features available over this endpoint
            double msgVersion = endpoint.getMessagingVersion();
            if (msgVersion > 0)
                replyMessage.setHeader(CommandMessage.MESSAGING_VERSION, new Double(msgVersion));

            // Record the flex client ID
            FlexClient flexClient = FlexContext.getFlexClient();
            if (flexClient != null)
                replyMessage.setHeader(Message.FLEX_CLIENT_ID_HEADER, flexClient.getId());
        } else if (!serviced) {
            MessageException lme = new MessageException();
            // The supplied destination id is not registered with any service..
            lme.setMessage(ERR_MSG_NO_SERVICE_FOR_DEST);
            throw lme;
        }

        replyMessage.setCorrelationId(command.getMessageId());
        replyMessage.setClientId(command.getClientId());
        if (replyMessage.getBody() instanceof java.util.List) {
            replyMessage.setBody(((List) replyMessage.getBody()).toArray());
        }

        if (Log.isDebug())
            Log.getLogger(getLogCategory(command)).debug(
                    "Executed command: " +
                            (service == null ? "(default service)" : "service=" +
                                    service.getId()) + StringUtils.NEWLINE +
                            "  commandMessage: " + command + StringUtils.NEWLINE +
                            "  replyMessage: " + replyMessage + StringUtils.NEWLINE);

        return replyMessage;
    }

    /**
     * Services call this method in order to send a message
     * to a FlexClient.
     *
     * @param message       the message
     * @param messageClient the message client the message should be sent to
     */
    public void routeMessageToMessageClient(Message message, MessageClient messageClient) {
        // Make sure message has a messageId
        checkMessageId(message);

        // Route the message and the MessageClient (subscription) to the FlexClient to
        // queue the message for delivery to the remote client.
        // Reset the thread local FlexClient and FlexSession to be specific to the client
        // we're pushing to, and then reset the context back to its original request handling state.
        FlexClient requestFlexClient = FlexContext.getFlexClient();
        FlexSession requestFlexSession = FlexContext.getFlexSession();

        FlexClient pushFlexClient = messageClient.getFlexClient();
        FlexContext.setThreadLocalFlexClient(pushFlexClient);
        FlexContext.setThreadLocalSession(null); // Null because we don't have a currently active endpoint for the push client.
        try {
            pushFlexClient.push(message, messageClient);
        } finally // Reset thread locals.
        {
            FlexContext.setThreadLocalFlexClient(requestFlexClient);
            FlexContext.setThreadLocalSession(requestFlexSession);
        }
    }

    /**
     * Check that the destination permits access over the endpoint, the security
     * constraint of the destination permits the operation, and the service and
     * the destination the message is targeting are running,
     *
     * @param message     The incoming message.
     * @param destination The destination to check against.
     */
    public void inspectOperation(Message message, Destination destination) {
        inspectChannel(message, destination);
        loginManager.checkConstraint(destination.getSecurityConstraint());

        Service service = destination.getService();
        if (!service.isStarted()) {
            // {0} ''{1}'' cannot service message ''{2}'' in stopped state.
            MessageException me = new MessageException();
            me.setMessage(ERR_MSG_CANNOT_SERVICE_STOPPED, new Object[]{SERVICE, service.getId(), message.getMessageId()});
            throw me;
        }

        if (!destination.isStarted()) {
            // {0} ''{1}'' cannot service message ''{2}'' in stopped state.
            MessageException me = new MessageException();
            me.setMessage(ERR_MSG_CANNOT_SERVICE_STOPPED, new Object[]{"Destination", destination.getId(), message.getMessageId()});
            throw me;
        }
    }

    /**
     * Verify that this destination permits access over this endpoint.
     *
     * @param message     The incoming message.
     * @param destination The destination to check against.
     */
    public void inspectChannel(Message message, Destination destination) {
        if (!enforceEndpointValidation && message.getHeader(Message.VALIDATE_ENDPOINT_HEADER) == null)
            return;

        String messageChannel = (String) message.getHeader(Message.ENDPOINT_HEADER);
        for (String channelId : destination.getChannels()) {
            if (channelId.equals(messageChannel))
                return;
        }
        MessageException lme = new MessageException();
        lme.setMessage(ERR_MSG_DESTINATION_UNACCESSIBLE, new Object[]{destination.getId(), messageChannel});
        throw lme;
    }

    /**
     * Returns the logging category to use for a given message.
     *
     * @param message the message
     * @return the logging category to use for a given message
     */
    public String getLogCategory(Message message) {
        if (message instanceof AbstractMessage)
            return ((AbstractMessage) message).logCategory();
        return LogCategories.MESSAGE_GENERAL;
    }

    /**
     * This is the class loader used by the system to load user defined classes.
     *
     * @return <code>ClassLoader</code> the system should use to load user defined classes.
     */
    public ClassLoader getClassLoader() {
        return classLoader;
    }

    /**
     * Sets the class loader used by the system to load user defined classes.
     *
     * @param classLoader The class loader used by the system to loader user defiend classes.
     */
    public void setClassLoader(ClassLoader classLoader) {
        this.classLoader = classLoader;
    }

    /**
     * Used internally by AbstractService to check existence of destination and service id
     * mapping in the destinationToService map.
     *
     * @param destId         the destination id
     * @param svcId          the service id
     * @param throwException true if an exception should be thrown if something goes wrong
     * @return True if the destination is already registered.
     */
    public boolean isDestinationRegistered(String destId, String svcId, boolean throwException) {
        // Do not allow multiple destinations with the same id across services
        if (destinationToService.containsKey(destId)) {
            if (throwException) {
                // Cannot add destination with id ''{0}'' to service with id ''{1}'' because another service with id ''{2}'' already has a destination with the same id.
                ConfigurationException ex = new ConfigurationException();
                ex.setMessage(ConfigurationConstants.DUPLICATE_DEST_ID, new Object[]{destId, svcId, destinationToService.get(destId)});
                throw ex;
            }
            return true;
        }
        return false;
    }

    /**
     * Used internally by AbstractService to add destination and service id
     * mapping to destinationToService map.
     *
     * @param destId Destination id.
     * @param svcId  Service id.
     */
    public void registerDestination(String destId, String svcId) {
        // Do not allow multiple destinations with the same id across services
        if (destinationToService.containsKey(destId)) {
            // Cannot add destination with id ''{0}'' to service with id ''{1}'' because another service with id ''{2}'' already has a destination with the same id.
            ConfigurationException ex = new ConfigurationException();
            ex.setMessage(ConfigurationConstants.DUPLICATE_DEST_ID, new Object[]{destId, svcId, destinationToService.get(destId)});
            throw ex;
        }
        destinationToService.put(destId, svcId);
    }

    /**
     * Used internally by AbstractService to remove destination and service id
     * mapping from destinationToService map.
     *
     * @param destId Destination id.
     */
    public void unregisterDestination(String destId) {
        destinationToService.remove(destId);
    }

    /**
     * Looks up and returns a destination by id; removing the need to know which service
     * a destination is registered for.
     *
     * @param destId Destination id.
     * @return the Destination oblect for the given destination id
     */
    public Destination getRegisteredDestination(String destId) {
        String serviceId = destId != null ? destinationToService.get(destId) : null;
        return serviceId != null ? getService(serviceId).getDestination(destId) : null;
    }

    /**
     * Increments the count of destinations actively using an Application or Session
     * level scoped assembler identified by the passed in attributeId.
     *
     * @param attributeId Attribute id for the session or application-scoped object.
     */
    public void incrementAttributeIdRefCount(String attributeId) {
        synchronized (attributeIdRefCounts) {
            Integer currentCount = attributeIdRefCounts.get(attributeId);
            if (currentCount == null)
                attributeIdRefCounts.put(attributeId, INTEGER_ONE);
            else
                attributeIdRefCounts.put(attributeId, currentCount + 1);
        }
    }

    /**
     * Decrements the count of destinations actively using an Application or Session
     * level scoped assembler identified by the passed in attributeId.
     *
     * @param attributeId Attribute id for the session or application-scoped object.
     * @return in the attribute ID ref count after decrement
     */
    public int decrementAttributeIdRefCount(String attributeId) {
        synchronized (attributeIdRefCounts) {
            Integer currentCount = attributeIdRefCounts.get(attributeId);
            if (currentCount == null)
                return 0;

            int newValue = currentCount - 1;
            attributeIdRefCounts.put(attributeId, newValue);
            return newValue;
        }
    }


    //--------------------------------------------------------------------------
    //
    // Protected Methods
    //
    //--------------------------------------------------------------------------

    /**
     * Utility method to make sure that message has an assigned messageId.
     *
     * @param message the message that should be checked
     */
    protected void checkMessageId(Message message) {
        if (message.getMessageId() == null) {
            MessageException lme = new MessageException();
            lme.setMessage(ERR_MSG_NULL_MESSAGE_ID);
            throw lme;
        }
    }

    /**
     * Check the headers for the message for the RemoteCredentials.
     *
     * @param service the service
     * @param message the message
     */
    protected void extractRemoteCredentials(Service service, Message message) {
        if (!message.headerExists(Message.REMOTE_CREDENTIALS_HEADER))
            return;

        boolean setting = false;
        String username = null;
        String credentials = null;
        if (message.getHeader(Message.REMOTE_CREDENTIALS_HEADER) instanceof String) {
            String encoded = (String) message.getHeader(Message.REMOTE_CREDENTIALS_HEADER);
            if (encoded.length() > 0) //empty string is clearing the credentials
            {
                setting = true;
                Base64.Decoder decoder = new Base64.Decoder();
                decoder.decode(encoded);
                byte[] decodedBytes = decoder.drain();
                String decoded;

                String charset = (String) message.getHeader(Message.REMOTE_CREDENTIALS_CHARSET_HEADER);
                if (charset != null) {
                    try {
                        decoded = new String(decodedBytes, charset);
                    } catch (UnsupportedEncodingException ex) {
                        MessageException lme = new MessageException();
                        lme.setMessage(ERR_MSG_UNKNOWN_REMOTE_CREDENTIALS_FORMAT);
                        throw lme;
                    }
                } else {
                    decoded = new String(decodedBytes);
                }

                int colon = decoded.indexOf(':');
                if (colon > 0 && colon < decoded.length() - 1) {
                    username = decoded.substring(0, colon);
                    credentials = decoded.substring(colon + 1);
                }
            }
        } else {
            MessageException lme = new MessageException();
            lme.setMessage(ERR_MSG_UNKNOWN_REMOTE_CREDENTIALS_FORMAT);
            throw lme;
        }

        if (setting) {
            FlexContext.getFlexSession().putRemoteCredentials(
                    new FlexRemoteCredentials(service.getId(),
                            message.getDestination(), username, credentials));
        } else {
            FlexContext.getFlexSession().clearRemoteCredentials(service.getId(),
                    message.getDestination());
        }
    }

    @Override
    protected String getLogCategory() {
        return LOG_CATEGORY;
    }


    public void setServletContext(ServletContext servletContext) {
        this.servletContext = servletContext;
    }

    /**
     * This method was added so that Spring-BlazeDS Integration 1.0.2 works with latest BlazeDS binaries
     * Internally, this method simply invokes the setServletContext(...) method
     *
     * @param servletContext ServletContext that should be set.
     */
    protected void setInitServletContext(ServletContext servletContext) {
        setServletContext(servletContext);
    }

    protected void recreateHttpFlexSessionAfterLogin() {
        FlexSession currentHttpFlexSession = FlexContext.getFlexSession();
        Principal principal = currentHttpFlexSession.getUserPrincipal();
        currentHttpFlexSession.invalidate(); // This will recreate a new session.

        FlexSession newHttpFlexSession = FlexContext.getFlexSession();
        newHttpFlexSession.setUserPrincipal(principal);
    }

    /**
     * Start all of the broker's endpoints.
     */
    protected void startEndpoints() {
        for (Endpoint endpoint : endpoints.values()) {
            if (endpoint instanceof AbstractEndpoint && ((AbstractEndpoint) endpoint).isRemote())
                continue; // Local representation of remote endpoints are not started.
            endpoint.start();
        }
    }

    /**
     * Stop all of the broker's endpoints.
     */
    protected void stopEndpoints() {
        for (Endpoint endpoint : endpoints.values()) {
            if (endpoint instanceof AbstractEndpoint && ((AbstractEndpoint) endpoint).isRemote())
                continue; // Local representation of remote endpoints are not stopped.
            endpoint.stop();
        }
    }

    //--------------------------------------------------------------------------
    //
    // Private Methods
    //
    //--------------------------------------------------------------------------

    /**
     *
     */
    private void checkEndpointUrl(String id, String endpointUrl) {
        // Do not allow endpoints with null url property.
        if (endpointUrl == null) {
            // Cannot add ''{0}'' with null url to the ''{1}''
            ConfigurationException ex = new ConfigurationException();
            ex.setMessage(ERR_MSG_NULL_ENDPOINT_URL, new Object[]{ENDPOINT, MESSAGEBROKER});
            throw ex;
        }

        String parsedEndpointURI = ChannelSettings.removeTokens(endpointUrl);

        // first check the original URI
        if (registeredEndpoints.containsKey(parsedEndpointURI)) {
            ConfigurationException ce = new ConfigurationException();
            ce.setMessage(ERR_MSG_URI_ALREADY_REGISTERED, new Object[]{id, parsedEndpointURI,
                    registeredEndpoints.get(parsedEndpointURI)});
            throw ce;
        }

        // add the original URI to the registered endpoints map
        registeredEndpoints.put(parsedEndpointURI, id);

        // also need to check the URI without the context root
        int nextSlash = parsedEndpointURI.indexOf('/', 1);
        if (nextSlash > 0) {
            String parsedEndpointURI2 = parsedEndpointURI.substring(nextSlash);
            if (registeredEndpoints.containsKey(parsedEndpointURI2)) {
                ConfigurationException ce = new ConfigurationException();
                ce.setMessage(ERR_MSG_URI_ALREADY_REGISTERED, new Object[]{
                        parsedEndpointURI2, id,
                        registeredEndpoints.get(parsedEndpointURI2)});
                throw ce;
            }
            registeredEndpoints.put(parsedEndpointURI2, id);
        }
    }

    /**
     * Matches the current &quot;servlet + pathinfo&quot; to a list of channels registered
     * in the services configuration file, independent of context root.
     *
     * @param path        The Servlet mapping and PathInfo of the current request
     * @param contextPath The web application context root (or empty string for default root)
     * @param endpoint    The endpoint to be matched
     * @return whether the current request matches a registered endpoint URI
     */
    private boolean matchEndpoint(String path, String contextPath, Endpoint endpoint) {
        boolean match = false;
        String channelEndpoint = endpoint.getParsedUrl(contextPath);

        if (path.endsWith("/")) {
            path = path.substring(0, path.length() - 1);
        }

        if (path.equalsIgnoreCase(channelEndpoint)) {
            match = true;
        }

        return match;
    }

    private void registerMessageBroker() {
        String mbid = getId();

        synchronized (messageBrokers) {
            if (messageBrokers.get(mbid) != null) {
                ConfigurationException ce = new ConfigurationException();
                ce.setMessage(10137, new Object[]{getId() == null ? "(no value supplied)" : mbid});
                throw ce;
            }
            messageBrokers.put(mbid, this);
        }
    }

    private void unRegisterMessageBroker() {
        String mbid = getId();

        synchronized (messageBrokers) {
            messageBrokers.remove(mbid);
        }
    }

    /**
     * Start all of the broker's shared servers.
     */
    private void startServers() {
        for (Server server : servers.values()) {
            // Validate that the server is actually referenced by an endpoint; if not, warn.
            boolean serverIsReferenced = false;
            for (Endpoint endpoint : endpoints.values()) {
                if (endpoint instanceof Endpoint2 && server.equals(((Endpoint2) endpoint).getServer())) {
                    serverIsReferenced = true;
                    break;
                }
            }

            if (!serverIsReferenced && Log.isWarn())
                Log.getLogger(LogCategories.CONFIGURATION).warn("Server '" + server.getId() + "' is not referenced by any endpoints.");

            server.start();
        }
    }

    /**
     * Stop all the broker's shared servers.
     */
    private void stopServers() {
        for (Server server : servers.values())
            server.stop();
    }

    /**
     * Start all of the broker's services.
     */
    private void startServices() {
        for (Service svc : services.values()) {
            long timeBeforeStartup = 0;
            if (Log.isDebug()) {
                timeBeforeStartup = System.currentTimeMillis();
                Log.getLogger(LOG_CATEGORY_STARTUP_SERVICE).debug("Service with id '{0}' is starting.",
                        new Object[]{svc.getId()});
            }

            svc.start();

            if (Log.isDebug()) {
                long timeAfterStartup = System.currentTimeMillis();
                Long diffMillis = timeAfterStartup - timeBeforeStartup;
                Log.getLogger(LOG_CATEGORY_STARTUP_SERVICE).debug("Service with id '{0}' is ready (startup time: '{1}' ms)",
                        new Object[]{svc.getId(), diffMillis});
            }
        }
    }

    /**
     * Stop all of the broker's services.
     */
    private void stopServices() {
        for (Service svc : services.values())
            svc.stop();
    }
}
