/*
 * Copyright 2009-2010 by The Regents of the University of California
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * you may obtain a copy of the License from
 * 
 *     http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package edu.uci.ics.hyracks.net.protocols.muxdemux;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;

import edu.uci.ics.hyracks.net.buffers.IBufferAcceptor;
import edu.uci.ics.hyracks.net.buffers.ICloseableBufferAcceptor;
import edu.uci.ics.hyracks.net.exceptions.NetException;

/**
 * Handle to a channel that represents a logical full-duplex communication end-point.
 * 
 * @author vinayakb
 */
public class ChannelControlBlock {
    private static final Logger LOGGER = Logger.getLogger(ChannelControlBlock.class.getName());

    private final ChannelSet cSet;

    private final int channelId;

    private final ReadInterface ri;

    private final WriteInterface wi;

    private final AtomicBoolean localClose;

    private final AtomicBoolean localCloseAck;

    private final AtomicBoolean remoteClose;

    private final AtomicBoolean remoteCloseAck;

    ChannelControlBlock(ChannelSet cSet, int channelId) {
        this.cSet = cSet;
        this.channelId = channelId;
        this.ri = new ReadInterface();
        this.wi = new WriteInterface();
        localClose = new AtomicBoolean();
        localCloseAck = new AtomicBoolean();
        remoteClose = new AtomicBoolean();
        remoteCloseAck = new AtomicBoolean();
    }

    int getChannelId() {
        return channelId;
    }

    /**
     * Get the read inderface of this channel.
     * 
     * @return the read interface.
     */
    public IChannelReadInterface getReadInterface() {
        return ri;
    }

    /**
     * Get the write interface of this channel.
     * 
     * @return the write interface.
     */
    public IChannelWriteInterface getWriteInterface() {
        return wi;
    }

    private final class ReadInterface implements IChannelReadInterface {
        private final Deque<ByteBuffer> riEmptyStack;

        private final IBufferAcceptor eba = new IBufferAcceptor() {
            @Override
            public void accept(ByteBuffer buffer) {
                int delta = buffer.remaining();
                synchronized (ChannelControlBlock.this) {
                    if (remoteClose.get()) {
                        return;
                    }
                    riEmptyStack.push(buffer);
                }
                cSet.addPendingCredits(channelId, delta);
            }
        };

        private ICloseableBufferAcceptor fba;

        private volatile int credits;

        private ByteBuffer currentReadBuffer;

        ReadInterface() {
            riEmptyStack = new ArrayDeque<ByteBuffer>();
            credits = 0;
        }

        @Override
        public void setFullBufferAcceptor(ICloseableBufferAcceptor fullBufferAcceptor) {
            fba = fullBufferAcceptor;
        }

        @Override
        public IBufferAcceptor getEmptyBufferAcceptor() {
            return eba;
        }

        int read(SocketChannel sc, int size) throws IOException, NetException {
            while (true) {
                if (size <= 0) {
                    return size;
                }
                if (currentReadBuffer == null) {
                    currentReadBuffer = riEmptyStack.poll();
                    assert currentReadBuffer != null;
                }
                int rSize = Math.min(size, currentReadBuffer.remaining());
                if (rSize > 0) {
                    currentReadBuffer.limit(currentReadBuffer.position() + rSize);
                    int len;
                    try {
                        len = sc.read(currentReadBuffer);
                        if (len < 0) {
                            throw new NetException("Socket Closed");
                        }
                    } finally {
                        currentReadBuffer.limit(currentReadBuffer.capacity());
                    }
                    size -= len;
                    if (len < rSize) {
                        return size;
                    }
                } else {
                    return size;
                }
                if (currentReadBuffer.remaining() <= 0) {
                    flush();
                }
            }
        }

        void flush() {
            if (currentReadBuffer != null) {
                currentReadBuffer.flip();
                fba.accept(currentReadBuffer);
                currentReadBuffer = null;
            }
        }
    }

    private final class WriteInterface implements IChannelWriteInterface {
        private final Queue<ByteBuffer> wiFullQueue;

        private boolean channelWritabilityState;

        private final ICloseableBufferAcceptor fba = new ICloseableBufferAcceptor() {
            @Override
            public void accept(ByteBuffer buffer) {
                synchronized (ChannelControlBlock.this) {
                    wiFullQueue.add(buffer);
                    adjustChannelWritability();
                }
            }

            @Override
            public void close() {
                synchronized (ChannelControlBlock.this) {
                    if (eos) {
                        if (LOGGER.isLoggable(Level.WARNING)) {
                            LOGGER.warning("Received duplicate close() on channel: " + channelId);
                        }
                        return;
                    }
                    eos = true;
                    adjustChannelWritability();
                }
            }

            @Override
            public void error(int ecode) {
                synchronized (ChannelControlBlock.this) {
                    WriteInterface.this.ecode = ecode;
                    adjustChannelWritability();
                }
            }
        };

