/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.distributedlog;

import java.net.URI;
import java.util.ArrayList;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.distributedlog.api.AsyncLogReader;
import org.apache.distributedlog.api.DistributedLogManager;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.distributedlog.exceptions.LockCancelledException;
import org.apache.distributedlog.exceptions.LockingException;
import org.apache.distributedlog.exceptions.OwnershipAcquireFailedException;
import org.apache.distributedlog.impl.BKNamespaceDriver;
import org.apache.distributedlog.lock.LockClosedException;
import org.apache.distributedlog.api.namespace.NamespaceBuilder;
import org.apache.distributedlog.namespace.NamespaceDriver;
import org.apache.distributedlog.api.subscription.SubscriptionsStore;
import org.apache.distributedlog.common.concurrent.FutureEventListener;
import org.apache.distributedlog.util.Utils;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;

public class TestAsyncReaderLock extends TestDistributedLogBase {
    static final Logger LOG = LoggerFactory.getLogger(TestAsyncReaderLock.class);

    @Rule
    public TestName runtime = new TestName();

    void assertAcquiredFlagsSet(boolean[] acquiredFlags, int endIndex) {
        for (int i = 0; i < endIndex; i++) {
            assertTrue("reader " + i + " should have acquired lock", acquiredFlags[i]);
        }
        for (int i = endIndex; i < acquiredFlags.length; i++) {
            assertFalse("reader " + i + " should not have acquired lock", acquiredFlags[i]);
        }
    }

    @Test(timeout = 60000)
    public void testReaderLockIfLockPathDoesntExist() throws Exception {
        final String name = runtime.getMethodName();
        DistributedLogManager dlm = createNewDLM(conf, name);
        BKAsyncLogWriter writer = (BKAsyncLogWriter)(dlm.startAsyncLogSegmentNonPartitioned());
        writer.write(DLMTestUtil.getLogRecordInstance(1L));
        writer.closeAndComplete();

        CompletableFuture<AsyncLogReader> futureReader1 = dlm.getAsyncLogReaderWithLock(DLSN.InitialDLSN);
        BKAsyncLogReader reader1 = (BKAsyncLogReader) Utils.ioResult(futureReader1);
        LogRecordWithDLSN record = Utils.ioResult(reader1.readNext());
        assertEquals(1L, record.getTransactionId());
        assertEquals(0L, record.getSequenceId());
        DLMTestUtil.verifyLogRecord(record);

        String readLockPath = reader1.readHandler.getReadLockPath();
        Utils.close(reader1);

        // simulate a old stream created without readlock path
        NamespaceDriver driver = dlm.getNamespaceDriver();
        ((BKNamespaceDriver) driver).getWriterZKC().get().delete(readLockPath, -1);
        CompletableFuture<AsyncLogReader> futureReader2 = dlm.getAsyncLogReaderWithLock(DLSN.InitialDLSN);
        AsyncLogReader reader2 = Utils.ioResult(futureReader2);
        record = Utils.ioResult(reader2.readNext());
        assertEquals(1L, record.getTransactionId());
        assertEquals(0L, record.getSequenceId());
        DLMTestUtil.verifyLogRecord(record);
    }

    @Test(timeout = 60000)
    public void testReaderLockCloseInAcquireCallback() throws Exception {
        final String name = runtime.getMethodName();
        DistributedLogManager dlm = createNewDLM(conf, name);
        BKAsyncLogWriter writer = (BKAsyncLogWriter)(dlm.startAsyncLogSegmentNonPartitioned());
        writer.write(DLMTestUtil.getLogRecordInstance(1L));
        writer.closeAndComplete();

        final CountDownLatch latch = new CountDownLatch(1);

        CompletableFuture<AsyncLogReader> futureReader1 = dlm.getAsyncLogReaderWithLock(DLSN.InitialDLSN);
        futureReader1
            .thenCompose(
                reader -> reader.asyncClose()
                    .thenApply(result -> {
                        latch.countDown();
                        return null;
                    }));

        latch.await();
        dlm.close();
    }

