/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.ignite.internal.processors.rest.protocols.tcp;

import java.io.IOException;
import java.net.InetAddress;
import java.nio.ByteOrder;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import javax.cache.configuration.Factory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLException;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.configuration.ConnectorConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteNodeAttributes;
import org.apache.ignite.internal.client.marshaller.GridClientMarshaller;
import org.apache.ignite.internal.client.marshaller.jdk.GridClientJdkMarshaller;
import org.apache.ignite.internal.client.marshaller.optimized.GridClientOptimizedMarshaller;
import org.apache.ignite.internal.client.marshaller.optimized.GridClientZipOptimizedMarshaller;
import org.apache.ignite.internal.client.ssl.GridSslContextFactory;
import org.apache.ignite.internal.processors.rest.GridRestProtocolHandler;
import org.apache.ignite.internal.processors.rest.client.message.GridClientMessage;
import org.apache.ignite.internal.processors.rest.protocols.GridRestProtocolAdapter;
import org.apache.ignite.internal.util.nio.GridNioCodecFilter;
import org.apache.ignite.internal.util.nio.GridNioFilter;
import org.apache.ignite.internal.util.nio.GridNioParser;
import org.apache.ignite.internal.util.nio.GridNioServer;
import org.apache.ignite.internal.util.nio.GridNioServerListener;
import org.apache.ignite.internal.util.nio.ssl.GridNioSslFilter;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.plugin.PluginProvider;
import org.apache.ignite.spi.IgnitePortProtocol;
import org.jetbrains.annotations.Nullable;

import static org.apache.ignite.internal.processors.metric.impl.MetricUtils.metricName;

/**
 * TCP binary protocol implementation.
 */
public class GridTcpRestProtocol extends GridRestProtocolAdapter {
    /** Server. */
    private GridNioServer<GridClientMessage> srv;

    /** NIO server listener. */
    private GridTcpRestNioListener lsnr;

    /** The name of the metric registry associated with the REST TCP connector. */
    public static final String REST_CONNECTOR_METRIC_REGISTRY_NAME = metricName("rest", "client");

    /** @param ctx Context. */
    public GridTcpRestProtocol(GridKernalContext ctx) {
        super(ctx);
    }

    /** {@inheritDoc} */
    @Override public String name() {
        return "TCP binary";
    }

    /** {@inheritDoc} */
    @Override public void start(final GridRestProtocolHandler hnd) throws IgniteCheckedException {
        assert hnd != null;

        ConnectorConfiguration cfg = ctx.config().getConnectorConfiguration();

        assert cfg != null;

        lsnr = new GridTcpRestNioListener(log, this, hnd, ctx);

        GridNioParser parser = new GridTcpRestParser(false, ctx.marshallerContext().jdkMarshaller());

        try {
            host = resolveRestTcpHost(ctx.config());

            SSLContext sslCtx = null;

            if (cfg.isSslEnabled()) {
                Factory<SSLContext> igniteFactory = ctx.config().getSslContextFactory();

                Factory<SSLContext> factory = cfg.getSslFactory();

                // This factory deprecated and will be removed.
                GridSslContextFactory depFactory = cfg.getSslContextFactory();

                if (factory == null && depFactory == null && igniteFactory == null)
                    // Thrown SSL exception instead of IgniteCheckedException for writing correct warning message into log.
                    throw new SSLException("SSL is enabled, but SSL context factory is not specified.");

                if (factory != null)
                    sslCtx = factory.create();
                else if (depFactory != null)
                    sslCtx = depFactory.createSslContext();
                else
                    sslCtx = igniteFactory.create();
            }
            int startPort = cfg.getPort();
            int portRange = cfg.getPortRange();
            int lastPort = portRange == 0 ? startPort : startPort + portRange - 1;

            for (int port0 = startPort; port0 <= lastPort; port0++) {
                if (startTcpServer(host, port0, lsnr, parser, sslCtx, cfg)) {
                    port = port0;

                    if (log.isInfoEnabled())
                        log.info(startInfo());

                    return;
                }
            }

            U.warn(log, "Failed to start TCP binary REST server (possibly all ports in range are in use) " +
                "[firstPort=" + cfg.getPort() + ", lastPort=" + lastPort + ", host=" + host + ']');
        }
        catch (SSLException e) {
            U.warn(log, "Failed to start " + name() + " protocol on port " + port + ". Check if SSL context factory " +
                "is properly configured: " + e.getMessage());
        }
        catch (IOException e) {
            U.warn(log, "Failed to start " + name() + " protocol on port " + port + ". " +
                "Check restTcpHost configuration property: " + e.getMessage());
        }
    }

