/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.cassandra.utils;

import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.util.FileUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.RandomAccessFile;
import java.lang.reflect.Constructor;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel.MapMode;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import java.util.Locale;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;

public class CoalescingStrategies
{
    static protected final Logger logger = LoggerFactory.getLogger(CoalescingStrategies.class);

    /*
     * Log debug information at info level about what the average is and when coalescing is enabled/disabled
     */
    private static final String DEBUG_COALESCING_PROPERTY = Config.PROPERTY_PREFIX + "coalescing_debug";
    private static final boolean DEBUG_COALESCING = Boolean.getBoolean(DEBUG_COALESCING_PROPERTY);

    private static final String DEBUG_COALESCING_PATH_PROPERTY = Config.PROPERTY_PREFIX + "coalescing_debug_path";
    private static final String DEBUG_COALESCING_PATH = System.getProperty(DEBUG_COALESCING_PATH_PROPERTY, "/tmp/coleascing_debug");

    static
    {
        if (DEBUG_COALESCING)
        {
            File directory = new File(DEBUG_COALESCING_PATH);

            if (directory.exists())
                FileUtils.deleteRecursive(directory);

            if (!directory.mkdirs())
                throw new ExceptionInInitializerError("Couldn't create log dir");
        }
    }

    @VisibleForTesting
    interface Clock
    {
        long nanoTime();
    }

    @VisibleForTesting
    static Clock CLOCK = new Clock()
    {
        public long nanoTime()
        {
            return System.nanoTime();
        }
    };

    public static interface Coalescable
    {
        long timestampNanos();
    }

    @VisibleForTesting
    static void parkLoop(long nanos)
    {
        long now = System.nanoTime();
        final long timer = now + nanos;
        // We shouldn't loop if it's within a few % of the target sleep time if on a second iteration.
        // See CASSANDRA-8692.
        final long limit = timer - nanos / 16;
        do
        {
            LockSupport.parkNanos(timer - now);
            now = System.nanoTime();
        }
        while (now < limit);
    }

    private static boolean maybeSleep(int messages, long averageGap, long maxCoalesceWindow, Parker parker)
    {
        // Do not sleep if there are still items in the backlog (CASSANDRA-13090).
        if (messages >= DatabaseDescriptor.getOtcCoalescingEnoughCoalescedMessages())
            return false;

        // only sleep if we can expect to double the number of messages we're sending in the time interval
        long sleep = messages * averageGap;
        if (sleep <= 0 || sleep > maxCoalesceWindow)
            return false;

        // assume we receive as many messages as we expect; apply the same logic to the future batch:
        // expect twice as many messages to consider sleeping for "another" interval; this basically translates
        // to doubling our sleep period until we exceed our max sleep window
        while (sleep * 2 < maxCoalesceWindow)
            sleep *= 2;
        parker.park(sleep);
        return true;
    }

    public static abstract class CoalescingStrategy
    {
        protected final Parker parker;
        protected final Logger logger;
        protected volatile boolean shouldLogAverage = false;
        protected final ByteBuffer logBuffer;
        private RandomAccessFile ras;
        private final String displayName;

        protected CoalescingStrategy(Parker parker, Logger logger, String displayName)
        {
            this.parker = parker;
            this.logger = logger;
            this.displayName = displayName;
            if (DEBUG_COALESCING)
            {
                NamedThreadFactory.createThread(() ->
                {
                    while (true)
                    {
                        try
                        {
                            Thread.sleep(5000);
                        }
                        catch (InterruptedException e)
                        {
                            throw new AssertionError();
                        }
                        shouldLogAverage = true;
                    }
                }, displayName + " debug thread").start();
            }
            RandomAccessFile rasTemp = null;
            ByteBuffer logBufferTemp = null;
            if (DEBUG_COALESCING)
            {
                try
                {
                    File outFile = File.createTempFile("coalescing_" + this.displayName + "_", ".log", new File(DEBUG_COALESCING_PATH));
                    rasTemp = new RandomAccessFile(outFile, "rw");
                    logBufferTemp = ras.getChannel().map(MapMode.READ_WRITE, 0, Integer.MAX_VALUE);
                    logBufferTemp.putLong(0);
                }
                catch (Exception e)
                {
                    logger.error("Unable to create output file for debugging coalescing", e);
                }
            }
            ras = rasTemp;
            logBuffer = logBufferTemp;
        }

