/*
*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */
package org.apache.qpid.server.transport;

import java.io.IOException;
import java.net.SocketAddress;
import java.nio.channels.SocketChannel;
import java.security.Principal;
import java.security.cert.Certificate;
import java.util.Collection;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.network.Ticker;
import org.apache.qpid.transport.network.TransportEncryption;
import org.apache.qpid.util.SystemUtils;

public class NonBlockingConnection implements ServerNetworkConnection, ByteBufferSender
{
    private static final Logger LOGGER = LoggerFactory.getLogger(NonBlockingConnection.class);

    private final SocketChannel _socketChannel;
    private NonBlockingConnectionDelegate _delegate;
    private final Deque<NetworkConnectionScheduler> _schedulerDeque = new ConcurrentLinkedDeque<>();
    private final ConcurrentLinkedQueue<QpidByteBuffer> _buffers = new ConcurrentLinkedQueue<>();

    private final String _remoteSocketAddress;
    private final AtomicBoolean _closed = new AtomicBoolean(false);
    private final ProtocolEngine _protocolEngine;
    private final Runnable _onTransportEncryptionAction;
    private final AtomicLong _usedOutboundMessageSpace = new AtomicLong();
    private final long _outboundMessageBufferLimit;

    private volatile boolean _fullyWritten = true;

    private boolean _partialRead = false;

    private final AmqpPort _port;
    private final AtomicBoolean _scheduled = new AtomicBoolean();
    private volatile long _scheduledTime;
    private volatile boolean _unexpectedByteBufferSizeReported;
    private final String _threadName;
    private volatile SelectorThread.SelectionTask _selectionTask;
    private Iterator<Runnable> _pendingIterator;
    private final AtomicLong _maxWriteIdleMillis = new AtomicLong();
    private final AtomicLong _maxReadIdleMillis = new AtomicLong();
    private final List<SchedulingDelayNotificationListener> _schedulingDelayNotificationListeners = new CopyOnWriteArrayList<>();
    private final AtomicBoolean _hasShutdown = new AtomicBoolean();

    public NonBlockingConnection(SocketChannel socketChannel,
                                 ProtocolEngine protocolEngine,
                                 final Set<TransportEncryption> encryptionSet,
                                 final Runnable onTransportEncryptionAction,
                                 final NetworkConnectionScheduler scheduler,
                                 final AmqpPort port)
    {
        _socketChannel = socketChannel;
        pushScheduler(scheduler);

        _protocolEngine = protocolEngine;
        _onTransportEncryptionAction = onTransportEncryptionAction;

        _remoteSocketAddress = _socketChannel.socket().getRemoteSocketAddress().toString();
        _port = port;
        _threadName = SelectorThread.IO_THREAD_NAME_PREFIX + _remoteSocketAddress.toString();

        _outboundMessageBufferLimit = (long) _port.getContextValue(Long.class,
                                                                   AmqpPort.PORT_AMQP_OUTBOUND_MESSAGE_BUFFER_SIZE);

        protocolEngine.setWorkListener(new Action<ProtocolEngine>()
        {
            @Override
            public void performAction(final ProtocolEngine object)
            {
                if(!_scheduled.get())
                {
                    getScheduler().schedule(NonBlockingConnection.this);
                }
            }
        });

        if(encryptionSet.size() == 1)
        {
            setTransportEncryption(encryptionSet.iterator().next());
        }
        else
        {
            _delegate = new NonBlockingConnectionUndecidedDelegate(this);
        }

    }

    String getThreadName()
    {
        return _threadName;
    }

    public boolean isPartialRead()
    {
        return _partialRead;
    }

    Ticker getTicker()
    {
        return _protocolEngine.getAggregateTicker();
    }

    SocketChannel getSocketChannel()
    {
        return _socketChannel;
    }

    @Override
    public void start()
    {
    }

    @Override
    public ByteBufferSender getSender()
    {
        return this;
    }