    /** {@inheritDoc} */
    @Override public void onProcessorStart() {
        super.onProcessorStart();

        Map<Byte, GridClientMarshaller> marshMap = new HashMap<>();

        ArrayList<PluginProvider> providers = new ArrayList<>(ctx.plugins().allProviders());

        GridClientOptimizedMarshaller optMarsh = new GridClientOptimizedMarshaller(providers);

        marshMap.put(GridClientOptimizedMarshaller.ID, optMarsh);
        marshMap.put(GridClientZipOptimizedMarshaller.ID, new GridClientZipOptimizedMarshaller(optMarsh, providers));

        try {
            IgnitePredicate<String> clsFilter = MarshallerUtils.classNameFilter(getClass().getClassLoader());

            marshMap.put(GridClientJdkMarshaller.ID, new GridClientJdkMarshaller(clsFilter));
        }
        catch (IgniteCheckedException e) {
            throw new IgniteException(e);
        }

        lsnr.marshallers(marshMap);
    }

    /** {@inheritDoc} */
    @Override public void stop() {
        if (srv != null) {
            ctx.ports().deregisterPorts(getClass());

            srv.stop();
        }

        if (log.isInfoEnabled())
            log.info(stopInfo());
    }

    /**
     * Resolves host for REST TCP server using grid configuration.
     *
     * @param cfg Grid configuration.
     * @return REST host.
     * @throws IOException If failed to resolve REST host.
     */
    private InetAddress resolveRestTcpHost(IgniteConfiguration cfg) throws IOException {
        String host = cfg.getConnectorConfiguration().getHost();

        if (host == null)
            host = cfg.getLocalHost();

        return U.resolveLocalHost(host);
    }

    /**
     * Tries to start server with given parameters.
     *
     * @param hostAddr Host on which server should be bound.
     * @param port Port on which server should be bound.
     * @param lsnr Server message listener.
     * @param parser Server message parser.
     * @param sslCtx SSL context in case if SSL is enabled.
     * @param cfg Configuration for other parameters.
     * @return {@code True} if server successfully started, {@code false} if port is used and
     *      server was unable to start.
     */
    private boolean startTcpServer(InetAddress hostAddr, int port, GridNioServerListener<GridClientMessage> lsnr,
        GridNioParser parser, @Nullable SSLContext sslCtx, ConnectorConfiguration cfg) {
        try {
            GridNioFilter codec = new GridNioCodecFilter(parser, log, false);

            GridNioFilter[] filters;

            if (sslCtx != null) {
                GridNioSslFilter sslFilter = new GridNioSslFilter(
                    sslCtx,
                    cfg.isDirectBuffer(),
                    ByteOrder.nativeOrder(),
                    log,
                    ctx.metric().registry(REST_CONNECTOR_METRIC_REGISTRY_NAME));

                sslFilter.directMode(false);

                boolean auth = cfg.isSslClientAuth();

                sslFilter.wantClientAuth(auth);

                sslFilter.needClientAuth(auth);

                filters = new GridNioFilter[] {
                    codec,
                    sslFilter
                };
            }
            else
                filters = new GridNioFilter[] { codec };

            srv = GridNioServer.<GridClientMessage>builder()
                .address(hostAddr)
                .port(port)
                .listener(lsnr)
                .logger(log)
                .selectorCount(cfg.getSelectorCount())
                .igniteInstanceName(ctx.igniteInstanceName())
                .serverName("tcp-rest")
                .tcpNoDelay(cfg.isNoDelay())
                .directBuffer(cfg.isDirectBuffer())
                .byteOrder(ByteOrder.nativeOrder())
                .socketSendBufferSize(cfg.getSendBufferSize())
                .socketReceiveBufferSize(cfg.getReceiveBufferSize())
                .sendQueueLimit(cfg.getSendQueueLimit())
                .filters(filters)
                .directMode(false)
                .metricRegistry(ctx.metric().registry(REST_CONNECTOR_METRIC_REGISTRY_NAME))
                .build();

            srv.idleTimeout(cfg.getIdleTimeout());

            srv.start();

            ctx.ports().registerPort(port, IgnitePortProtocol.TCP, getClass());

            return true;
        }
        catch (IgniteCheckedException e) {
            if (log.isDebugEnabled())
                log.debug("Failed to start " + name() + " protocol on port " + port + ": " + e.getMessage());

            return false;
        }
    }

    /** {@inheritDoc} */
    @Override protected String getAddressPropertyName() {
        return IgniteNodeAttributes.ATTR_REST_TCP_ADDRS;
    }

    /** {@inheritDoc} */
    @Override protected String getHostNamePropertyName() {
        return IgniteNodeAttributes.ATTR_REST_TCP_HOST_NAMES;
    }

    /** {@inheritDoc} */
    @Override protected String getPortPropertyName() {
        return IgniteNodeAttributes.ATTR_REST_TCP_PORT;
    }
}
