/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.ignite.internal.network.scalecube;

import static io.scalecube.cluster.membership.MembershipEvent.createAdded;
import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;

import io.scalecube.cluster.ClusterConfig;
import io.scalecube.cluster.ClusterImpl;
import io.scalecube.cluster.ClusterMessageHandler;
import io.scalecube.cluster.Member;
import io.scalecube.cluster.membership.MembershipEvent;
import io.scalecube.cluster.metadata.MetadataCodec;
import io.scalecube.net.Address;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.network.AbstractClusterService;
import org.apache.ignite.internal.network.ClusterNodeImpl;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.DefaultMessagingService;
import org.apache.ignite.internal.network.NettyBootstrapFactory;
import org.apache.ignite.internal.network.NetworkMessagesFactory;
import org.apache.ignite.internal.network.NodeFinder;
import org.apache.ignite.internal.network.NodeFinderFactory;
import org.apache.ignite.internal.network.configuration.ClusterMembershipView;
import org.apache.ignite.internal.network.configuration.NetworkConfiguration;
import org.apache.ignite.internal.network.configuration.NetworkView;
import org.apache.ignite.internal.network.configuration.ScaleCubeView;
import org.apache.ignite.internal.network.netty.ConnectionManager;
import org.apache.ignite.internal.network.recovery.StaleIds;
import org.apache.ignite.internal.network.serialization.ClassDescriptorFactory;
import org.apache.ignite.internal.network.serialization.ClassDescriptorRegistry;
import org.apache.ignite.internal.network.serialization.MessageSerializationRegistry;
import org.apache.ignite.internal.network.serialization.SerializationService;
import org.apache.ignite.internal.network.serialization.UserObjectSerializationContext;
import org.apache.ignite.internal.network.serialization.marshal.DefaultUserObjectMarshaller;
import org.apache.ignite.internal.worker.CriticalWorkerRegistry;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.NodeMetadata;
import org.apache.ignite.network.TopologyEventHandler;

/**
 * Cluster service factory that uses ScaleCube for messaging and topology services.
 */
public class ScaleCubeClusterServiceFactory {
    /** Logger. */
    private static final IgniteLogger LOG = Loggers.forClass(ScaleCubeClusterServiceFactory.class);

    /** Metadata codec. */
    private static final MetadataCodec METADATA_CODEC = MetadataCodec.INSTANCE;

