/*
 *  Licensed to the Apache Software Foundation (ASF) under one
 *  or more contributor license agreements.  See the NOTICE file
 *  distributed with this work for additional information
 *  regarding copyright ownership.  The ASF licenses this file
 *  to you under the Apache License, Version 2.0 (the
 *  "License"); you may not use this file except in compliance
 *  with the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing,
 *  software distributed under the License is distributed on an
 *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 *  KIND, either express or implied.  See the License for the
 *  specific language governing permissions and limitations
 *  under the License.
 *
 */
package org.apache.mina.transport.socket.apr;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;

import org.apache.mina.core.RuntimeIoException;
import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.file.FileRegion;
import org.apache.mina.core.polling.AbstractPollingIoProcessor;
import org.apache.mina.core.session.SessionState;
import org.apache.tomcat.jni.File;
import org.apache.tomcat.jni.Poll;
import org.apache.tomcat.jni.Pool;
import org.apache.tomcat.jni.Socket;
import org.apache.tomcat.jni.Status;

/**
 * The class in charge of processing socket level IO events for the
 * {@link AprSocketConnector}
 *
 * @author <a href="http://mina.apache.org">Apache MINA Project</a>
 */
public final class AprIoProcessor extends AbstractPollingIoProcessor<AprSession> {
    private static final int POLLSET_SIZE = 1024;

    private final Map<Long, AprSession> allSessions = new HashMap<Long, AprSession>(POLLSET_SIZE);

    private final Object wakeupLock = new Object();

    private final long wakeupSocket;

    private volatile boolean toBeWakenUp;

    private final long pool;

    private final long bufferPool; // memory pool

    private final long pollset; // socket poller

    private final long[] polledSockets = new long[POLLSET_SIZE << 1];

    private final Queue<AprSession> polledSessions = new ConcurrentLinkedQueue<AprSession>();

    /**
     * Create a new instance of {@link AprIoProcessor} with a given Exector for
     * handling I/Os events.
     *
     * @param executor
     *            the {@link Executor} for handling I/O events
     */
    public AprIoProcessor(Executor executor) {
        super(executor);

        // initialize a memory pool for APR functions
        pool = Pool.create(AprLibrary.getInstance().getRootPool());
        bufferPool = Pool.create(AprLibrary.getInstance().getRootPool());

        try {
            wakeupSocket = Socket.create(Socket.APR_INET, Socket.SOCK_DGRAM, Socket.APR_PROTO_UDP, pool);
        } catch (RuntimeException e) {
            throw e;
        } catch (Error e) {
            throw e;
        } catch (Exception e) {
            throw new RuntimeIoException("Failed to create a wakeup socket.", e);
        }

        boolean success = false;
        long newPollset;
        try {
            newPollset = Poll.create(POLLSET_SIZE, pool, Poll.APR_POLLSET_THREADSAFE, Long.MAX_VALUE);

            if (newPollset == 0) {
                newPollset = Poll.create(62, pool, Poll.APR_POLLSET_THREADSAFE, Long.MAX_VALUE);
            }

            pollset = newPollset;
            if (pollset < 0) {
                if (Status.APR_STATUS_IS_ENOTIMPL(-(int) pollset)) {
                    throw new RuntimeIoException("Thread-safe pollset is not supported in this platform.");
                }
            }
            success = true;
        } catch (RuntimeException e) {
            throw e;
        } catch (Error e) {
            throw e;
        } catch (Exception e) {
            throw new RuntimeIoException("Failed to create a pollset.", e);
        } finally {
            if (!success) {
                dispose();
            }
        }
    }

    /**
     * {@inheritDoc}
     */
    @Override
    protected void doDispose() {
        Poll.destroy(pollset);
        Socket.close(wakeupSocket);
        Pool.destroy(bufferPool);
        Pool.destroy(pool);
    }

    /**
     * {@inheritDoc}
     */
    @Override
    protected int select() throws Exception {
        return select(Integer.MAX_VALUE);
    }