        private IBufferAcceptor eba;

        private int credits;

        private boolean eos;

        private boolean eosSent;

        private int ecode;

        private boolean ecodeSent;

        private ByteBuffer currentWriteBuffer;

        WriteInterface() {
            wiFullQueue = new ArrayDeque<ByteBuffer>();
            credits = 0;
            eos = false;
            eosSent = false;
            ecode = -1;
            ecodeSent = false;
        }

        @Override
        public void setEmptyBufferAcceptor(IBufferAcceptor emptyBufferAcceptor) {
            eba = emptyBufferAcceptor;
        }

        @Override
        public ICloseableBufferAcceptor getFullBufferAcceptor() {
            return fba;
        }

        void write(MultiplexedConnection.WriterState writerState) throws NetException {
            if (currentWriteBuffer == null) {
                currentWriteBuffer = wiFullQueue.poll();
            }
            if (currentWriteBuffer != null) {
                int size = Math.min(currentWriteBuffer.remaining(), credits);
                if (size > 0) {
                    credits -= size;
                    writerState.command.setChannelId(channelId);
                    writerState.command.setCommandType(MuxDemuxCommand.CommandType.DATA);
                    writerState.command.setData(size);
                    writerState.reset(currentWriteBuffer, size, ChannelControlBlock.this);
                } else {
                    adjustChannelWritability();
                }
            } else if (ecode >= 0 && !ecodeSent) {
                writerState.command.setChannelId(channelId);
                writerState.command.setCommandType(MuxDemuxCommand.CommandType.ERROR);
                writerState.command.setData(ecode);
                writerState.reset(null, 0, null);
                ecodeSent = true;
                localClose.set(true);
                adjustChannelWritability();
            } else if (eos && !eosSent) {
                writerState.command.setChannelId(channelId);
                writerState.command.setCommandType(MuxDemuxCommand.CommandType.CLOSE_CHANNEL);
                writerState.command.setData(0);
                writerState.reset(null, 0, null);
                eosSent = true;
                localClose.set(true);
                adjustChannelWritability();
            }
        }

        void writeComplete() {
            if (currentWriteBuffer.remaining() <= 0) {
                currentWriteBuffer.clear();
                eba.accept(currentWriteBuffer);
                currentWriteBuffer = null;
                adjustChannelWritability();
            }
        }

        private boolean computeWritability() {
            boolean writableDataPresent = currentWriteBuffer != null || !wiFullQueue.isEmpty();
            if (writableDataPresent) {
                return credits > 0;
            }
            if (eos && !eosSent) {
                return true;
            }
            if (ecode >= 0 && !ecodeSent) {
                return true;
            }
            return false;
        }

        void adjustChannelWritability() {
            boolean writable = computeWritability();
            if (writable) {
                if (!channelWritabilityState) {
                    cSet.markPendingWrite(channelId);
                }
            } else {
                if (channelWritabilityState) {
                    cSet.unmarkPendingWrite(channelId);
                }
            }
            channelWritabilityState = writable;
        }
    }

    synchronized void write(MultiplexedConnection.WriterState writerState) throws NetException {
        wi.write(writerState);
    }

    synchronized void writeComplete() {
        wi.writeComplete();
    }

    synchronized int read(SocketChannel sc, int size) throws IOException, NetException {
        return ri.read(sc, size);
    }

    int getReadCredits() {
        return ri.credits;
    }

    void setReadCredits(int credits) {
        this.ri.credits = credits;
    }

    synchronized void addWriteCredits(int delta) {
        wi.credits += delta;
        wi.adjustChannelWritability();
    }

    synchronized void reportRemoteEOS() {
        ri.flush();
        ri.fba.close();
        remoteClose.set(true);
    }

    void reportRemoteEOSAck() {
        remoteCloseAck.set(true);
    }

    boolean getRemoteEOS() {
        return remoteClose.get();
    }

    void reportLocalEOSAck() {
        localCloseAck.set(true);
    }

    synchronized void reportRemoteError(int ecode) {
        ri.flush();
        ri.fba.error(ecode);
        remoteClose.set(true);
    }

    boolean completelyClosed() {
        return localCloseAck.get() && remoteCloseAck.get();
    }

    @Override
    public String toString() {
        return "Channel:" + channelId + "[localClose: " + localClose + " localCloseAck: " + localCloseAck
                + " remoteClose: " + remoteClose + " remoteCloseAck:" + remoteCloseAck + " readCredits: " + ri.credits
                + " writeCredits: " + wi.credits + "]";
    }
}