    @Test(timeout = 60000)
    public void testReaderLockBackgroundReaderLockAcquire() throws Exception {
        final String name = runtime.getMethodName();
        DistributedLogManager dlm = createNewDLM(conf, name);
        BKAsyncLogWriter writer = (BKAsyncLogWriter)(dlm.startAsyncLogSegmentNonPartitioned());
        writer.write(DLMTestUtil.getLogRecordInstance(1L));
        writer.closeAndComplete();

        CompletableFuture<AsyncLogReader> futureReader1 = dlm.getAsyncLogReaderWithLock(DLSN.InitialDLSN);
        AsyncLogReader reader1 = Utils.ioResult(futureReader1);
        reader1.readNext();

        final CountDownLatch acquiredLatch = new CountDownLatch(1);
        final AtomicBoolean acquired = new AtomicBoolean(false);
        Thread acquireThread = new Thread(new Runnable() {
            @Override
            public void run() {
                CompletableFuture<AsyncLogReader> futureReader2 = null;
                DistributedLogManager dlm2 = null;
                try {
                    dlm2 = createNewDLM(conf, name);
                    futureReader2 = dlm2.getAsyncLogReaderWithLock(DLSN.InitialDLSN);
                    AsyncLogReader reader2 = Utils.ioResult(futureReader2);
                    acquired.set(true);
                    acquiredLatch.countDown();
                } catch (Exception ex) {
                    fail("shouldn't reach here");
                } finally {
                    try {
                        dlm2.close();
                    } catch (Exception ex) {
                        fail("shouldn't reach here");
                    }
                }
            }
        }, "acquire-thread");
        acquireThread.start();

        Thread.sleep(1000);
        assertEquals(false, acquired.get());
        Utils.close(reader1);

        acquiredLatch.await();
        assertEquals(true, acquired.get());
        dlm.close();
    }

    int countDefined(ArrayList<CompletableFuture<AsyncLogReader>> readers) {
        int done = 0;
        for (CompletableFuture<AsyncLogReader> futureReader : readers) {
            if (futureReader.isDone()) {
                done++;
            }
        }
        return done;
    }

    @Test(timeout = 60000)
    public void testReaderLockManyLocks() throws Exception {
        String name = runtime.getMethodName();
        DistributedLogManager dlm = createNewDLM(conf, name);
        BKAsyncLogWriter writer = (BKAsyncLogWriter)(dlm.startAsyncLogSegmentNonPartitioned());
        writer.write(DLMTestUtil.getLogRecordInstance(1L));
        writer.write(DLMTestUtil.getLogRecordInstance(2L));
        writer.closeAndComplete();

        int count = 5;
        final CountDownLatch acquiredLatch = new CountDownLatch(count);
        final ArrayList<CompletableFuture<AsyncLogReader>> readers = new ArrayList<CompletableFuture<AsyncLogReader>>(count);
        for (int i = 0; i < count; i++) {
            readers.add(null);
        }
        final DistributedLogManager[] dlms = new DistributedLogManager[count];
        for (int i = 0; i < count; i++) {
            dlms[i] = createNewDLM(conf, name);
            readers.set(i, dlms[i].getAsyncLogReaderWithLock(DLSN.InitialDLSN));
            readers.get(i).whenComplete(new FutureEventListener<AsyncLogReader>() {
                @Override
                public void onSuccess(AsyncLogReader reader) {
                    acquiredLatch.countDown();
                    reader.asyncClose();
                }
                @Override
                public void onFailure(Throwable cause) {
                    fail("acquire shouldnt have failed");
                }
            });
        }

        acquiredLatch.await();
        for (int i = 0; i < count; i++) {
            dlms[i].close();
        }

        dlm.close();
    }