    /**
     * {@inheritDoc}
     */
    @Override
    protected int select(long timeout) throws Exception {
        int rv = Poll.poll(pollset, 1000 * timeout, polledSockets, false);
        if (rv <= 0) {
            if (rv != -120001) {
                throwException(rv);
            }

            rv = Poll.maintain(pollset, polledSockets, true);
            if (rv > 0) {
                for (int i = 0; i < rv; i++) {
                    long socket = polledSockets[i];
                    AprSession session = allSessions.get(socket);
                    if (session == null) {
                        continue;
                    }

                    int flag = (session.isInterestedInRead() ? Poll.APR_POLLIN : 0)
                            | (session.isInterestedInWrite() ? Poll.APR_POLLOUT : 0);

                    Poll.add(pollset, socket, flag);
                }
            } else if (rv < 0) {
                throwException(rv);
            }

            return 0;
        } else {
            rv <<= 1;
            if (!polledSessions.isEmpty()) {
                polledSessions.clear();
            }
            for (int i = 0; i < rv; i++) {
                long flag = polledSockets[i];
                long socket = polledSockets[++i];
                if (socket == wakeupSocket) {
                    synchronized (wakeupLock) {
                        Poll.remove(pollset, wakeupSocket);
                        toBeWakenUp = false;
                        wakeupCalled.set(true);
                    }
                    continue;
                }
                AprSession session = allSessions.get(socket);
                if (session == null) {
                    continue;
                }

                session.setReadable((flag & Poll.APR_POLLIN) != 0);
                session.setWritable((flag & Poll.APR_POLLOUT) != 0);

                polledSessions.add(session);
            }

            return polledSessions.size();
        }
    }

    /**
     * {@inheritDoc}
     */
    @Override
    protected boolean isSelectorEmpty() {
        return allSessions.isEmpty();
    }

    /**
     * {@inheritDoc}
     */
    @Override
    protected void wakeup() {
        if (toBeWakenUp) {
            return;
        }

        // Add a dummy socket to the pollset.
        synchronized (wakeupLock) {
            toBeWakenUp = true;
            Poll.add(pollset, wakeupSocket, Poll.APR_POLLOUT);
        }
    }

    /**
     * {@inheritDoc}
     */
    @Override
    protected Iterator<AprSession> allSessions() {
        return allSessions.values().iterator();
    }
    
    /**
     * {@inheritDoc}
     */
    @Override
    protected int allSessionsCount() {
    	return allSessions.size();
    }

    /**
     * {@inheritDoc}
     */
    @Override
    protected Iterator<AprSession> selectedSessions() {
        return polledSessions.iterator();
    }

    @Override
    protected void init(AprSession session) throws Exception {
        long s = session.getDescriptor();
        Socket.optSet(s, Socket.APR_SO_NONBLOCK, 1);
        Socket.timeoutSet(s, 0);

        int rv = Poll.add(pollset, s, Poll.APR_POLLIN);
        if (rv != Status.APR_SUCCESS) {
            throwException(rv);
        }

        session.setInterestedInRead(true);
        allSessions.put(s, session);
    }

    /**
     * {@inheritDoc}
     */
    @Override
    protected void destroy(AprSession session) throws Exception {
        if (allSessions.remove(session.getDescriptor()) == null) {
            // Already destroyed.
            return;
        }

        int ret = Poll.remove(pollset, session.getDescriptor());
        try {
            if (ret != Status.APR_SUCCESS) {
                throwException(ret);
            }
        } finally {
            ret = Socket.close(session.getDescriptor());

            // destroying the session because it won't be reused
            // after this point
            Socket.destroy(session.getDescriptor());
            session.setDescriptor(0);

            if (ret != Status.APR_SUCCESS) {
                throwException(ret);
            }
        }
    }

    /**
     * {@inheritDoc}
     */
    @Override
    protected SessionState getState(AprSession session) {
        long socket = session.getDescriptor();

        if (socket != 0) {
            return SessionState.OPENED;
        } else if (allSessions.get(socket) != null) {
            return SessionState.OPENING; // will occur ?
        } else {
            return SessionState.CLOSING;
        }
    }

    /**
     * {@inheritDoc}
     */
    @Override
    protected boolean isReadable(AprSession session) {
        return session.isReadable();
    }

    /**
     * {@inheritDoc}
     */
    @Override
    protected boolean isWritable(AprSession session) {
        return session.isWritable();
    }

    /**
     * {@inheritDoc}
     */
    @Override
    protected boolean isInterestedInRead(AprSession session) {
        return session.isInterestedInRead();
    }

    /**
     * {@inheritDoc}
     */
    @Override
    protected boolean isInterestedInWrite(AprSession session) {
        return session.isInterestedInWrite();
    }

