/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.twitter.distributedlog.client.routing;

import static com.google.common.base.Preconditions.checkNotNull;

import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import com.twitter.distributedlog.service.DLSocketAddress;
import com.twitter.finagle.NoBrokersAvailableException;
import com.twitter.finagle.stats.StatsReceiver;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Routing Service based on a given {@link com.twitter.common.zookeeper.ServerSet}.
 */
class ServerSetRoutingService extends Thread implements RoutingService {

    private static final Logger logger = LoggerFactory.getLogger(ServerSetRoutingService.class);

    static ServerSetRoutingServiceBuilder newServerSetRoutingServiceBuilder() {
        return new ServerSetRoutingServiceBuilder();
    }

    /**
     * Builder to build {@link com.twitter.common.zookeeper.ServerSet} based routing service.
     */
    static class ServerSetRoutingServiceBuilder implements RoutingService.Builder {

        private ServerSetWatcher serverSetWatcher;

        private ServerSetRoutingServiceBuilder() {}

        public ServerSetRoutingServiceBuilder serverSetWatcher(ServerSetWatcher serverSetWatcher) {
            this.serverSetWatcher = serverSetWatcher;
            return this;
        }

        @Override
        public Builder statsReceiver(StatsReceiver statsReceiver) {
            return this;
        }

        @Override
        public RoutingService build() {
            checkNotNull(serverSetWatcher, "No serverset watcher provided.");
            return new ServerSetRoutingService(this.serverSetWatcher);
        }
    }

    private static class HostComparator implements Comparator<SocketAddress> {

        private static final HostComparator INSTANCE = new HostComparator();

        @Override
        public int compare(SocketAddress o1, SocketAddress o2) {
            return o1.toString().compareTo(o2.toString());
        }
    }

    private final ServerSetWatcher serverSetWatcher;

    private final Set<SocketAddress> hostSet = new HashSet<SocketAddress>();
    private List<SocketAddress> hostList = new ArrayList<SocketAddress>();
    private final HashFunction hasher = Hashing.md5();

    // Server Set Changes
    private final AtomicReference<ImmutableSet<DLSocketAddress>> serverSetChange =
            new AtomicReference<ImmutableSet<DLSocketAddress>>(null);
    private final CountDownLatch changeLatch = new CountDownLatch(1);

    // Listeners
    protected final CopyOnWriteArraySet<RoutingListener> listeners =
            new CopyOnWriteArraySet<RoutingListener>();

    ServerSetRoutingService(ServerSetWatcher serverSetWatcher) {
        super("ServerSetRoutingService");
        this.serverSetWatcher = serverSetWatcher;
    }

    @Override
    public Set<SocketAddress> getHosts() {
        synchronized (hostSet) {
            return ImmutableSet.copyOf(hostSet);
        }
    }

    @Override
    public void startService() {
        start();
        try {
            if (!changeLatch.await(1, TimeUnit.MINUTES)) {
                logger.warn("No serverset change received in 1 minute.");
            }
        } catch (InterruptedException e) {
            logger.warn("Interrupted waiting first serverset change : ", e);
        }
        logger.info("{} Routing Service Started.", getClass().getSimpleName());
    }

    @Override
    public void stopService() {
        Thread.currentThread().interrupt();
        try {
            join();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            logger.warn("Interrupted on waiting serverset routing service to finish : ", e);
        }
        logger.info("{} Routing Service Stopped.", getClass().getSimpleName());
    }

    @Override
    public RoutingService registerListener(RoutingListener listener) {
        listeners.add(listener);
        return this;
    }

    @Override
    public RoutingService unregisterListener(RoutingListener listener) {
        listeners.remove(listener);
        return this;
    }

    @Override
    public SocketAddress getHost(String key, RoutingContext rContext)
            throws NoBrokersAvailableException {
        SocketAddress address = null;
        synchronized (hostSet) {
            if (0 != hostList.size()) {
                int hashCode = hasher.hashUnencodedChars(key).asInt();
                int hostId = signSafeMod(hashCode, hostList.size());
                address = hostList.get(hostId);
                if (rContext.isTriedHost(address)) {
                    ArrayList<SocketAddress> newList = new ArrayList<SocketAddress>(hostList);
                    newList.remove(hostId);
                    // pickup a new host by rehashing it.
                    hostId = signSafeMod(hashCode, newList.size());
                    address = newList.get(hostId);
                    int i = hostId;
                    while (rContext.isTriedHost(address)) {
                        i = (i + 1) % newList.size();
                        if (i == hostId) {
                            address = null;
                            break;
                        }
                        address = newList.get(i);
                    }
                }
            }
        }
        if (null == address) {
            throw new NoBrokersAvailableException("No host is available.");
        }
        return address;
    }

    @Override
    public void removeHost(SocketAddress host, Throwable reason) {
        synchronized (hostSet) {
            if (hostSet.remove(host)) {
                logger.info("Node {} left due to : ", host, reason);
            }
            hostList = new ArrayList<SocketAddress>(hostSet);
            Collections.sort(hostList, HostComparator.INSTANCE);
            logger.info("Host list becomes : {}.", hostList);
        }
    }

    @Override
    public void run() {
        try {
            serverSetWatcher.watch(new ServerSetWatcher.ServerSetMonitor() {
                @Override
                public void onChange(ImmutableSet<DLSocketAddress> serviceInstances) {
                    ImmutableSet<DLSocketAddress> lastValue = serverSetChange.getAndSet(serviceInstances);
                    if (null == lastValue) {
                        ImmutableSet<DLSocketAddress> mostRecentValue;
                        do {
                            mostRecentValue = serverSetChange.get();
                            performServerSetChange(mostRecentValue);
                            changeLatch.countDown();
                        } while (!serverSetChange.compareAndSet(mostRecentValue, null));
                    }
                }
            });
        } catch (Exception e) {
            logger.error("Fail to monitor server set : ", e);
            Runtime.getRuntime().exit(-1);
        }
    }

    protected synchronized void performServerSetChange(ImmutableSet<DLSocketAddress> serverSet) {
        Set<SocketAddress> newSet = new HashSet<SocketAddress>();
        for (DLSocketAddress serviceInstance : serverSet) {
            newSet.add(serviceInstance.getSocketAddress());
        }

        Set<SocketAddress> removed;
        Set<SocketAddress> added;
        synchronized (hostSet) {
            removed = Sets.difference(hostSet, newSet).immutableCopy();
            added = Sets.difference(newSet, hostSet).immutableCopy();
            for (SocketAddress node: removed) {
                if (hostSet.remove(node)) {
                    logger.info("Node {} left.", node);
                }
            }
            for (SocketAddress node: added) {
                if (hostSet.add(node)) {
                    logger.info("Node {} joined.", node);
                }
            }
        }

        for (SocketAddress addr : removed) {
            for (RoutingListener listener : listeners) {
                listener.onServerLeft(addr);
            }
        }

        for (SocketAddress addr : added) {
            for (RoutingListener listener : listeners) {
                listener.onServerJoin(addr);
            }
        }

        synchronized (hostSet) {
            hostList = new ArrayList<SocketAddress>(hostSet);
            Collections.sort(hostList, HostComparator.INSTANCE);
            logger.info("Host list becomes : {}.", hostList);
        }

    }

    static int signSafeMod(long dividend, int divisor) {
        int mod = (int) (dividend % divisor);

        if (mod < 0) {
            mod += divisor;
        }

        return mod;
    }
}
