QuorumCoverage should only count unknown nodes

The original patch was contributed by ivankelly in PR #2303, I have only fixed checkstyle and removed two tests that were wrong.

Quorum coverage checks if we have heard from enough nodes to know that
there is no entry that can have been written to enough nodes that we
haven't heard from to have formed an ack quorum.

The coverage algorithm was correct pre-5e399df.

5e399df(BOOKKEEPER-759: Delay Ensemble Change & Disable Ensemble
Change) broke this, but it still seems to have worked because they had
a broken else statement at the end. Why a change which is 100% about
the write-path changed something in the read-path is a mystery.

dcdd1e(Small fix wrong nodesUninitialized count when checkCovered)
went on to fix the broken fix, so the whole thing ended up broke.

The change also modifies ReadLastConfirmedOp to make it testable.

Reviewers: Sijie Guo <None>, Rajan Dhabalia <rdhabalia@apache.org>

This closes #2333 from eolivelli/pr2303
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index b71ac76..f80d0bd 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -1407,7 +1407,13 @@
                 }
             };
 
-        new ReadLastConfirmedOp(this, clientCtx.getBookieClient(), getCurrentEnsemble(), innercb).initiate();
+        new ReadLastConfirmedOp(clientCtx.getBookieClient(),
+                                distributionSchedule,
+                                macManager,
+                                ledgerId,
+                                getCurrentEnsemble(),
+                                ledgerKey,
+                                innercb).initiate();
     }
 
     /**
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
index 2dd6ea2..bd19a6b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
@@ -93,7 +93,12 @@
     }
 
     public CompletableFuture<LedgerHandle> initiate() {
-        ReadLastConfirmedOp rlcop = new ReadLastConfirmedOp(lh, clientCtx.getBookieClient(), lh.getCurrentEnsemble(),
+        ReadLastConfirmedOp rlcop = new ReadLastConfirmedOp(clientCtx.getBookieClient(),
+                                                            lh.distributionSchedule,
+                                                            lh.macManager,
+                                                            lh.ledgerId,
+                                                            lh.getCurrentEnsemble(),
+                                                            lh.ledgerKey,
                 new ReadLastConfirmedOp.LastConfirmedDataCallback() {
                     public void readLastConfirmedDataComplete(int rc, RecoveryData data) {
                         if (rc == BKException.Code.OK) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java
index 2b75403..e39bf60 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java
@@ -17,6 +17,8 @@
  */
 package org.apache.bookkeeper.client;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import io.netty.buffer.ByteBuf;
 import java.util.List;
 
@@ -25,6 +27,7 @@
 import org.apache.bookkeeper.proto.BookieClient;
 import org.apache.bookkeeper.proto.BookieProtocol;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
