/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.ignite.spi.discovery.tcp.ipfinder.zk;

import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;

import com.fasterxml.jackson.annotation.JsonRootName;
import com.google.common.collect.Sets;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.curator.x.discovery.UriSpec;
import org.apache.curator.x.discovery.details.JsonInstanceSerializer;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.SystemProperty;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinderAdapter;

/**
 * This TCP Discovery IP Finder uses Apache ZooKeeper (ZK) to locate peer nodes when bootstrapping in order to join
 * the cluster. It uses the Apache Curator library to interact with ZooKeeper in a simple manner. Specifically,
 * it uses the {@link ServiceDiscovery} recipe, which makes use of ephemeral nodes in ZK to register services.
 *
 * <p>
 * There are several ways to instantiate the TcpDiscoveryZookeeperIpFinder:
 * <li>
 *     <ul>By providing an instance of {@link CuratorFramework} directly, in which case no ZK Connection String
 *     is required.</ul>
 *     <ul>By providing a ZK Connection String through {@link #setZkConnectionString(String)}, and optionally
 *     a {@link RetryPolicy} through the setter. If the latter is not provided, a default
 *     {@link ExponentialBackoffRetry} policy is used, with a base sleep time of 1000ms and 10 retries.</ul>
 *     <ul>By providing a ZK Connection String through system property {@link #PROP_ZK_CONNECTION_STRING}. If this
 *     property is set, it overrides the ZK Connection String passed in as a property, but it does not override
 *     the {@link CuratorFramework} if provided.</ul>
 * </li>
 *
 * You may customise the base path for services, as well as the service name. By default {@link #BASE_PATH} and
 * {@link #SERVICE_NAME} are use respectively. You can also choose to enable or disable duplicate registrations. See
 * {@link #setAllowDuplicateRegistrations(boolean)} for more details.
 *
 * @see <a href="http://zookeeper.apache.org">Apache ZooKeeper</a>
 * @see <a href="http://curator.apache.org">Apache Curator</a>
 */
public class TcpDiscoveryZookeeperIpFinder extends TcpDiscoveryIpFinderAdapter {
    /** System property name to provide the ZK Connection String. */
    @SystemProperty(value = "Zookeeper connection string", type = String.class)
    public static final String PROP_ZK_CONNECTION_STRING = "IGNITE_ZK_CONNECTION_STRING";

    /** Default base path for service registrations. */
    private static final String BASE_PATH = "/services";

    /** Default service name for service registrations. */
    private static final String SERVICE_NAME = "ignite";

    /** Default URI Spec to use with the {@link ServiceDiscoveryBuilder}. */
    private static final UriSpec URI_SPEC = new UriSpec("{address}:{port}");

    /** Init guard. */
    @GridToStringExclude
    private final AtomicBoolean initGuard = new AtomicBoolean();

    /** Init guard. */
    @GridToStringExclude
    private final AtomicBoolean closeGuard = new AtomicBoolean();

    /** Logger. */
    @LoggerResource
    private IgniteLogger log;

    /** The Curator framework in use, either injected or constructed by this component. */
    private CuratorFramework curator;

    /** The ZK Connection String if provided by the user. */
    private String zkConnectionString;