        /*
         * If debugging is enabled log to the logger the current average gap calculation result.
         */
        final protected void debugGap(long averageGap)
        {
            if (DEBUG_COALESCING && shouldLogAverage)
            {
                shouldLogAverage = false;
                logger.info("{} gap {}μs", this, TimeUnit.NANOSECONDS.toMicros(averageGap));
            }
        }

        /*
         * If debugging is enabled log the provided nanotime timestamp to a file.
         */
        final protected void debugTimestamp(long timestamp)
        {
            if(DEBUG_COALESCING && logBuffer != null)
            {
                logBuffer.putLong(0, logBuffer.getLong(0) + 1);
                logBuffer.putLong(timestamp);
            }
        }

        /*
         * If debugging is enabled log the timestamps of all the items in the provided collection
         * to a file.
         */
        final protected <C extends Coalescable> void debugTimestamps(Collection<C> coalescables)
        {
            if (DEBUG_COALESCING)
            {
                for (C coalescable : coalescables)
                {
                    debugTimestamp(coalescable.timestampNanos());
                }
            }
        }

        /**
         * Drain from the input blocking queue to the output list up to maxItems elements.
         *
         * The coalescing strategy may choose to park the current thread if it thinks it will
         * be able to produce an output list with more elements.
         *
         * @param input Blocking queue to retrieve elements from
         * @param out Output list to place retrieved elements in. Must be empty.
         * @param maxItems Maximum number of elements to place in the output list
         */
        public <C extends Coalescable> void coalesce(BlockingQueue<C> input, List<C> out, int maxItems) throws InterruptedException
        {
            Preconditions.checkArgument(out.isEmpty(), "out list should be empty");
            coalesceInternal(input, out, maxItems);
        }

        protected abstract <C extends Coalescable> void coalesceInternal(BlockingQueue<C> input, List<C> out, int maxItems) throws InterruptedException;

    }

    @VisibleForTesting
    interface Parker
    {
        void park(long nanos);
    }

    private static final Parker PARKER = new Parker()
    {
        @Override
        public void park(long nanos)
        {
            parkLoop(nanos);
        }
    };

    @VisibleForTesting
    static class TimeHorizonMovingAverageCoalescingStrategy extends CoalescingStrategy
    {
        // for now we'll just use 64ms per bucket; this can be made configurable, but results in ~1s for 16 samples
        private static final int INDEX_SHIFT = 26;
        private static final long BUCKET_INTERVAL = 1L << 26;
        private static final int BUCKET_COUNT = 16;
        private static final long INTERVAL = BUCKET_INTERVAL * BUCKET_COUNT;
        private static final long MEASURED_INTERVAL = BUCKET_INTERVAL * (BUCKET_COUNT - 1);

        // the minimum timestamp we will now accept updates for; only moves forwards, never backwards
        private long epoch = CLOCK.nanoTime();
        // the buckets, each following on from epoch; the measurements run from ix(epoch) to ix(epoch - 1)
        // ix(epoch-1) is a partial result, that is never actually part of the calculation, and most updates
        // are expected to hit this bucket
        private final int samples[] = new int[BUCKET_COUNT];
        private long sum = 0;
        private final long maxCoalesceWindow;

        public TimeHorizonMovingAverageCoalescingStrategy(int maxCoalesceWindow, Parker parker, Logger logger, String displayName)
        {
            super(parker, logger, displayName);
            this.maxCoalesceWindow = TimeUnit.MICROSECONDS.toNanos(maxCoalesceWindow);
            sum = 0;
        }

        private void logSample(long nanos)
        {
            debugTimestamp(nanos);
            long epoch = this.epoch;
            long delta = nanos - epoch;
            if (delta < 0)
                // have to simply ignore, but would be a bit crazy to get such reordering
                return;

            if (delta > INTERVAL)
                epoch = rollepoch(delta, epoch, nanos);

            int ix = ix(nanos);
            samples[ix]++;

            // if we've updated an old bucket, we need to update the sum to match
            if (ix != ix(epoch - 1))
                sum++;
        }