    @Override
    public void close()
    {
        LOGGER.debug("Closing " + _remoteSocketAddress);
        if(_closed.compareAndSet(false,true))
        {
            _protocolEngine.notifyWork();
            _selectionTask.wakeup();
        }
    }

    @Override
    public SocketAddress getRemoteAddress()
    {
        return _socketChannel.socket().getRemoteSocketAddress();
    }

    @Override
    public SocketAddress getLocalAddress()
    {
        return _socketChannel.socket().getLocalSocketAddress();
    }

    @Override
    public void setMaxWriteIdleMillis(final long millis)
    {
        _maxWriteIdleMillis.set(millis);
    }

    @Override
    public void setMaxReadIdleMillis(final long millis)
    {
        _maxReadIdleMillis.set(millis);
    }

    @Override
    public Principal getPeerPrincipal()
    {
        return _delegate.getPeerPrincipal();
    }

    @Override
    public Certificate getPeerCertificate()
    {
        return _delegate.getPeerCertificate();
    }

    @Override
    public long getMaxReadIdleMillis()
    {
        return _maxReadIdleMillis.get();
    }

    @Override
    public long getMaxWriteIdleMillis()
    {
        return _maxWriteIdleMillis.get();
    }

    @Override
    public void reserveOutboundMessageSpace(long size)
    {
        if (_usedOutboundMessageSpace.addAndGet(size) > _outboundMessageBufferLimit)
        {
            _protocolEngine.setMessageAssignmentSuspended(true, false);
        }
    }

    @Override
    public String getTransportInfo()
    {
        return _delegate.getTransportInfo();
    }

    boolean wantsRead()
    {
        return _fullyWritten;
    }

    boolean wantsWrite()
    {
        return !_fullyWritten;
    }

    public boolean isStateChanged()
    {
        return _protocolEngine.hasWork();
    }

    public void doPreWork()
    {
        if (!_closed.get())
        {
            long currentTime = System.currentTimeMillis();
            long schedulingDelay = currentTime - getScheduledTime();
            if (!_schedulingDelayNotificationListeners.isEmpty())
            {
                for (SchedulingDelayNotificationListener listener : _schedulingDelayNotificationListeners)
                {
                    listener.notifySchedulingDelay(schedulingDelay);
                }
            }
        }
    }

    public boolean doWork()
    {
        _protocolEngine.clearWork();
        if (!_closed.get())
        {
            try
            {
                long currentTime = System.currentTimeMillis();
                int tick = getTicker().getTimeToNextTick(currentTime);
                if (tick <= 0)
                {
                    getTicker().tick(currentTime);
                }

                _protocolEngine.setIOThread(Thread.currentThread());
                _protocolEngine.setMessageAssignmentSuspended(true, true);

                boolean processPendingComplete = processPending();

                if(processPendingComplete)
                {
                    _pendingIterator = null;
                    _protocolEngine.setTransportBlockedForWriting(false);
                    boolean dataRead = doRead();
                    _protocolEngine.setTransportBlockedForWriting(!doWrite());

                    if (!_fullyWritten || dataRead || (_delegate.needsWork() && _delegate.getNetInputBuffer().position() != 0))
                    {
                        _protocolEngine.notifyWork();
                    }

                    if (_fullyWritten)
                    {
                        _protocolEngine.setMessageAssignmentSuspended(false, true);
                    }
                }
                else
                {
                    _protocolEngine.notifyWork();
                }

            }
            catch (IOException |
                    ConnectionScopedRuntimeException e)
            {
                if (LOGGER.isDebugEnabled())
                {
                    LOGGER.debug("Exception performing I/O for connection '{}'",
                                 _remoteSocketAddress, e);
                }
                else
                {
                    LOGGER.info("Exception performing I/O for connection '{}' : {}",
                                _remoteSocketAddress, e.getMessage());
                }

                if(_closed.compareAndSet(false,true))
                {
                    _protocolEngine.notifyWork();
                }
            }
            finally
            {
                _protocolEngine.setIOThread(null);
            }
        }

        final boolean closed = _closed.get();
        if (closed)
        {
            shutdown();
        }

        return closed;

    }