    @Test(timeout = 60000)
    public void testReaderLockDlmClosed() throws Exception {
        String name = runtime.getMethodName();
        DistributedLogManager dlm0 = createNewDLM(conf, name);
        BKAsyncLogWriter writer = (BKAsyncLogWriter)(dlm0.startAsyncLogSegmentNonPartitioned());
        writer.write(DLMTestUtil.getLogRecordInstance(1L));
        writer.write(DLMTestUtil.getLogRecordInstance(2L));
        writer.closeAndComplete();

        DistributedLogManager dlm1 = createNewDLM(conf, name);
        CompletableFuture<AsyncLogReader> futureReader1 = dlm1.getAsyncLogReaderWithLock(DLSN.InitialDLSN);
        AsyncLogReader reader1 = Utils.ioResult(futureReader1);

        BKDistributedLogManager dlm2 = (BKDistributedLogManager) createNewDLM(conf, name);
        CompletableFuture<AsyncLogReader> futureReader2 = dlm2.getAsyncLogReaderWithLock(DLSN.InitialDLSN);

        dlm2.close();
        try {
            Utils.ioResult(futureReader2);
            fail("should have thrown exception!");
        } catch (CancellationException ce) {
        } catch (LockClosedException ex) {
        } catch (LockCancelledException ex) {
        }

        Utils.close(reader1);
        dlm0.close();
        dlm1.close();
    }

    @Test(timeout = 60000)
    public void testReaderLockSessionExpires() throws Exception {
        String name = runtime.getMethodName();
        URI uri = createDLMURI("/" + name);
        ensureURICreated(uri);
        Namespace ns0 = NamespaceBuilder.newBuilder()
                .conf(conf)
                .uri(uri)
                .build();
        DistributedLogManager dlm0 = ns0.openLog(name);
        BKAsyncLogWriter writer = (BKAsyncLogWriter)(dlm0.startAsyncLogSegmentNonPartitioned());
        writer.write(DLMTestUtil.getLogRecordInstance(1L));
        writer.write(DLMTestUtil.getLogRecordInstance(2L));
        writer.closeAndComplete();

        Namespace ns1 = NamespaceBuilder.newBuilder()
                .conf(conf)
                .uri(uri)
                .build();
        DistributedLogManager dlm1 = ns1.openLog(name);
        CompletableFuture<AsyncLogReader> futureReader1 = dlm1.getAsyncLogReaderWithLock(DLSN.InitialDLSN);
        AsyncLogReader reader1 = Utils.ioResult(futureReader1);
        ZooKeeperClientUtils.expireSession(((BKNamespaceDriver) ns1.getNamespaceDriver()).getWriterZKC(), zkServers, 1000);

        // The result of expireSession is somewhat non-deterministic with this lock.
        // It may fail with LockingException or it may succesfully reacquire, so for
        // the moment rather than make it deterministic we accept either result.
        boolean success = false;
        try {
            Utils.ioResult(reader1.readNext());
            success = true;
        } catch (LockingException ex) {
        }
        if (success) {
            Utils.ioResult(reader1.readNext());
        }

        Utils.close(reader1);
        dlm0.close();
        ns0.close();
        dlm1.close();
        ns1.close();
    }

    @Test(timeout = 60000)
    public void testReaderLockFutureCancelledWhileWaiting() throws Exception {
        String name = runtime.getMethodName();
        DistributedLogManager dlm0 = createNewDLM(conf, name);
        BKAsyncLogWriter writer = (BKAsyncLogWriter)(dlm0.startAsyncLogSegmentNonPartitioned());
        writer.write(DLMTestUtil.getLogRecordInstance(1L));
        writer.write(DLMTestUtil.getLogRecordInstance(2L));
        writer.closeAndComplete();

        DistributedLogManager dlm1 = createNewDLM(conf, name);
        CompletableFuture<AsyncLogReader> futureReader1 = dlm1.getAsyncLogReaderWithLock(DLSN.InitialDLSN);
        AsyncLogReader reader1 = Utils.ioResult(futureReader1);

        DistributedLogManager dlm2 = createNewDLM(conf, name);
        CompletableFuture<AsyncLogReader> futureReader2 = dlm2.getAsyncLogReaderWithLock(DLSN.InitialDLSN);
        try {
            futureReader2.cancel(true);
            Utils.ioResult(futureReader2);
            fail("Should fail getting log reader as it is cancelled");
        } catch (CancellationException ce) {
        } catch (LockClosedException ex) {
        } catch (LockCancelledException ex) {
        } catch (OwnershipAcquireFailedException oafe) {
        }

        futureReader2 = dlm2.getAsyncLogReaderWithLock(DLSN.InitialDLSN);
        Utils.close(reader1);

        Utils.ioResult(futureReader2);

        dlm0.close();
        dlm1.close();
        dlm2.close();
    }