    /**
     * {@inheritDoc}
     */
    @Override
    protected void setInterestedInRead(AprSession session, boolean isInterested) throws Exception {
        if (session.isInterestedInRead() == isInterested) {
            return;
        }

        int rv = Poll.remove(pollset, session.getDescriptor());

        if (rv != Status.APR_SUCCESS) {
            throwException(rv);
        }

        int flags = (isInterested ? Poll.APR_POLLIN : 0) | (session.isInterestedInWrite() ? Poll.APR_POLLOUT : 0);

        rv = Poll.add(pollset, session.getDescriptor(), flags);

        if (rv == Status.APR_SUCCESS) {
            session.setInterestedInRead(isInterested);
        } else {
            throwException(rv);
        }
    }

    /**
     * {@inheritDoc}
     */
    @Override
    protected void setInterestedInWrite(AprSession session, boolean isInterested) throws Exception {
        if (session.isInterestedInWrite() == isInterested) {
            return;
        }

        int rv = Poll.remove(pollset, session.getDescriptor());

        if (rv != Status.APR_SUCCESS) {
            throwException(rv);
        }

        int flags = (session.isInterestedInRead() ? Poll.APR_POLLIN : 0) | (isInterested ? Poll.APR_POLLOUT : 0);

        rv = Poll.add(pollset, session.getDescriptor(), flags);

        if (rv == Status.APR_SUCCESS) {
            session.setInterestedInWrite(isInterested);
        } else {
            throwException(rv);
        }
    }

    /**
     * {@inheritDoc}
     */
    @Override
    protected int read(AprSession session, IoBuffer buffer) throws Exception {
        int bytes;
        int capacity = buffer.remaining();
        // Using Socket.recv() directly causes memory leak. :-(
        ByteBuffer b = Pool.alloc(bufferPool, capacity);

        try {
            bytes = Socket.recvb(session.getDescriptor(), b, 0, capacity);

            if (bytes > 0) {
                b.position(0);
                b.limit(bytes);
                buffer.put(b);
            } else if (bytes < 0) {
                if (Status.APR_STATUS_IS_EOF(-bytes)) {
                    bytes = -1;
                } else if (Status.APR_STATUS_IS_EAGAIN(-bytes)) {
                    bytes = 0;
                } else {
                    throwException(bytes);
                }
            }
        } finally {
            Pool.clear(bufferPool);
        }

        return bytes;
    }

    /**
     * {@inheritDoc}
     */
    @Override
    protected int write(AprSession session, IoBuffer buf, int length) throws IOException {
        int writtenBytes;
        if (buf.isDirect()) {
            writtenBytes = Socket.sendb(session.getDescriptor(), buf.buf(), buf.position(), length);
        } else {
            writtenBytes = Socket.send(session.getDescriptor(), buf.array(), buf.position(), length);
            if (writtenBytes > 0) {
                buf.skip(writtenBytes);
            }
        }

        if (writtenBytes < 0) {
            if (Status.APR_STATUS_IS_EAGAIN(-writtenBytes)) {
                writtenBytes = 0;
            } else if (Status.APR_STATUS_IS_EOF(-writtenBytes)) {
                writtenBytes = 0;
            } else {
                throwException(writtenBytes);
            }
        }
        return writtenBytes;
    }

    /**
     * {@inheritDoc}
     */
    @Override
    protected int transferFile(AprSession session, FileRegion region, int length) throws Exception {
        if (region.getFilename() == null) {
            throw new UnsupportedOperationException();
        }

        long fd = File.open(region.getFilename(), File.APR_FOPEN_READ | File.APR_FOPEN_SENDFILE_ENABLED
                | File.APR_FOPEN_BINARY, 0, Socket.pool(session.getDescriptor()));
        long numWritten = Socket.sendfilen(session.getDescriptor(), fd, region.getPosition(), length, 0);
        File.close(fd);

        if (numWritten < 0) {
            if (numWritten == -Status.EAGAIN) {
                return 0;
            }
            throw new IOException(org.apache.tomcat.jni.Error.strerror((int) -numWritten) + " (code: " + numWritten
                    + ")");
        }
        return (int) numWritten;
    }

    private void throwException(int code) throws IOException {
        throw new IOException(org.apache.tomcat.jni.Error.strerror(-code) + " (code: " + code + ")");
    }

    /**
     * {@inheritDoc}
     */
    @Override
    protected void registerNewSelector() {
        // Do nothing
    }

    /**
     * {@inheritDoc}
     */
    @Override
    protected boolean isBrokenConnection() throws IOException {
        // Here, we assume that this is the case.
        return true;
    }
}