    @Override
    public void addSchedulingDelayNotificationListeners(final SchedulingDelayNotificationListener listener)
    {
        _schedulingDelayNotificationListeners.add(listener);
    }

    @Override
    public void removeSchedulingDelayNotificationListeners(final SchedulingDelayNotificationListener listener)
    {
        _schedulingDelayNotificationListeners.remove(listener);
    }

    private boolean processPending() throws IOException
    {
        if(_pendingIterator == null)
        {
            _pendingIterator = _protocolEngine.processPendingIterator();
        }

        final int networkBufferSize = _port.getNetworkBufferSize();

        while(_pendingIterator.hasNext())
        {
            long size = getBufferedSize();
            if(size >= networkBufferSize)
            {
                doWrite();
                long bytesWritten = size - getBufferedSize();
                if(bytesWritten < (networkBufferSize / 2))
                {
                    break;
                }
            }
            else
            {
                final Runnable task = _pendingIterator.next();
                task.run();
            }
        }

        boolean complete = !_pendingIterator.hasNext();
        if (getBufferedSize() >= networkBufferSize)
        {
            doWrite();
            complete &= getBufferedSize() < networkBufferSize /2;
        }
        return complete;
    }

    private long getBufferedSize()
    {
        // Avoids iterator garbage if empty
        if (_buffers.isEmpty())
        {
            return 0L;
        }
        long totalSize = 0L;
        for(QpidByteBuffer buf : _buffers)
        {
            totalSize += buf.remaining();
        }
        return totalSize;
    }

    private void shutdown()
    {
        if (_hasShutdown.getAndSet(true))
        {
            return;
        }

        try
        {
            shutdownInput();
            shutdownFinalWrite();
            _protocolEngine.closed();
            shutdownOutput();
        }
        finally
        {
            try
            {
                try
                {
                    NetworkConnectionScheduler scheduler = getScheduler();
                    if (scheduler != null)
                    {
                        scheduler.removeConnection(this);
                    }
                }
                finally
                {
                    _socketChannel.close();
                }
            }
            catch (IOException e)
            {
                LOGGER.info("Exception closing socket '{}': {}", _remoteSocketAddress, e.getMessage());
            }

            if (SystemUtils.isWindows())
            {
                _delegate.shutdownInput();
                _delegate.shutdownOutput();
            }
        }
    }

    private void shutdownFinalWrite()
    {
        try
        {
            while(!doWrite())
            {
            }
        }
        catch (IOException e)
        {
            LOGGER.info("Exception performing final write/close for '{}': {}", _remoteSocketAddress, e.getMessage());
        }
    }

    private void shutdownOutput()
    {
        if(!SystemUtils.isWindows())
        {
            try
            {
                _socketChannel.shutdownOutput();
            }
            catch (IOException e)
            {
                LOGGER.info("Exception closing socket '{}': {}", _remoteSocketAddress, e.getMessage());
            }
            finally
            {
                _delegate.shutdownOutput();
            }
        }
    }

    private void shutdownInput()
    {

        if(!SystemUtils.isWindows())
        {
            try
            {
                _socketChannel.shutdownInput();
            }
            catch (IOException e)
            {
                LOGGER.info("Exception shutting down input for '{}': {}", _remoteSocketAddress, e.getMessage());
            }
            finally
            {
                _delegate.shutdownInput();
            }
        }
    }

    /**
     * doRead is not reentrant.
     */
    boolean doRead() throws IOException
    {
        _partialRead = false;
        if(!_closed.get() && _delegate.readyForRead())
        {
            int readData = readFromNetwork();

            if (readData > 0)
            {
                return _delegate.processData();
            }
            else
            {
                return false;
            }
        }
        else
        {
            return false;
        }
    }

