BOOKKEEPER-799: Distribution schedule coverage sets don't take gaps in response lists into account when writequorum > ackquorum (ivank)
diff --git a/CHANGES.txt b/CHANGES.txt
index a6c26b4..9dc12c3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,8 @@
BOOKKEEPER-815: Ledger fence state is lost when the ledger file is evicted (Charles Xie via ivank)
+ BOOKKEEPER-799: Distribution schedule coverage sets don't take gaps in response lists into account when writequorum > ackquorum (ivank)
+
IMPROVEMENTS:
BOOKKEEPER-800: Expose whether a ledger is closed or not (ivank)
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RoundRobinDistributionSchedule.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RoundRobinDistributionSchedule.java
index b34ff75..82f300b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RoundRobinDistributionSchedule.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RoundRobinDistributionSchedule.java
@@ -67,33 +67,31 @@
}
private class RRQuorumCoverageSet implements QuorumCoverageSet {
- // covered[i] is true if the quorum starting at bookie index i has been
- // covered by a recovery reply
- private boolean[] covered = null;
- private int numQuorumsUncovered;
+ private final boolean[] covered = new boolean[ensembleSize];
private RRQuorumCoverageSet() {
- covered = new boolean[ensembleSize];
- numQuorumsUncovered = ensembleSize;
+ for (int i = 0; i < covered.length; i++) {
+ covered[i] = false;
+ }
}
public synchronized boolean addBookieAndCheckCovered(int bookieIndexHeardFrom) {
- if (numQuorumsUncovered == 0) {
- return true;
- }
+ covered[bookieIndexHeardFrom] = true;
- for (int i = 0; i < ackQuorumSize; i++) {
- int quorumStartIndex = MathUtils.signSafeMod(bookieIndexHeardFrom - i, ensembleSize);
- if (!covered[quorumStartIndex]) {
- covered[quorumStartIndex] = true;
- numQuorumsUncovered--;
-
- if (numQuorumsUncovered == 0) {
- return true;
+ // now check if there are any write quorums, with |ackQuorum| nodes available
+ for (int i = 0; i < ensembleSize; i++) {
+ int nodesNotCovered = 0;
+ for (int j = 0; j < writeQuorumSize; j++) {
+ int nodeIndex = (i + j) % ensembleSize;
+ if (!covered[nodeIndex]) {
+ nodesNotCovered++;
}
}
+ if (nodesNotCovered >= ackQuorumSize) {
+ return false;
+ }
}
- return false;
+ return true;
}
}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/RoundRobinDistributionScheduleTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/RoundRobinDistributionScheduleTest.java
index 8f173ef..23cf9d8 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/RoundRobinDistributionScheduleTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/RoundRobinDistributionScheduleTest.java
@@ -22,6 +22,10 @@
package org.apache.bookkeeper.client;
import java.util.List;
+import java.util.Set;
+import java.util.HashSet;
+import com.google.common.collect.Sets;
+
import org.junit.Test;
import static org.junit.Assert.*;
@@ -44,21 +48,86 @@
assertFalse("Shouldn't ack yet", ackSet.addBookieAndCheck(wSet.get(0)));
assertTrue("Should ack after 2 unique", ackSet.addBookieAndCheck(wSet.get(2)));
assertTrue("Should still be acking", ackSet.addBookieAndCheck(wSet.get(1)));
-
- DistributionSchedule.QuorumCoverageSet covSet = schedule.getCoverageSet();
- assertFalse("Shouldn't cover yet", covSet.addBookieAndCheckCovered(0));
- assertFalse("Shouldn't cover yet", covSet.addBookieAndCheckCovered(2));
- assertTrue("Should cover now", covSet.addBookieAndCheckCovered(3));
-
- covSet = schedule.getCoverageSet();
- assertFalse("Shouldn't cover yet", covSet.addBookieAndCheckCovered(0));
- assertFalse("Shouldn't cover yet", covSet.addBookieAndCheckCovered(1));
- assertFalse("Shouldn't cover yet", covSet.addBookieAndCheckCovered(2));
- assertTrue("Should cover now", covSet.addBookieAndCheckCovered(3));
-
- covSet = schedule.getCoverageSet();
- assertFalse("Shouldn't cover yet", covSet.addBookieAndCheckCovered(4));
- assertFalse("Shouldn't cover yet", covSet.addBookieAndCheckCovered(0));
- assertTrue("Should cover now", covSet.addBookieAndCheckCovered(2));
}
-}
\ No newline at end of file
+
+ /**
+ * Test that coverage sets only respond as covered when it has
+ * heard from enough bookies that no ack quorum can exist without these bookies.
+ */
+ @Test(timeout=60000)
+ public void testCoverageSets() {
+ int errors = 0;
+ for (int e = 6; e > 0; e--) {
+ for (int w = e; w > 0; w--) {
+ for (int a = w; a > 0; a--) {
+ errors += testCoverageForConfiguration(e, w, a);
+ }
+ }
+ }
+ assertEquals("Should be no errors", 0, errors);
+ }
+
+ /**
+ * Build a boolean array of which nodes have not responded
+ * and thus are available to build a quorum.
+ */
+ boolean[] buildAvailable(int ensemble, Set<Integer> responses) {
+ boolean[] available = new boolean[ensemble];
+ for (int i = 0; i < ensemble; i++) {
+ if (responses.contains(i)) {
+ available[i] = false;
+ } else {
+ available[i] = true;
+ }
+ }
+ return available;
+ }
+
+ /**
+ * Check whether it is possible for a write to reach
+ * a quorum with a given set of nodes available
+ */
+ boolean canGetAckQuorum(int ensemble, int writeQuorum, int ackQuorum, boolean[] available) {
+ for (int i = 0; i < ensemble; i++) {
+ int count = 0;
+ for (int j = 0; j < writeQuorum; j++) {
+ if (available[(i+j)%ensemble]) {
+ count++;
+ }
+ }
+ if (count >= ackQuorum) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private int testCoverageForConfiguration(int ensemble, int writeQuorum, int ackQuorum) {
+ RoundRobinDistributionSchedule schedule = new RoundRobinDistributionSchedule(
+ writeQuorum, ackQuorum, ensemble);
+ Set<Integer> indexes = new HashSet<Integer>();
+ for (int i = 0; i < ensemble; i++) {
+ indexes.add(i);
+ }
+ Set<Set<Integer>> subsets = Sets.powerSet(indexes);
+
+ int errors = 0;
+ for (Set<Integer> subset : subsets) {
+ DistributionSchedule.QuorumCoverageSet covSet = schedule.getCoverageSet();
+ boolean covSetSays = false;
+ for (Integer i : subset) {
+ covSetSays = covSet.addBookieAndCheckCovered(i);
+ }
+
+ boolean[] nodesAvailable = buildAvailable(ensemble, subset);
+ boolean canGetAck = canGetAckQuorum(ensemble, writeQuorum, ackQuorum, nodesAvailable);
+ if (canGetAck == covSetSays) {
+ LOG.error("e{}:w{}:a{} available {} canGetAck {} covSetSays {}",
+ new Object[] { ensemble, writeQuorum, ackQuorum,
+ nodesAvailable, canGetAck, covSetSays });
+ errors++;
+ }
+ }
+ return errors;
+ }
+}