/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */

package org.apache.bookkeeper.meta;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import com.google.common.collect.Lists;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Queue;
import java.util.Random;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.bookie.CheckpointSource;
import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
import org.apache.bookkeeper.bookie.Checkpointer;
import org.apache.bookkeeper.bookie.CompactableLedgerStorage;
import org.apache.bookkeeper.bookie.EntryLocation;
import org.apache.bookkeeper.bookie.EntryLogger;
import org.apache.bookkeeper.bookie.GarbageCollector;
import org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification;
import org.apache.bookkeeper.bookie.LedgerDirsManager;
import org.apache.bookkeeper.bookie.ScanAndCompareGarbageCollector;
import org.apache.bookkeeper.bookie.StateManager;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.LedgerMetadataBuilder;
import org.apache.bookkeeper.client.api.DigestType;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.common.util.Watcher;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.LedgerManager.LedgerRange;
import org.apache.bookkeeper.meta.LedgerManager.LedgerRangeIterator;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.test.TestStatsProvider;
import org.apache.bookkeeper.versioning.Version;
import org.apache.bookkeeper.versioning.Versioned;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Test garbage collection ledgers in ledger manager.
 */
public class GcLedgersTest extends LedgerManagerTestCase {
    static final Logger LOG = LoggerFactory.getLogger(GcLedgersTest.class);

    public GcLedgersTest(Class<? extends LedgerManagerFactory> lmFactoryCls) {
        super(lmFactoryCls);
    }

