/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.distributedlog.util;

import com.google.common.base.Objects;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.distributedlog.stats.BroadCastStatsLogger;
import com.twitter.util.ExecutorServiceFuturePool;
import com.twitter.util.FuturePool;
import com.twitter.util.Time;
import com.twitter.util.Timer;
import com.twitter.util.TimerTask;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.util.MathUtils;
import scala.Function0;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
 * Ordered Scheduler. It is thread pool based {@link ScheduledExecutorService}, additionally providing
 * the ability to execute/schedule tasks by <code>key</code>. Hence the tasks submitted by same <i>key</i>
 * will be executed in order.
 * <p>
 * The scheduler is comprised of multiple {@link MonitoredScheduledThreadPoolExecutor}s. Each
 * {@link MonitoredScheduledThreadPoolExecutor} is a single thread executor. Normal task submissions will
 * be submitted to executors in a random manner to guarantee load balancing. Keyed task submissions (e.g
 * {@link OrderedScheduler#apply(Object, Function0)} will be submitted to a dedicated executor based on
 * the hash value of submit <i>key</i>.
 *
 * <h3>Metrics</h3>
 *
 * <h4>Per Executor Metrics</h4>
 *
 * Metrics about individual executors are exposed via {@link Builder#perExecutorStatsLogger}
 * under <i>`scope`/`name`-executor-`id`-0</i>. `name` is the scheduler name provided by {@link Builder#name}
 * while `id` is the index of this executor in the pool. And corresponding stats of future pool of
 * that executor are exposed under <i>`scope`/`name`-executor-`id`-0/futurepool</i>.
 * <p>
 * See {@link MonitoredScheduledThreadPoolExecutor} and {@link MonitoredFuturePool} for per executor metrics
 * exposed.
 *
 * <h4>Aggregated Metrics</h4>
 * <ul>
 * <li>task_pending_time: opstats. measuring the characteristics about the time that tasks spent on
 * waiting being executed.
 * <li>task_execution_time: opstats. measuring the characteristics about the time that tasks spent on
 * executing.
 * <li>futurepool/task_pending_time: opstats. measuring the characteristics about the time that tasks spent
 * on waiting in future pool being executed.
 * <li>futurepool/task_execution_time: opstats. measuring the characteristics about the time that tasks spent
 * on executing.
 * <li>futurepool/task_enqueue_time: opstats. measuring the characteristics about the time that tasks spent on
 * submitting to future pool.
 * <li>futurepool/tasks_pending: gauge. how many tasks are pending in this future pool.
 * </ul>
 */
public class OrderedScheduler implements ScheduledExecutorService {

    /**
     * Create a builder to build scheduler.
     *
     * @return scheduler builder
     */
    public static Builder newBuilder() {
        return new Builder();
    }

    /**
     * Builder for {@link OrderedScheduler}.
     */
    public static class Builder {

        private String name = "OrderedScheduler";
        private int corePoolSize = -1;
        private ThreadFactory threadFactory = null;
        private boolean traceTaskExecution = false;
        private long traceTaskExecutionWarnTimeUs = Long.MAX_VALUE;
        private StatsLogger statsLogger = NullStatsLogger.INSTANCE;
        private StatsLogger perExecutorStatsLogger = NullStatsLogger.INSTANCE;

        /**
         * Set the name of this scheduler. It would be used as part of stats scope and thread name.
         *
         * @param name
         *          name of the scheduler.
         * @return scheduler builder
         */
        public Builder name(String name) {
            this.name = name;
            return this;
        }

        /**
         * Set the number of threads to be used in this scheduler.
         *
         * @param corePoolSize the number of threads to keep in the pool, even
         *        if they are idle
         * @return scheduler builder
         */
        public Builder corePoolSize(int corePoolSize) {
            this.corePoolSize = corePoolSize;
            return this;
        }

        /**
         * Set the thread factory that the scheduler uses to create a new thread.
         *
         * @param threadFactory the factory to use when the executor
         *        creates a new thread
         * @return scheduler builder
         */
        public Builder threadFactory(ThreadFactory threadFactory) {
            this.threadFactory = threadFactory;
            return this;
        }

        /**
         * Enable/Disable exposing task execution stats.
         *
         * @param trace
         *          flag to enable/disable exposing task execution stats.
         * @return scheduler builder
         */
        public Builder traceTaskExecution(boolean trace) {
            this.traceTaskExecution = trace;
            return this;
        }

        /**
         * Enable/Disable logging slow tasks whose execution time is above <code>timeUs</code>.
         *
         * @param timeUs
         *          slow task execution time threshold in us.
         * @return scheduler builder.
         */
        public Builder traceTaskExecutionWarnTimeUs(long timeUs) {
            this.traceTaskExecutionWarnTimeUs = timeUs;
            return this;
        }

        /**
         * Expose the aggregated stats over <code>statsLogger</code>.
         *
         * @param statsLogger
         *          stats logger to receive aggregated stats.
         * @return scheduler builder
         */
        public Builder statsLogger(StatsLogger statsLogger) {
            this.statsLogger = statsLogger;
            return this;
        }

        /**
         * Expose stats of individual executors over <code>perExecutorStatsLogger</code>.
         * Each executor's stats will be exposed under a sub-scope `name`-executor-`id`-0.
         * `name` is the scheduler name, while `id` is the index of the scheduler in the pool.
         *
         * @param perExecutorStatsLogger
         *          stats logger to receive per executor stats.
         * @return scheduler builder
         */
        public Builder perExecutorStatsLogger(StatsLogger perExecutorStatsLogger) {
            this.perExecutorStatsLogger = perExecutorStatsLogger;
            return this;
        }

        /**
         * Build the ordered scheduler.
         *
         * @return ordered scheduler
         */
        public OrderedScheduler build() {
            if (corePoolSize <= 0) {
                corePoolSize = Runtime.getRuntime().availableProcessors();
            }
            if (null == threadFactory) {
                threadFactory = Executors.defaultThreadFactory();
            }

            return new OrderedScheduler(
                    name,
                    corePoolSize,
                    threadFactory,
                    traceTaskExecution,
                    traceTaskExecutionWarnTimeUs,
                    statsLogger,
                    perExecutorStatsLogger);
        }

    }

    protected final String name;
    protected final int corePoolSize;
    protected final MonitoredScheduledThreadPoolExecutor[] executors;
    protected final MonitoredFuturePool[] futurePools;
    protected final Random random;

    private OrderedScheduler(String name,
                             int corePoolSize,
                             ThreadFactory threadFactory,
                             boolean traceTaskExecution,
                             long traceTaskExecutionWarnTimeUs,
                             StatsLogger statsLogger,
                             StatsLogger perExecutorStatsLogger) {
        this.name = name;
        this.corePoolSize = corePoolSize;
        this.executors = new MonitoredScheduledThreadPoolExecutor[corePoolSize];
        this.futurePools = new MonitoredFuturePool[corePoolSize];
        for (int i = 0; i < corePoolSize; i++) {
            ThreadFactory tf = new ThreadFactoryBuilder()
                    .setNameFormat(name + "-executor-" + i + "-%d")
                    .setThreadFactory(threadFactory)
                    .build();
            StatsLogger broadcastStatsLogger =
                    BroadCastStatsLogger.masterslave(perExecutorStatsLogger.scope("executor-" + i), statsLogger);
            executors[i] = new MonitoredScheduledThreadPoolExecutor(
                    1, tf, broadcastStatsLogger, traceTaskExecution);
            futurePools[i] = new MonitoredFuturePool(
                    new ExecutorServiceFuturePool(executors[i]),
                    broadcastStatsLogger.scope("futurepool"),
                    traceTaskExecution,
                    traceTaskExecutionWarnTimeUs);
        }
        this.random = new Random(System.currentTimeMillis());
    }

    protected MonitoredScheduledThreadPoolExecutor chooseExecutor() {
        return corePoolSize == 1 ? executors[0] : executors[random.nextInt(corePoolSize)];
    }

    protected MonitoredScheduledThreadPoolExecutor chooseExecutor(Object key) {
        return corePoolSize == 1 ? executors[0] :
                executors[MathUtils.signSafeMod(Objects.hashCode(key), corePoolSize)];
    }

    protected FuturePool chooseFuturePool(Object key) {
        return corePoolSize == 1 ? futurePools[0] :
                futurePools[MathUtils.signSafeMod(Objects.hashCode(key), corePoolSize)];
    }

    protected FuturePool chooseFuturePool() {
        return corePoolSize == 1 ? futurePools[0] : futurePools[random.nextInt(corePoolSize)];
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
        return chooseExecutor().schedule(command, delay, unit);
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
        return chooseExecutor().schedule(callable, delay, unit);
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay, long period, TimeUnit unit) {
        return chooseExecutor().scheduleAtFixedRate(command, initialDelay, period, unit);
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay, long delay, TimeUnit unit) {
        return chooseExecutor().scheduleWithFixedDelay(command, initialDelay, delay, unit);
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public void shutdown() {
        for (MonitoredScheduledThreadPoolExecutor executor : executors) {
            // Unregister gauges
            executor.unregisterGauges();
            executor.shutdown();
        }
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public List<Runnable> shutdownNow() {
        List<Runnable> runnables = new ArrayList<Runnable>();
        for (MonitoredScheduledThreadPoolExecutor executor : executors) {
            runnables.addAll(executor.shutdownNow());
        }
        return runnables;
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public boolean isShutdown() {
        for (MonitoredScheduledThreadPoolExecutor executor : executors) {
            if (!executor.isShutdown()) {
                return false;
            }
        }
        return true;
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public boolean isTerminated() {
        for (MonitoredScheduledThreadPoolExecutor executor : executors) {
            if (!executor.isTerminated()) {
                return false;
            }
        }
        return true;
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public boolean awaitTermination(long timeout, TimeUnit unit)
            throws InterruptedException {
        for (MonitoredScheduledThreadPoolExecutor executor : executors) {
            if (!executor.awaitTermination(timeout, unit)) {
                return false;
            }
        }
        return true;
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public <T> Future<T> submit(Callable<T> task) {
        return chooseExecutor().submit(task);
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public <T> Future<T> submit(Runnable task, T result) {
        return chooseExecutor().submit(task, result);
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public Future<?> submit(Runnable task) {
        return chooseExecutor().submit(task);
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
            throws InterruptedException {
        return chooseExecutor().invokeAll(tasks);
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
            throws InterruptedException {
        return chooseExecutor().invokeAll(tasks, timeout, unit);
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
            throws InterruptedException, ExecutionException {
        return chooseExecutor().invokeAny(tasks);
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException {
        return chooseExecutor().invokeAny(tasks, timeout, unit);
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public void execute(Runnable command) {
        chooseExecutor().execute(command);
    }

    // Ordered Functions

    /**
     * Return a future pool used by <code>key</code>.
     *
     * @param key
     *          key to order in the future pool
     * @return future pool
     */
    public FuturePool getFuturePool(Object key) {
        return chooseFuturePool(key);
    }

    /**
     * Execute the <code>function</code> in the executor that assigned by <code>key</code>.
     *
     * @see com.twitter.util.Future
     * @param key key of the <i>function</i> to run
     * @param function function to run
     * @return future representing the result of the <i>function</i>
     */
    public <T> com.twitter.util.Future<T> apply(Object key, Function0<T> function) {
        return chooseFuturePool(key).apply(function);
    }

    /**
     * Execute the <code>function</code> by the scheduler. It would be submitted to any executor randomly.
     *
     * @param function function to run
     * @return future representing the result of the <i>function</i>
     */
    public <T> com.twitter.util.Future<T> apply(Function0<T> function) {
        return chooseFuturePool().apply(function);
    }

    public ScheduledFuture<?> schedule(Object key, Runnable command, long delay, TimeUnit unit) {
        return chooseExecutor(key).schedule(command, delay, unit);
    }

    public ScheduledFuture<?> scheduleAtFixedRate(Object key,
                                                  Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
        return chooseExecutor(key).scheduleAtFixedRate(command, initialDelay, period, unit);
    }

    public Future<?> submit(Object key, Runnable command) {
        return chooseExecutor(key).submit(command);
    }

}
