/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */

package org.apache.bookkeeper.meta;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.Random;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.bookkeeper.bookie.CompactableLedgerStorage;
import org.apache.bookkeeper.bookie.GarbageCollector;
import org.apache.bookkeeper.bookie.ScanAndCompareGarbageCollector;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.LedgerMetadata;
import org.apache.bookkeeper.meta.LedgerManager.LedgerRange;
import org.apache.bookkeeper.meta.LedgerManager.LedgerRangeIterator;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.versioning.Version;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Test garbage collection ledgers in ledger manager
 */
public class GcLedgersTest extends LedgerManagerTestCase {
    static final Logger LOG = LoggerFactory.getLogger(GcLedgersTest.class);

    public GcLedgersTest(Class<? extends LedgerManagerFactory> lmFactoryCls) {
        super(lmFactoryCls);
    }

    /**
     * Create ledgers
     */
    private void createLedgers(int numLedgers, final Set<Long> createdLedgers) throws IOException {
        final AtomicInteger expected = new AtomicInteger(numLedgers);
        for (int i=0; i<numLedgers; i++) {
            getLedgerIdGenerator().generateLedgerId(new GenericCallback<Long>() {
                @Override
                public void operationComplete(int rc, final Long ledgerId) {
                    if (BKException.Code.OK != rc) {
                        synchronized (expected) {
                            int num = expected.decrementAndGet();
                            if (num == 0) {
                                expected.notify();
                            }
                        }
                        return;
                    }

                    getLedgerManager().createLedgerMetadata(ledgerId,
                            new LedgerMetadata(1, 1, 1, DigestType.MAC, "".getBytes()), new GenericCallback<Void>() {
                                @Override
                                public void operationComplete(int rc, Void result) {
                                    if (rc == BKException.Code.OK) {
                                        activeLedgers.put(ledgerId, true);
                                        createdLedgers.add(ledgerId);
                                    }
                                    synchronized (expected) {
                                        int num = expected.decrementAndGet();
                                        if (num == 0) {
                                            expected.notify();
                                        }
                                    }
                                }
                            });
                }
            });
        }
        synchronized (expected) {
            try {
                while (expected.get() > 0) {
                    expected.wait(100);
                }
            } catch (InterruptedException ie) {
            }
        }
    }

    private void removeLedger(long ledgerId) throws Exception {
        final AtomicInteger rc = new AtomicInteger(0);
        final CountDownLatch latch = new CountDownLatch(1);
        getLedgerManager().removeLedgerMetadata(ledgerId, Version.ANY,
                new GenericCallback<Void>() {
                    @Override
                    public void operationComplete(int rc2, Void result) {
                        rc.set(rc2);
                        latch.countDown();
                    }
                   });
        assertTrue(latch.await(10, TimeUnit.SECONDS));
        assertEquals("Remove should have succeeded", 0, rc.get());
    }

    @Test(timeout=60000)
    public void testGarbageCollectLedgers() throws Exception {
        int numLedgers = 100;
        int numRemovedLedgers = 10;

        final Set<Long> createdLedgers = new HashSet<Long>();
        final Set<Long> removedLedgers = new HashSet<Long>();

        // create 100 ledgers
        createLedgers(numLedgers, createdLedgers);

        Random r = new Random(System.currentTimeMillis());
        final List<Long> tmpList = new ArrayList<Long>();
        tmpList.addAll(createdLedgers);
        Collections.shuffle(tmpList, r);
        // random remove several ledgers
        for (int i=0; i<numRemovedLedgers; i++) {
            long ledgerId = tmpList.get(i);
            synchronized (removedLedgers) {
                getLedgerManager().removeLedgerMetadata(ledgerId, Version.ANY,
                    new GenericCallback<Void>() {
                        @Override
                        public void operationComplete(int rc, Void result) {
                            synchronized (removedLedgers) {
                                removedLedgers.notify();
                            }
                        }
                   });
                removedLedgers.wait();
            }
            removedLedgers.add(ledgerId);
            createdLedgers.remove(ledgerId);
        }
        final CountDownLatch inGcProgress = new CountDownLatch(1);
        final CountDownLatch createLatch = new CountDownLatch(1);
        final CountDownLatch endLatch = new CountDownLatch(2);
        final CompactableLedgerStorage mockLedgerStorage = new MockLedgerStorage();
        final GarbageCollector garbageCollector = new ScanAndCompareGarbageCollector(getLedgerManager(),
                mockLedgerStorage, baseConf);
        Thread gcThread = new Thread() {
            @Override
            public void run() {
                garbageCollector.gc(new GarbageCollector.GarbageCleaner() {
                    boolean paused = false;

                    @Override
                    public void clean(long ledgerId) {
                        try {
                            mockLedgerStorage.deleteLedger(ledgerId);
                        } catch (IOException e) {
                            e.printStackTrace();
                            return;
                        }

                        if (!paused) {
                            inGcProgress.countDown();
                            try {
                                createLatch.await();
                            } catch (InterruptedException ie) {
                            }
                            paused = true;
                        }
                        LOG.info("Garbage Collected ledger {}", ledgerId);
                    }
                });
                LOG.info("Gc Thread quits.");
                endLatch.countDown();
            }
        };

        Thread createThread = new Thread() {
            @Override
            public void run() {
                try {
                    inGcProgress.await();
                    // create 10 more ledgers
                    createLedgers(10, createdLedgers);
                    LOG.info("Finished creating 10 more ledgers.");
                    createLatch.countDown();
                } catch (Exception e) {
                }
                LOG.info("Create Thread quits.");
                endLatch.countDown();
            }
        };

        createThread.start();
        gcThread.start();

        endLatch.await();

        // test ledgers
        for (Long ledger : removedLedgers) {
            assertFalse(activeLedgers.containsKey(ledger));
        }
        for (Long ledger : createdLedgers) {
            assertTrue(activeLedgers.containsKey(ledger));
        }
    }