    /**
     * Creates a new {@link ClusterService} using the provided context. The created network will not be in the "started" state.
     *
     * @param consistentId Consistent ID (aka name) of the local node associated with the service to create.
     * @param networkConfiguration  Network configuration.
     * @param nettyBootstrapFactory Bootstrap factory.
     * @param serializationRegistry Registry used for serialization.
     * @param staleIds Used to update/detect whether a node has left the physical topology.
     * @param criticalWorkerRegistry Used to register critical threads managed by the new service and its components.
     * @param failureProcessor Failure processor that is used to handle critical errors.
     * @return New cluster service.
     */
    public ClusterService createClusterService(
            String consistentId,
            NetworkConfiguration networkConfiguration,
            NettyBootstrapFactory nettyBootstrapFactory,
            MessageSerializationRegistry serializationRegistry,
            StaleIds staleIds,
            CriticalWorkerRegistry criticalWorkerRegistry,
            FailureProcessor failureProcessor
    ) {
        var topologyService = new ScaleCubeTopologyService();

        // Adding this handler as the first handler to make sure that StaleIds is at least up-to-date as any
        // other component that watches topology events.
        topologyService.addEventHandler(new TopologyEventHandler() {
            @Override
            public void onDisappeared(ClusterNode member) {
                staleIds.markAsStale(member.id());
            }
        });

        var messageFactory = new NetworkMessagesFactory();

        UserObjectSerializationContext userObjectSerialization = createUserObjectSerializationContext();

        var messagingService = new DefaultMessagingService(
                consistentId,
                messageFactory,
                topologyService,
                staleIds,
                userObjectSerialization.descriptorRegistry(),
                userObjectSerialization.marshaller(),
                criticalWorkerRegistry
        );

        return new AbstractClusterService(consistentId, topologyService, messagingService, serializationRegistry) {

            private volatile ClusterImpl cluster;

            private volatile ConnectionManager connectionMgr;

            private volatile CompletableFuture<Void> shutdownFuture;

            @Override
            public CompletableFuture<Void> startAsync() {
                var serializationService = new SerializationService(serializationRegistry, userObjectSerialization);

                UUID launchId = UUID.randomUUID();

                NetworkView configView = networkConfiguration.value();

                ConnectionManager connectionMgr = new ConnectionManager(
                        configView,
                        serializationService,
                        consistentId,
                        nettyBootstrapFactory,
                        staleIds,
                        failureProcessor
                );
                this.connectionMgr = connectionMgr;

                connectionMgr.start();
                messagingService.start();

                Address scalecubeLocalAddress = prepareAddress(connectionMgr.localAddress());

                topologyService.addEventHandler(new TopologyEventHandler() {
                    @Override
                    public void onDisappeared(ClusterNode member) {
                        connectionMgr.handleNodeLeft(member.id());
                    }
                });

                var transport = new ScaleCubeDirectMarshallerTransport(
                        scalecubeLocalAddress,
                        messagingService,
                        topologyService,
                        messageFactory
                );

                ClusterConfig clusterConfig = clusterConfig(configView.membership());
                NodeFinder finder = NodeFinderFactory.createNodeFinder(configView.nodeFinder());
                ClusterImpl cluster = new ClusterImpl(clusterConfig)
                        .handler(cl -> new ClusterMessageHandler() {
                            @Override
                            public void onMembershipEvent(MembershipEvent event) {
                                topologyService.onMembershipEvent(event);
                            }
                        })
                        .config(opts -> opts
                                .memberId(launchId.toString())
                                .memberAlias(consistentId)
                                .metadataCodec(METADATA_CODEC)
                        )
                        .transport(opts -> opts.transportFactory(transportConfig -> transport))
                        .membership(opts -> opts.seedMembers(parseAddresses(finder.findNodes())));

                Member localMember = createLocalMember(scalecubeLocalAddress, launchId, clusterConfig);
                ClusterNode localNode = new ClusterNodeImpl(
                        localMember.id(),
                        consistentId,
                        new NetworkAddress(localMember.address().host(), localMember.address().port())
                );
                connectionMgr.setLocalNode(localNode);

                this.shutdownFuture = cluster.onShutdown().toFuture();

                // resolve cyclic dependencies
                topologyService.setCluster(cluster);
                messagingService.setConnectionManager(connectionMgr);

                cluster.startAwait();

                assert cluster.member().equals(localMember) : "Expected local member from cluster " + cluster.member()
                        + " to be equal to the precomputed one " + localMember;

                // emit an artificial event as if the local member has joined the topology (ScaleCube doesn't do that)
                var localMembershipEvent = createAdded(cluster.member(), null, System.currentTimeMillis());
                topologyService.onMembershipEvent(localMembershipEvent);

                this.cluster = cluster;

                return nullCompletedFuture();
            }

            @Override
            public CompletableFuture<Void> stopAsync() {
                ConnectionManager localConnectionMgr = connectionMgr;

                if (localConnectionMgr != null) {
                    localConnectionMgr.initiateStopping();
                }

                // Local member will be null, if cluster has not been started.
                if (cluster != null && cluster.member() != null) {
                    cluster.shutdown();

                    try {
                        shutdownFuture.get(10, TimeUnit.SECONDS);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();

                        throw new IgniteInternalException("Interrupted while waiting for the ClusterService to stop", e);
                    } catch (ExecutionException e) {
                        throw new IgniteInternalException("Unable to stop the ClusterService", e.getCause());
                    } catch (TimeoutException e) {
                        // Failed to leave gracefully.
                        LOG.warn("Failed to wait for ScaleCube cluster shutdown [reason={}]", e, e.getMessage());
                    }

                }

                if (localConnectionMgr != null) {
                    localConnectionMgr.stop();
                }

                // Messaging service checks connection manager's status before sending a message, so connection manager should be
                // stopped before messaging service
                messagingService.stop();

                return nullCompletedFuture();
            }

            @Override
            public void beforeNodeStop() {
                this.stopAsync().join();
            }

            @Override
            public boolean isStopped() {
                return shutdownFuture.isDone();
            }

            @Override
            public void updateMetadata(NodeMetadata metadata) {
                cluster.updateMetadata(metadata).subscribe();
                topologyService.updateLocalMetadata(metadata);
            }

        };
    }

