BOOKKEEPER-799: Distribution schedule coverage sets don't take gaps in response lists into account when writequorum > ackquorum (ivank)
diff --git a/CHANGES.txt b/CHANGES.txt
index a6c26b4..9dc12c3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,8 @@
 
       BOOKKEEPER-815: Ledger fence state is lost when the ledger file is evicted (Charles Xie via ivank)
 
+      BOOKKEEPER-799: Distribution schedule coverage sets don't take gaps in response lists into account when writequorum > ackquorum (ivank)
+
     IMPROVEMENTS:
 
       BOOKKEEPER-800: Expose whether a ledger is closed or not (ivank)
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RoundRobinDistributionSchedule.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RoundRobinDistributionSchedule.java
index b34ff75..82f300b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RoundRobinDistributionSchedule.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RoundRobinDistributionSchedule.java
@@ -67,33 +67,31 @@
     }
 
     private class RRQuorumCoverageSet implements QuorumCoverageSet {
-        // covered[i] is true if the quorum starting at bookie index i has been
-        // covered by a recovery reply
-        private boolean[] covered = null;
-        private int numQuorumsUncovered;
+        private final boolean[] covered = new boolean[ensembleSize];
 
         private RRQuorumCoverageSet() {
-            covered = new boolean[ensembleSize];
-            numQuorumsUncovered = ensembleSize;
+            for (int i = 0; i < covered.length; i++) {
+                covered[i] = false;
+            }
         }
 
         public synchronized boolean addBookieAndCheckCovered(int bookieIndexHeardFrom) {
-            if (numQuorumsUncovered == 0) {
-                return true;
-            }
+            covered[bookieIndexHeardFrom] = true;
 
-            for (int i = 0; i < ackQuorumSize; i++) {
-                int quorumStartIndex = MathUtils.signSafeMod(bookieIndexHeardFrom - i, ensembleSize);
-                if (!covered[quorumStartIndex]) {
-                    covered[quorumStartIndex] = true;
-                    numQuorumsUncovered--;
-
-                    if (numQuorumsUncovered == 0) {
-                        return true;
+            // now check if there are any write quorums, with |ackQuorum| nodes available
+            for (int i = 0; i < ensembleSize; i++) {
+                int nodesNotCovered = 0;
+                for (int j = 0; j < writeQuorumSize; j++) {
+                    int nodeIndex = (i + j) % ensembleSize;
+                    if (!covered[nodeIndex]) {
+                        nodesNotCovered++;
                     }
                 }
+                if (nodesNotCovered >= ackQuorumSize) {
+                    return false;
+                }
             }
-            return false;
+            return true;
         }
     }
 
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/RoundRobinDistributionScheduleTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/RoundRobinDistributionScheduleTest.java
index 8f173ef..23cf9d8 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/RoundRobinDistributionScheduleTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/RoundRobinDistributionScheduleTest.java
@@ -22,6 +22,10 @@
 package org.apache.bookkeeper.client;
 
 import java.util.List;
+import java.util.Set;
+import java.util.HashSet;
+import com.google.common.collect.Sets;
+
 import org.junit.Test;
 import static org.junit.Assert.*;
 
@@ -44,21 +48,86 @@
         assertFalse("Shouldn't ack yet", ackSet.addBookieAndCheck(wSet.get(0)));
         assertTrue("Should ack after 2 unique", ackSet.addBookieAndCheck(wSet.get(2)));
         assertTrue("Should still be acking", ackSet.addBookieAndCheck(wSet.get(1)));
-
-        DistributionSchedule.QuorumCoverageSet covSet = schedule.getCoverageSet();
-        assertFalse("Shouldn't cover yet", covSet.addBookieAndCheckCovered(0));
-        assertFalse("Shouldn't cover yet", covSet.addBookieAndCheckCovered(2));
-        assertTrue("Should cover now", covSet.addBookieAndCheckCovered(3));
-
-        covSet = schedule.getCoverageSet();
-        assertFalse("Shouldn't cover yet", covSet.addBookieAndCheckCovered(0));
-        assertFalse("Shouldn't cover yet", covSet.addBookieAndCheckCovered(1));
-        assertFalse("Shouldn't cover yet", covSet.addBookieAndCheckCovered(2));
-        assertTrue("Should cover now", covSet.addBookieAndCheckCovered(3));
-
-        covSet = schedule.getCoverageSet();
-        assertFalse("Shouldn't cover yet", covSet.addBookieAndCheckCovered(4));
-        assertFalse("Shouldn't cover yet", covSet.addBookieAndCheckCovered(0));
-        assertTrue("Should cover now", covSet.addBookieAndCheckCovered(2));
     }
-}
\ No newline at end of file
+
+    /**
+     * Test that coverage sets only respond as covered when it has
+     * heard from enough bookies that no ack quorum can exist without these bookies.
+     */
+    @Test(timeout=60000)
+    public void testCoverageSets() {
+        int errors = 0;
+        for (int e = 6; e > 0; e--) {
+            for (int w = e; w > 0; w--) {
+                for (int a = w; a > 0; a--) {
+                    errors += testCoverageForConfiguration(e, w, a);
+                }
+            }
+        }
+        assertEquals("Should be no errors", 0, errors);
+    }
+
+    /**
+     * Build a boolean array of which nodes have not responded
+     * and thus are available to build a quorum.
+     */
+    boolean[] buildAvailable(int ensemble, Set<Integer> responses) {
+        boolean[] available = new boolean[ensemble];
+        for (int i = 0; i < ensemble; i++) {
+            if (responses.contains(i)) {
+                available[i] = false;
+            } else {
+                available[i] = true;
+            }
+        }
+        return available;
+    }
+
+    /**
+     * Check whether it is possible for a write to reach
+     * a quorum with a given set of nodes available
+     */
+    boolean canGetAckQuorum(int ensemble, int writeQuorum, int ackQuorum, boolean[] available) {
+        for (int i = 0; i < ensemble; i++) {
+            int count = 0;
+            for (int j = 0; j < writeQuorum; j++) {
+                if (available[(i+j)%ensemble]) {
+                    count++;
+                }
+            }
+            if (count >= ackQuorum) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private int testCoverageForConfiguration(int ensemble, int writeQuorum, int ackQuorum) {
+        RoundRobinDistributionSchedule schedule = new RoundRobinDistributionSchedule(
+                writeQuorum, ackQuorum, ensemble);
+        Set<Integer> indexes = new HashSet<Integer>();
+        for (int i = 0; i < ensemble; i++) {
+            indexes.add(i);
+        }
+        Set<Set<Integer>> subsets = Sets.powerSet(indexes);
+
+        int errors = 0;
+        for (Set<Integer> subset : subsets) {
+            DistributionSchedule.QuorumCoverageSet covSet = schedule.getCoverageSet();
+            boolean covSetSays = false;
+            for (Integer i : subset) {
+                covSetSays = covSet.addBookieAndCheckCovered(i);
+            }
+
+            boolean[] nodesAvailable = buildAvailable(ensemble, subset);
+            boolean canGetAck = canGetAckQuorum(ensemble, writeQuorum, ackQuorum, nodesAvailable);
+            if (canGetAck == covSetSays) {
+                LOG.error("e{}:w{}:a{} available {}    canGetAck {} covSetSays {}",
+                          new Object[] { ensemble, writeQuorum, ackQuorum,
+                                         nodesAvailable, canGetAck, covSetSays });
+                errors++;
+            }
+        }
+        return errors;
+    }
+}