/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */
package org.apache.qpid.server.protocol.v0_10;

import static org.apache.qpid.server.protocol.v0_10.ServerConnection.State.CLOSE_RCVD;

import java.nio.charset.StandardCharsets;
import java.security.AccessControlException;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.security.Principal;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.NamedAddressSpace;
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.plugin.ConnectionPropertyEnricher;
import org.apache.qpid.server.properties.ConnectionStartProperties;
import org.apache.qpid.server.protocol.v0_10.transport.*;
import org.apache.qpid.server.security.SubjectCreator;
import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus;
import org.apache.qpid.server.security.auth.SubjectAuthenticationResult;
import org.apache.qpid.server.security.auth.sasl.SaslNegotiator;
import org.apache.qpid.server.security.auth.sasl.SaslSettings;
import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;

public class ServerConnectionDelegate extends MethodDelegate<ServerConnection> implements ProtocolDelegate<ServerConnection>
{
    private static final Logger LOGGER = LoggerFactory.getLogger(ServerConnectionDelegate.class);
    static final String MESSAGE_DIGEST_SHA1 = "SHA-1";
    static final int BASE64_LIMIT = 64;
    private final AmqpPort<?> _port;

    private List<Object> _locales;
    private List<Object> _mechanisms;

    private final Broker<?> _broker;
    private int _maxNoOfChannels;
    private Map<String,Object> _clientProperties;
    private final SubjectCreator _subjectCreator;
    private int _maximumFrameSize;

    private boolean _compressionSupported;
    private volatile SaslNegotiator _saslNegotiator;

    enum ConnectionState
    {
        INIT,
        AWAIT_START_OK,
        AWAIT_SECURE_OK,
        AWAIT_TUNE_OK,
        AWAIT_OPEN,
        OPEN
    }

    private volatile ConnectionState _state = ConnectionState.INIT;
    private volatile SubjectAuthenticationResult _successfulAuthenticationResult;


    public ServerConnectionDelegate(AmqpPort<?> port, boolean secure, final String selectedHost)
    {
        _port = port;
        _broker = (Broker<?>) port.getParent();
        _mechanisms = new ArrayList<>(port.getAuthenticationProvider().getAvailableMechanisms(secure));

        _maxNoOfChannels = port.getSessionCountLimit();
        _subjectCreator = port.getSubjectCreator(secure, selectedHost);
        _maximumFrameSize = Math.min(0xffff, _broker.getNetworkBufferSize());
    }

    @Override
    public void control(ServerConnection conn, Method method)
    {
        method.dispatch(conn, this);
    }

    @Override
    public void command(ServerConnection conn, Method method)
    {
        method.dispatch(conn, this);
    }

    @Override
    public void error(ServerConnection conn, ProtocolError error)
    {
        conn.exception(new ConnectionException(error.getMessage()));
    }

    @Override
    public void handle(ServerConnection conn, Method method)
    {
        conn.dispatch(method);
    }

    @Override public void connectionHeartbeat(ServerConnection conn, ConnectionHeartbeat hearbeat)
    {
        // do nothing
    }

    protected void sendConnectionCloseOkAndCloseSender(ServerConnection conn)
    {
        conn.connectionCloseOk();
        conn.getSender().close();
    }

    @Override public void connectionCloseOk(ServerConnection conn, ConnectionCloseOk ok)
    {
        conn.getSender().close();
    }

    @Override public void sessionDetached(ServerConnection conn, SessionDetached dtc)
    {
        ServerSession ssn = conn.getSession(dtc.getChannel());
        if (ssn != null)
        {
            ssn.setDetachCode(dtc.getCode());
            conn.unmap(ssn);
            ssn.closed();
        }
    }

    public final ConnectionState getState()
    {
        return _state;
    }

    private void assertState(final ServerConnection conn, final ConnectionState requiredState)
    {
        if(_state != requiredState)
        {
            String replyText = "Command Invalid, expected " + requiredState + " but was " + _state;
            conn.sendConnectionClose(ConnectionCloseCode.FRAMING_ERROR, replyText);
            conn.closeAndIgnoreFutureInput();
            throw new ConnectionScopedRuntimeException(replyText);
        }
    }

    @Override
    public void init(final ServerConnection serverConnection, final ProtocolHeader hdr)
    {
        assertState(serverConnection, ConnectionState.INIT);
        serverConnection.send(new ProtocolHeader(1, 0, 10));
        Map<String,Object> props = Collections.emptyMap();
        for(ConnectionPropertyEnricher enricher : _port.getConnectionPropertyEnrichers())
        {
            props = enricher.addConnectionProperties(serverConnection.getAmqpConnection(), props);
        }
        serverConnection.sendConnectionStart(props, _mechanisms, Collections.singletonList((Object)"en_US"));
        _state = ConnectionState.AWAIT_START_OK;
    }

