/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.cassandra.utils;

import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.utils.CoalescingStrategies.Clock;
import org.apache.cassandra.utils.CoalescingStrategies.Coalescable;
import org.apache.cassandra.utils.CoalescingStrategies.CoalescingStrategy;
import org.apache.cassandra.utils.CoalescingStrategies.Parker;
import org.junit.BeforeClass;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;

import static org.junit.Assert.*;

public class CoalescingStrategiesTest
{

    static final ExecutorService ex = Executors.newSingleThreadExecutor();

    private static final Logger logger = LoggerFactory.getLogger(CoalescingStrategiesTest.class);

    static class MockParker implements Parker
    {
        Queue<Long> parks = new ArrayDeque<Long>();
        Semaphore permits = new Semaphore(0);

        Semaphore parked = new Semaphore(0);

        public void park(long nanos)
        {
            parks.offer(nanos);
            parked.release();
            try
            {
                permits.acquire();
            }
            catch (InterruptedException e)
            {
                throw new RuntimeException(e);
            }
        }
    }

    static class SimpleCoalescable implements Coalescable
    {
        final long timestampNanos;

        SimpleCoalescable(long timestampNanos)
        {
            this.timestampNanos = timestampNanos;
        }

        public long timestampNanos()
        {
            return timestampNanos;
        }
    }


    static long toNanos(long micros)
    {
        return TimeUnit.MICROSECONDS.toNanos(micros);
    }

    MockParker parker;

    BlockingQueue<SimpleCoalescable> input;
    List<SimpleCoalescable> output;

    CoalescingStrategy cs;

    Semaphore queueParked = new Semaphore(0);
    Semaphore queueRelease = new Semaphore(0);

    @BeforeClass
    public static void initDD()
    {
        DatabaseDescriptor.forceStaticInitialization();
    }

    @SuppressWarnings({ "serial" })
    @Before
    public void setUp() throws Exception
    {
        cs = null;
        CoalescingStrategies.CLOCK = new Clock()
        {
            @Override
            public long nanoTime()
            {
                return 0;
            }
        };

        parker = new MockParker();
        input = new LinkedBlockingQueue<SimpleCoalescable>()
                {
            @Override
            public SimpleCoalescable take() throws InterruptedException
            {
                queueParked.release();
                queueRelease.acquire();
                return super.take();
            }
        };
        output = new ArrayList<>(128);

        clear();
    }

    CoalescingStrategy newStrategy(String name, int window)
    {
        return CoalescingStrategies.newCoalescingStrategy(name, window, parker, logger, "Stupendopotamus");
    }

    void add(long whenMicros)
    {
        input.offer(new SimpleCoalescable(toNanos(whenMicros)));
    }

    void clear()
    {
        output.clear();
        input.clear();
        parker.parks.clear();
        parker.parked.drainPermits();
        parker.permits.drainPermits();
        queueParked.drainPermits();
        queueRelease.drainPermits();
    }

    void release() throws Exception
    {
        queueRelease.release();
        parker.permits.release();
        fut.get();
    }

    Future<?> fut;
    void runBlocker(Semaphore waitFor) throws Exception
    {
        fut = ex.submit(new Runnable()
        {
            @Override
            public void run()
            {
                try
                {
                    cs.coalesce(input, output, 128);
                }
                catch (Exception ex)
                {
                    ex.printStackTrace();
                    throw new RuntimeException(ex);
                }
            }
        });
        waitFor.acquire();
    }

    @Test
    public void testFixedCoalescingStrategy() throws Exception
    {
        cs = newStrategy("FIXED", 200);

        //Test that when a stream of messages continues arriving it keeps sending until all are drained
        //It does this because it is already awake and sending messages
        add(42);
        add(42);
        cs.coalesce(input, output, 128);
        assertEquals( 2, output.size());
        assertNull(parker.parks.poll());

        clear();

        runBlocker(queueParked);
        add(42);
        add(42);
        add(42);
        release();
        assertEquals( 3, output.size());
        assertEquals(toNanos(200), parker.parks.poll().longValue());

    }

    @Test
    public void testFixedCoalescingStrategyEnough() throws Exception
    {
        int oldValue = DatabaseDescriptor.getOtcCoalescingEnoughCoalescedMessages();
        DatabaseDescriptor.setOtcCoalescingEnoughCoalescedMessages(1);
        try {
            cs = newStrategy("FIXED", 200);

            //Test that when a stream of messages continues arriving it keeps sending until all are drained
            //It does this because it is already awake and sending messages
            add(42);
            add(42);
            cs.coalesce(input, output, 128);
            assertEquals(2, output.size());
            assertNull(parker.parks.poll());

            clear();

            runBlocker(queueParked);
            add(42);
            add(42);
            add(42);
            release();
            assertEquals(3, output.size());
            assertNull(parker.parks.poll());
        }
        finally {
            DatabaseDescriptor.setOtcCoalescingEnoughCoalescedMessages(oldValue);
        }

    }

    @Test
    public void testDisabledCoalescingStrateg() throws Exception
    {
        cs = newStrategy("DISABLED", 200);

        add(42);
        add(42);
        cs.coalesce(input, output, 128);
        assertEquals( 2, output.size());
        assertNull(parker.parks.poll());

        clear();

        runBlocker(queueParked);
        add(42);
        add(42);
        release();
        assertEquals( 2, output.size());
        assertNull(parker.parks.poll());
    }

