/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.twitter.distributedlog.service;

import com.twitter.distributedlog.DistributedLogConfiguration;
import com.twitter.distributedlog.LocalDLMEmulator;
import com.twitter.distributedlog.client.routing.SingleHostRoutingService;
import com.twitter.distributedlog.impl.metadata.BKDLConfig;
import com.twitter.distributedlog.metadata.DLMetadata;
import com.twitter.distributedlog.service.placement.EqualLoadAppraiser;
import com.twitter.distributedlog.service.streamset.IdentityStreamPartitionConverter;
import com.twitter.finagle.builder.Server;
import java.io.File;
import java.net.BindException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.shims.zk.ZooKeeperServerShim;
import org.apache.bookkeeper.stats.NullStatsProvider;
import org.apache.bookkeeper.util.IOUtils;
import org.apache.bookkeeper.util.LocalBookKeeper;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * DistributedLog Cluster is an emulator to run distributedlog components.
 */
public class DistributedLogCluster {

    private static final Logger LOG = LoggerFactory.getLogger(DistributedLogCluster.class);

    public static Builder newBuilder() {
        return new Builder();
    }

    /**
     * Builder to build distributedlog cluster.
     */
    public static class Builder {

        int numBookies = 3;
        boolean shouldStartZK = true;
        String zkHost = "127.0.0.1";
        int zkPort = 0;
        boolean shouldStartProxy = true;
        int proxyPort = 7000;
        boolean thriftmux = false;
        DistributedLogConfiguration dlConf = new DistributedLogConfiguration()
                .setLockTimeout(10)
                .setOutputBufferSize(0)
                .setImmediateFlushEnabled(true);
        ServerConfiguration bkConf = new ServerConfiguration();

        private Builder() {}

        /**
         * How many bookies to run. By default is 3.
         *
         * @return builder
         */
        public Builder numBookies(int numBookies) {
            this.numBookies = numBookies;
            return this;
        }

        /**
         * Whether to start zookeeper? By default is true.
         *
         * @param startZK
         *          flag to start zookeeper?
         * @return builder
         */
        public Builder shouldStartZK(boolean startZK) {
            this.shouldStartZK = startZK;
            return this;
        }

        /**
         * ZooKeeper server to run. By default it runs locally on '127.0.0.1'.
         *
         * @param zkServers
         *          zk servers
         * @return builder
         */
        public Builder zkServers(String zkServers) {
            this.zkHost = zkServers;
            return this;
        }

        /**
         * ZooKeeper server port to listen on. By default it listens on 2181.
         *
         * @param zkPort
         *          zookeeper server port.
         * @return builder.
         */
        public Builder zkPort(int zkPort) {
            this.zkPort = zkPort;
            return this;
        }

        /**
         * Whether to start proxy or not. By default is true.
         *
         * @param startProxy
         *          whether to start proxy or not.
         * @return builder
         */
        public Builder shouldStartProxy(boolean startProxy) {
            this.shouldStartProxy = startProxy;
            return this;
        }

        /**
         * Port that proxy server to listen on. By default is 7000.
         *
         * @param proxyPort
         *          port that proxy server to listen on.
         * @return builder
         */
        public Builder proxyPort(int proxyPort) {
            this.proxyPort = proxyPort;
            return this;
        }

        /**
         * Set the distributedlog configuration.
         *
         * @param dlConf
         *          distributedlog configuration
         * @return builder
         */
        public Builder dlConf(DistributedLogConfiguration dlConf) {
            this.dlConf = dlConf;
            return this;
        }

        /**
         * Set the Bookkeeper server configuration.
         *
         * @param bkConf
         *          bookkeeper server configuration
         * @return builder
         */
        public Builder bkConf(ServerConfiguration bkConf) {
            this.bkConf = bkConf;
            return this;
        }

        /**
         * Enable thriftmux for the dl server.
         *
         * @param enabled flag to enable thriftmux
         * @return builder
         */
        public Builder thriftmux(boolean enabled) {
            this.thriftmux = enabled;
            return this;
        }

        public DistributedLogCluster build() throws Exception {
            // build the cluster
            return new DistributedLogCluster(
                dlConf,
                bkConf,
                numBookies,
                shouldStartZK,
                zkHost,
                zkPort,
                shouldStartProxy,
                proxyPort,
                thriftmux);
        }
    }

    /**
     * Run a distributedlog proxy server.
     */
    public static class DLServer {

        static final int MAX_RETRIES = 20;
        static final int MIN_PORT = 1025;
        static final int MAX_PORT = 65535;

        int proxyPort;

        public final InetSocketAddress address;
        public final Pair<DistributedLogServiceImpl, Server> dlServer;
        private final SingleHostRoutingService routingService = SingleHostRoutingService.of(null);