    @Override
    public void connectionSecureOk(final ServerConnection serverConnection, final ConnectionSecureOk ok)
    {
        assertState(serverConnection, ConnectionState.AWAIT_SECURE_OK);
        secure(serverConnection, ok.getResponse());
    }

    protected void secure(final ServerConnection sconn, final byte[] response)
    {
        SubjectAuthenticationResult authResult = _successfulAuthenticationResult;
        byte[] challenge = null;
        if (authResult == null)
        {
            authResult = _subjectCreator.authenticate(_saslNegotiator, response);
            challenge = authResult.getChallenge();
        }

        if (AuthenticationStatus.SUCCESS.equals(authResult.getStatus()))
        {
            _successfulAuthenticationResult = authResult;
            if (challenge == null || challenge.length == 0)
            {
                sconn.sendConnectionTune(getChannelMax(), getFrameMax(), 0, getHeartbeatMax());
                sconn.setAuthorizedSubject(authResult.getSubject());
                _state = ConnectionState.AWAIT_TUNE_OK;
                disposeSaslNegotiator();
            }
            else
            {
                sconn.sendConnectionSecure(authResult.getChallenge());
                _state = ConnectionState.AWAIT_SECURE_OK;
            }
        }
        else if (AuthenticationStatus.CONTINUE.equals(authResult.getStatus()))
        {
            sconn.sendConnectionSecure(authResult.getChallenge());
            _state = ConnectionState.AWAIT_SECURE_OK;
        }
        else
        {
            connectionAuthFailed(sconn, authResult.getCause());
        }
    }

    @Override
    public void connectionClose(ServerConnection sconn, ConnectionClose close)
    {
        sconn.closeCode(close);
        sconn.setState(CLOSE_RCVD);
        sendConnectionCloseOkAndCloseSender(sconn);
    }

    @Override
    public void connectionOpen(ServerConnection sconn, ConnectionOpen open)
    {
        assertState(sconn, ConnectionState.AWAIT_OPEN);
        NamedAddressSpace addressSpace;
        String vhostName;
        if(open.hasVirtualHost())
        {
            vhostName = open.getVirtualHost();
        }
        else
        {
            vhostName = "";
        }

        AmqpPort port = sconn.getPort();
        addressSpace = port.getAddressSpace(vhostName);

        if(addressSpace != null)
        {
            if (!addressSpace.isActive())
            {
                sconn.setState(ServerConnection.State.CLOSING);
                final String redirectHost = addressSpace.getRedirectHost(port);
                if(redirectHost == null)
                {
                    sconn.sendConnectionClose(ConnectionCloseCode.CONNECTION_FORCED,
                                                     "Virtual host '" + vhostName + "' is not active");
                }
                else
                {
                    sconn.invoke(new ConnectionRedirect(redirectHost, new ArrayList<Object>()));
                }
                return;
            }

            try
            {
                sconn.setVirtualHost(addressSpace);
                if(!addressSpace.authoriseCreateConnection(sconn.getAmqpConnection()))
                {
                    sconn.setState(ServerConnection.State.CLOSING);
                    sconn.sendConnectionClose(ConnectionCloseCode.CONNECTION_FORCED, "Connection not authorized");
                    return;
                }
            }
            catch (AccessControlException | VirtualHostUnavailableException e)
            {
                sconn.setState(ServerConnection.State.CLOSING);
                sconn.sendConnectionClose(ConnectionCloseCode.CONNECTION_FORCED, e.getMessage());
                return;
            }

            sconn.setState(ServerConnection.State.OPEN);
            _state = ConnectionState.OPEN;
            sconn.invoke(new ConnectionOpenOk(Collections.emptyList()));
        }
        else
        {
            sconn.setState(ServerConnection.State.CLOSING);
            sconn.sendConnectionClose(ConnectionCloseCode.INVALID_PATH,
                                             "Unknown virtualhost '" + vhostName + "'");
        }

    }

