/*
 * Copyright 2009-2010 by The Regents of the University of California
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * you may obtain a copy of the License from
 * 
 *     http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package edu.uci.ics.hyracks.ipc.impl;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;

public class IPCConnectionManager {
    private static final Logger LOGGER = Logger.getLogger(IPCConnectionManager.class.getName());

    private final IPCSystem system;

    private final NetworkThread networkThread;

    private final ServerSocketChannel serverSocketChannel;

    private final Map<InetSocketAddress, IPCHandle> ipcHandleMap;

    private final List<IPCHandle> pendingConnections;

    private final List<IPCHandle> workingPendingConnections;

    private final List<Message> sendList;

    private final List<Message> workingSendList;

    private final InetSocketAddress address;

    private volatile boolean stopped;

    IPCConnectionManager(IPCSystem system, InetSocketAddress socketAddress) throws IOException {
        this.system = system;
        this.networkThread = new NetworkThread();
        this.serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.socket().setReuseAddress(true);
        serverSocketChannel.configureBlocking(false);
        ServerSocket socket = serverSocketChannel.socket();
        socket.bind(socketAddress);
        address = new InetSocketAddress(socket.getInetAddress(), socket.getLocalPort());
        ipcHandleMap = new HashMap<InetSocketAddress, IPCHandle>();
        pendingConnections = new ArrayList<IPCHandle>();
        workingPendingConnections = new ArrayList<IPCHandle>();
        sendList = new ArrayList<Message>();
        workingSendList = new ArrayList<Message>();
    }

    InetSocketAddress getAddress() {
        return address;
    }

    void start() {
        stopped = false;
        networkThread.start();
    }

    void stop() throws IOException {
        stopped = true;
        serverSocketChannel.close();
    }

    IPCHandle getIPCHandle(InetSocketAddress remoteAddress) throws IOException, InterruptedException {
        IPCHandle handle;
        synchronized (this) {
            handle = ipcHandleMap.get(remoteAddress);
            if (handle == null) {
                handle = new IPCHandle(system, remoteAddress);
                pendingConnections.add(handle);
                networkThread.selector.wakeup();
            }
        }
        handle.waitTillConnected();
        return handle;
    }

    synchronized void registerHandle(IPCHandle handle) {
        ipcHandleMap.put(handle.getRemoteAddress(), handle);
    }

    synchronized void write(Message msg) {
        if (LOGGER.isLoggable(Level.FINE)) {
            LOGGER.fine("Enqueued message: " + msg);
        }
        sendList.add(msg);
        networkThread.selector.wakeup();
    }

    private synchronized void collectOutstandingWork() {
        if (!pendingConnections.isEmpty()) {
            moveAll(pendingConnections, workingPendingConnections);
        }
        if (!sendList.isEmpty()) {
            moveAll(sendList, workingSendList);
        }
    }

    private Message createInitialReqMessage(IPCHandle handle) {
        Message msg = new Message(handle);
        msg.setMessageId(system.createMessageId());
        msg.setRequestMessageId(-1);
        msg.setFlag(Message.INITIAL_REQ);
        msg.setPayload(address);
        return msg;
    }

    private Message createInitialAckMessage(IPCHandle handle, Message req) {
        Message msg = new Message(handle);
        msg.setMessageId(system.createMessageId());
        msg.setRequestMessageId(req.getMessageId());
        msg.setFlag(Message.INITIAL_ACK);
        msg.setPayload(null);
        return msg;
    }

    void ack(IPCHandle handle, Message req) {
        write(createInitialAckMessage(handle, req));
    }

    private class NetworkThread extends Thread {
        private final Selector selector;

        public NetworkThread() {
            super("IPC Network Listener Thread");
            setDaemon(true);
            try {
                selector = Selector.open();
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }

        @Override
        public void run() {
            try {
                serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            } catch (ClosedChannelException e) {
                throw new RuntimeException(e);
            }
            BitSet unsentMessagesBitmap = new BitSet();
            List<Message> tempUnsentMessages = new ArrayList<Message>();
            while (!stopped) {
                try {
                    if (LOGGER.isLoggable(Level.FINE)) {
                        LOGGER.fine("Starting Select");
                    }
                    int n = selector.select();
                    collectOutstandingWork();
                    if (!workingPendingConnections.isEmpty()) {
                        for (IPCHandle handle : workingPendingConnections) {
                            SocketChannel channel = SocketChannel.open();
                            channel.configureBlocking(false);
                            SelectionKey cKey = null;
                            if (channel.connect(handle.getRemoteAddress())) {
                                cKey = channel.register(selector, SelectionKey.OP_READ);
                                handle.setState(HandleState.CONNECT_SENT);
                                write(createInitialReqMessage(handle));
                            } else {
                                cKey = channel.register(selector, SelectionKey.OP_CONNECT);
                            }
                            handle.setKey(cKey);
                            cKey.attach(handle);
                        }
                        workingPendingConnections.clear();
                    }
                    if (!workingSendList.isEmpty()) {
                        unsentMessagesBitmap.clear();
                        int len = workingSendList.size();
                        for (int i = 0; i < len; ++i) {
                            Message msg = workingSendList.get(i);
                            if (LOGGER.isLoggable(Level.FINE)) {
                                LOGGER.fine("Processing send of message: " + msg);
                            }
                            IPCHandle handle = msg.getIPCHandle();
                            if (handle.getState() != HandleState.CLOSED) {
                                if (!handle.full()) {
                                    while (true) {
                                        ByteBuffer buffer = handle.getOutBuffer();
                                        buffer.compact();
                                        boolean success = msg.write(buffer);
                                        buffer.flip();
                                        if (success) {
                                            system.getPerformanceCounters().addMessageSentCount(1);
                                            SelectionKey key = handle.getKey();
                                            key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
                                        } else {
                                            if (!buffer.hasRemaining()) {
                                                handle.resizeOutBuffer();
                                                continue;
                                            }
                                            handle.markFull();
                                            unsentMessagesBitmap.set(i);
                                        }
                                        break;
                                    }
                                } else {
                                    unsentMessagesBitmap.set(i);
                                }
                            }
                        }
                        copyUnsentMessages(unsentMessagesBitmap, tempUnsentMessages);
                    }
                    if (n > 0) {
                        for (Iterator<SelectionKey> i = selector.selectedKeys().iterator(); i.hasNext();) {
                            SelectionKey key = i.next();
                            i.remove();
                            SelectableChannel sc = key.channel();
                            if (key.isReadable()) {
                                SocketChannel channel = (SocketChannel) sc;
                                IPCHandle handle = (IPCHandle) key.attachment();
                                ByteBuffer readBuffer = handle.getInBuffer();
                                int len = channel.read(readBuffer);
                                system.getPerformanceCounters().addMessageBytesReceived(len);
                                if (len < 0) {
                                    key.cancel();
                                    channel.close();
                                    handle.close();
                                } else {
                                    handle.processIncomingMessages();
                                    if (!readBuffer.hasRemaining()) {
                                        handle.resizeInBuffer();
                                    }
                                }
                            } else if (key.isWritable()) {
                                SocketChannel channel = (SocketChannel) sc;
                                IPCHandle handle = (IPCHandle) key.attachment();
                                ByteBuffer writeBuffer = handle.getOutBuffer();
                                int len = channel.write(writeBuffer);
                                system.getPerformanceCounters().addMessageBytesSent(len);
                                if (len < 0) {
                                    key.cancel();
                                    channel.close();
                                    handle.close();
                                } else if (!writeBuffer.hasRemaining()) {
                                    key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
                                }
                                if (handle.full()) {
                                    handle.clearFull();
                                    selector.wakeup();
                                }
                            } else if (key.isAcceptable()) {
                                assert sc == serverSocketChannel;
                                SocketChannel channel = serverSocketChannel.accept();
                                channel.configureBlocking(false);
                                IPCHandle handle = new IPCHandle(system, null);
                                SelectionKey cKey = channel.register(selector, SelectionKey.OP_READ);
                                handle.setKey(cKey);
                                cKey.attach(handle);
                                handle.setState(HandleState.CONNECT_RECEIVED);
                            } else if (key.isConnectable()) {
                                SocketChannel channel = (SocketChannel) sc;
                                if (channel.finishConnect()) {
                                    IPCHandle handle = (IPCHandle) key.attachment();
                                    handle.setState(HandleState.CONNECT_SENT);
                                    registerHandle(handle);
                                    key.interestOps(SelectionKey.OP_READ);
                                    write(createInitialReqMessage(handle));
                                }
                            }
                        }
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }

        private void copyUnsentMessages(BitSet unsentMessagesBitmap, List<Message> tempUnsentMessages) {
            assert tempUnsentMessages.isEmpty();
            for (int i = unsentMessagesBitmap.nextSetBit(0); i >= 0; i = unsentMessagesBitmap.nextSetBit(i + 1)) {
                tempUnsentMessages.add(workingSendList.get(i));
            }
            workingSendList.clear();
            moveAll(tempUnsentMessages, workingSendList);
        }
    }

    private <T> void moveAll(List<T> source, List<T> target) {
        int len = source.size();
        for (int i = 0; i < len; ++i) {
            target.add(source.get(i));
        }
        source.clear();
    }
}