    @Test(timeout = 60000)
    public void testReaderLockFutureCancelledWhileLocked() throws Exception {
        String name = runtime.getMethodName();
        DistributedLogManager dlm0 = createNewDLM(conf, name);
        BKAsyncLogWriter writer = (BKAsyncLogWriter)(dlm0.startAsyncLogSegmentNonPartitioned());
        writer.write(DLMTestUtil.getLogRecordInstance(1L));
        writer.write(DLMTestUtil.getLogRecordInstance(2L));
        writer.closeAndComplete();

        DistributedLogManager dlm1 = createNewDLM(conf, name);
        CompletableFuture<AsyncLogReader> futureReader1 = dlm1.getAsyncLogReaderWithLock(DLSN.InitialDLSN);

        // Must not throw or cancel or do anything bad, future already completed.
        Utils.ioResult(futureReader1);
        futureReader1.cancel(true);
        AsyncLogReader reader1 = Utils.ioResult(futureReader1);
        Utils.ioResult(reader1.readNext());

        dlm0.close();
        dlm1.close();
    }

    @Test(timeout = 60000)
    public void testReaderLockSharedDlmDoesNotConflict() throws Exception {
        String name = runtime.getMethodName();
        DistributedLogManager dlm0 = createNewDLM(conf, name);
        BKAsyncLogWriter writer = (BKAsyncLogWriter)(dlm0.startAsyncLogSegmentNonPartitioned());
        writer.write(DLMTestUtil.getLogRecordInstance(1L));
        writer.write(DLMTestUtil.getLogRecordInstance(2L));
        writer.closeAndComplete();

        DistributedLogManager dlm1 = createNewDLM(conf, name);
        CompletableFuture<AsyncLogReader> futureReader1 = dlm1.getAsyncLogReaderWithLock(DLSN.InitialDLSN);
        CompletableFuture<AsyncLogReader> futureReader2 = dlm1.getAsyncLogReaderWithLock(DLSN.InitialDLSN);

        // Both use the same client id, so there's no lock conflict. Not necessarily ideal, but how the
        // system currently works.
        Utils.ioResult(futureReader1);
        Utils.ioResult(futureReader2);

        dlm0.close();
        dlm1.close();
    }

    static class ReadRecordsListener implements FutureEventListener<AsyncLogReader> {

        final AtomicReference<DLSN> currentDLSN;
        final String name;
        final ExecutorService executorService;

        final CountDownLatch latch = new CountDownLatch(1);
        boolean failed = false;

        public ReadRecordsListener(AtomicReference<DLSN> currentDLSN,
                                   String name,
                                   ExecutorService executorService) {
            this.currentDLSN = currentDLSN;
            this.name = name;
            this.executorService = executorService;
        }
        public CountDownLatch getLatch() {
            return latch;
        }
        public boolean failed() {
            return failed;
        }
        public boolean done() {
            return latch.getCount() == 0;
        }

        @Override
        public void onSuccess(final AsyncLogReader reader) {
            LOG.info("Reader {} is ready to read entries", name);
            executorService.submit(new Runnable() {
                @Override
                public void run() {
                    readEntries(reader);
                }
            });
        }

