/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */
package org.apache.qpid.server.model.adapter;

import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;

import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.Protocol.ProtocolType;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.util.MapValueConverter;

public class PortFactory
{
    public static final int DEFAULT_AMQP_SEND_BUFFER_SIZE = 262144;
    public static final int DEFAULT_AMQP_RECEIVE_BUFFER_SIZE = 262144;
    public static final boolean DEFAULT_AMQP_NEED_CLIENT_AUTH = false;
    public static final boolean DEFAULT_AMQP_WANT_CLIENT_AUTH = false;
    public static final boolean DEFAULT_AMQP_TCP_NO_DELAY = true;
    public static final String DEFAULT_AMQP_BINDING = "*";
    public static final Transport DEFAULT_TRANSPORT = Transport.TCP;

    private final Collection<Protocol> _defaultProtocols;

    public PortFactory()
    {
        Set<Protocol> defaultProtocols = EnumSet.of(Protocol.AMQP_0_8, Protocol.AMQP_0_9, Protocol.AMQP_0_9_1,
                Protocol.AMQP_0_10, Protocol.AMQP_1_0);
        String excludedProtocols = System.getProperty(BrokerProperties.PROPERTY_BROKER_DEFAULT_AMQP_PROTOCOL_EXCLUDES);
        if (excludedProtocols != null)
        {
            String[] excludes = excludedProtocols.split(",");
            for (String exclude : excludes)
            {
                Protocol protocol = Protocol.valueOf(exclude);
                defaultProtocols.remove(protocol);
            }
        }
        String includedProtocols = System.getProperty(BrokerProperties.PROPERTY_BROKER_DEFAULT_AMQP_PROTOCOL_INCLUDES);
        if (includedProtocols != null)
        {
            String[] includes = includedProtocols.split(",");
            for (String include : includes)
            {
                Protocol protocol = Protocol.valueOf(include);
                defaultProtocols.add(protocol);
            }
        }
        _defaultProtocols = Collections.unmodifiableCollection(defaultProtocols);
    }

    public Port createPort(UUID id, Broker broker, Map<String, Object> objectAttributes)
    {
        Map<String, Object> attributes = retrieveAttributes(objectAttributes);

        final Port port;
        Map<String, Object> defaults = new HashMap<String, Object>();
        defaults.put(Port.TRANSPORTS, Collections.singleton(DEFAULT_TRANSPORT));
        Object portValue = attributes.get(Port.PORT);
        if (portValue == null)
        {
            throw new IllegalConfigurationException("Port attribute is not specified for port: " + attributes);
        }
        if (isAmqpProtocol(attributes))
        {
            Object binding = attributes.get(Port.BINDING_ADDRESS);
            if (binding == null)
            {
                binding = DEFAULT_AMQP_BINDING;
                defaults.put(Port.BINDING_ADDRESS, DEFAULT_AMQP_BINDING);
            }
            defaults.put(Port.NAME, binding + ":" + portValue);
            defaults.put(Port.PROTOCOLS, _defaultProtocols);
            defaults.put(Port.TCP_NO_DELAY, DEFAULT_AMQP_TCP_NO_DELAY);
            defaults.put(Port.WANT_CLIENT_AUTH, DEFAULT_AMQP_WANT_CLIENT_AUTH);
            defaults.put(Port.NEED_CLIENT_AUTH, DEFAULT_AMQP_NEED_CLIENT_AUTH);
            defaults.put(Port.RECEIVE_BUFFER_SIZE, DEFAULT_AMQP_RECEIVE_BUFFER_SIZE);
            defaults.put(Port.SEND_BUFFER_SIZE, DEFAULT_AMQP_SEND_BUFFER_SIZE);
            port = new AmqpPortAdapter(id, broker, attributes, defaults, broker.getTaskExecutor());
        }
        else
        {
            @SuppressWarnings("unchecked")
            Collection<Protocol> protocols = (Collection<Protocol>)attributes.get(Port.PROTOCOLS);
            if (protocols.size() > 1)
            {
                throw new IllegalConfigurationException("Only one protocol can be used on non AMQP port");
            }
            Protocol protocol = protocols.iterator().next();
            defaults.put(Port.NAME, portValue + "-" + protocol.name());
            port = new PortAdapter(id, broker, attributes, defaults, broker.getTaskExecutor());
        }
        return port;
    }

