[BK-SERVER] Clean up over-replicated ledgers owned by different bookies

### Motivation
As described at: https://github.com/apache/pulsar/issues/4632
- Sometimes due to overreplication, bookie contains ledgers which are not owned by that bookie anymore and that bookie is not part of the ensemble-list of those ledgers. In this case, GC finds out those overreplicated ledgers and 
- deletes their index from dbStorage (rocksDB) and 
- tries to delete them from entrylog files.

However, bookie doesn't delete them from entry-log files due to change made in [#870](https://github.com/apache/bookkeeper/issues/870) where bookie avoids deleting ledger if znode of that ledger exists.

Because of that bookie ends up storing large number entrylog files with ledgers which are owned by different bookies. It also cause OOM when GC tries to deal with large number of entry log files.

### Modification

Delete the ledgers if bookie is not part of ensemble list of over-replicated ledgers

Reviewers: Enrico Olivelli <eolivelli@gmail.com>, Sijie Guo <sijie@apache.org>

This closes #2119 from rdhabalia/overRepl
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ScanAndCompareGarbageCollector.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ScanAndCompareGarbageCollector.java
index 02e47ae..53685be 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ScanAndCompareGarbageCollector.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ScanAndCompareGarbageCollector.java
@@ -34,8 +34,10 @@
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.meta.LedgerManager.LedgerRange;
@@ -45,6 +47,7 @@
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.bookkeeper.versioning.Versioned;
 import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooKeeper;
@@ -149,6 +152,8 @@
             long start;
             long end = -1;
             boolean done = false;
+            AtomicBoolean isBookieInEnsembles = new AtomicBoolean(false);
+            Versioned<LedgerMetadata> metadata = null;
             while (!done) {
                 start = end + 1;
                 if (ledgerRangeIterator.hasNext()) {
@@ -169,9 +174,12 @@
                 for (Long bkLid : subBkActiveLedgers) {
                     if (!ledgersInMetadata.contains(bkLid)) {
                         if (verifyMetadataOnGc) {
+                            isBookieInEnsembles.set(false);
+                            metadata = null;
                             int rc = BKException.Code.OK;
                             try {
-                                result(ledgerManager.readLedgerMetadata(bkLid), zkOpTimeoutMs, TimeUnit.MILLISECONDS);
+                                metadata = result(ledgerManager.readLedgerMetadata(bkLid), zkOpTimeoutMs,
+                                        TimeUnit.MILLISECONDS);
                             } catch (BKException | TimeoutException e) {
                                 if (e instanceof BKException) {
                                     rc = ((BKException) e).getCode();
@@ -182,7 +190,19 @@
                                     continue;
                                 }
                             }
-                            if (rc != BKException.Code.NoSuchLedgerExistsOnMetadataServerException) {
+                            // check bookie should be part of ensembles in one
+                            // of the segment else ledger should be deleted from
+                            // local storage
+                            if (metadata != null && metadata.getValue() != null) {
+                                metadata.getValue().getAllEnsembles().forEach((entryId, ensembles) -> {
+                                    if (ensembles != null && ensembles.contains(selfBookieAddress)) {
+                                        isBookieInEnsembles.set(true);
+                                    }
+                                });
+                                if (isBookieInEnsembles.get()) {
+                                    continue;
+                                }
+                            } else if (rc != BKException.Code.NoSuchLedgerExistsOnMetadataServerException) {
                                 LOG.warn("Ledger {} Missing in metadata list, but ledgerManager returned rc: {}.",
                                         bkLid, rc);
                                 continue;
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
index 30121b9..2388772 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
@@ -52,6 +52,7 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.bookie.BookieException;
 import org.apache.bookkeeper.bookie.CheckpointSource;
 import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
@@ -93,12 +94,18 @@
         super(lmFactoryCls);
     }
 
+    private void createLedgers(int numLedgers, final Set<Long> createdLedgers) throws IOException {
+        BookieSocketAddress selfBookie = Bookie.getBookieAddress(baseConf);
+        createLedgers(numLedgers, createdLedgers, selfBookie);
+    }
+
     /**
      * Create ledgers.
      */
-    private void createLedgers(int numLedgers, final Set<Long> createdLedgers) throws IOException {
+    private void createLedgers(int numLedgers, final Set<Long> createdLedgers, BookieSocketAddress selfBookie)
+            throws IOException {
         final AtomicInteger expected = new AtomicInteger(numLedgers);
-        List<BookieSocketAddress> ensemble = Lists.newArrayList(new BookieSocketAddress("192.0.2.1", 1234));
+        List<BookieSocketAddress> ensemble = Lists.newArrayList(selfBookie);
 
         for (int i = 0; i < numLedgers; i++) {
             getLedgerIdGenerator().generateLedgerId(new GenericCallback<Long>() {
@@ -692,4 +699,55 @@
             return null;
         }
     }
+
+    /**
+     * Verifies that gc should cleaned up overreplicatd ledgers which is not
+     * owned by the bookie anymore.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testGcLedgersForOverreplicated() throws Exception {
+        baseConf.setVerifyMetadataOnGc(true);
+        final SortedSet<Long> createdLedgers = Collections.synchronizedSortedSet(new TreeSet<Long>());
+        final SortedSet<Long> cleaned = Collections.synchronizedSortedSet(new TreeSet<Long>());
+
+        // Create few ledgers
+        final int numLedgers = 5;
+
+        BookieSocketAddress bookieAddress = new BookieSocketAddress("192.0.0.1", 1234);
+        createLedgers(numLedgers, createdLedgers, bookieAddress);
+
+        LedgerManager mockLedgerManager = new CleanupLedgerManager(getLedgerManager()) {
+            @Override
+            public LedgerRangeIterator getLedgerRanges(long zkOpTimeout) {
+                return new LedgerRangeIterator() {
+                    @Override
+                    public LedgerRange next() throws IOException {
+                        return null;
+                    }
+
+                    @Override
+                    public boolean hasNext() throws IOException {
+                        return false;
+                    }
+                };
+            }
+        };
+
+        final GarbageCollector garbageCollector = new ScanAndCompareGarbageCollector(mockLedgerManager,
+                new MockLedgerStorage(), baseConf, NullStatsLogger.INSTANCE);
+        GarbageCollector.GarbageCleaner cleaner = new GarbageCollector.GarbageCleaner() {
+            @Override
+            public void clean(long ledgerId) {
+                LOG.info("Cleaned {}", ledgerId);
+                cleaned.add(ledgerId);
+            }
+        };
+
+        validateLedgerRangeIterator(createdLedgers);
+
+        garbageCollector.gc(cleaner);
+        assertEquals("Should have cleaned all ledgers", cleaned.size(), numLedgers);
+    }
 }