        private void readEntries(AsyncLogReader reader) {
            try {
                for (int i = 0; i < 300; i++) {
                    LogRecordWithDLSN record = Utils.ioResult(reader.readNext());
                    currentDLSN.set(record.getDlsn());
                }
            } catch (Exception ex) {
                failed = true;
            } finally {
                latch.countDown();
            }
        }

        @Override
        public void onFailure(Throwable cause) {
            LOG.error("{} failed to open reader", name, cause);
            failed = true;
            latch.countDown();
        }
    }

    @Test(timeout = 60000)
    public void testReaderLockMultiReadersScenario() throws Exception {
        final String name = runtime.getMethodName();
        URI uri = createDLMURI("/" + name);
        ensureURICreated(uri);

        DistributedLogConfiguration localConf = new DistributedLogConfiguration();
        localConf.addConfiguration(conf);
        localConf.setImmediateFlushEnabled(false);
        localConf.setPeriodicFlushFrequencyMilliSeconds(60 * 1000);
        localConf.setOutputBufferSize(0);
        // Otherwise, we won't be able to run scheduled threads for readahead when we're in a callback.
        localConf.setNumWorkerThreads(2);
        localConf.setLockTimeout(Long.MAX_VALUE);

        Namespace namespace = NamespaceBuilder.newBuilder()
                .conf(localConf).uri(uri).clientId("main").build();

        DistributedLogManager dlm0 = namespace.openLog(name);
        DLMTestUtil.generateCompletedLogSegments(dlm0, localConf, 9, 100);
        dlm0.close();

        int recordCount = 0;
        AtomicReference<DLSN> currentDLSN = new AtomicReference<DLSN>(DLSN.InitialDLSN);

        String clientId1 = "reader1";
        Namespace namespace1 = NamespaceBuilder.newBuilder()
                .conf(localConf).uri(uri).clientId(clientId1).build();
        DistributedLogManager dlm1 = namespace1.openLog(name);
        String clientId2 = "reader2";
        Namespace namespace2 = NamespaceBuilder.newBuilder()
                .conf(localConf).uri(uri).clientId(clientId2).build();
        DistributedLogManager dlm2 = namespace2.openLog(name);
        String clientId3 = "reader3";
        Namespace namespace3 = NamespaceBuilder.newBuilder()
                .conf(localConf).uri(uri).clientId(clientId3).build();
        DistributedLogManager dlm3 = namespace3.openLog(name);

        LOG.info("{} is opening reader on stream {}", clientId1, name);
        CompletableFuture<AsyncLogReader> futureReader1 = dlm1.getAsyncLogReaderWithLock(DLSN.InitialDLSN);
        AsyncLogReader reader1 = Utils.ioResult(futureReader1);
        LOG.info("{} opened reader on stream {}", clientId1, name);

        LOG.info("{} is opening reader on stream {}", clientId2, name);
        CompletableFuture<AsyncLogReader> futureReader2 = dlm2.getAsyncLogReaderWithLock(DLSN.InitialDLSN);
        LOG.info("{} is opening reader on stream {}", clientId3, name);
        CompletableFuture<AsyncLogReader> futureReader3 = dlm3.getAsyncLogReaderWithLock(DLSN.InitialDLSN);

        ExecutorService executorService = Executors.newCachedThreadPool();

        ReadRecordsListener listener2 =
                new ReadRecordsListener(currentDLSN, clientId2, executorService);
        ReadRecordsListener listener3 =
                new ReadRecordsListener(currentDLSN, clientId3, executorService);
        futureReader2.whenComplete(listener2);
        futureReader3.whenComplete(listener3);

        // Get reader1 and start reading.
        for ( ; recordCount < 200; recordCount++) {
            LogRecordWithDLSN record = Utils.ioResult(reader1.readNext());
            currentDLSN.set(record.getDlsn());
        }

        // Take a break, reader2 decides to stop waiting and cancels.
        Thread.sleep(1000);
        assertFalse(listener2.done());
        futureReader2.cancel(true);
        listener2.getLatch().await();
        assertTrue(listener2.done());
        assertTrue(listener2.failed());

        // Reader1 starts reading again.
        for (; recordCount < 300; recordCount++) {
            LogRecordWithDLSN record = Utils.ioResult(reader1.readNext());
            currentDLSN.set(record.getDlsn());
        }

        // Reader1 is done, someone else can take over. Since reader2 was
        // aborted, reader3 should take its place.
        assertFalse(listener3.done());
        Utils.close(reader1);
        listener3.getLatch().await();
        assertTrue(listener3.done());
        assertFalse(listener3.failed());

        assertEquals(new DLSN(3, 99, 0), currentDLSN.get());

        try {
            Utils.ioResult(futureReader2);
        } catch (Exception ex) {
            // Can't get this one to close it--the dlm will take care of it.
        }

        Utils.close(Utils.ioResult(futureReader3));

        dlm1.close();
        namespace1.close();
        dlm2.close();
        namespace2.close();
        dlm3.close();
        namespace3.close();

        executorService.shutdown();
    }