        private long averageGap()
        {
            if (sum == 0)
                return Integer.MAX_VALUE;
            return MEASURED_INTERVAL / sum;
        }

        // this sample extends past the end of the range we cover, so rollover
        private long rollepoch(long delta, long epoch, long nanos)
        {
            if (delta > 2 * INTERVAL)
            {
                // this sample is more than twice our interval ahead, so just clear our counters completely
                epoch = epoch(nanos);
                sum = 0;
                Arrays.fill(samples, 0);
            }
            else
            {
                // ix(epoch - 1) => last index; this is our partial result bucket, so we add this to the sum
                sum += samples[ix(epoch - 1)];
                // then we roll forwards, clearing buckets, until our interval covers the new sample time
                while (epoch + INTERVAL < nanos)
                {
                    int index = ix(epoch);
                    sum -= samples[index];
                    samples[index] = 0;
                    epoch += BUCKET_INTERVAL;
                }
            }
            // store the new epoch
            this.epoch = epoch;
            return epoch;
        }

        private long epoch(long latestNanos)
        {
            return (latestNanos - MEASURED_INTERVAL) & ~(BUCKET_INTERVAL - 1);
        }

        private int ix(long nanos)
        {
            return (int) ((nanos >>> INDEX_SHIFT) & 15);
        }

        @Override
        protected <C extends Coalescable> void coalesceInternal(BlockingQueue<C> input, List<C> out,  int maxItems) throws InterruptedException
        {
            if (input.drainTo(out, maxItems) == 0)
            {
                out.add(input.take());
                input.drainTo(out, maxItems - out.size());
            }

            for (Coalescable qm : out)
                logSample(qm.timestampNanos());

            long averageGap = averageGap();
            debugGap(averageGap);

            int count = out.size();
            if (maybeSleep(count, averageGap, maxCoalesceWindow, parker))
            {
                input.drainTo(out, maxItems - out.size());
                int prevCount = count;
                count = out.size();
                for (int  i = prevCount; i < count; i++)
                    logSample(out.get(i).timestampNanos());
            }
        }

        @Override
        public String toString()
        {
            return "Time horizon moving average";
        }
    }

    /*
     * Start coalescing by sleeping if the moving average is < the requested window.
     * The actual time spent waiting to coalesce will be the min( window, moving average * 2)
     * The actual amount of time spent waiting can be greater then the window. For instance
     * observed time spent coalescing was 400 microseconds with the window set to 200 in one benchmark.
     */
    @VisibleForTesting
    static class MovingAverageCoalescingStrategy extends CoalescingStrategy
    {
        private final int samples[] = new int[16];
        private long lastSample = 0;
        private int index = 0;
        private long sum = 0;

        private final long maxCoalesceWindow;

        public MovingAverageCoalescingStrategy(int maxCoalesceWindow, Parker parker, Logger logger, String displayName)
        {
            super(parker, logger, displayName);
            this.maxCoalesceWindow = TimeUnit.MICROSECONDS.toNanos(maxCoalesceWindow);
            for (int ii = 0; ii < samples.length; ii++)
                samples[ii] = Integer.MAX_VALUE;
            sum = Integer.MAX_VALUE * (long)samples.length;
        }

        private long logSample(int value)
        {
            sum -= samples[index];
            sum += value;
            samples[index] = value;
            index++;
            index = index & ((1 << 4) - 1);
            return sum / 16;
        }

        private long notifyOfSample(long sample)
        {
            debugTimestamp(sample);
            if (sample > lastSample)
            {
                final int delta = (int)(Math.min(Integer.MAX_VALUE, sample - lastSample));
                lastSample = sample;
                return logSample(delta);
            }
            else
            {
                return logSample(1);
            }
        }

