/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.distributedlog.common.concurrent;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.LongStream;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.distributedlog.util.OrderedScheduler;
import org.junit.Test;

/**
 * Unit Test for {@link FutureUtils}.
 */
public class TestFutureUtils {

    /**
     * Test Exception.
     */
    static class TestException extends IOException {
        private static final long serialVersionUID = -6256482498453846308L;

        public TestException() {
            super("test-exception");
        }
    }

    @Test
    public void testComplete() throws Exception {
        CompletableFuture<Long> future = FutureUtils.createFuture();
        FutureUtils.complete(future, 1024L);
        assertEquals(1024L, FutureUtils.result(future).longValue());
    }

    @Test(expected = TestException.class)
    public void testCompleteExceptionally() throws Exception {
        CompletableFuture<Long> future = FutureUtils.createFuture();
        FutureUtils.completeExceptionally(future, new TestException());
        FutureUtils.result(future);
    }

    @Test
    public void testWhenCompleteAsync() throws Exception {
        OrderedScheduler scheduler = OrderedScheduler.newBuilder()
            .name("test-when-complete-async")
            .corePoolSize(1)
            .build();
        AtomicLong resultHolder = new AtomicLong(0L);
        CountDownLatch latch = new CountDownLatch(1);
        CompletableFuture<Long> future = FutureUtils.createFuture();
        FutureUtils.whenCompleteAsync(
            future,
            (result, cause) -> {
                resultHolder.set(result);
                latch.countDown();
            },
            scheduler,
            new Object());
        FutureUtils.complete(future, 1234L);
        latch.await();
        assertEquals(1234L, resultHolder.get());
    }

    @Test
    public void testProxyToSuccess() throws Exception {
        CompletableFuture<Long> src = FutureUtils.createFuture();
        CompletableFuture<Long> target = FutureUtils.createFuture();
        FutureUtils.proxyTo(src, target);
        FutureUtils.complete(src, 10L);
        assertEquals(10L, FutureUtils.result(target).longValue());
    }

    @Test(expected = TestException.class)
    public void testProxyToFailure() throws Exception {
        CompletableFuture<Long> src = FutureUtils.createFuture();
        CompletableFuture<Long> target = FutureUtils.createFuture();
        FutureUtils.proxyTo(src, target);
        FutureUtils.completeExceptionally(src, new TestException());
        FutureUtils.result(target);
    }

    @Test
    public void testVoid() throws Exception {
        CompletableFuture<Void> voidFuture = FutureUtils.Void();
        assertTrue(voidFuture.isDone());
        assertFalse(voidFuture.isCompletedExceptionally());
        assertFalse(voidFuture.isCancelled());
    }

    @Test
    public void testCollectEmptyList() throws Exception {
        List<CompletableFuture<Integer>> futures = Lists.newArrayList();
        List<Integer> result = FutureUtils.result(FutureUtils.collect(futures));
        assertTrue(result.isEmpty());
    }

    @Test
    public void testCollectTenItems() throws Exception {
        List<CompletableFuture<Integer>> futures = Lists.newArrayList();
        List<Integer> expectedResults = Lists.newArrayList();
        for (int i = 0; i < 10; i++) {
            futures.add(FutureUtils.value(i));
            expectedResults.add(i);
        }
        List<Integer> results = FutureUtils.result(FutureUtils.collect(futures));
        assertEquals(expectedResults, results);
    }

    @Test(expected = TestException.class)
    public void testCollectFailures() throws Exception {
        List<CompletableFuture<Integer>> futures = Lists.newArrayList();
        List<Integer> expectedResults = Lists.newArrayList();
        for (int i = 0; i < 10; i++) {
            if (i == 9) {
                futures.add(FutureUtils.value(i));
            } else {
                futures.add(FutureUtils.exception(new TestException()));
            }
            expectedResults.add(i);
        }
        FutureUtils.result(FutureUtils.collect(futures));
    }

    @Test
    public void testWithinAlreadyDone() throws Exception {
        OrderedScheduler scheduler = mock(OrderedScheduler.class);
        CompletableFuture<Long> doneFuture = FutureUtils.value(1234L);
        CompletableFuture<Long> withinFuture = FutureUtils.within(
            doneFuture,
            10,
            TimeUnit.MILLISECONDS,
            new TestException(),
            scheduler,
            1234L);
        TimeUnit.MILLISECONDS.sleep(20);
        assertTrue(withinFuture.isDone());
        assertFalse(withinFuture.isCancelled());
        assertFalse(withinFuture.isCompletedExceptionally());
        verify(scheduler, times(0))
            .schedule(eq(1234L), isA(Runnable.class), eq(10), eq(TimeUnit.MILLISECONDS));
    }

