/*
 * ====================================================================
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 * ====================================================================
 *
 * This software consists of voluntary contributions made by many
 * individuals on behalf of the Apache Software Foundation.  For more
 * information on the Apache Software Foundation, please see
 * <http://www.apache.org/>.
 *
 */

package org.apache.hc.core5.reactor;

import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ByteChannel;
import java.nio.channels.SelectionKey;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;

import javax.net.ssl.SSLContext;

import org.apache.hc.core5.function.Callback;
import org.apache.hc.core5.function.Decorator;
import org.apache.hc.core5.io.CloseMode;
import org.apache.hc.core5.net.NamedEndpoint;
import org.apache.hc.core5.reactor.ssl.SSLBufferMode;
import org.apache.hc.core5.reactor.ssl.SSLIOSession;
import org.apache.hc.core5.reactor.ssl.SSLMode;
import org.apache.hc.core5.reactor.ssl.SSLSessionInitializer;
import org.apache.hc.core5.reactor.ssl.SSLSessionVerifier;
import org.apache.hc.core5.reactor.ssl.TlsDetails;
import org.apache.hc.core5.util.Asserts;
import org.apache.hc.core5.util.Timeout;

final class InternalDataChannel extends InternalChannel implements ProtocolIOSession {

    private final IOSession ioSession;
    private final NamedEndpoint initialEndpoint;
    private final Decorator<IOSession> ioSessionDecorator;
    private final IOSessionListener sessionListener;
    private final Queue<InternalDataChannel> closedSessions;
    private final AtomicReference<SSLIOSession> tlsSessionRef;
    private final AtomicReference<IOSession> currentSessionRef;
    private final AtomicBoolean closed;

    InternalDataChannel(
            final IOSession ioSession,
            final NamedEndpoint initialEndpoint,
            final Decorator<IOSession> ioSessionDecorator,
            final IOSessionListener sessionListener,
            final Queue<InternalDataChannel> closedSessions) {
        this.ioSession = ioSession;
        this.initialEndpoint = initialEndpoint;
        this.closedSessions = closedSessions;
        this.ioSessionDecorator = ioSessionDecorator;
        this.sessionListener = sessionListener;
        this.tlsSessionRef = new AtomicReference<>(null);
        this.currentSessionRef = new AtomicReference<>(
                ioSessionDecorator != null ? ioSessionDecorator.decorate(ioSession) : ioSession);
        this.closed = new AtomicBoolean(false);
    }

    @Override
    public String getId() {
        return ioSession.getId();
    }

    @Override
    public NamedEndpoint getInitialEndpoint() {
        return initialEndpoint;
    }

    @Override
    public IOEventHandler getHandler() {
        final IOSession currentSession = currentSessionRef.get();
        return currentSession.getHandler();
    }

    @Override
    public void upgrade(final IOEventHandler handler) {
        final IOSession currentSession = currentSessionRef.get();
        currentSession.upgrade(handler);
    }

    private IOEventHandler ensureHandler(final IOSession session) {
        final IOEventHandler handler = session.getHandler();
        Asserts.notNull(handler, "IO event handler");
        return handler;
    }