+import org.apache.bookkeeper.proto.checksum.DigestManager;
 import org.apache.bookkeeper.proto.checksum.DigestManager.RecoveryData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,17 +38,18 @@
  */
 class ReadLastConfirmedOp implements ReadEntryCallback {
     static final Logger LOG = LoggerFactory.getLogger(ReadLastConfirmedOp.class);
-    LedgerHandle lh;
-    BookieClient bookieClient;
-    int numResponsesPending;
-    int numSuccessfulResponse;
-    RecoveryData maxRecoveredData;
-    volatile boolean completed = false;
-    int lastSeenError = BKException.Code.ReadException;
+    private final long ledgerId;
+    private final byte[] ledgerKey;
+    private final BookieClient bookieClient;
+    private final DigestManager digestManager;
+    private int numResponsesPending;
+    private RecoveryData maxRecoveredData;
+    private volatile boolean completed = false;
+    private int lastSeenError = BKException.Code.ReadException;
 
-    LastConfirmedDataCallback cb;
-    final DistributionSchedule.QuorumCoverageSet coverageSet;
-    final List<BookieSocketAddress> currentEnsemble;
+    private final LastConfirmedDataCallback cb;
+    private final DistributionSchedule.QuorumCoverageSet coverageSet;
+    private final List<BookieSocketAddress> currentEnsemble;
 
     /**
      * Wrapper to get all recovered data from the request.
@@ -54,22 +58,28 @@
         void readLastConfirmedDataComplete(int rc, RecoveryData data);
     }
 
-    public ReadLastConfirmedOp(LedgerHandle lh, BookieClient bookieClient,
-                               List<BookieSocketAddress> ensemble, LastConfirmedDataCallback cb) {
+    public ReadLastConfirmedOp(BookieClient bookieClient,
+                               DistributionSchedule schedule,
+                               DigestManager digestManager,
+                               long ledgerId,
+                               List<BookieSocketAddress> ensemble,
+                               byte[] ledgerKey,
+                               LastConfirmedDataCallback cb) {
         this.cb = cb;
         this.bookieClient = bookieClient;
         this.maxRecoveredData = new RecoveryData(LedgerHandle.INVALID_ENTRY_ID, 0);
-        this.lh = lh;
-        this.numSuccessfulResponse = 0;
-        this.numResponsesPending = lh.getLedgerMetadata().getEnsembleSize();
-        this.coverageSet = lh.distributionSchedule.getCoverageSet();
+        this.numResponsesPending = ensemble.size();
+        this.coverageSet = schedule.getCoverageSet();
         this.currentEnsemble = ensemble;
+        this.ledgerId = ledgerId;
+        this.ledgerKey = ledgerKey;
+        this.digestManager = digestManager;
     }
 
     public void initiate() {
         for (int i = 0; i < currentEnsemble.size(); i++) {
             bookieClient.readEntry(currentEnsemble.get(i),
-                                   lh.ledgerId,
+                                   ledgerId,
                                    BookieProtocol.LAST_ADD_CONFIRMED,
                                    this, i, BookieProtocol.FLAG_NONE);
         }
@@ -78,10 +88,10 @@
     public void initiateWithFencing() {
         for (int i = 0; i < currentEnsemble.size(); i++) {
             bookieClient.readEntry(currentEnsemble.get(i),
-                                   lh.ledgerId,
+                                   ledgerId,
                                    BookieProtocol.LAST_ADD_CONFIRMED,
                                    this, i, BookieProtocol.FLAG_DO_FENCING,
-                                   lh.ledgerKey);
+                                   ledgerKey);
         }
     }
 
@@ -96,12 +106,11 @@
         boolean heardValidResponse = false;
         if (rc == BKException.Code.OK) {
             try {
-                RecoveryData recoveryData = lh.macManager.verifyDigestAndReturnLastConfirmed(buffer);
+                RecoveryData recoveryData = digestManager.verifyDigestAndReturnLastConfirmed(buffer);
                 if (recoveryData.getLastAddConfirmed() > maxRecoveredData.getLastAddConfirmed()) {
                     maxRecoveredData = recoveryData;
                 }
                 heardValidResponse = true;
-                numSuccessfulResponse++;
             } catch (BKDigestMatchException e) {
                 // Too bad, this bookie didn't give us a valid answer, we
                 // still might be able to recover though so continue
@@ -140,17 +149,15 @@
         }
 
         if (numResponsesPending == 0 && !completed) {
-            int totalExepctedResponse = lh.getLedgerMetadata().getWriteQuorumSize()
-                    - lh.getLedgerMetadata().getAckQuorumSize() + 1;
-            if (numSuccessfulResponse >= totalExepctedResponse) {
-                cb.readLastConfirmedDataComplete(BKException.Code.OK, maxRecoveredData);
-                return;
-            }
-            // Have got all responses back but was still not enough, just fail the operation
-            LOG.error("While readLastConfirmed ledger: {} did not hear success responses from all quorums {}", ledgerId,
-                    lastSeenError);
+            LOG.error("While readLastConfirmed ledger: {} did not hear success responses from all quorums, {}",
+                      ledgerId, coverageSet);
             cb.readLastConfirmedDataComplete(lastSeenError, maxRecoveredData);
         }
 
     }
+
+    @VisibleForTesting
+    synchronized int getNumResponsesPending() {
+        return numResponsesPending;
+    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RoundRobinDistributionSchedule.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RoundRobinDistributionSchedule.java
index 05711fd..e4ae4f2 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RoundRobinDistributionSchedule.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RoundRobinDistributionSchedule.java
@@ -373,29 +373,43 @@
         public synchronized boolean checkCovered() {
             // now check if there are any write quorums, with |ackQuorum| nodes available
             for (int i = 0; i < ensembleSize; i++) {
-                int nodesNotCovered = 0;
-                int nodesOkay = 0;
-                int nodesUninitialized = 0;
+                /* Nodes which have either responded with an error other than NoSuch{Entry,Ledger},
+                   or have not responded at all. We cannot know if these nodes ever accepted a entry. */
+                int nodesUnknown = 0;
+
                 for (int j = 0; j < writeQuorumSize; j++) {
                     int nodeIndex = (i + j) % ensembleSize;
-                    if (covered[nodeIndex] == BKException.Code.OK) {
-                        nodesOkay++;
-                    } else if (covered[nodeIndex] != BKException.Code.NoSuchEntryException
-                            && covered[nodeIndex] != BKException.Code.NoSuchLedgerExistsException) {
-                        nodesNotCovered++;
-                        if (covered[nodeIndex] == BKException.Code.UNINITIALIZED) {
-                            nodesUninitialized++;
-                        }
+                    if (covered[nodeIndex] != BKException.Code.OK
+                        && covered[nodeIndex] != BKException.Code.NoSuchEntryException
+                        && covered[nodeIndex] != BKException.Code.NoSuchLedgerExistsException) {
+                        nodesUnknown++;
                     }
                 }
-                // if we haven't seen any OK responses and there are still nodes not heard from,
-                // let's wait until
-                if (nodesNotCovered >= ackQuorumSize || (nodesOkay == 0 && nodesUninitialized > 0)) {
+
+                /* If nodesUnknown is greater than the ack quorum size, then
+                   it is possible those two unknown nodes accepted an entry which
+                   we do not know about */
+                if (nodesUnknown >= ackQuorumSize) {
                     return false;
                 }
             }
             return true;
         }
+
+        @Override
+        public String toString() {
+            StringBuilder buffer = new StringBuilder();
+            buffer.append("QuorumCoverage(e:").append(ensembleSize)
+                .append(",w:").append(writeQuorumSize)
+                .append(",a:").append(ackQuorumSize)
+                .append(") = [");
+            int i = 0;
+            for (; i < covered.length - 1; i++) {
+                buffer.append(covered[i]).append(", ");
+            }
+            buffer.append(covered[i]).append("]");
+            return buffer.toString();
+        }
     }
 
     @Override
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
index ab69fa7..710756e 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
@@ -675,8 +675,12 @@
         final AtomicInteger rcHolder = new AtomicInteger(-1234);
         final CountDownLatch doneLatch = new CountDownLatch(1);
 