    @Test
    public void testWithinZeroTimeout() throws Exception {
        OrderedScheduler scheduler = mock(OrderedScheduler.class);
        CompletableFuture<Long> newFuture = FutureUtils.createFuture();
        CompletableFuture<Long> withinFuture = FutureUtils.within(
            newFuture,
            0,
            TimeUnit.MILLISECONDS,
            new TestException(),
            scheduler,
            1234L);
        TimeUnit.MILLISECONDS.sleep(20);
        assertFalse(withinFuture.isDone());
        assertFalse(withinFuture.isCancelled());
        assertFalse(withinFuture.isCompletedExceptionally());
        verify(scheduler, times(0))
            .schedule(eq(1234L), isA(Runnable.class), eq(10), eq(TimeUnit.MILLISECONDS));
    }

    @Test
    public void testWithinCompleteBeforeTimeout() throws Exception {
        OrderedScheduler scheduler = mock(OrderedScheduler.class);
        ScheduledFuture<?> scheduledFuture = mock(ScheduledFuture.class);
        when(scheduler.schedule(anyObject(), any(Runnable.class), anyLong(), any(TimeUnit.class)))
            .thenAnswer(invocationOnMock -> scheduledFuture);
        CompletableFuture<Long> newFuture = FutureUtils.createFuture();
        CompletableFuture<Long> withinFuture = FutureUtils.within(
            newFuture,
            Long.MAX_VALUE,
            TimeUnit.MILLISECONDS,
            new TestException(),
            scheduler,
            1234L);
        assertFalse(withinFuture.isDone());
        assertFalse(withinFuture.isCancelled());
        assertFalse(withinFuture.isCompletedExceptionally());

        newFuture.complete(5678L);

        assertTrue(withinFuture.isDone());
        assertFalse(withinFuture.isCancelled());
        assertFalse(withinFuture.isCompletedExceptionally());
        assertEquals((Long) 5678L, FutureUtils.result(withinFuture));

        verify(scheduledFuture, times(1))
            .cancel(eq(true));
    }

    @Test
    public void testIgnoreSuccess() {
        CompletableFuture<Long> underlyFuture = FutureUtils.createFuture();
        CompletableFuture<Void> ignoredFuture = FutureUtils.ignore(underlyFuture);
        underlyFuture.complete(1234L);
        assertTrue(ignoredFuture.isDone());
        assertFalse(ignoredFuture.isCompletedExceptionally());
        assertFalse(ignoredFuture.isCancelled());
    }

    @Test
    public void testIgnoreFailure() {
        CompletableFuture<Long> underlyFuture = FutureUtils.createFuture();
        CompletableFuture<Void> ignoredFuture = FutureUtils.ignore(underlyFuture);
        underlyFuture.completeExceptionally(new TestException());
        assertTrue(ignoredFuture.isDone());
        assertFalse(ignoredFuture.isCompletedExceptionally());
        assertFalse(ignoredFuture.isCancelled());
    }

    @Test
    public void testEnsureSuccess() throws Exception {
        CountDownLatch ensureLatch = new CountDownLatch(1);
        CompletableFuture<Long> underlyFuture = FutureUtils.createFuture();
        CompletableFuture<Long> ensuredFuture = FutureUtils.ensure(underlyFuture, () -> {
            ensureLatch.countDown();
        });
        underlyFuture.complete(1234L);
        FutureUtils.result(ensuredFuture);
        assertTrue(ensuredFuture.isDone());
        assertFalse(ensuredFuture.isCompletedExceptionally());
        assertFalse(ensuredFuture.isCancelled());
        ensureLatch.await();
    }

    @Test
    public void testEnsureFailure() throws Exception {
        CountDownLatch ensureLatch = new CountDownLatch(1);
        CompletableFuture<Long> underlyFuture = FutureUtils.createFuture();
        CompletableFuture<Long> ensuredFuture = FutureUtils.ensure(underlyFuture, () -> {
            ensureLatch.countDown();
        });
        underlyFuture.completeExceptionally(new TestException());
        FutureUtils.result(FutureUtils.ignore(ensuredFuture));
        assertTrue(ensuredFuture.isDone());
        assertTrue(ensuredFuture.isCompletedExceptionally());
        assertFalse(ensuredFuture.isCancelled());
        ensureLatch.await();
    }