    @Override
    void onIOEvent(final int readyOps) throws IOException {
        if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
            final IOSession currentSession = currentSessionRef.get();
            currentSession.clearEvent(SelectionKey.OP_CONNECT);
            if (tlsSessionRef.get() == null) {
                if (sessionListener != null) {
                    sessionListener.connected(currentSession);
                }
                final IOEventHandler handler = ensureHandler(currentSession);
                handler.connected(currentSession);
            }
        }
        if ((readyOps & SelectionKey.OP_READ) != 0) {
            final IOSession currentSession = currentSessionRef.get();
            currentSession.updateReadTime();
            if (sessionListener != null) {
                sessionListener.inputReady(currentSession);
            }
            final IOEventHandler handler = ensureHandler(currentSession);
            handler.inputReady(currentSession, null);
        }
        if ((readyOps & SelectionKey.OP_WRITE) != 0
                || (ioSession.getEventMask() & SelectionKey.OP_WRITE) != 0) {
            final IOSession currentSession = currentSessionRef.get();
            currentSession.updateWriteTime();
            if (sessionListener != null) {
                sessionListener.outputReady(currentSession);
            }
            final IOEventHandler handler = ensureHandler(currentSession);
            handler.outputReady(currentSession);
        }
    }

    @Override
    Timeout getTimeout() {
        final IOSession currentSession = currentSessionRef.get();
        return currentSession.getSocketTimeout();
    }

    @Override
    void onTimeout(final Timeout timeout) throws IOException {
        final IOSession currentSession = currentSessionRef.get();
        if (sessionListener != null) {
            sessionListener.timeout(currentSession);
        }
        final IOEventHandler handler = ensureHandler(currentSession);
        handler.timeout(currentSession, timeout);
    }

    @Override
    void onException(final Exception cause) {
        final IOSession currentSession = currentSessionRef.get();
        if (sessionListener != null) {
            sessionListener.exception(currentSession, cause);
        }
        final IOEventHandler handler = currentSession.getHandler();
        if (handler != null) {
            handler.exception(currentSession, cause);
        }
    }

    void onTLSSessionStart(final SSLIOSession sslSession) {
        final IOSession currentSession = currentSessionRef.get();
        if (sessionListener != null) {
            sessionListener.connected(currentSession);
        }
    }

    void onTLSSessionEnd() {
        if (closed.compareAndSet(false, true)) {
            closedSessions.add(this);
        }
    }

    void disconnected() {
        final IOSession currentSession = currentSessionRef.get();
        if (sessionListener != null) {
            sessionListener.disconnected(currentSession);
        }
        final IOEventHandler handler = currentSession.getHandler();
        if (handler != null) {
            handler.disconnected(currentSession);
        }
    }

    @Override
    public void startTls(
            final SSLContext sslContext,
            final NamedEndpoint endpoint,
            final SSLBufferMode sslBufferMode,
            final SSLSessionInitializer initializer,
            final SSLSessionVerifier verifier,
            final Timeout handshakeTimeout) {
        final SSLIOSession sslioSession = new SSLIOSession(
                endpoint != null ? endpoint : initialEndpoint,
                ioSession,
                initialEndpoint != null ? SSLMode.CLIENT : SSLMode.SERVER,
                sslContext,
                sslBufferMode,
                initializer,
                verifier,
                new Callback<SSLIOSession>() {

                    @Override
                    public void execute(final SSLIOSession sslSession) {
                        onTLSSessionStart(sslSession);
                    }

                },
                new Callback<SSLIOSession>() {

                    @Override
                    public void execute(final SSLIOSession sslSession) {
                        onTLSSessionEnd();
                    }

                },
                handshakeTimeout);
        if (tlsSessionRef.compareAndSet(null, sslioSession)) {
            currentSessionRef.set(ioSessionDecorator != null ? ioSessionDecorator.decorate(sslioSession) : sslioSession);
            if (sessionListener != null) {
                sessionListener.startTls(sslioSession);
            }
        } else {
            throw new IllegalStateException("TLS already activated");
        }
    }

    @SuppressWarnings("resource")
    @Override
    public TlsDetails getTlsDetails() {
        final SSLIOSession sslIoSession = tlsSessionRef.get();
        return sslIoSession != null ? sslIoSession.getTlsDetails() : null;
    }

    @Override
    public Lock getLock() {
        return ioSession.getLock();
    }

    @Override
    public void close() {
        close(CloseMode.GRACEFUL);
    }

    @Override
    public void close(final CloseMode closeMode) {
        final IOSession currentSession = currentSessionRef.get();
        if (closeMode == CloseMode.IMMEDIATE) {
            closed.set(true);
            currentSession.close(closeMode);
        } else {
            if (closed.compareAndSet(false, true)) {
                try {
                    currentSession.close(closeMode);
                } finally {
                    closedSessions.add(this);
                }
            }
        }
    }

    @Override
    public IOSession.Status getStatus() {
        final IOSession currentSession = currentSessionRef.get();
        return currentSession.getStatus();
    }

    @Override
    public boolean isOpen() {
        final IOSession currentSession = currentSessionRef.get();
        return currentSession.isOpen();
    }

    @Override
    public void enqueue(final Command command, final Command.Priority priority) {
        final IOSession currentSession = currentSessionRef.get();
        currentSession.enqueue(command, priority);
    }

    @Override
    public boolean hasCommands() {
        final IOSession currentSession = currentSessionRef.get();
        return currentSession.hasCommands();
    }

    @Override
    public Command poll() {
        final IOSession currentSession = currentSessionRef.get();
        return currentSession.poll();
    }

    @Override
    public ByteChannel channel() {
        final IOSession currentSession = currentSessionRef.get();
        return currentSession.channel();
    }

    @Override
    public SocketAddress getRemoteAddress() {
        return ioSession.getRemoteAddress();
    }

    @Override
    public SocketAddress getLocalAddress() {
        return ioSession.getLocalAddress();
    }

    @Override
    public int getEventMask() {
        final IOSession currentSession = currentSessionRef.get();
        return currentSession.getEventMask();
    }

    @Override
    public void setEventMask(final int ops) {
        final IOSession currentSession = currentSessionRef.get();
        currentSession.setEventMask(ops);
    }

    @Override
    public void setEvent(final int op) {
        final IOSession currentSession = currentSessionRef.get();
        currentSession.setEvent(op);
    }

    @Override
    public void clearEvent(final int op) {
        final IOSession currentSession = currentSessionRef.get();
        currentSession.clearEvent(op);
    }

    @Override
    public Timeout getSocketTimeout() {
        return ioSession.getSocketTimeout();
    }

    @Override
    public void setSocketTimeout(final Timeout timeout) {
        ioSession.setSocketTimeout(timeout);
    }

    @Override
    public int read(final ByteBuffer dst) throws IOException {
        final IOSession currentSession = currentSessionRef.get();
        return currentSession.read(dst);
    }

    @Override
    public int write(final ByteBuffer src) throws IOException {
        final IOSession currentSession = currentSessionRef.get();
        return currentSession.write(src);
    }

    @Override
    public void updateReadTime() {
        ioSession.updateReadTime();
    }

    @Override
    public void updateWriteTime() {
        ioSession.updateWriteTime();
    }

    @Override
    public long getLastReadTime() {
        return ioSession.getLastReadTime();
    }

    @Override
    public long getLastWriteTime() {
        return ioSession.getLastWriteTime();
    }

    @Override
    public long getLastEventTime() {
        return ioSession.getLastEventTime();
    }

    @Override
    public String toString() {
        final IOSession currentSession = currentSessionRef.get();
        if (currentSession != null) {
            return currentSession.toString();
        } else {
            return ioSession.toString();
        }
    }

}