    @Test(timeout=60000)
    public void testGcLedgersOutsideRange() throws Exception {
        final SortedSet<Long> createdLedgers = Collections.synchronizedSortedSet(new TreeSet<Long>());
        final Queue<Long> cleaned = new LinkedList<Long>();
        int numLedgers = 100;

        createLedgers(numLedgers, createdLedgers);

        final GarbageCollector garbageCollector = new ScanAndCompareGarbageCollector(getLedgerManager(),
                new MockLedgerStorage(), baseConf);
        GarbageCollector.GarbageCleaner cleaner = new GarbageCollector.GarbageCleaner() {
                @Override
                public void clean(long ledgerId) {
                    LOG.info("Cleaned {}", ledgerId);
                    cleaned.add(ledgerId);
                }
            };

        garbageCollector.gc(cleaner);
        assertNull("Should have cleaned nothing", cleaned.poll());

        long last = createdLedgers.last();
        removeLedger(last);
        garbageCollector.gc(cleaner);
        assertNotNull("Should have cleaned something", cleaned.peek());
        assertEquals("Should have cleaned last ledger" + last, (long)last, (long)cleaned.poll());

        long first = createdLedgers.first();
        removeLedger(first);
        garbageCollector.gc(cleaner);
        assertNotNull("Should have cleaned something", cleaned.peek());
        assertEquals("Should have cleaned first ledger" + first, (long)first, (long)cleaned.poll());
    }

    @Test(timeout=60000)
    public void testGcLedgersNotLast() throws Exception {
        final SortedSet<Long> createdLedgers = Collections.synchronizedSortedSet(new TreeSet<Long>());
        final List<Long> cleaned = new ArrayList<Long>();

        // Create enough ledgers to span over 4 ranges in the hierarchical ledger manager implementation
        final int numLedgers = 30001;

        createLedgers(numLedgers, createdLedgers);

        final GarbageCollector garbageCollector = new ScanAndCompareGarbageCollector(getLedgerManager(),
                new MockLedgerStorage(), baseConf);
        GarbageCollector.GarbageCleaner cleaner = new GarbageCollector.GarbageCleaner() {
                @Override
                public void clean(long ledgerId) {
                    LOG.info("Cleaned {}", ledgerId);
                    cleaned.add(ledgerId);
                }
            };

        SortedSet<Long> scannedLedgers = new TreeSet<Long>();
        LedgerRangeIterator iterator = getLedgerManager().getLedgerRanges();
        while (iterator.hasNext()) {
            LedgerRange ledgerRange = iterator.next();
            scannedLedgers.addAll(ledgerRange.getLedgers());
        }

        assertEquals(createdLedgers, scannedLedgers);

        garbageCollector.gc(cleaner);
        assertTrue("Should have cleaned nothing", cleaned.isEmpty());

        long first = createdLedgers.first();
        removeLedger(first);
        garbageCollector.gc(cleaner);
        assertEquals("Should have cleaned something", 1, cleaned.size());
        assertEquals("Should have cleaned first ledger" + first, (long)first, (long)cleaned.get(0));
    }
}