-        new ReadLastConfirmedOp(readLh, bkc.getBookieClient(),
+        new ReadLastConfirmedOp(bkc.getBookieClient(),
+                                readLh.distributionSchedule,
+                                readLh.macManager,
+                                readLh.ledgerId,
                                 readLh.getLedgerMetadata().getAllEnsembles().lastEntry().getValue(),
+                                readLh.ledgerKey,
                 new ReadLastConfirmedOp.LastConfirmedDataCallback() {
                     @Override
                     public void readLastConfirmedDataComplete(int rc, DigestManager.RecoveryData data) {
@@ -760,8 +764,13 @@
     private int readLACFromQuorum(LedgerHandle ledger, int... bookieLACResponse) throws Exception {
         MutableInt responseCode = new MutableInt(100);
         CountDownLatch responseLatch = new CountDownLatch(1);
-        ReadLastConfirmedOp readLCOp = new ReadLastConfirmedOp(ledger, bkc.getBookieClient(),
+        ReadLastConfirmedOp readLCOp = new ReadLastConfirmedOp(
+                bkc.getBookieClient(),
+                ledger.getDistributionSchedule(),
+                ledger.getDigestManager(),
+                ledger.getId(),
                 ledger.getLedgerMetadata().getAllEnsembles().lastEntry().getValue(),
+                ledger.getLedgerKey(),
                 new ReadLastConfirmedOp.LastConfirmedDataCallback() {
                     @Override
                     public void readLastConfirmedDataComplete(int rc, DigestManager.RecoveryData data) {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ReadLastConfirmedOpTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ReadLastConfirmedOpTest.java
new file mode 100644
index 0000000..e553e52
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ReadLastConfirmedOpTest.java
@@ -0,0 +1,111 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client;
+
+import com.google.common.collect.Lists;
+import io.netty.buffer.UnpooledByteBufAllocator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.DigestType;
+import org.apache.bookkeeper.proto.MockBookieClient;
+import org.apache.bookkeeper.proto.checksum.DigestManager;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tests about ReadLastConfirmedOp.
+ */
+public class ReadLastConfirmedOpTest {
+    private static final Logger log = LoggerFactory.getLogger(ReadLastConfirmedOpTest.class);
+    private final BookieSocketAddress bookie1 = new BookieSocketAddress("bookie1", 3181);
+    private final BookieSocketAddress bookie2 = new BookieSocketAddress("bookie2", 3181);
+
+    OrderedExecutor executor = null;
+
+    @Before
+    public void setup() throws Exception {
+        executor = OrderedExecutor.newBuilder()
+                .name("BookKeeperClientWorker")
+                .numThreads(1)
+                .build();
+    }
+
+    @After
+    public void teardown() throws Exception {
+        if (executor != null) {
+            executor.shutdown();
+        }
+    }
+
+    /**
+     * Test for specific bug that was introduced with dcdd1e88.
+     */
+    @Test
+    public void testBookieFailsAfterLedgerMissingOnFirst() throws Exception {
+        long ledgerId = 0xf00b;
+        List<BookieSocketAddress> ensemble = Lists.newArrayList(bookie1, bookie2);
+        byte[] ledgerKey = new byte[0];
+
+        MockBookieClient bookieClient = new MockBookieClient(executor);
+        DistributionSchedule schedule = new RoundRobinDistributionSchedule(2, 2, 2);
+        DigestManager digestManager = DigestManager.instantiate(ledgerId, ledgerKey,
+                                                                DigestType.CRC32C,
+                                                                UnpooledByteBufAllocator.DEFAULT,
+                                                                true /* useV2 */);
+
+        CompletableFuture<Void> blocker = new CompletableFuture<>();
+        bookieClient.setPreReadHook((bookie, lId, entryId) -> {
+                if (bookie.equals(bookie1)) {
+                    return CompletableFuture.completedFuture(null);
+                } else {
+                    return blocker;
+                }
+            });
+        CompletableFuture<DigestManager.RecoveryData> promise = new CompletableFuture<>();
+        ReadLastConfirmedOp op = new ReadLastConfirmedOp(
+                bookieClient, schedule,
+                digestManager, ledgerId, ensemble,
+                ledgerKey,
+                (rc, data) -> {
+                    if (rc != BKException.Code.OK) {
+                        promise.completeExceptionally(
+                                BKException.create(rc));
+                    } else {
+                        promise.complete(data);
+                    }
+                });
+        op.initiateWithFencing();
+
+        while (op.getNumResponsesPending() > 1) {
+            Thread.sleep(100);
+        }
+        blocker.completeExceptionally(
+                new BKException.BKBookieHandleNotAvailableException());
+        promise.get();
+    }
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/RoundRobinDistributionScheduleTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/RoundRobinDistributionScheduleTest.java
index 1b989f5..391df80 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/RoundRobinDistributionScheduleTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/RoundRobinDistributionScheduleTest.java
@@ -74,16 +74,6 @@
             }
         }
         assertEquals("Should be no errors", 0, errors);
-
-        RoundRobinDistributionSchedule schedule = new RoundRobinDistributionSchedule(
-            5, 3, 5);
-        DistributionSchedule.QuorumCoverageSet covSet = schedule.getCoverageSet();
-        covSet.addBookie(0, BKException.Code.NoSuchLedgerExistsException);
-        covSet.addBookie(1, BKException.Code.NoSuchEntryException);
-        covSet.addBookie(2, BKException.Code.NoSuchLedgerExistsException);
-        covSet.addBookie(3, BKException.Code.UNINITIALIZED);
-        covSet.addBookie(4, BKException.Code.UNINITIALIZED);
-        assertFalse(covSet.checkCovered());
     }
 
     /**
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieFailureTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieFailureTest.java
index 255e4f3..c60a2db 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieFailureTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieFailureTest.java
@@ -301,21 +301,6 @@
         LedgerHandle afterlh = bkc.openLedgerNoRecovery(beforelh.getId(), digestType, "".getBytes());
 
         assertEquals(numEntries - 2, afterlh.getLastAddConfirmed());
-
-        startNewBookie();
-        LedgerHandle beforelh2 = bkc.createLedger(numBookies, 1, digestType, "".getBytes());
-
-        for (int i = 0; i < numEntries; i++) {
-            beforelh2.addEntry(tmp.getBytes());
-        }
-
-        // shutdown first bookie server
-        killBookie(0);
-
-        // try to open ledger no recovery
-        // should be able to open ledger with one bookie down:
-        // Total bookies available 3 >= 1 (Qw(4) - Qa(4) + 1)
-        bkc.openLedgerNoRecovery(beforelh2.getId(), digestType, "".getBytes());
     }
 
     @Test
@@ -333,24 +318,10 @@
         killBookie(0);
         startNewBookie();
 
-        // try to open ledger no recovery
+        // try to open ledger with recovery
         LedgerHandle afterlh = bkc.openLedger(beforelh.getId(), digestType, "".getBytes());
 
         assertEquals(beforelh.getLastAddPushed(), afterlh.getLastAddConfirmed());
-
-        LedgerHandle beforelh2 = bkc.createLedger(numBookies, 1, digestType, "".getBytes());
-
-        for (int i = 0; i < numEntries; i++) {
-            beforelh2.addEntry(tmp.getBytes());
-        }
-
-        // shutdown first bookie server
-        killBookie(0);
-
-        // try to open ledger no recovery
-        // should be able to open ledger with one bookie down:
-        // Total bookies available 3 >= 1 (Qw(4) - Qa(4) + 1)
-        bkc.openLedger(beforelh2.getId(), digestType, "".getBytes());
     }
 
     /**
diff --git a/stream/distributedlog/core/src/main/java/org/apache/bookkeeper/client/LedgerReader.java b/stream/distributedlog/core/src/main/java/org/apache/bookkeeper/client/LedgerReader.java
index b7a6ba9..06ed4d2 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/bookkeeper/client/LedgerReader.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/bookkeeper/client/LedgerReader.java
@@ -197,7 +197,13 @@
             op.submit();
         };
         // Read Last AddConfirmed
-        new ReadLastConfirmedOp(lh, clientCtx.getBookieClient(), lh.getCurrentEnsemble(), readLACCallback).initiate();
+        new ReadLastConfirmedOp(clientCtx.getBookieClient(),
+                                lh.distributionSchedule,
+                                lh.macManager,
+                                lh.ledgerId,
+                                lh.getCurrentEnsemble(),
+                                lh.ledgerKey,
+                                readLACCallback).initiate();
     }
 
     public void readLacs(final LedgerHandle lh, long eid,