    @Test
    public void parkLoop() throws Exception
   {
        final Thread current = Thread.currentThread();
        final Semaphore helperReady = new Semaphore(0);
        final Semaphore helperGo = new Semaphore(0);

        new Thread()
        {
            @Override
            public void run()
            {
                try
                {
                    helperReady.release();
                    helperGo.acquire();
                    Thread.sleep(50);
                    LockSupport.unpark(current);
                }
                catch (Exception e)
                {
                    e.printStackTrace();
                    logger.error("Error", e);
                    System.exit(-1);
                }
            }
        }.start();

        long start = System.nanoTime();
        helperGo.release();

        long parkNanos = TimeUnit.MILLISECONDS.toNanos(500);

        CoalescingStrategies.parkLoop(parkNanos);
        long delta = System.nanoTime() - start;

        assertTrue (delta >= (parkNanos - (parkNanos / 16)));
    }

    @Test
    public void testMovingAverageCoalescingStrategy() throws Exception
    {
        cs = newStrategy("org.apache.cassandra.utils.CoalescingStrategies$MovingAverageCoalescingStrategy", 200);


        //Test that things can be pulled out of the queue if it is non-empty
        add(201);
        add(401);
        cs.coalesce(input, output, 128);
        assertEquals( 2, output.size());
        assertNull(parker.parks.poll());

        //Test that blocking on the queue results in everything drained
        clear();

        runBlocker(queueParked);
        add(601);
        add(801);
        release();
        assertEquals( 2, output.size());
        assertNull(parker.parks.poll());

        clear();

        //Test that out of order samples still flow
        runBlocker(queueParked);
        add(0);
        release();
        assertEquals( 1, output.size());
        assertNull(parker.parks.poll());

        clear();

        add(0);
        cs.coalesce(input, output, 128);
        assertEquals( 1, output.size());
        assertNull(parker.parks.poll());

        clear();

        //Test that too high an average doesn't coalesce
        for (long ii = 0; ii < 128; ii++)
            add(ii * 1000);
        cs.coalesce(input, output, 128);
        assertEquals(output.size(), 128);
        assertTrue(parker.parks.isEmpty());

        clear();

        runBlocker(queueParked);
        add(129 * 1000);
        release();
        assertTrue(parker.parks.isEmpty());

        clear();

        //Test that a low enough average coalesces
        cs = newStrategy("MOVINGAVERAGE", 200);
        for (long ii = 0; ii < 128; ii++)
            add(ii * 99);
        cs.coalesce(input, output, 128);
        assertEquals(output.size(), 128);
        assertTrue(parker.parks.isEmpty());

        clear();

        runBlocker(queueParked);
        add(128 * 99);
        add(129 * 99);
        release();
        assertEquals(2, output.size());
        assertEquals(toNanos(198), parker.parks.poll().longValue());
    }

    @Test
    public void testTimeHorizonStrategy() throws Exception
    {
        cs = newStrategy("TIMEHORIZON", 200);

        //Test that things can be pulled out of the queue if it is non-empty
        add(201);
        add(401);
        cs.coalesce(input, output, 128);
        assertEquals( 2, output.size());
        assertNull(parker.parks.poll());

        //Test that blocking on the queue results in everything drained
        clear();

        runBlocker(queueParked);
        add(601);
        add(801);
        release();
        assertEquals( 2, output.size());
        assertNull(parker.parks.poll());

        clear();

        //Test that out of order samples still flow
        runBlocker(queueParked);
        add(0);
        release();
        assertEquals( 1, output.size());
        assertNull(parker.parks.poll());

        clear();

        add(0);
        cs.coalesce(input, output, 128);
        assertEquals( 1, output.size());
        assertNull(parker.parks.poll());

        clear();

        //Test that too high an average doesn't coalesce
        for (long ii = 0; ii < 128; ii++)
            add(ii * 1000);
        cs.coalesce(input, output, 128);
        assertEquals(output.size(), 128);
        assertTrue(parker.parks.isEmpty());

        clear();

        runBlocker(queueParked);
        add(129 * 1000);
        release();
        assertTrue(parker.parks.isEmpty());

        clear();

        //Test that a low enough average coalesces
        cs = newStrategy("TIMEHORIZON", 200);
        primeTimeHorizonAverage(99);

        clear();

        runBlocker(queueParked);
        add(100000 * 99);
        queueRelease.release();
        parker.parked.acquire();
        add(100001 * 99);
        parker.permits.release();
        fut.get();
        assertEquals(2, output.size());
        assertEquals(toNanos(198), parker.parks.poll().longValue());

        clear();

        //Test far future
        add(Integer.MAX_VALUE);
        cs.coalesce(input, output, 128);
        assertEquals(1, output.size());
        assertTrue(parker.parks.isEmpty());

        clear();

        //Distant past
        add(0);
        cs.coalesce(input, output, 128);
        assertEquals(1, output.size());
        assertTrue(parker.parks.isEmpty());
    }

    void primeTimeHorizonAverage(long micros) throws Exception
    {
        for (long ii = 0; ii < 100000; ii++)
        {
            add(ii * micros);
            if (ii % 128 == 0)
            {
                cs.coalesce(input, output, 128);
                output.clear();
            }
        }
    }
}