    @Override
    public void connectionTuneOk(final ServerConnection sconn, final ConnectionTuneOk ok)
    {
        assertState(sconn, ConnectionState.AWAIT_TUNE_OK);
        int okChannelMax = ok.getChannelMax();
        int okMaxFrameSize = ok.getMaxFrameSize();

        if (okChannelMax > getChannelMax())
        {
            LOGGER.error("Connection '" + sconn.getConnectionId() + "' being severed, " +
                    "client connectionTuneOk returned a channelMax (" + okChannelMax +
                    ") above the server's offered limit (" + getChannelMax() +")");

            //Due to the error we must forcefully close the connection without negotiation
            sconn.closeAndIgnoreFutureInput();
            return;
        }

        if(okMaxFrameSize > getFrameMax())
        {
            LOGGER.error("Connection '" + sconn.getConnectionId() + "' being severed, " +
                         "client connectionTuneOk returned a frameMax (" + okMaxFrameSize +
                         ") above the server's offered limit (" + getFrameMax() +")");

            //Due to the error we must forcefully close the connection without negotiation
            sconn.closeAndIgnoreFutureInput();

            return;
        }
        else if(okMaxFrameSize > 0 && okMaxFrameSize < Constant.MIN_MAX_FRAME_SIZE)
        {
            LOGGER.error("Connection '" + sconn.getConnectionId() + "' being severed, " +
                         "client connectionTuneOk returned a frameMax (" + okMaxFrameSize +
                         ") below the minimum permitted size (" + Constant.MIN_MAX_FRAME_SIZE +")");

            //Due to the error we must forcefully close the connection without negotiation
            sconn.closeAndIgnoreFutureInput();
            return;
        }
        else if(okMaxFrameSize == 0)
        {
            okMaxFrameSize = getFrameMax();
        }

        if(ok.hasHeartbeat() && ok.getHeartbeat() > 0)
        {
            int heartbeat = ok.getHeartbeat();
            sconn.setHeartBeatDelay(heartbeat);
            long readerIdle = 2000L * heartbeat;
            long writerIdle = 1000L * heartbeat;
            sconn.getAmqpConnection().initialiseHeartbeating(writerIdle, readerIdle);
        }

        //0 means no implied limit, except available server resources
        //(or that forced by protocol limitations [0xFFFF])
        sconn.setChannelMax(okChannelMax == 0 ? getChannelMax() : okChannelMax);
        sconn.setMaxFrameSize(okMaxFrameSize);
        _state = ConnectionState.AWAIT_OPEN;
    }

    private int getChannelMax()
    {
        return _maxNoOfChannels;
    }

    private int getFrameMax()
    {
        return _maximumFrameSize;
    }

    @Override
    public void sessionDetach(ServerConnection conn, SessionDetach dtc)
    {
        int channel = dtc.getChannel();
        ServerSession ssn = conn.getSession(channel);
        if (ssn != null)
        {
            stopAllSubscriptions(ssn);
            ssn.setClose(true);
            ssn.sessionDetached(dtc.getName(),
                                ssn.getDetachCode() == null ? SessionDetachCode.NORMAL : ssn.getDetachCode());
            conn.unmap(ssn);
            ssn.closed();
        }
        else
        {
            if(LOGGER.isDebugEnabled())
            {
                LOGGER.debug("SessionDetach received on unattached channel : {}", channel);
            }
            SessionDetached sessionDetached = new SessionDetached(dtc.getName(), SessionDetachCode.NOT_ATTACHED);
            sessionDetached.setChannel(channel);
            conn.invoke(sessionDetached);
        }
    }

    private void stopAllSubscriptions(final ServerSession ssn)
    {
        final Collection<ConsumerTarget_0_10> subs = ssn.getSubscriptions();
        for (ConsumerTarget_0_10 subscription_0_10 : subs)
        {
            subscription_0_10.stop();
        }
    }


    @Override
    public void sessionAttach(final ServerConnection serverConnection, final SessionAttach atc)
    {
        assertState(serverConnection, ConnectionState.OPEN);

        // We ignore the force flag

        if(isSessionNameUnique(atc.getName(), serverConnection))
        {
            ServerSessionDelegate serverSessionDelegate = new ServerSessionDelegate();

            final ServerSession serverSession =
                    new ServerSession(serverConnection, serverSessionDelegate, new Binary(atc.getName()), 0);
            final Session_0_10 session = new Session_0_10(serverConnection.getAmqpConnection(), atc.getChannel(),
                                                          serverSession, getPeerSessionName(atc.getName()));
            session.create();
            serverSession.setModelObject(session);

            serverConnection.map(serverSession, atc.getChannel());
            serverConnection.registerSession(serverSession);
            serverSession.sendSessionAttached(atc.getName());
            serverSession.setState(ServerSession.State.OPEN);
        }
        else
        {
            final SessionDetached detached = new SessionDetached(atc.getName(), SessionDetachCode.SESSION_BUSY);
            detached.setChannel(atc.getChannel());
            serverConnection.invoke(detached);
        }
    }