    /** Retry policy to use. */
    private RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);

    /** Base path to use, by default {#link #BASE_PATH}. */
    private String basePath = BASE_PATH;

    /** Service name to use, by default {#link #SERVICE_NAME}. */
    private String serviceName = SERVICE_NAME;

    /** Whether to allow or not duplicate registrations. See setter doc. */
    private boolean allowDuplicateRegistrations;

    /** The Service Discovery recipe. */
    private ServiceDiscovery<IgniteInstanceDetails> discovery;

    /** Map of the {#link ServiceInstance}s we have registered. */
    private Map<InetSocketAddress, ServiceInstance<IgniteInstanceDetails>> ourInstances = new ConcurrentHashMap<>();

    /** Constructor. */
    public TcpDiscoveryZookeeperIpFinder() {
        setShared(true);
    }

    /** Initializes this IP Finder by creating the appropriate Curator objects. */
    private void init() {
        if (!initGuard.compareAndSet(false, true))
            return;

        String sysPropZkConnString = IgniteSystemProperties.getString(PROP_ZK_CONNECTION_STRING);

        if (sysPropZkConnString != null && !sysPropZkConnString.trim().isEmpty())
            zkConnectionString = sysPropZkConnString;

        if (log.isInfoEnabled())
            log.info("Initializing ZooKeeper IP Finder.");

        if (curator == null) {
            A.notNullOrEmpty(zkConnectionString, String.format("ZooKeeper URL (or system property %s) cannot be null " +
                "or empty if a CuratorFramework object is not provided explicitly", PROP_ZK_CONNECTION_STRING));
            curator = CuratorFrameworkFactory.newClient(zkConnectionString, retryPolicy);
        }

        if (curator.getState() == CuratorFrameworkState.LATENT)
            curator.start();

        A.ensure(curator.getState() == CuratorFrameworkState.STARTED, "CuratorFramework can't be started.");

        discovery = ServiceDiscoveryBuilder.builder(IgniteInstanceDetails.class)
            .client(curator)
            .basePath(basePath)
            .serializer(new JsonInstanceSerializer<>(IgniteInstanceDetails.class))
            .build();
    }

    /** {@inheritDoc} */
    @Override public void onSpiContextDestroyed() {
        if (!closeGuard.compareAndSet(false, true)) {
            U.warn(log, "ZooKeeper IP Finder can't be closed more than once.");

            return;
        }

        if (log.isInfoEnabled())
            log.info("Destroying ZooKeeper IP Finder.");

        super.onSpiContextDestroyed();

        if (curator != null)
            curator.close();

    }

    /** {@inheritDoc} */
    @Override public Collection<InetSocketAddress> getRegisteredAddresses() throws IgniteSpiException {
        init();

        if (log.isDebugEnabled())
            log.debug("Getting registered addresses from ZooKeeper IP Finder.");

        Collection<ServiceInstance<IgniteInstanceDetails>> serviceInstances;

        try {
            serviceInstances = discovery.queryForInstances(serviceName);
        }
        catch (Exception e) {
            log.warning("Error while getting registered addresses from ZooKeeper IP Finder.", e);
            return Collections.emptyList();
        }

        Set<InetSocketAddress> answer = new HashSet<>();

        for (ServiceInstance<IgniteInstanceDetails> si : serviceInstances)
            answer.add(new InetSocketAddress(si.getAddress(), si.getPort()));

        if (log.isInfoEnabled())
            log.info("ZooKeeper IP Finder resolved addresses: " + answer);

        return answer;
    }

    /** {@inheritDoc} */
    @Override public void registerAddresses(Collection<InetSocketAddress> addrs) throws IgniteSpiException {
        init();

        if (log.isInfoEnabled())
            log.info("Registering addresses with ZooKeeper IP Finder: " + addrs);

        Set<InetSocketAddress> registrationsToIgnore = Sets.newHashSet();
        if (!allowDuplicateRegistrations) {
            try {
                for (ServiceInstance<IgniteInstanceDetails> sd : discovery.queryForInstances(serviceName))
                    registrationsToIgnore.add(new InetSocketAddress(sd.getAddress(), sd.getPort()));
            }
            catch (Exception e) {
                log.warning("Error while finding currently registered services to avoid duplicate registrations", e);
                throw new IgniteSpiException(e);
            }
        }

        for (InetSocketAddress addr : addrs) {
            if (registrationsToIgnore.contains(addr))
                continue;

            try {
                ServiceInstance<IgniteInstanceDetails> si = ServiceInstance.<IgniteInstanceDetails>builder()
                    .name(serviceName)
                    .uriSpec(URI_SPEC)
                    .address(addr.getAddress().getHostAddress())
                    .port(addr.getPort())
                    .build();

                ourInstances.put(addr, si);

                discovery.registerService(si);

            }
            catch (Exception e) {
                log.warning(String.format("Error while registering an address from ZooKeeper IP Finder " +
                    "[message=%s,addresses=%s]", e.getMessage(), addr), e);
            }
        }
    }

    /** {@inheritDoc} */
    @Override public void unregisterAddresses(Collection<InetSocketAddress> addrs) throws IgniteSpiException {

        // if curator is not STARTED, we have nothing to unregister, because we are using ephemeral nodes,
        // which means that our addresses will only be registered in ZK as long as our connection is alive
        if (curator.getState() != CuratorFrameworkState.STARTED)
            return;

        if (log.isInfoEnabled())
            log.info("Unregistering addresses with ZooKeeper IP Finder: " + addrs);

        for (InetSocketAddress addr : addrs) {
            ServiceInstance<IgniteInstanceDetails> si = ourInstances.get(addr);
            if (si == null) {
                log.warning("Asked to unregister address from ZooKeeper IP Finder, but no match was found in local " +
                    "instance map for: " + addrs);
                continue;
            }

            try {
                discovery.unregisterService(si);
            }
            catch (Exception e) {
                log.warning("Error while unregistering an address from ZooKeeper IP Finder: " + addr, e);
            }
        }
    }

    /**
     * @param curator A {@link CuratorFramework} instance to use. It can already be in <tt>STARTED</tt> state.
     * @return {@code this} for chaining.
     */
    public TcpDiscoveryZookeeperIpFinder setCurator(CuratorFramework curator) {
        this.curator = curator;

        return this;
    }

    /**
     * @return The ZooKeeper connection string, only if set explicitly. Else, it returns null.
     */
    public String getZkConnectionString() {
        return zkConnectionString;
    }

    /**
     * @param zkConnectionString ZooKeeper connection string in case a {@link CuratorFramework} is not being set
     * explicitly.
     * @return {@code this} for chaining.
     */
    public TcpDiscoveryZookeeperIpFinder setZkConnectionString(String zkConnectionString) {
        this.zkConnectionString = zkConnectionString;

        return this;
    }

    /**
     * @return Retry policy in use if, and only if, it was set explicitly. Else, it returns null.
     */
    public RetryPolicy getRetryPolicy() {
        return retryPolicy;
    }

    /**
     * @param retryPolicy {@link RetryPolicy} to use in case a ZK Connection String is being injected, or if using a
     * system property.
     * @return {@code this} for chaining.
     */
    public TcpDiscoveryZookeeperIpFinder setRetryPolicy(RetryPolicy retryPolicy) {
        this.retryPolicy = retryPolicy;

        return this;
    }

    /**
     * @return Base path for service registration in ZK. Default value: {@link #BASE_PATH}.
     */
    public String getBasePath() {
        return basePath;
    }

    /**
     * @param basePath Base path for service registration in ZK. If not passed, {@link #BASE_PATH} will be used.
     * @return {@code this} for chaining.
     */
    public TcpDiscoveryZookeeperIpFinder setBasePath(String basePath) {
        this.basePath = basePath;

        return this;
    }

    /**
     * @return Service name being used, in Curator terms. See {@link #setServiceName(String)} for more information.
     */
    public String getServiceName() {
        return serviceName;
    }

    /**
     * @param serviceName Service name to use, as defined by Curator's {#link ServiceDiscovery} recipe. In physical ZK
     * terms, it represents the node under {@link #basePath}, under which services will be registered.
     * @return {@code this} for chaining.
     */
    public TcpDiscoveryZookeeperIpFinder setServiceName(String serviceName) {
        this.serviceName = serviceName;

        return this;
    }

    /**
     * @return The value of this flag. See {@link #setAllowDuplicateRegistrations(boolean)} for more details.
     */
    public boolean isAllowDuplicateRegistrations() {
        return allowDuplicateRegistrations;
    }

    /**
     * @param allowDuplicateRegistrations Whether to register each node only once, or if duplicate registrations are
     * allowed. Nodes will attempt to register themselves, plus those they know about. By default, duplicate
     * registrations are not allowed, but you might want to set this property to <tt>true</tt> if you have multiple
     * network interfaces or if you are facing troubles.
     * @return {@code this} for chaining.
     */
    public TcpDiscoveryZookeeperIpFinder setAllowDuplicateRegistrations(boolean allowDuplicateRegistrations) {
        this.allowDuplicateRegistrations = allowDuplicateRegistrations;

        return this;
    }

    /** {@inheritDoc} */
    @Override public TcpDiscoveryZookeeperIpFinder setShared(boolean shared) {
        super.setShared(shared);

        return this;
    }

    /**
     * Empty DTO for storing service instances details. Currently acting as a placeholder because Curator requires a
     * payload type when registering and discovering nodes. May be enhanced in the future with further information to
     * assist discovery.
     *
     * @author Raul Kripalani
     */
    @JsonRootName("ignite_instance_details")
    private class IgniteInstanceDetails {

    }

}