    @Test(timeout = 60000)
    public void testAsyncReadWithSubscriberId() throws Exception {
        String name = "distrlog-asyncread-with-sbuscriber-id";
        String subscriberId = "asyncreader";
        DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
        confLocal.addConfiguration(conf);
        confLocal.setOutputBufferSize(0);
        confLocal.setImmediateFlushEnabled(true);

        DistributedLogManager dlm = createNewDLM(confLocal, name);

        DLSN readDLSN = DLSN.InitialDLSN;

        int txid = 1;
        for (long i = 0; i < 3; i++) {
            BKAsyncLogWriter writer = (BKAsyncLogWriter) dlm.startAsyncLogSegmentNonPartitioned();
            for (long j = 1; j <= 10; j++) {
                DLSN dlsn = Utils.ioResult(writer.write(DLMTestUtil.getEmptyLogRecordInstance(txid++)));
                if (i == 1 && j == 1L) {
                    readDLSN = dlsn;
                }
            }
            writer.closeAndComplete();
        }

        BKAsyncLogReader reader0 = (BKAsyncLogReader) Utils.ioResult(dlm.getAsyncLogReaderWithLock(subscriberId));
        assertEquals(DLSN.NonInclusiveLowerBound, reader0.getStartDLSN());
        long numTxns = 0;
        LogRecordWithDLSN record = Utils.ioResult(reader0.readNext());
        while (null != record) {
            DLMTestUtil.verifyEmptyLogRecord(record);
            ++numTxns;
            assertEquals(numTxns, record.getTransactionId());
            assertEquals(record.getTransactionId() - 1, record.getSequenceId());

            if (txid - 1 == numTxns) {
                break;
            }
            record = Utils.ioResult(reader0.readNext());
        }
        assertEquals(txid - 1, numTxns);
        Utils.close(reader0);

        SubscriptionsStore subscriptionsStore = dlm.getSubscriptionsStore();
        Utils.ioResult(subscriptionsStore.advanceCommitPosition(subscriberId, readDLSN));
        BKAsyncLogReader reader1 = (BKAsyncLogReader) Utils.ioResult(dlm.getAsyncLogReaderWithLock(subscriberId));
        assertEquals(readDLSN, reader1.getStartDLSN());
        numTxns = 0;
        long startTxID =  10L;
        record = Utils.ioResult(reader1.readNext());
        while (null != record) {
            DLMTestUtil.verifyEmptyLogRecord(record);
            ++numTxns;
            ++startTxID;
            assertEquals(startTxID, record.getTransactionId());
            assertEquals(record.getTransactionId() - 1L, record.getSequenceId());

            if (startTxID == txid - 1) {
                break;
            }
            record = Utils.ioResult(reader1.readNext());
        }
        assertEquals(txid - 1, startTxID);
        assertEquals(20, numTxns);
        Utils.close(reader1);

        dlm.close();
    }
}