        protected DLServer(DistributedLogConfiguration dlConf,
                           URI uri,
                           int basePort,
                           boolean thriftmux) throws Exception {
            proxyPort = basePort;

            boolean success = false;
            int retries = 0;
            Pair<DistributedLogServiceImpl, Server> serverPair = null;
            while (!success) {
                try {
                    com.twitter.distributedlog.service.config.ServerConfiguration serverConf =
                            new com.twitter.distributedlog.service.config.ServerConfiguration();
                    serverConf.loadConf(dlConf);
                    serverConf.setServerShardId(proxyPort);
                    serverPair = DistributedLogServer.runServer(
                            serverConf,
                            dlConf,
                            uri,
                            new IdentityStreamPartitionConverter(),
                            routingService,
                            new NullStatsProvider(),
                            proxyPort,
                            thriftmux,
                            new EqualLoadAppraiser());
                    routingService.setAddress(DLSocketAddress.getSocketAddress(proxyPort));
                    routingService.startService();
                    serverPair.getLeft().startPlacementPolicy();
                    success = true;
                } catch (BindException be) {
                    retries++;
                    if (retries > MAX_RETRIES) {
                        throw be;
                    }
                    proxyPort++;
                    if (proxyPort > MAX_PORT) {
                        proxyPort = MIN_PORT;
                    }
                }
            }

            LOG.info("Running DL on port {}", proxyPort);

            dlServer = serverPair;
            address = DLSocketAddress.getSocketAddress(proxyPort);
        }

        public InetSocketAddress getAddress() {
            return address;
        }

        public void shutdown() {
            DistributedLogServer.closeServer(dlServer, 0, TimeUnit.MILLISECONDS);
            routingService.stopService();
        }
    }

    private final DistributedLogConfiguration dlConf;
    private final ZooKeeperServerShim zks;
    private final LocalDLMEmulator dlmEmulator;
    private DLServer dlServer;
    private final boolean shouldStartProxy;
    private final int proxyPort;
    private final boolean thriftmux;
    private final List<File> tmpDirs = new ArrayList<File>();

    private DistributedLogCluster(DistributedLogConfiguration dlConf,
                                  ServerConfiguration bkConf,
                                  int numBookies,
                                  boolean shouldStartZK,
                                  String zkServers,
                                  int zkPort,
                                  boolean shouldStartProxy,
                                  int proxyPort,
                                  boolean thriftmux) throws Exception {
        this.dlConf = dlConf;
        if (shouldStartZK) {
            File zkTmpDir = IOUtils.createTempDir("zookeeper", "distrlog");
            tmpDirs.add(zkTmpDir);
            if (0 == zkPort) {
                Pair<ZooKeeperServerShim, Integer> serverAndPort = LocalDLMEmulator.runZookeeperOnAnyPort(zkTmpDir);
                this.zks = serverAndPort.getLeft();
                zkPort = serverAndPort.getRight();
            } else {
                this.zks = LocalBookKeeper.runZookeeper(1000, zkPort, zkTmpDir);
            }
        } else {
            this.zks = null;
        }
        this.dlmEmulator = LocalDLMEmulator.newBuilder()
                .numBookies(numBookies)
                .zkHost(zkServers)
                .zkPort(zkPort)
                .serverConf(bkConf)
                .shouldStartZK(false)
                .build();
        this.shouldStartProxy = shouldStartProxy;
        this.proxyPort = proxyPort;
        this.thriftmux = thriftmux;
    }

    public void start() throws Exception {
        this.dlmEmulator.start();
        BKDLConfig bkdlConfig = new BKDLConfig(this.dlmEmulator.getZkServers(), "/ledgers").setACLRootPath(".acl");
        DLMetadata.create(bkdlConfig).update(this.dlmEmulator.getUri());
        if (shouldStartProxy) {
            this.dlServer = new DLServer(
                    dlConf,
                    this.dlmEmulator.getUri(),
                    proxyPort,
                    thriftmux);
        } else {
            this.dlServer = null;
        }
    }

    public void stop() throws Exception {
        if (null != dlServer) {
            this.dlServer.shutdown();
        }
        this.dlmEmulator.teardown();
        if (null != this.zks) {
            this.zks.stop();
        }
        for (File dir : tmpDirs) {
            FileUtils.deleteDirectory(dir);
        }
    }

    public URI getUri() {
        return this.dlmEmulator.getUri();
    }

    public String getZkServers() {
        return this.dlmEmulator.getZkServers();
    }

    public String getProxyFinagleStr() {
        return "inet!" + (dlServer == null ? "127.0.0.1:" + proxyPort : dlServer.getAddress().toString());
    }

}
