/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */
package org.apache.qpid.server.protocol.v0_10;

import java.nio.BufferUnderflowException;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.protocol.v0_10.transport.ConnectionCloseCode;
import org.apache.qpid.server.session.AMQPSession;
import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.transport.AbstractAMQPConnection;
import org.apache.qpid.server.transport.AggregateTicker;
import org.apache.qpid.server.transport.ByteBufferSender;
import org.apache.qpid.server.transport.ProtocolEngine;
import org.apache.qpid.server.transport.ServerNetworkConnection;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.util.ServerScopedRuntimeException;


public class AMQPConnection_0_10Impl extends AbstractAMQPConnection<AMQPConnection_0_10Impl, ServerConnection>
        implements
        AMQPConnection_0_10<AMQPConnection_0_10Impl>
{
    private static final Logger LOGGER = LoggerFactory.getLogger(AMQPConnection_0_10Impl.class);
    private final ServerInputHandler _inputHandler;

    private final ServerConnection _connection;

    private volatile boolean _transportBlockedForWriting;

    private final AtomicBoolean _stateChanged = new AtomicBoolean();
    private final AtomicReference<Action<ProtocolEngine>> _workListener = new AtomicReference<>();
    private ServerDisassembler _disassembler;

    private final Set<AMQPSession<?,?>> _sessionsWithWork =
            Collections.newSetFromMap(new ConcurrentHashMap<AMQPSession<?,?>, Boolean>());

    public AMQPConnection_0_10Impl(final Broker<?> broker,
                                   ServerNetworkConnection network,
                                   final AmqpPort<?> port,
                                   final Transport transport,
                                   final long id,
                                   final AggregateTicker aggregateTicker)
    {
        super(broker, network, port, transport, Protocol.AMQP_0_10, id, aggregateTicker);

        _connection = new ServerConnection(id, broker, port, transport, this);

        ServerConnectionDelegate connDelegate = new ServerConnectionDelegate(port, transport.isSecure(), network.getSelectedHost());

        _connection.setConnectionDelegate(connDelegate);
        _connection.setRemoteAddress(network.getRemoteAddress());

        _inputHandler = new ServerInputHandler(new ServerAssembler(_connection));
        _connection.addFrameSizeObserver(_inputHandler);

        AccessController.doPrivileged((PrivilegedAction<Object>) () ->
        {
            _connection.setNetworkConnection(getNetwork());
            _disassembler = new ServerDisassembler(wrapSender(getNetwork().getSender()), Constant.MIN_MAX_FRAME_SIZE);
            _connection.setSender(_disassembler);
            _connection.addFrameSizeObserver(_disassembler);
            return null;
        }, getAccessControllerContext());
    }

    private ByteBufferSender wrapSender(final ByteBufferSender sender)
    {
        return new ByteBufferSender()
        {
            @Override
            public boolean isDirectBufferPreferred()
            {
                return sender.isDirectBufferPreferred();
            }

            @Override
            public void send(final QpidByteBuffer msg)
            {
                updateLastWriteTime();
                sender.send(msg);
            }

            @Override
            public void flush()
            {
                sender.flush();

            }

            @Override
            public void close()
            {
                sender.close();
            }
        };
    }

    @Override
    public void received(final QpidByteBuffer buf)
    {
        AccessController.doPrivileged((PrivilegedAction<Object>) () ->
        {
            updateLastReadTime();
            try
            {
                _inputHandler.received(buf);
                _connection.receivedComplete();
            }
            catch (IllegalArgumentException | IllegalStateException | BufferUnderflowException e)
            {
                throw new ConnectionScopedRuntimeException(e);
            }
            catch (StoreException e)
            {
                if (getAddressSpace().isActive())
                {
                    throw new ServerScopedRuntimeException(e);
                }
                else
                {
                    throw new ConnectionScopedRuntimeException(e);
                }
            }
            return null;
        }, getAccessControllerContext());
    }

    @Override
    public void encryptedTransport()
    {
    }

    @Override
    public void writerIdle()
    {
        _connection.doHeartBeat();
    }

    @Override
    public void readerIdle()
    {
        AccessController.doPrivileged((PrivilegedAction<Object>) () ->
        {
            _connection.getEventLogger().message(ConnectionMessages.IDLE_CLOSE("Current connection state: " + _connection.getConnectionDelegate().getState(), true));
            getNetwork().close();
            return null;
        }, getAccessControllerContext());

    }

    public String getAddress()
    {
        return getNetwork().getRemoteAddress().toString();
    }

    @Override
    public void closed()
    {
        try
        {
            AccessController.doPrivileged((PrivilegedAction<Void>) () ->
            {
                _inputHandler.closed();
                if(_disassembler != null)
                {
                    _disassembler.closed();
                }
                return null;
            }, getAccessControllerContext());
        }
        finally
        {
            markTransportClosed();
        }
    }

    @Override
    public boolean isTransportBlockedForWriting()
    {
        return _transportBlockedForWriting;
    }

    @Override
    public boolean isClosing()
    {
        return _connection.isClosing() || _connection.isConnectionLost();
    }

    @Override
    public int getHeartbeatDelay()
    {
        return _connection.getHeartBeatDelay();
    }

    @Override
    public void setTransportBlockedForWriting(final boolean blocked)
    {
        if(_transportBlockedForWriting != blocked)
        {
            _transportBlockedForWriting = blocked;
            _connection.transportStateChanged();
        }
    }

    @Override
    public Iterator<Runnable> processPendingIterator()
    {
        if (isIOThread())
        {
            return _connection.processPendingIterator(_sessionsWithWork);
        }
        else
        {
            return Collections.emptyIterator();
        }
    }

    @Override
    public boolean hasWork()
    {
        return _stateChanged.get();
    }

    @Override
    public void notifyWork()
    {
        _stateChanged.set(true);

        final Action<ProtocolEngine> listener = _workListener.get();
        if(listener != null)
        {
            listener.performAction(this);
        }
    }

    @Override
    public void notifyWork(final AMQPSession<?,?> sessionModel)
    {
        _sessionsWithWork.add(sessionModel);
        notifyWork();
    }

    @Override
    public void clearWork()
    {
        _stateChanged.set(false);
    }

    @Override
    public void setWorkListener(final Action<ProtocolEngine> listener)
    {
        _workListener.set(listener);
    }

    @Override
    public boolean hasSessionWithName(final byte[] name)
    {
        return _connection.hasSessionWithName(name);
    }

    @Override
    public void sendConnectionCloseAsync(final CloseReason reason, final String description)
    {
        _connection.setConnectionCloseCause(reason, description);
        stopConnection();
        // Best mapping for all reasons is "forced"
        _connection.sendConnectionCloseAsync(ConnectionCloseCode.CONNECTION_FORCED, description);

    }

    @Override
    public void closeSessionAsync(final AMQPSession<?,?> session,
                                  final CloseReason reason, final String message)
    {
        ServerSession s = ((Session_0_10)session).getServerSession();
        _connection.closeSessionAsync(s, reason, message);
    }

    @Override
    protected void addAsyncTask(final Action<? super ServerConnection> action)
    {
        _connection.addAsyncTask(action);
    }

    @Override
    public void block()
    {
        _connection.block();
    }

    @Override
    public String getRemoteContainerName()
    {
        return getClientId();
    }

    @Override
    public Collection<? extends Session_0_10> getSessionModels()
    {
        final Collection<org.apache.qpid.server.model.Session> sessions =
                getChildren(org.apache.qpid.server.model.Session.class);
        final Collection<? extends Session_0_10> session_0_10s = new ArrayList<>((Collection)sessions);
        return session_0_10s;
    }

    @Override
    public void unblock()
    {
        _connection.unblock();
    }

    @Override
    public int getSessionCountLimit()
    {
        return _connection.getSessionCountLimit();
    }

    @Override
    protected boolean isOrderlyClose()
    {
        return !_connection.isConnectionLost();
    }

    @Override
    protected String getCloseCause()
    {
        String connectionCloseMessage = _connection.getConnectionCloseMessage();
        if (connectionCloseMessage == null)
        {
            return null;
        }
        return _connection.getConnectionCloseCode() + " - " + connectionCloseMessage;
    }

    @Override
    public void initialiseHeartbeating(final long writerDelay, final long readerDelay)
    {
        super.initialiseHeartbeating(writerDelay, readerDelay);
    }

    @Override
    protected boolean isOpeningInProgress()
    {
        ServerConnectionDelegate.ConnectionState state = _connection.getConnectionDelegate().getState();
        switch (state)
        {
            case INIT:
            case AWAIT_START_OK:
            case AWAIT_SECURE_OK:
            case AWAIT_TUNE_OK:
            case AWAIT_OPEN:
                return true;
            case OPEN:
                return false;
            default:
                throw new IllegalStateException("Unsupported state " + state);
        }
    }

    @Override
    public Iterator<ServerTransaction> getOpenTransactions()
    {
        return getSessionModels().stream()
                                 .filter(sessionModel -> sessionModel.getServerSession()
                                                                     .getTransaction() instanceof LocalTransaction)
                                 .map(sessionModel -> sessionModel.getServerSession().getTransaction())
                                 .iterator();
    }

}
