Ledger having a Empty segment is causing ReplicasCheck scrutiny to fail



Descriptions of the changes in this PR:

- in Auditor.ReadLedgerMetadataCallbackForReplicasCheck, fixing logic for empty segments
- added test for validating areEntriesOfLedgerStoredInTheBookie for empty middle segment
- added testcase for ledger with more empty segments.



Reviewers: Enrico Olivelli <eolivelli@gmail.com>

This closes #2205 from reddycharan/fixreplicascheck
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
index eac25b2..db6e331 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
@@ -133,6 +133,7 @@
     private static final Logger LOG = LoggerFactory.getLogger(Auditor.class);
     private static final int MAX_CONCURRENT_REPLICAS_CHECK_LEDGER_REQUESTS = 100;
     private static final int REPLICAS_CHECK_TIMEOUT_IN_SECS = 120;
+    private static final BitSet EMPTY_BITSET = new BitSet();
     private final ServerConfiguration conf;
     private final BookKeeper bkc;
     private final boolean ownBkc;
@@ -1489,7 +1490,8 @@
                 return;
             }
 
-            if (metadata.getLastEntryId() == -1) {
+            final long lastEntryId = metadata.getLastEntryId();
+            if (lastEntryId == -1) {
                 LOG.debug("Ledger: {} is closed but it doesn't has any entries, so skipping the replicas check",
                         ledgerInRange);
                 mcbForThisLedgerRange.processResult(BKException.Code.OK, null, null);
@@ -1515,12 +1517,23 @@
                 final Entry<Long, ? extends List<BookieSocketAddress>> segmentEnsemble = segments.get(segmentNum);
                 final List<BookieSocketAddress> ensembleOfSegment = segmentEnsemble.getValue();
                 final long startEntryIdOfSegment = segmentEnsemble.getKey();
-                final long lastEntryIdOfSegment = (segmentNum == (segments.size() - 1)) ? metadata.getLastEntryId()
+                final boolean lastSegment = (segmentNum == (segments.size() - 1));
+                final long lastEntryIdOfSegment = lastSegment ? lastEntryId
                         : segments.get(segmentNum + 1).getKey() - 1;
+                /*
+                 * Segment can be empty. If last segment is empty, then
+                 * startEntryIdOfSegment of it will be greater than lastEntryId
+                 * of the ledger. If the segment in middle is empty, then its
+                 * startEntry will be same as startEntry of the following
+                 * segment.
+                 */
+                final boolean emptySegment = lastSegment ? (startEntryIdOfSegment > lastEntryId)
+                        : (startEntryIdOfSegment == segments.get(segmentNum + 1).getKey());
                 for (int bookieIndex = 0; bookieIndex < ensembleOfSegment.size(); bookieIndex++) {
                     final BookieSocketAddress bookieInEnsemble = ensembleOfSegment.get(bookieIndex);
-                    final BitSet entriesStripedToThisBookie = distributionSchedule
-                            .getEntriesStripedToTheBookie(bookieIndex, startEntryIdOfSegment, lastEntryIdOfSegment);
+                    final BitSet entriesStripedToThisBookie = emptySegment ? EMPTY_BITSET
+                            : distributionSchedule.getEntriesStripedToTheBookie(bookieIndex, startEntryIdOfSegment,
+                                    lastEntryIdOfSegment);
                     if (entriesStripedToThisBookie.cardinality() == 0) {
                         /*
                          * if no entry is expected to contain in this bookie,
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperAdminTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperAdminTest.java
index 1916880..15253fb 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperAdminTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperAdminTest.java
@@ -592,7 +592,7 @@
     }
 
     @Test
-    public void testAreEntriesOfLedgerStoredInTheBookieForMultipleSegments() throws Exception {
+    public void testAreEntriesOfLedgerStoredInTheBookieForLastEmptySegment() throws Exception {
         int lastEntryId = 10;
         long ledgerId = 100L;
         BookieSocketAddress bookie0 = new BookieSocketAddress("bookie0:3181");
@@ -728,5 +728,4 @@
     public void testLegacyBookieServiceInfo() throws Exception {
         testBookieServiceInfo(false, true);
     }
-
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorReplicasCheckTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorReplicasCheckTest.java
index 85b2015..fcbde5e 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorReplicasCheckTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorReplicasCheckTest.java
@@ -808,4 +808,126 @@
         runTestScenario(returnAvailabilityOfEntriesOfLedger, errorReturnValueForGetAvailabilityOfEntriesOfLedger, 0, 0,
                 numLedgersFoundHavingLessThanWQReplicasOfAnEntry);
     }
+
+    /*
+     * In this testscenario all the ledgers have empty segments.
+     */
+    @Test
+    public void testReplicasCheckForLedgersWithEmptySegments() throws Exception {
+        int numOfBookies = 5;
+        RegistrationManager regManager = driver.getRegistrationManager();
+        MultiKeyMap<String, AvailabilityOfEntriesOfLedger> returnAvailabilityOfEntriesOfLedger =
+                new MultiKeyMap<String, AvailabilityOfEntriesOfLedger>();
+        MultiKeyMap<String, Integer> errorReturnValueForGetAvailabilityOfEntriesOfLedger =
+                new MultiKeyMap<String, Integer>();
+        List<BookieSocketAddress> bookieAddresses = addAndRegisterBookies(regManager, numOfBookies);
+
+        LedgerManagerFactory mFactory = driver.getLedgerManagerFactory();
+        LedgerManager lm = mFactory.newLedgerManager();
+        DigestType digestType = DigestType.DUMMY;
+        byte[] password = new byte[0];
+        Collections.shuffle(bookieAddresses);
+
+        int numLedgersFoundHavingNoReplicaOfAnEntry = 0;
+        int numLedgersFoundHavingLessThanAQReplicasOfAnEntry = 0;
+        int numLedgersFoundHavingLessThanWQReplicasOfAnEntry = 0;
+
+        /*
+         * closed ledger.
+         *
+         * This closed Ledger has no entry. So it should not be counted towards
+         * numLedgersFoundHavingNoReplicaOfAnEntry/LessThanAQReplicasOfAnEntry
+         * /WQReplicasOfAnEntry.
+         */
+        Map<Long, List<BookieSocketAddress>> segmentEnsembles = new HashMap<Long, List<BookieSocketAddress>>();
+        int ensembleSize = 4;
+        int writeQuorumSize = 3;
+        int ackQuorumSize = 2;
+        long lastEntryId = -1L;
+        int length = 0;
+        segmentEnsembles.put(0L, bookieAddresses.subList(0, 4));
+        long ledgerId = 1L;
+        createClosedLedgerMetadata(lm, ledgerId, ensembleSize, writeQuorumSize, ackQuorumSize, segmentEnsembles,
+                lastEntryId, length, digestType, password);
+
+        /*
+         * closed ledger with multiple segments.
+         *
+         * This ledger has empty last segment, but all the entries have
+         * writeQuorumSize number of copies, So it should not be counted towards
+         * numLedgersFoundHavingNoReplicaOfAnEntry/LessThanAQReplicasOfAnEntry/
+         * WQReplicasOfAnEntry.
+         */
+        lastEntryId = 2;
+        segmentEnsembles.clear();
+        segmentEnsembles.put(0L, bookieAddresses.subList(0, 4));
+        segmentEnsembles.put((lastEntryId + 1), bookieAddresses.subList(1, 5));
+        ledgerId = 2L;
+        createClosedLedgerMetadata(lm, ledgerId, ensembleSize, writeQuorumSize, ackQuorumSize, segmentEnsembles,
+                lastEntryId, length, digestType, password);
+        returnAvailabilityOfEntriesOfLedger.put(bookieAddresses.get(0).toString(), Long.toString(ledgerId),
+                new AvailabilityOfEntriesOfLedger(new long[] { 0, 2 }));
+        returnAvailabilityOfEntriesOfLedger.put(bookieAddresses.get(1).toString(), Long.toString(ledgerId),
+                new AvailabilityOfEntriesOfLedger(new long[] { 0, 1 }));
+        returnAvailabilityOfEntriesOfLedger.put(bookieAddresses.get(2).toString(), Long.toString(ledgerId),
+                new AvailabilityOfEntriesOfLedger(new long[] { 0, 1, 2 }));
+        returnAvailabilityOfEntriesOfLedger.put(bookieAddresses.get(3).toString(), Long.toString(ledgerId),
+                new AvailabilityOfEntriesOfLedger(new long[] { 1, 2 }));
+
+        /*
+         * Closed ledger with multiple segments.
+         *
+         * Segment0, Segment1, Segment3, Segment5 and Segment6 are empty.
+         * Entries from entryid 3 are missing. So it should be counted towards
+         * numLedgersFoundHavingNoReplicaOfAnEntry.
+         */
+        lastEntryId = 5;
+        segmentEnsembles.clear();
+        segmentEnsembles.put(0L, bookieAddresses.subList(1, 5));
+        segmentEnsembles.put(0L, bookieAddresses.subList(0, 4));
+        segmentEnsembles.put(0L, bookieAddresses.subList(0, 4));
+        segmentEnsembles.put(4L, bookieAddresses.subList(1, 5));
+        segmentEnsembles.put(4L, bookieAddresses.subList(0, 4));
+        segmentEnsembles.put((lastEntryId + 1), bookieAddresses.subList(1, 5));
+        segmentEnsembles.put((lastEntryId + 1), bookieAddresses.subList(0, 4));
+        ledgerId = 3L;
+        createClosedLedgerMetadata(lm, ledgerId, ensembleSize, writeQuorumSize, ackQuorumSize, segmentEnsembles,
+                lastEntryId, length, digestType, password);
+        returnAvailabilityOfEntriesOfLedger.put(bookieAddresses.get(0).toString(), Long.toString(ledgerId),
+                new AvailabilityOfEntriesOfLedger(new long[] { 0, 2 }));
+        returnAvailabilityOfEntriesOfLedger.put(bookieAddresses.get(1).toString(), Long.toString(ledgerId),
+                new AvailabilityOfEntriesOfLedger(new long[] { 0, 1 }));
+        returnAvailabilityOfEntriesOfLedger.put(bookieAddresses.get(2).toString(), Long.toString(ledgerId),
+                new AvailabilityOfEntriesOfLedger(new long[] { 0, 1, 2 }));
+        returnAvailabilityOfEntriesOfLedger.put(bookieAddresses.get(3).toString(), Long.toString(ledgerId),
+                new AvailabilityOfEntriesOfLedger(new long[] { 1, 2 }));
+        numLedgersFoundHavingNoReplicaOfAnEntry++;
+
+        /*
+         * non-closed ledger with multiple segments
+         *
+         * since this is non-closed ledger, it should not be counted towards
+         * ledgersFoundHavingLessThanWQReplicasOfAnEntry
+         */
+        lastEntryId = 2;
+        segmentEnsembles.clear();
+        segmentEnsembles.put(0L, bookieAddresses.subList(0, 4));
+        segmentEnsembles.put(0L, bookieAddresses.subList(1, 5));
+        segmentEnsembles.put((lastEntryId + 1), bookieAddresses.subList(1, 5));
+        ledgerId = 4L;
+        createNonClosedLedgerMetadata(lm, ledgerId, ensembleSize, writeQuorumSize, ackQuorumSize, segmentEnsembles,
+                digestType, password);
+        returnAvailabilityOfEntriesOfLedger.put(bookieAddresses.get(0).toString(), Long.toString(ledgerId),
+                new AvailabilityOfEntriesOfLedger(new long[] { 0, 2 }));
+        returnAvailabilityOfEntriesOfLedger.put(bookieAddresses.get(1).toString(), Long.toString(ledgerId),
+                new AvailabilityOfEntriesOfLedger(new long[] { 0, 1 }));
+        returnAvailabilityOfEntriesOfLedger.put(bookieAddresses.get(2).toString(), Long.toString(ledgerId),
+                new AvailabilityOfEntriesOfLedger(new long[] { 0, 1, 2 }));
+        returnAvailabilityOfEntriesOfLedger.put(bookieAddresses.get(3).toString(), Long.toString(ledgerId),
+                new AvailabilityOfEntriesOfLedger(new long[] { 1, 2 }));
+
+        runTestScenario(returnAvailabilityOfEntriesOfLedger, errorReturnValueForGetAvailabilityOfEntriesOfLedger,
+                numLedgersFoundHavingNoReplicaOfAnEntry, numLedgersFoundHavingLessThanAQReplicasOfAnEntry,
+                numLedgersFoundHavingLessThanWQReplicasOfAnEntry);
+    }
 }