    private Map<String, Object> retrieveAttributes(Map<String, Object> objectAttributes)
    {
        Map<String, Object> attributes = new HashMap<String, Object>(objectAttributes);

        if (objectAttributes.containsKey(Port.PROTOCOLS))
        {
            final Set<Protocol> protocolSet = MapValueConverter.getEnumSetAttribute(Port.PROTOCOLS, objectAttributes, Protocol.class);
            attributes.put(Port.PROTOCOLS, protocolSet);
        }

        if (objectAttributes.containsKey(Port.TRANSPORTS))
        {
            final Set<Transport> transportSet = MapValueConverter.getEnumSetAttribute(Port.TRANSPORTS, objectAttributes,
                    Transport.class);
            attributes.put(Port.TRANSPORTS, transportSet);
        }

        if (objectAttributes.containsKey(Port.PORT))
        {
            Integer port = MapValueConverter.getIntegerAttribute(Port.PORT, objectAttributes);
            attributes.put(Port.PORT, port);
        }

        if (objectAttributes.containsKey(Port.TCP_NO_DELAY))
        {
            boolean tcpNoDelay = MapValueConverter.getBooleanAttribute(Port.TCP_NO_DELAY, objectAttributes);
            attributes.put(Port.TCP_NO_DELAY, tcpNoDelay);
        }

        if (objectAttributes.containsKey(Port.RECEIVE_BUFFER_SIZE))
        {
            int receiveBufferSize = MapValueConverter.getIntegerAttribute(Port.RECEIVE_BUFFER_SIZE, objectAttributes);
            attributes.put(Port.RECEIVE_BUFFER_SIZE, receiveBufferSize);
        }

        if (objectAttributes.containsKey(Port.SEND_BUFFER_SIZE))
        {
            int sendBufferSize = MapValueConverter.getIntegerAttribute(Port.SEND_BUFFER_SIZE, objectAttributes);
            attributes.put(Port.SEND_BUFFER_SIZE, sendBufferSize);
        }

        if (objectAttributes.containsKey(Port.NEED_CLIENT_AUTH))
        {
            boolean needClientAuth = MapValueConverter.getBooleanAttribute(Port.NEED_CLIENT_AUTH, objectAttributes);
            attributes.put(Port.NEED_CLIENT_AUTH, needClientAuth);
        }

        if (objectAttributes.containsKey(Port.WANT_CLIENT_AUTH))
        {
            boolean wantClientAuth = MapValueConverter.getBooleanAttribute(Port.WANT_CLIENT_AUTH, objectAttributes);
            attributes.put(Port.WANT_CLIENT_AUTH, wantClientAuth);
        }

        if (objectAttributes.containsKey(Port.BINDING_ADDRESS))
        {
            String binding = MapValueConverter.getStringAttribute(Port.BINDING_ADDRESS, objectAttributes);
            attributes.put(Port.BINDING_ADDRESS, binding);
        }

        if (objectAttributes.containsKey(Port.STATE))
        {
            State state = MapValueConverter.getEnumAttribute(State.class, Port.STATE, objectAttributes);
            attributes.put(Port.STATE, state);
        }
        return attributes;
    }

    private boolean isAmqpProtocol(Map<String, Object> portAttributes)
    {
        @SuppressWarnings("unchecked")
        Set<Protocol> protocols = (Set<Protocol>) portAttributes.get(Port.PROTOCOLS);
        if (protocols == null || protocols.isEmpty())
        {
            // defaulting to AMQP if protocol is not specified
            return true;
        }

        Set<ProtocolType> protocolTypes = new HashSet<ProtocolType>();
        for (Protocol protocolObject : protocols)
        {
            protocolTypes.add(protocolObject.getProtocolType());
        }

        if (protocolTypes.size() > 1)
        {
            throw new IllegalConfigurationException("Found different protocol types '" + protocolTypes
                    + "' on port configuration: " + portAttributes);
        }

        return protocolTypes.contains(ProtocolType.AMQP);
    }

    public Collection<Protocol> getDefaultProtocols()
    {
        return _defaultProtocols;
    }

}