    /**
     * Convert {@link InetSocketAddress} to {@link Address}.
     *
     * @param addr Address.
     * @return ScaleCube address.
     */
    private static Address prepareAddress(InetSocketAddress addr) {
        InetAddress address = addr.getAddress();

        String host = address.isAnyLocalAddress() ? Address.getLocalIpAddress().getHostAddress() : address.getHostAddress();

        return Address.create(host, addr.getPort());
    }

    // This is copied from ClusterImpl#creeateLocalMember() and adjusted to always use launchId as member ID.
    private Member createLocalMember(Address address, UUID launchId, ClusterConfig config) {
        int port = Optional.ofNullable(config.externalPort()).orElse(address.port());

        // calculate local member cluster address
        Address memberAddress =
                Optional.ofNullable(config.externalHost())
                        .map(host -> Address.create(host, port))
                        .orElseGet(() -> Address.create(address.host(), port));

        return new Member(
                launchId.toString(),
                config.memberAlias(),
                memberAddress,
                config.membershipConfig().namespace());
    }

    /**
     * Returns ScaleCube's cluster configuration. Can be overridden in subclasses for finer control of the created {@link ClusterService}
     * instances.
     *
     * @param cfg Membership configuration.
     * @return Cluster configuration.
     */
    protected ClusterConfig clusterConfig(ClusterMembershipView cfg) {
        ScaleCubeView scaleCube = cfg.scaleCube();

        return ClusterConfig.defaultLocalConfig()
                .membership(opts ->
                        opts.syncInterval(cfg.membershipSyncInterval())
                                .suspicionMult(scaleCube.membershipSuspicionMultiplier())
                )
                .failureDetector(opts ->
                        opts.pingInterval(cfg.failurePingInterval())
                                .pingReqMembers(scaleCube.failurePingRequestMembers())
                )
                .gossip(opts ->
                        opts.gossipInterval(scaleCube.gossipInterval())
                                .gossipRepeatMult(scaleCube.gossipRepeatMult())
                )
                .metadataTimeout(scaleCube.metadataTimeout());
    }

    /**
     * Creates everything that is needed for the user object serialization.
     *
     * @return User object serialization context.
     */
    private UserObjectSerializationContext createUserObjectSerializationContext() {
        var userObjectDescriptorRegistry = new ClassDescriptorRegistry();
        var userObjectDescriptorFactory = new ClassDescriptorFactory(userObjectDescriptorRegistry);

        var userObjectMarshaller = new DefaultUserObjectMarshaller(userObjectDescriptorRegistry, userObjectDescriptorFactory);

        return new UserObjectSerializationContext(userObjectDescriptorRegistry, userObjectDescriptorFactory,
                userObjectMarshaller);
    }

    /**
     * Converts the given list of {@link NetworkAddress} into a list of ScaleCube's {@link Address}.
     *
     * @param addresses Network address.
     * @return List of ScaleCube's {@link Address}.
     */
    private static List<Address> parseAddresses(List<NetworkAddress> addresses) {
        return addresses.stream()
                .map(addr -> Address.create(addr.host(), addr.port()))
                .collect(Collectors.toList());
    }
}