    private String getPeerSessionName(final byte[] attachName)
    {
        try
        {
            return UUID.fromString(new String(attachName, StandardCharsets.UTF_8)).toString();
        }
        catch (RuntimeException e)
        {
            return createBase64OrSha1(attachName);
        }
    }

    private String createBase64OrSha1(final byte[] attachName)
    {
        if (attachName.length <= BASE64_LIMIT)
        {
            return Base64.getEncoder().encodeToString(attachName);
        }
        else
        {
            return createSha1(attachName);
        }
    }

    private String createSha1(final byte[] attachName)
    {
        try
        {
            final MessageDigest digest = MessageDigest.getInstance(MESSAGE_DIGEST_SHA1);
            return Base64.getEncoder().encodeToString(digest.digest(attachName));
        }
        catch (NoSuchAlgorithmException e)
        {
            return Base64.getEncoder().encodeToString(attachName);
        }
    }

    // for test purposes only
    void setState(ConnectionState state)
    {
        _state = state;
    }

    private boolean isSessionNameUnique(final byte[] name, final ServerConnection conn)
    {
        final Principal authorizedPrincipal = conn.getAuthorizedPrincipal();
        final String userId = authorizedPrincipal == null ? "" : authorizedPrincipal.getName();

        final Iterator<? extends org.apache.qpid.server.model.Connection<?>> connections =
                        conn.getAddressSpace().getConnections().iterator();
        while(connections.hasNext())
        {
            final AMQPConnection<?> amqConnectionModel = (AMQPConnection<?>) connections.next();

            final String userName = amqConnectionModel.getAuthorizedPrincipal() == null
                    ? ""
                    : amqConnectionModel.getAuthorizedPrincipal().getName();
            if (userId.equals(userName) && amqConnectionModel.hasSessionWithName(name))
            {
                return false;
            }
        }
        return true;
    }

    @Override
    public void connectionStartOk(ServerConnection serverConnection, ConnectionStartOk ok)
    {
        assertState(serverConnection, ConnectionState.AWAIT_START_OK);
        _clientProperties = ok.getClientProperties();
        if(_clientProperties != null)
        {
            Object compressionSupported =
                    _clientProperties.get(ConnectionStartProperties.QPID_MESSAGE_COMPRESSION_SUPPORTED);
            if (compressionSupported != null)
            {
                _compressionSupported = Boolean.parseBoolean(String.valueOf(compressionSupported));

            }
            final AMQPConnection_0_10 protocolEngine = serverConnection.getAmqpConnection();
            protocolEngine.setClientId(getStringClientProperty(ConnectionStartProperties.CLIENT_ID_0_10));
            protocolEngine.setClientProduct(getStringClientProperty(ConnectionStartProperties.PRODUCT));
            protocolEngine.setClientVersion(getStringClientProperty(ConnectionStartProperties.VERSION_0_10));
            protocolEngine.setRemoteProcessPid(getStringClientProperty(ConnectionStartProperties.PID));
        }


        serverConnection.setLocale(ok.getLocale());
        String mechanism = ok.getMechanism();

        if (mechanism == null || mechanism.length() == 0)
        {
            serverConnection.sendConnectionClose(ConnectionCloseCode.CONNECTION_FORCED,
                                                 "No Sasl mechanism was specified");
            return;
        }

        _saslNegotiator = _subjectCreator.createSaslNegotiator(mechanism,
                                                               (SaslSettings) serverConnection.getAmqpConnection());
        if (_saslNegotiator == null)
        {
            serverConnection.sendConnectionClose(ConnectionCloseCode.CONNECTION_FORCED,
                                                 "No SaslServer could be created for mechanism: " + mechanism);
        }
        else
        {
            secure(serverConnection, ok.getResponse());
        }
    }

    private String getStringClientProperty(final String name)
    {
        return (_clientProperties == null || _clientProperties.get(name) == null) ? null : String.valueOf(_clientProperties.get(name));
    }


    protected int getHeartbeatMax()
    {
        int delay = _port.getHeartbeatDelay();
        return delay == 0 ? 0xFFFF : delay;
    }

    public boolean isCompressionSupported()
    {
        return _compressionSupported && _broker.isMessageCompressionEnabled();
    }

    private void connectionAuthFailed(final ServerConnection serverConnection, Exception e)
    {
        if (e != null)
        {
            serverConnection.exception(e);
        }
        serverConnection.sendConnectionClose(ConnectionCloseCode.CONNECTION_FORCED, e == null ? "Authentication failed" : e.getMessage());
        disposeSaslNegotiator();
    }

    private void disposeSaslNegotiator()
    {
        _saslNegotiator.dispose();
        _saslNegotiator = null;
    }
}
