/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.ignite.network.internal.netty;

import java.net.SocketAddress;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import java.util.stream.Stream;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.internal.handshake.HandshakeManager;
import org.apache.ignite.network.serialization.MessageSerializationRegistry;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;

/**
 * Class that manages connections both incoming and outgoing.
 */
public class ConnectionManager {
    /** Logger. */
    private static final IgniteLogger LOG = IgniteLogger.forClass(ConnectionManager.class);

    /** Latest version of the direct marshalling protocol. */
    public static final byte DIRECT_PROTOCOL_VERSION = 1;

    /** Client bootstrap. */
    private final Bootstrap clientBootstrap;

    /** Client socket channel handler event loop group. */
    private final EventLoopGroup clientWorkerGroup = new NioEventLoopGroup();

    /** Server. */
    private final NettyServer server;

    /** Channels map from consistentId to {@link NettySender}. */
    private final Map<String, NettySender> channels = new ConcurrentHashMap<>();

    /** Clients. */
    private final Map<SocketAddress, NettyClient> clients = new ConcurrentHashMap<>();

    /** Serialization registry. */
    private final MessageSerializationRegistry serializationRegistry;

    /** Message listeners. */
    private final List<BiConsumer<SocketAddress, NetworkMessage>> listeners = new CopyOnWriteArrayList<>();

    /** Node consistent id. */
    private final String consistentId;

    /** Client handshake manager factory. */
    private final Supplier<HandshakeManager> clientHandshakeManagerFactory;

    /**
     * Constructor.
     *
     * @param port Server port.
     * @param registry Serialization registry.
     * @param consistentId Consistent id of this node.
     * @param serverHandshakeManagerFactory Server handshake manager factory.
     * @param clientHandshakeManagerFactory Client handshake manager factory.
     */
    public ConnectionManager(
        int port,
        MessageSerializationRegistry registry,
        String consistentId,
        Supplier<HandshakeManager> serverHandshakeManagerFactory,
        Supplier<HandshakeManager> clientHandshakeManagerFactory
    ) {
        this.serializationRegistry = registry;
        this.consistentId = consistentId;
        this.clientHandshakeManagerFactory = clientHandshakeManagerFactory;
        this.server = new NettyServer(
            port,
            serverHandshakeManagerFactory.get(),
            this::onNewIncomingChannel,
            this::onMessage,
            serializationRegistry
        );
        this.clientBootstrap = createClientBootstrap(clientWorkerGroup, serializationRegistry);
    }

    /**
     * Starts the server.
     *
     * @throws IgniteInternalException If failed to start.
     */
    public void start() throws IgniteInternalException {
        try {
            server.start().join();
        }
        catch (CompletionException e) {
            Throwable cause = e.getCause();
            throw new IgniteInternalException("Failed to start server: " + cause.getMessage(), cause);
        }
    }

    /**
     * @return Server local address.
     */
    public SocketAddress getLocalAddress() {
        return server.address();
    }

    /**
     * Gets a {@link NettySender}, that sends data from this node to another node with the specified address.
     * @param consistentId Another node's consistent id.
     * @param address Another node's address.
     * @return Sender.
     */
    public CompletableFuture<NettySender> channel(@Nullable String consistentId, SocketAddress address) {
        if (consistentId != null) {
            // If consistent id is known, try looking up a channel by consistent id. There can be an outbound connection
            // or an inbound connection associated with that consistent id.
            NettySender channel = channels.compute(
                consistentId,
                (addr, sender) -> (sender == null || !sender.isOpen()) ? null : sender
            );

            if (channel != null)
                return CompletableFuture.completedFuture(channel);
        }

        // Get an existing client or create a new one. NettyClient provides a CompletableFuture that resolves
        // when the client is ready for write operations, so previously started client, that didn't establish connection
        // or didn't perform the handhsake operaton, can be reused.
        NettyClient client = clients.compute(address, (addr, existingClient) ->
            existingClient != null && !existingClient.failedToConnect() && !existingClient.isDisconnected() ?
                existingClient : connect(addr)
        );

        CompletableFuture<NettySender> sender = client.sender();

        assert sender != null;

        return sender;
    }

    /**
     * Callback that is called upon receiving a new message.
     *
     * @param from Source of the message.
     * @param message New message.
     */
    private void onMessage(SocketAddress from, NetworkMessage message) {
        listeners.forEach(consumer -> consumer.accept(from, message));
    }

    /**
     * Callback that is called upon new client connected to the server.
     *
     * @param channel Channel from client to this {@link #server}.
     */
    private void onNewIncomingChannel(NettySender channel) {
        channels.put(channel.consistentId(), channel);
    }

    /**
     * Create new client from this node to specified address.
     *
     * @param address Target address.
     * @return New netty client.
     */
    private NettyClient connect(
        SocketAddress address
    ) {
        var client = new NettyClient(
            address,
            serializationRegistry,
            clientHandshakeManagerFactory.get(),
            this::onMessage
        );

        client.start(clientBootstrap).whenComplete((sender, throwable) -> {
            if (throwable == null)
                channels.put(sender.consistentId(), sender);
            else
                clients.remove(address);
        });

        return client;
    }

    /**
     * Add incoming message listener.
     *
     * @param listener Message listener.
     */
    public void addListener(BiConsumer<SocketAddress, NetworkMessage> listener) {
        listeners.add(listener);
    }

    /**
     * Stops the server and all clients.
     */
    public void stop() {
         Stream<CompletableFuture<Void>> stream = Stream.concat(
            clients.values().stream().map(NettyClient::stop),
            Stream.of(server.stop())
        );

         CompletableFuture<Void> stopFut = CompletableFuture.allOf(stream.toArray(CompletableFuture<?>[]::new));

         try {
             stopFut.join();
             // TODO: IGNITE-14538 quietPeriod and timeout should be configurable.
             clientWorkerGroup.shutdownGracefully(0L, 15, TimeUnit.SECONDS).sync();
         }
         catch (Exception e) {
             LOG.warn("Failed to stop the ConnectionManager: " + e.getMessage());
         }
    }

    /**
     * @return Connection manager's {@link #server}.
     */
    @TestOnly
    public NettyServer server() {
        return server;
    }

    /**
     * @return This node's consistent id.
     */
    @TestOnly
    public String consistentId() {
        return consistentId;
    }

    /**
     * @return Collection of all the clients started by this connection manager.
     */
    @TestOnly
    public Collection<NettyClient> clients() {
        return Collections.unmodifiableCollection(clients.values());
    }


    /**
     * @return Map of the channels.
     */
    @TestOnly
    public Map<String, NettySender> channels() {
        return Collections.unmodifiableMap(channels);
    }

    /**
     * Creates a {@link Bootstrap} for clients, providing channel handlers and options.
     *
     * @param eventLoopGroup Event loop group for channel handling.
     * @param serializationRegistry Serialization registry.
     * @return Bootstrap for clients.
     */
    public static Bootstrap createClientBootstrap(
        EventLoopGroup eventLoopGroup,
        MessageSerializationRegistry serializationRegistry
    ) {
        Bootstrap clientBootstrap = new Bootstrap();

        clientBootstrap.group(eventLoopGroup)
            .channel(NioSocketChannel.class)
            // See NettyServer#start for netty configuration details.
            .option(ChannelOption.SO_KEEPALIVE, true)
            .option(ChannelOption.SO_LINGER, 0)
            .option(ChannelOption.TCP_NODELAY, true);

        return clientBootstrap;
    }
}