    /**
     * Create ledgers.
     */
    private void createLedgers(int numLedgers, final Set<Long> createdLedgers) throws IOException {
        final AtomicInteger expected = new AtomicInteger(numLedgers);
        List<BookieSocketAddress> ensemble = Lists.newArrayList(new BookieSocketAddress("192.0.2.1", 1234));

        for (int i = 0; i < numLedgers; i++) {
            getLedgerIdGenerator().generateLedgerId(new GenericCallback<Long>() {
                @Override
                public void operationComplete(int rc, final Long ledgerId) {
                    if (BKException.Code.OK != rc) {
                        synchronized (expected) {
                            int num = expected.decrementAndGet();
                            if (num == 0) {
                                expected.notify();
                            }
                        }
                        return;
                    }

                    LedgerMetadata md = LedgerMetadataBuilder.create()
                        .withDigestType(DigestType.CRC32C)
                        .withPassword(new byte[0])
                        .withEnsembleSize(1).withWriteQuorumSize(1).withAckQuorumSize(1)
                        .newEnsembleEntry(0L, ensemble).build();

                    getLedgerManager().createLedgerMetadata(ledgerId, md)
                        .whenComplete((result, exception) -> {
                                if (exception == null) {
                                    activeLedgers.put(ledgerId, true);
                                    createdLedgers.add(ledgerId);
                                }
                                synchronized (expected) {
                                    int num = expected.decrementAndGet();
                                    if (num == 0) {
                                        expected.notify();
                                    }
                                }
                            });
                }
            });
        }
        synchronized (expected) {
            try {
                while (expected.get() > 0) {
                    expected.wait(100);
                }
            } catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
            }
        }
    }

    private void removeLedger(long ledgerId) throws Exception {
        getLedgerManager().removeLedgerMetadata(ledgerId, Version.ANY).get(10, TimeUnit.SECONDS);
    }

    @Test
    public void testGarbageCollectLedgers() throws Exception {
        int numLedgers = 100;
        int numRemovedLedgers = 10;

        final Set<Long> createdLedgers = new HashSet<Long>();
        final Set<Long> removedLedgers = new HashSet<Long>();

        // create 100 ledgers
        createLedgers(numLedgers, createdLedgers);

        Random r = new Random(System.currentTimeMillis());
        final List<Long> tmpList = new ArrayList<Long>();
        tmpList.addAll(createdLedgers);
        Collections.shuffle(tmpList, r);
        // random remove several ledgers
        for (int i = 0; i < numRemovedLedgers; i++) {
            long ledgerId = tmpList.get(i);
            getLedgerManager().removeLedgerMetadata(ledgerId, Version.ANY).get();
            removedLedgers.add(ledgerId);
            createdLedgers.remove(ledgerId);
        }
        final CountDownLatch inGcProgress = new CountDownLatch(1);
        final CountDownLatch createLatch = new CountDownLatch(1);
        final CountDownLatch endLatch = new CountDownLatch(2);
        final CompactableLedgerStorage mockLedgerStorage = new MockLedgerStorage();
        TestStatsProvider stats = new TestStatsProvider();
        final ScanAndCompareGarbageCollector garbageCollector = new ScanAndCompareGarbageCollector(getLedgerManager(),
                mockLedgerStorage, baseConf, stats.getStatsLogger("gc"));
        Thread gcThread = new Thread() {
            @Override
            public void run() {
                garbageCollector.gc(new GarbageCollector.GarbageCleaner() {
                    boolean paused = false;

                    @Override
                    public void clean(long ledgerId) {
                        try {
                            mockLedgerStorage.deleteLedger(ledgerId);
                        } catch (IOException e) {
                            e.printStackTrace();
                            return;
                        }

                        if (!paused) {
                            inGcProgress.countDown();
                            try {
                                createLatch.await();
                            } catch (InterruptedException ie) {
                                Thread.currentThread().interrupt();
                            }
                            paused = true;
                        }
                        LOG.info("Garbage Collected ledger {}", ledgerId);
                    }
                });
                LOG.info("Gc Thread quits.");
                endLatch.countDown();
            }
        };

        Thread createThread = new Thread() {
            @Override
            public void run() {
                try {
                    inGcProgress.await();
                    // create 10 more ledgers
                    createLedgers(10, createdLedgers);
                    LOG.info("Finished creating 10 more ledgers.");
                    createLatch.countDown();
                } catch (Exception e) {
                }
                LOG.info("Create Thread quits.");
                endLatch.countDown();
            }
        };

        createThread.start();
        gcThread.start();

        endLatch.await();

        // test ledgers
        for (Long ledger : removedLedgers) {
            assertFalse(activeLedgers.containsKey(ledger));
        }
        for (Long ledger : createdLedgers) {
            assertTrue(activeLedgers.containsKey(ledger));
        }
        assertTrue(
                "Wrong ACTIVE_LEDGER_COUNT",
                garbageCollector.getNumActiveLedgers() == createdLedgers.size());
    }

    @Test
    public void testGcLedgersOutsideRange() throws Exception {
        final SortedSet<Long> createdLedgers = Collections.synchronizedSortedSet(new TreeSet<Long>());
        final Queue<Long> cleaned = new LinkedList<Long>();
        int numLedgers = 100;

        createLedgers(numLedgers, createdLedgers);

        MockLedgerStorage mockLedgerStorage = new MockLedgerStorage();
        TestStatsProvider stats = new TestStatsProvider();
        final ScanAndCompareGarbageCollector garbageCollector = new ScanAndCompareGarbageCollector(getLedgerManager(),
                mockLedgerStorage, baseConf, stats.getStatsLogger("gc"));
        GarbageCollector.GarbageCleaner cleaner = new GarbageCollector.GarbageCleaner() {
                @Override
                public void clean(long ledgerId) {
                    LOG.info("Cleaned {}", ledgerId);
                    cleaned.add(ledgerId);
                    try {
                        mockLedgerStorage.deleteLedger(ledgerId);
                    } catch (IOException e) {
                        e.printStackTrace();
                        fail("Exception from deleteLedger");
                    }
                }
            };

        garbageCollector.gc(cleaner);
        assertNull("Should have cleaned nothing", cleaned.poll());
        assertTrue(
                "Wrong ACTIVE_LEDGER_COUNT",
                garbageCollector.getNumActiveLedgers() == numLedgers);

        long last = createdLedgers.last();
        removeLedger(last);
        garbageCollector.gc(cleaner);
        assertNotNull("Should have cleaned something", cleaned.peek());
        assertEquals("Should have cleaned last ledger" + last, (long) last, (long) cleaned.poll());

        long first = createdLedgers.first();
        removeLedger(first);
        garbageCollector.gc(cleaner);
        assertNotNull("Should have cleaned something", cleaned.peek());
        assertEquals("Should have cleaned first ledger" + first, (long) first, (long) cleaned.poll());

        garbageCollector.gc(cleaner);
        assertTrue(
                "Wrong ACTIVE_LEDGER_COUNT",
                garbageCollector.getNumActiveLedgers() == (numLedgers - 2));

    }

    @Test
    public void testGcLedgersNotLast() throws Exception {
        final SortedSet<Long> createdLedgers = Collections.synchronizedSortedSet(new TreeSet<Long>());
        final List<Long> cleaned = new ArrayList<Long>();

        // Create enough ledgers to span over 4 ranges in the hierarchical ledger manager implementation
        final int numLedgers = 30001;

        createLedgers(numLedgers, createdLedgers);

        final GarbageCollector garbageCollector = new ScanAndCompareGarbageCollector(getLedgerManager(),
                new MockLedgerStorage(), baseConf, NullStatsLogger.INSTANCE);
        GarbageCollector.GarbageCleaner cleaner = new GarbageCollector.GarbageCleaner() {
                @Override
                public void clean(long ledgerId) {
                    LOG.info("Cleaned {}", ledgerId);
                    cleaned.add(ledgerId);
                }
            };

        SortedSet<Long> scannedLedgers = new TreeSet<Long>();
        LedgerRangeIterator iterator = getLedgerManager().getLedgerRanges();
        while (iterator.hasNext()) {
            LedgerRange ledgerRange = iterator.next();
            scannedLedgers.addAll(ledgerRange.getLedgers());
        }

        assertEquals(createdLedgers, scannedLedgers);

        garbageCollector.gc(cleaner);
        assertTrue("Should have cleaned nothing", cleaned.isEmpty());

        long first = createdLedgers.first();
        removeLedger(first);
        garbageCollector.gc(cleaner);
        assertEquals("Should have cleaned something", 1, cleaned.size());
        assertEquals("Should have cleaned first ledger" + first, (long) first, (long) cleaned.get(0));
    }

    /*
     * in this scenario no ledger is created, so ledgeriterator's hasNext call would return false and next would be
     * null. GarbageCollector.gc is expected to behave normally
     */
    @Test
    public void testGcLedgersWithNoLedgers() throws Exception {
        final SortedSet<Long> createdLedgers = Collections.synchronizedSortedSet(new TreeSet<Long>());
        final List<Long> cleaned = new ArrayList<Long>();

        // no ledger created

        final GarbageCollector garbageCollector = new ScanAndCompareGarbageCollector(getLedgerManager(),
                new MockLedgerStorage(), baseConf, NullStatsLogger.INSTANCE);
        AtomicBoolean cleanerCalled = new AtomicBoolean(false);

        GarbageCollector.GarbageCleaner cleaner = new GarbageCollector.GarbageCleaner() {
            @Override
            public void clean(long ledgerId) {
                LOG.info("Cleaned {}", ledgerId);
                cleanerCalled.set(true);
            }
        };

        validateLedgerRangeIterator(createdLedgers);

        garbageCollector.gc(cleaner);
        assertFalse("Should have cleaned nothing, since no ledger is created", cleanerCalled.get());
    }

    // in this scenario all the created ledgers are in one single ledger range.
    @Test
    public void testGcLedgersWithLedgersInSameLedgerRange() throws Exception {
        baseConf.setVerifyMetadataOnGc(true);
        final SortedSet<Long> createdLedgers = Collections.synchronizedSortedSet(new TreeSet<Long>());
        final SortedSet<Long> cleaned = Collections.synchronizedSortedSet(new TreeSet<Long>());

        // Create few ledgers which span over just one ledger range in the hierarchical ledger manager implementation
        final int numLedgers = 5;

        createLedgers(numLedgers, createdLedgers);

        final GarbageCollector garbageCollector = new ScanAndCompareGarbageCollector(getLedgerManager(),
                new MockLedgerStorage(), baseConf, NullStatsLogger.INSTANCE);
        GarbageCollector.GarbageCleaner cleaner = new GarbageCollector.GarbageCleaner() {
            @Override
            public void clean(long ledgerId) {
                LOG.info("Cleaned {}", ledgerId);
                cleaned.add(ledgerId);
            }
        };

        validateLedgerRangeIterator(createdLedgers);

        garbageCollector.gc(cleaner);
        assertTrue("Should have cleaned nothing", cleaned.isEmpty());

        for (long ledgerId : createdLedgers) {
            removeLedger(ledgerId);
        }

        garbageCollector.gc(cleaner);
        assertEquals("Should have cleaned all the created ledgers", createdLedgers, cleaned);
    }

    /*
     * in this test scenario no created ledger is deleted, but ledgeriterator is screwed up and returns hasNext to be
     * false and next to be null. So even in this case it is expected not to clean any ledger's data.
     *
     * This testcase is needed for validating fix of bug - W-4292747.
     *
     * ScanAndCompareGarbageCollector/GC should clean data of ledger only if both the LedgerManager.getLedgerRanges says
     * that ledger is not existing and also ledgerManager.readLedgerMetadata fails with error
     * NoSuchLedgerExistsException.
     *
     */
    @Test
    public void testGcLedgersIfLedgerManagerIteratorFails() throws Exception {
        baseConf.setVerifyMetadataOnGc(true);
        final SortedSet<Long> createdLedgers = Collections.synchronizedSortedSet(new TreeSet<Long>());
        final SortedSet<Long> cleaned = Collections.synchronizedSortedSet(new TreeSet<Long>());

        // Create few ledgers
        final int numLedgers = 5;

        createLedgers(numLedgers, createdLedgers);

        LedgerManager mockLedgerManager = new CleanupLedgerManager(getLedgerManager()) {
            @Override
            public LedgerRangeIterator getLedgerRanges() {
                return new LedgerRangeIterator() {
                    @Override
                    public LedgerRange next() throws IOException {
                        return null;
                    }

                    @Override
                    public boolean hasNext() throws IOException {
                        return false;
                    }
                };
            }
        };

        final GarbageCollector garbageCollector = new ScanAndCompareGarbageCollector(mockLedgerManager,
                new MockLedgerStorage(), baseConf, NullStatsLogger.INSTANCE);
        GarbageCollector.GarbageCleaner cleaner = new GarbageCollector.GarbageCleaner() {
            @Override
            public void clean(long ledgerId) {
                LOG.info("Cleaned {}", ledgerId);
                cleaned.add(ledgerId);
            }
        };

        validateLedgerRangeIterator(createdLedgers);

        garbageCollector.gc(cleaner);
        assertTrue("Should have cleaned nothing", cleaned.isEmpty());
    }

    /*
     * In this test scenario no ledger is deleted, but LedgerManager.readLedgerMetadata says there is NoSuchLedger. So
     * even in that case, GarbageCollector.gc shouldn't delete ledgers data.
     *
     * Consider the possible scenario - when the LedgerIterator is created that ledger is not deleted, so as per
     * LedgerIterator that is live ledger. But right after the LedgerIterator creation that ledger is deleted, so
     * readLedgerMetadata call would return NoSuchLedger. In this testscenario we are validating that as per Iterator if
     * that ledger is alive though currently that ledger is deleted, we should not clean data of that ledger.
     *
     * ScanAndCompareGarbageCollector/GC should clean data of ledger only if both the LedgerManager.getLedgerRanges says
     * that ledger is not existing and also ledgerManager.readLedgerMetadata fails with error
     * NoSuchLedgerExistsException.
     *
     */
    @Test
    public void testGcLedgersIfReadLedgerMetadataSaysNoSuchLedger() throws Exception {
        final SortedSet<Long> createdLedgers = Collections.synchronizedSortedSet(new TreeSet<Long>());
        final SortedSet<Long> cleaned = Collections.synchronizedSortedSet(new TreeSet<Long>());

        // Create few ledgers
        final int numLedgers = 5;

        createLedgers(numLedgers, createdLedgers);

        CompletableFuture<Versioned<LedgerMetadata>> errorFuture = new CompletableFuture<>();
        errorFuture.completeExceptionally(new BKException.BKNoSuchLedgerExistsException());
        LedgerManager mockLedgerManager = new CleanupLedgerManager(getLedgerManager()) {
            @Override
            public CompletableFuture<Versioned<LedgerMetadata>> readLedgerMetadata(long ledgerId) {
                return errorFuture;
            }
        };

        final GarbageCollector garbageCollector = new ScanAndCompareGarbageCollector(mockLedgerManager,
                new MockLedgerStorage(), baseConf, NullStatsLogger.INSTANCE);
        GarbageCollector.GarbageCleaner cleaner = new GarbageCollector.GarbageCleaner() {
            @Override
            public void clean(long ledgerId) {
                LOG.info("Cleaned {}", ledgerId);
                cleaned.add(ledgerId);
            }
        };

        validateLedgerRangeIterator(createdLedgers);

        garbageCollector.gc(cleaner);
        assertTrue("Should have cleaned nothing", cleaned.isEmpty());
    }

    /*
     * In this test scenario all the created ledgers are deleted, but LedgerManager.readLedgerMetadata fails with
     * ZKException. So even in this case, GarbageCollector.gc shouldn't delete ledgers data.
     *
     * ScanAndCompareGarbageCollector/GC should clean data of ledger only if both the LedgerManager.getLedgerRanges says
     * that ledger is not existing and also ledgerManager.readLedgerMetadata fails with error
     * NoSuchLedgerExistsException, but is shouldn't delete if the readLedgerMetadata fails with any other error.
     */
    @Test
    public void testGcLedgersIfReadLedgerMetadataFailsForDeletedLedgers() throws Exception {
        baseConf.setVerifyMetadataOnGc(true);
        final SortedSet<Long> createdLedgers = Collections.synchronizedSortedSet(new TreeSet<Long>());
        final SortedSet<Long> cleaned = Collections.synchronizedSortedSet(new TreeSet<Long>());

        // Create few ledgers
        final int numLedgers = 5;

        createLedgers(numLedgers, createdLedgers);

        CompletableFuture<Versioned<LedgerMetadata>> errorFuture = new CompletableFuture<>();
        errorFuture.completeExceptionally(new BKException.ZKException());
        LedgerManager mockLedgerManager = new CleanupLedgerManager(getLedgerManager()) {
            @Override
            public CompletableFuture<Versioned<LedgerMetadata>> readLedgerMetadata(long ledgerId) {
                return errorFuture;
            }
        };

        final GarbageCollector garbageCollector = new ScanAndCompareGarbageCollector(mockLedgerManager,
                new MockLedgerStorage(), baseConf, NullStatsLogger.INSTANCE);
        GarbageCollector.GarbageCleaner cleaner = new GarbageCollector.GarbageCleaner() {
            @Override
            public void clean(long ledgerId) {
                LOG.info("Cleaned {}", ledgerId);
                cleaned.add(ledgerId);
            }
        };

        validateLedgerRangeIterator(createdLedgers);

        for (long ledgerId : createdLedgers) {
            removeLedger(ledgerId);
        }

        garbageCollector.gc(cleaner);
        assertTrue("Should have cleaned nothing", cleaned.isEmpty());
    }

    public void validateLedgerRangeIterator(SortedSet<Long> createdLedgers) throws IOException {
        SortedSet<Long> scannedLedgers = new TreeSet<Long>();
        LedgerRangeIterator iterator = getLedgerManager().getLedgerRanges();
        while (iterator.hasNext()) {
            LedgerRange ledgerRange = iterator.next();
            scannedLedgers.addAll(ledgerRange.getLedgers());
        }

        assertEquals(createdLedgers, scannedLedgers);
    }

    class MockLedgerStorage implements CompactableLedgerStorage {

        @Override
        public void initialize(
            ServerConfiguration conf,
            LedgerManager ledgerManager,
            LedgerDirsManager ledgerDirsManager,
            LedgerDirsManager indexDirsManager,
            StateManager stateManager,
            CheckpointSource checkpointSource,
            Checkpointer checkpointer,
            StatsLogger statsLogger,
            ByteBufAllocator allocator) throws IOException {
        }

        @Override
        public void start() {
        }

        @Override
        public void shutdown() throws InterruptedException {
        }

        @Override
        public long getLastAddConfirmed(long ledgerId) throws IOException {
            return 0;
        }

        @Override
        public void setExplicitlac(long ledgerId, ByteBuf lac) throws IOException {
        }

        @Override
        public ByteBuf getExplicitLac(long ledgerId) {
            return null;
        }

        @Override
        public boolean ledgerExists(long ledgerId) throws IOException {
            return false;
        }

        @Override
        public boolean setFenced(long ledgerId) throws IOException {
            return false;
        }

        @Override
        public boolean isFenced(long ledgerId) throws IOException {
            return false;
        }

        @Override
        public void setMasterKey(long ledgerId, byte[] masterKey) throws IOException {
        }

        @Override
        public byte[] readMasterKey(long ledgerId) throws IOException, BookieException {
            return null;
        }

        @Override
        public long addEntry(ByteBuf entry) throws IOException {
            return 0;
        }

        @Override
        public ByteBuf getEntry(long ledgerId, long entryId) throws IOException {
            return null;
        }

        @Override
        public void flush() throws IOException {
        }

        @Override
        public void checkpoint(Checkpoint checkpoint) throws IOException {
        }

        @Override
        public void deleteLedger(long ledgerId) throws IOException {
            activeLedgers.remove(ledgerId);
        }

        @Override
        public Iterable<Long> getActiveLedgersInRange(long firstLedgerId, long lastLedgerId) {
            NavigableMap<Long, Boolean> bkActiveLedgersSnapshot = activeLedgers.snapshot();
            Map<Long, Boolean> subBkActiveLedgers = bkActiveLedgersSnapshot
                    .subMap(firstLedgerId, true, lastLedgerId, false);

            return subBkActiveLedgers.keySet();
        }

        @Override
        public EntryLogger getEntryLogger() {
            return null;
        }

        @Override
        public void updateEntriesLocations(Iterable<EntryLocation> locations) throws IOException {
        }

        @Override
        public void registerLedgerDeletionListener(LedgerDeletionListener listener) {
        }

        @Override
        public void flushEntriesLocationsIndex() throws IOException {
        }

        @Override
        public boolean waitForLastAddConfirmedUpdate(long ledgerId,
                                                     long previousLAC,
                                                     Watcher<LastAddConfirmedUpdateNotification> watcher)
                throws IOException {
            return false;
        }
    }
}