    @Test
    public void testRescueSuccess() throws Exception {
        CompletableFuture<Long> underlyFuture = FutureUtils.createFuture();
        Function<Throwable, CompletableFuture<Long>> rescueFuc = mock(Function.class);
        CompletableFuture<Long> rescuedFuture = FutureUtils.rescue(underlyFuture, rescueFuc);
        underlyFuture.complete(1234L);
        FutureUtils.result(rescuedFuture);
        assertTrue(rescuedFuture.isDone());
        assertFalse(rescuedFuture.isCompletedExceptionally());
        assertFalse(rescuedFuture.isCancelled());
        verify(rescueFuc, times(0)).apply(any(Throwable.class));
    }

    @Test
    public void testRescueFailure() throws Exception {
        CompletableFuture<Long> futureCompletedAtRescue = FutureUtils.value(3456L);
        CompletableFuture<Long> underlyFuture = FutureUtils.createFuture();
        CompletableFuture<Long> rescuedFuture = FutureUtils.rescue(underlyFuture, (cause) -> futureCompletedAtRescue);
        underlyFuture.completeExceptionally(new TestException());
        FutureUtils.result(rescuedFuture);
        assertTrue(rescuedFuture.isDone());
        assertFalse(rescuedFuture.isCompletedExceptionally());
        assertFalse(rescuedFuture.isCancelled());
        assertEquals((Long) 3456L, FutureUtils.result(rescuedFuture));
    }

    @Test
    public void testStatsSuccess() throws Exception {
        OpStatsLogger statsLogger = mock(OpStatsLogger.class);
        CompletableFuture<Long> underlyFuture = FutureUtils.createFuture();
        CompletableFuture<Long> statsFuture = FutureUtils.stats(
            underlyFuture,
            statsLogger,
            Stopwatch.createStarted());
        underlyFuture.complete(1234L);
        FutureUtils.result(statsFuture);
        verify(statsLogger, times(1)).registerSuccessfulEvent(anyLong());
    }

    @Test
    public void testStatsFailure() throws Exception {
        OpStatsLogger statsLogger = mock(OpStatsLogger.class);
        CompletableFuture<Long> underlyFuture = FutureUtils.createFuture();
        CompletableFuture<Long> statsFuture = FutureUtils.stats(
            underlyFuture,
            statsLogger,
            Stopwatch.createStarted());
        underlyFuture.completeExceptionally(new TestException());
        FutureUtils.result(FutureUtils.ignore(statsFuture));
        verify(statsLogger, times(1)).registerFailedEvent(anyLong());
    }

    @Test
    public void testProcessListSuccess() throws Exception {
        List<Long> longList = Lists.newArrayList(LongStream.range(0L, 10L).iterator());
        List<Long> expectedList = Lists.transform(
            longList,
            aLong -> 2 * aLong);
        Function<Long, CompletableFuture<Long>> sumFunc = value -> FutureUtils.value(2 * value);
        CompletableFuture<List<Long>> totalFuture = FutureUtils.processList(
            longList,
            sumFunc,
            null);
        assertEquals(expectedList, FutureUtils.result(totalFuture));
    }

    @Test
    public void testProcessEmptyList() throws Exception {
        List<Long> longList = Lists.newArrayList();
        List<Long> expectedList = Lists.transform(
            longList,
            aLong -> 2 * aLong);
        Function<Long, CompletableFuture<Long>> sumFunc = value -> FutureUtils.value(2 * value);
        CompletableFuture<List<Long>> totalFuture = FutureUtils.processList(
            longList,
            sumFunc,
            null);
        assertEquals(expectedList, FutureUtils.result(totalFuture));
    }

    @Test
    public void testProcessListFailures() throws Exception {
        List<Long> longList = Lists.newArrayList(LongStream.range(0L, 10L).iterator());
        AtomicLong total = new AtomicLong(0L);
        Function<Long, CompletableFuture<Long>> sumFunc = value -> {
            if (value < 5) {
                total.addAndGet(value);
                return FutureUtils.value(2 * value);
            } else {
                return FutureUtils.exception(new TestException());
            }
        };
        CompletableFuture<List<Long>> totalFuture = FutureUtils.processList(
            longList,
            sumFunc,
            null);
        try {
            FutureUtils.result(totalFuture);
            fail("Should fail with TestException");
        } catch (TestException te) {
            // as expected
        }
        assertEquals(10L, total.get());
    }

}