        @Override
        protected <C extends Coalescable> void coalesceInternal(BlockingQueue<C> input, List<C> out,  int maxItems) throws InterruptedException
        {
            if (input.drainTo(out, maxItems) == 0)
            {
                out.add(input.take());
                input.drainTo(out, maxItems - out.size());
            }

            long average = notifyOfSample(out.get(0).timestampNanos());
            debugGap(average);

            if (maybeSleep(out.size(), average, maxCoalesceWindow, parker)) {
                input.drainTo(out, maxItems - out.size());
            }

            for (int ii = 1; ii < out.size(); ii++)
                notifyOfSample(out.get(ii).timestampNanos());
        }

        @Override
        public String toString()
        {
            return "Moving average";
        }
    }

    /*
     * A fixed strategy as a backup in case MovingAverage or TimeHorizongMovingAverage fails in some scenario
     */
    @VisibleForTesting
    static class FixedCoalescingStrategy extends CoalescingStrategy
    {
        private final long coalesceWindow;

        public FixedCoalescingStrategy(int coalesceWindowMicros, Parker parker, Logger logger, String displayName)
        {
            super(parker, logger, displayName);
            coalesceWindow = TimeUnit.MICROSECONDS.toNanos(coalesceWindowMicros);
        }

        @Override
        protected <C extends Coalescable> void coalesceInternal(BlockingQueue<C> input, List<C> out,  int maxItems) throws InterruptedException
        {
            int enough = DatabaseDescriptor.getOtcCoalescingEnoughCoalescedMessages();

            if (input.drainTo(out, maxItems) == 0)
            {
                out.add(input.take());
                input.drainTo(out, maxItems - out.size());
                if (out.size() < enough) {
                    parker.park(coalesceWindow);
                    input.drainTo(out, maxItems - out.size());
                }
            }
            debugTimestamps(out);
        }

        @Override
        public String toString()
        {
            return "Fixed";
        }
    }

    /*
     * A coalesscing strategy that just returns all currently available elements
     */
    @VisibleForTesting
    static class DisabledCoalescingStrategy extends CoalescingStrategy
    {

        public DisabledCoalescingStrategy(int coalesceWindowMicros, Parker parker, Logger logger, String displayName)
        {
            super(parker, logger, displayName);
        }

        @Override
        protected <C extends Coalescable> void coalesceInternal(BlockingQueue<C> input, List<C> out,  int maxItems) throws InterruptedException
        {
            if (input.drainTo(out, maxItems) == 0)
            {
                out.add(input.take());
                input.drainTo(out, maxItems - 1);
            }
            debugTimestamps(out);
        }

        @Override
        public String toString()
        {
            return "Disabled";
        }
    }

    @VisibleForTesting
    static CoalescingStrategy newCoalescingStrategy(String strategy,
                                                    int coalesceWindow,
                                                    Parker parker,
                                                    Logger logger,
                                                    String displayName)
    {
        String classname = null;
        String strategyCleaned = strategy.trim().toUpperCase(Locale.ENGLISH);
        switch(strategyCleaned)
        {
        case "MOVINGAVERAGE":
            classname = MovingAverageCoalescingStrategy.class.getName();
            break;
        case "FIXED":
            classname = FixedCoalescingStrategy.class.getName();
            break;
        case "TIMEHORIZON":
            classname = TimeHorizonMovingAverageCoalescingStrategy.class.getName();
            break;
        case "DISABLED":
            classname = DisabledCoalescingStrategy.class.getName();
            break;
        default:
            classname = strategy;
        }

        try
        {
            Class<?> clazz = Class.forName(classname);

            if (!CoalescingStrategy.class.isAssignableFrom(clazz))
            {
                throw new RuntimeException(classname + " is not an instance of CoalescingStrategy");
            }

            Constructor<?> constructor = clazz.getConstructor(int.class, Parker.class, Logger.class, String.class);

            return (CoalescingStrategy)constructor.newInstance(coalesceWindow, parker, logger, displayName);
        }
        catch (Exception e)
        {
            throw new RuntimeException(e);
        }
    }

    public static CoalescingStrategy newCoalescingStrategy(String strategy, int coalesceWindow, Logger logger, String displayName)
    {
        return newCoalescingStrategy(strategy, coalesceWindow, PARKER, logger, displayName);
    }
}