    long writeToTransport(Collection<QpidByteBuffer> buffers) throws IOException
    {
        long written  = QpidByteBuffer.write(_socketChannel, buffers);
        if (LOGGER.isDebugEnabled())
        {
            LOGGER.debug("Written " + written + " bytes");
        }
        return written;
    }

    private boolean doWrite() throws IOException
    {
        _fullyWritten = _delegate.doWrite(_buffers);
        while(!_buffers.isEmpty())
        {
            QpidByteBuffer buf = _buffers.peek();
            if(buf.hasRemaining())
            {
                break;
            }
            _buffers.poll();
            buf.dispose();
        }
        if (_fullyWritten)
        {
            _usedOutboundMessageSpace.set(0);
        }
        return _fullyWritten;

    }

    protected int readFromNetwork() throws IOException
    {
        QpidByteBuffer buffer = _delegate.getNetInputBuffer();

        int read = buffer.read(_socketChannel);
        if (read == -1)
        {
            _closed.set(true);
        }

        _partialRead = read != 0;

        if (LOGGER.isDebugEnabled())
        {
            LOGGER.debug("Read " + read + " byte(s)");
        }
        return read;
    }

    @Override
    public boolean isDirectBufferPreferred()
    {
        return true;
    }

    @Override
    public void send(final QpidByteBuffer msg)
    {

        if (_closed.get())
        {
            LOGGER.warn("Send ignored as the connection is already closed");
        }
        else if (msg.remaining() > 0)
        {
            _buffers.add(msg.duplicate());
        }
        msg.position(msg.limit());
    }

    @Override
    public void flush()
    {
    }

    public final void pushScheduler(NetworkConnectionScheduler scheduler)
    {
        _schedulerDeque.addFirst(scheduler);
    }

    public final NetworkConnectionScheduler popScheduler()
    {
        return _schedulerDeque.removeFirst();
    }

    public final NetworkConnectionScheduler getScheduler()
    {
        return _schedulerDeque.peekFirst();
    }

    @Override
    public String toString()
    {
        return "[NonBlockingConnection " + _remoteSocketAddress + "]";
    }

    public void processAmqpData(QpidByteBuffer applicationData)
    {
        _protocolEngine.received(applicationData);
    }

    public void setTransportEncryption(TransportEncryption transportEncryption)
    {
        NonBlockingConnectionDelegate oldDelegate = _delegate;
        switch (transportEncryption)
        {
            case TLS:
                _onTransportEncryptionAction.run();
                _delegate = new NonBlockingConnectionTLSDelegate(this, _port);
                break;
            case NONE:
                _delegate = new NonBlockingConnectionPlainDelegate(this, _port);
                break;
            default:
                throw new IllegalArgumentException("unknown TransportEncryption " + transportEncryption);
        }
        if(oldDelegate != null)
        {
            QpidByteBuffer src = oldDelegate.getNetInputBuffer().duplicate();
            src.flip();
            _delegate.getNetInputBuffer().put(src);
            src.dispose();
        }
        LOGGER.debug("Identified transport encryption as " + transportEncryption);
    }

    public boolean setScheduled()
    {
        final boolean scheduled = _scheduled.compareAndSet(false, true);
        if (scheduled)
        {
            _scheduledTime = System.currentTimeMillis();
        }
        return scheduled;
    }

    public void clearScheduled()
    {
        _scheduled.set(false);
        _scheduledTime = 0;
    }

    @Override
    public long getScheduledTime()
    {
        return _scheduledTime;
    }

    void reportUnexpectedByteBufferSizeUsage()
    {
        if (!_unexpectedByteBufferSizeReported)
        {
            LOGGER.info("At least one frame unexpectedly does not fit into default byte buffer size ({}B) on a connection {}.",
                    _port.getNetworkBufferSize(), this.toString());
            _unexpectedByteBufferSizeReported = true;
        }
    }

    public SelectorThread.SelectionTask getSelectionTask()
    {
        return _selectionTask;
    }

    public void setSelectionTask(final SelectorThread.SelectionTask selectionTask)
    {
        _selectionTask = selectionTask;
    }
}
