GOSSIP-74 Phi calculation or sampling is wrong in FailureDetector
diff --git a/src/main/java/org/apache/gossip/GossipSettings.java b/src/main/java/org/apache/gossip/GossipSettings.java
index bcea75c..6b2bf8b 100644
--- a/src/main/java/org/apache/gossip/GossipSettings.java
+++ b/src/main/java/org/apache/gossip/GossipSettings.java
@@ -30,19 +30,18 @@
private int gossipInterval = 10;
/** Time between cleanups in ms. Default is 10 seconds. */
- private int cleanupInterval = 10000;
+ private int cleanupInterval = 5000;
/** the minimum samples needed before reporting a result */
- private int minimumSamples = 1;
+ private int minimumSamples = 5;
/** the number of samples to keep per host */
private int windowSize = 5000;
/** the threshold for the detector */
- //private double convictThreshold = 2.606201185901408;
- private double convictThreshold = 2.606201185901408;
+ private double convictThreshold = 10;
- private String distribution = "exponential";
+ private String distribution = "normal";
private String activeGossipClass = "org.apache.gossip.manager.SimpleActiveGossipper";
diff --git a/src/main/java/org/apache/gossip/LocalMember.java b/src/main/java/org/apache/gossip/LocalMember.java
index db6e3f7..450bce5 100644
--- a/src/main/java/org/apache/gossip/LocalMember.java
+++ b/src/main/java/org/apache/gossip/LocalMember.java
@@ -43,7 +43,7 @@
public LocalMember(String clusterName, URI uri, String id,
long heartbeat, Map<String,String> properties, int windowSize, int minSamples, String distribution) {
super(clusterName, uri, id, heartbeat, properties );
- detector = new FailureDetector(this, minSamples, windowSize, distribution);
+ detector = new FailureDetector(minSamples, windowSize, distribution);
}
protected LocalMember(){
diff --git a/src/main/java/org/apache/gossip/accrual/FailureDetector.java b/src/main/java/org/apache/gossip/accrual/FailureDetector.java
index 22e73db..5abd5c6 100644
--- a/src/main/java/org/apache/gossip/accrual/FailureDetector.java
+++ b/src/main/java/org/apache/gossip/accrual/FailureDetector.java
@@ -21,72 +21,60 @@
import org.apache.commons.math.distribution.ExponentialDistributionImpl;
import org.apache.commons.math.distribution.NormalDistributionImpl;
import org.apache.commons.math.stat.descriptive.DescriptiveStatistics;
-import org.apache.gossip.LocalMember;
import org.apache.log4j.Logger;
public class FailureDetector {
- private static final Logger LOGGER = Logger.getLogger(FailureDetector.class);
+ public static final Logger LOGGER = Logger.getLogger(FailureDetector.class);
private final DescriptiveStatistics descriptiveStatistics;
private final long minimumSamples;
private volatile long latestHeartbeatMs = -1;
- private final LocalMember parent;
private final String distribution;
-
- public FailureDetector(LocalMember parent, long minimumSamples, int windowSize, String distribution){
- this.parent = parent;
+
+ public FailureDetector(long minimumSamples, int windowSize, String distribution) {
descriptiveStatistics = new DescriptiveStatistics(windowSize);
this.minimumSamples = minimumSamples;
this.distribution = distribution;
}
-
+
/**
- * Updates the statistics based on the delta between the last
+ * Updates the statistics based on the delta between the last
* heartbeat and supplied time
+ *
* @param now the time of the heartbeat in milliseconds
*/
- public void recordHeartbeat(long now){
- if (now < latestHeartbeatMs)
- return;
- if (now - latestHeartbeatMs == 0){
+ public synchronized void recordHeartbeat(long now) {
+ if (now <= latestHeartbeatMs) {
return;
}
- synchronized (descriptiveStatistics) {
- if (latestHeartbeatMs != -1){
- descriptiveStatistics.addValue(now - latestHeartbeatMs);
- } else {
- latestHeartbeatMs = now;
- }
+ if (latestHeartbeatMs != -1) {
+ descriptiveStatistics.addValue(now - latestHeartbeatMs);
}
+ latestHeartbeatMs = now;
}
-
- public Double computePhiMeasure(long now) {
+
+ public synchronized Double computePhiMeasure(long now) {
if (latestHeartbeatMs == -1 || descriptiveStatistics.getN() < minimumSamples) {
- LOGGER.debug(
- String.format( "%s latests %s samples %s minumumSamples %s", parent.getId(),
- latestHeartbeatMs, descriptiveStatistics.getN(), minimumSamples));
return null;
}
- synchronized (descriptiveStatistics) {
- long delta = now - latestHeartbeatMs;
- try {
- double probability = 0.0;
- if (distribution.equals("normal")){
- double variance = descriptiveStatistics.getVariance();
- probability = 1.0d - new NormalDistributionImpl(descriptiveStatistics.getMean(),
- variance == 0 ? 0.1 : variance).cumulativeProbability(delta);
- } else {
- probability = 1.0d - new ExponentialDistributionImpl(descriptiveStatistics.getMean()).cumulativeProbability(delta);
- }
- return -1.0d * Math.log10(probability);
- } catch (MathException | IllegalArgumentException e) {
- StringBuilder sb = new StringBuilder();
- for ( double d: descriptiveStatistics.getSortedValues()){
- sb.append(d + " ");
- }
- LOGGER.debug(e.getMessage() +" "+ sb.toString() +" "+ descriptiveStatistics);
- throw new IllegalArgumentException(e);
+ long delta = now - latestHeartbeatMs;
+ try {
+ double probability;
+ if (distribution.equals("normal")) {
+ double standardDeviation = descriptiveStatistics.getStandardDeviation();
+ standardDeviation = standardDeviation < 0.1 ? 0.1 : standardDeviation;
+ probability = new NormalDistributionImpl(descriptiveStatistics.getMean(), standardDeviation).cumulativeProbability(delta);
+ } else {
+ probability = new ExponentialDistributionImpl(descriptiveStatistics.getMean()).cumulativeProbability(delta);
}
+ final double eps = 1e-12;
+ if (1 - probability < eps) {
+ probability = 1.0;
+ }
+ return -1.0d * Math.log10(1.0d - probability);
+ } catch (MathException | IllegalArgumentException e) {
+ LOGGER.debug(e);
+ return null;
}
}
}
diff --git a/src/main/java/org/apache/gossip/examples/StandAloneDatacenterAndRack.java b/src/main/java/org/apache/gossip/examples/StandAloneDatacenterAndRack.java
index 357e316..497894c 100644
--- a/src/main/java/org/apache/gossip/examples/StandAloneDatacenterAndRack.java
+++ b/src/main/java/org/apache/gossip/examples/StandAloneDatacenterAndRack.java
@@ -34,9 +34,8 @@
public static void main (String [] args) throws UnknownHostException, InterruptedException {
GossipSettings s = new GossipSettings();
- s.setWindowSize(10);
- s.setConvictThreshold(1.0);
- s.setGossipInterval(1000);
+ s.setWindowSize(1000);
+ s.setGossipInterval(100);
s.setActiveGossipClass(DatacenterRackAwareActiveGossiper.class.getName());
Map<String, String> gossipProps = new HashMap<>();
gossipProps.put("sameRackGossipIntervalMs", "2000");
diff --git a/src/main/java/org/apache/gossip/examples/StandAloneNode.java b/src/main/java/org/apache/gossip/examples/StandAloneNode.java
index b38865e..93421b1 100644
--- a/src/main/java/org/apache/gossip/examples/StandAloneNode.java
+++ b/src/main/java/org/apache/gossip/examples/StandAloneNode.java
@@ -28,9 +28,8 @@
public class StandAloneNode {
public static void main (String [] args) throws UnknownHostException, InterruptedException{
GossipSettings s = new GossipSettings();
- s.setWindowSize(10);
- s.setConvictThreshold(1.0);
- s.setGossipInterval(10);
+ s.setWindowSize(1000);
+ s.setGossipInterval(100);
GossipManager gossipService = GossipManagerBuilder.newBuilder()
.cluster("mycluster")
.uri(URI.create(args[0]))
diff --git a/src/main/java/org/apache/gossip/examples/StandAloneNodeCrdtOrSet.java b/src/main/java/org/apache/gossip/examples/StandAloneNodeCrdtOrSet.java
index 0c5c0d5..d78cf5e 100644
--- a/src/main/java/org/apache/gossip/examples/StandAloneNodeCrdtOrSet.java
+++ b/src/main/java/org/apache/gossip/examples/StandAloneNodeCrdtOrSet.java
@@ -33,9 +33,8 @@
public class StandAloneNodeCrdtOrSet {
public static void main (String [] args) throws InterruptedException, IOException{
GossipSettings s = new GossipSettings();
- s.setWindowSize(10);
- s.setConvictThreshold(1.0);
- s.setGossipInterval(10);
+ s.setWindowSize(1000);
+ s.setGossipInterval(100);
GossipManager gossipService = GossipManagerBuilder.newBuilder()
.cluster("mycluster")
.uri(URI.create(args[0]))
diff --git a/src/main/java/org/apache/gossip/manager/GossipManager.java b/src/main/java/org/apache/gossip/manager/GossipManager.java
index d2f5d20..c2b50ae 100644
--- a/src/main/java/org/apache/gossip/manager/GossipManager.java
+++ b/src/main/java/org/apache/gossip/manager/GossipManager.java
@@ -51,7 +51,6 @@
private final LocalMember me;
private final GossipSettings settings;
private final AtomicBoolean gossipServiceRunning;
- private final GossipListener listener;
private AbstractActiveGossiper activeGossipThread;
private PassiveGossipThread passiveGossipThread;
private ExecutorService gossipThreadExecutor;
@@ -92,7 +91,6 @@
}
gossipThreadExecutor = Executors.newCachedThreadPool();
gossipServiceRunning = new AtomicBoolean(true);
- this.listener = listener;
this.scheduledServiced = Executors.newScheduledThreadPool(1);
this.registry = registry;
this.ringState = new RingStatePersister(this);
diff --git a/src/main/java/org/apache/gossip/manager/GossipMemberStateRefresher.java b/src/main/java/org/apache/gossip/manager/GossipMemberStateRefresher.java
index ad2e055..1836309 100644
--- a/src/main/java/org/apache/gossip/manager/GossipMemberStateRefresher.java
+++ b/src/main/java/org/apache/gossip/manager/GossipMemberStateRefresher.java
@@ -62,27 +62,21 @@
boolean userDown = processOptimisticShutdown(entry);
if (userDown)
continue;
- try {
- Double phiMeasure = entry.getKey().detect(clock.nanoTime());
- if (phiMeasure != null) {
- GossipState requiredState = calcRequiredState(phiMeasure);
- if (entry.getValue() != requiredState) {
- members.put(entry.getKey(), requiredState);
- listener.gossipEvent(entry.getKey(), requiredState);
- }
- }
- } catch (IllegalArgumentException ex) {
- //0.0 returns throws exception computing the mean.
- long now = clock.nanoTime();
- long nowInMillis = TimeUnit.MILLISECONDS.convert(now, TimeUnit.NANOSECONDS);
- if (nowInMillis - settings.getCleanupInterval() > entry.getKey().getHeartbeat() && entry.getValue() == GossipState.UP) {
- LOGGER.warn("Marking down");
- members.put(entry.getKey(), GossipState.DOWN);
- listener.gossipEvent(entry.getKey(), GossipState.DOWN);
- }
- } //end catch
- } // end for
+ Double phiMeasure = entry.getKey().detect(clock.nanoTime());
+ GossipState requiredState;
+
+ if (phiMeasure != null) {
+ requiredState = calcRequiredState(phiMeasure);
+ } else {
+ requiredState = calcRequiredStateCleanupInterval(entry.getKey(), entry.getValue());
+ }
+
+ if (entry.getValue() != requiredState) {
+ members.put(entry.getKey(), requiredState);
+ listener.gossipEvent(entry.getKey(), requiredState);
+ }
+ }
}
public GossipState calcRequiredState(Double phiMeasure) {
@@ -92,6 +86,16 @@
return GossipState.UP;
}
+ public GossipState calcRequiredStateCleanupInterval(LocalMember member, GossipState state) {
+ long now = clock.nanoTime();
+ long nowInMillis = TimeUnit.MILLISECONDS.convert(now, TimeUnit.NANOSECONDS);
+ if (nowInMillis - settings.getCleanupInterval() > member.getHeartbeat()) {
+ return GossipState.DOWN;
+ } else {
+ return state;
+ }
+ }
+
/**
* If we have a special key the per-node data that means that the node has sent us
* a pre-emptive shutdown message. We process this so node is seen down sooner
diff --git a/src/main/java/org/apache/gossip/manager/handlers/MessageInvokerCombiner.java b/src/main/java/org/apache/gossip/manager/handlers/MessageInvokerCombiner.java
index cc6ef52..5faf6a5 100644
--- a/src/main/java/org/apache/gossip/manager/handlers/MessageInvokerCombiner.java
+++ b/src/main/java/org/apache/gossip/manager/handlers/MessageInvokerCombiner.java
@@ -24,7 +24,6 @@
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.function.Predicate;
public class MessageInvokerCombiner implements MessageInvoker {
private final List<MessageInvoker> invokers = new CopyOnWriteArrayList<>();
diff --git a/src/test/java/org/apache/gossip/IdAndPropertyTest.java b/src/test/java/org/apache/gossip/IdAndPropertyTest.java
index 7c02d2d..7f550de 100644
--- a/src/test/java/org/apache/gossip/IdAndPropertyTest.java
+++ b/src/test/java/org/apache/gossip/IdAndPropertyTest.java
@@ -63,7 +63,7 @@
y.put("datacenter", "dc2");
y.put("rack", "rack2");
GossipManager gossipService2 = GossipManagerBuilder.newBuilder().cluster("a")
- .uri( new URI("udp://" + "127.0.0.1" + ":" + (29000 + 1)))
+ .uri( new URI("udp://" + "127.0.0.1" + ":" + (29000 + 10)))
.id("1")
.properties(y)
.gossipMembers(Arrays.asList(new RemoteMember("a",
diff --git a/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java b/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java
index a91480e..54005c3 100644
--- a/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java
+++ b/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java
@@ -47,7 +47,7 @@
@Test
public void DeadNodesDoNotComeAliveAgain()
throws InterruptedException, UnknownHostException, URISyntaxException {
- GossipSettings settings = new GossipSettings(100, 10000, 1000, 1, 2.0, "exponential");
+ GossipSettings settings = new GossipSettings(100, 10000, 1000, 1, 10.0, "normal");
settings.setPersistRingState(false);
settings.setPersistDataState(false);
String cluster = UUID.randomUUID().toString();
diff --git a/src/test/java/org/apache/gossip/accrual/FailureDetectorTest.java b/src/test/java/org/apache/gossip/accrual/FailureDetectorTest.java
index b00c0c3..3434c17 100644
--- a/src/test/java/org/apache/gossip/accrual/FailureDetectorTest.java
+++ b/src/test/java/org/apache/gossip/accrual/FailureDetectorTest.java
@@ -17,59 +17,97 @@
*/
package org.apache.gossip.accrual;
-import java.net.URI;
-
-import org.apache.gossip.LocalMember;
+import org.apache.gossip.GossipSettings;
import org.junit.Assert;
-import org.junit.Ignore;
import org.junit.jupiter.api.Test;
import org.junit.platform.runner.JUnitPlatform;
import org.junit.runner.RunWith;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
@RunWith(JUnitPlatform.class)
public class FailureDetectorTest {
-
- @Test
- public void aNormalTest(){
- int samples = 1;
- int windowSize = 1000;
- LocalMember member = new LocalMember("", URI.create("udp://127.0.0.1:1000"),
- "", 0L, null, windowSize, samples, "normal");
- member.recordHeartbeat(5);
- member.recordHeartbeat(10);
- Assert.assertEquals(new Double(0.3010299956639812), member.detect(10));
+
+ @FunctionalInterface
+ interface TriConsumer<A, B, C> {
+ void accept(A a, B b, C c);
+ }
+
+ static final Double failureThreshold = new GossipSettings().getConvictThreshold();
+
+ List<Integer> generateTimeList(int begin, int end, int step) {
+ List<Integer> values = new ArrayList<>();
+ Random rand = new Random();
+ for (int i = begin; i < end; i += step) {
+ int delta = (int) ((rand.nextDouble() - 0.5) * step / 2);
+
+ values.add(i + delta);
+ }
+ return values;
}
@Test
- public void aTest(){
- int samples = 1;
- int windowSize = 1000;
- LocalMember member = new LocalMember("", URI.create("udp://127.0.0.1:1000"),
- "", 0L, null, windowSize, samples, "exponential");
- member.recordHeartbeat(5);
- member.recordHeartbeat(10);
- Assert.assertEquals(new Double(0.4342944819032518), member.detect(10));
- Assert.assertEquals(new Double(0.5211533782839021), member.detect(11));
- Assert.assertEquals(new Double(1.3028834457097553), member.detect(20));
- Assert.assertEquals(new Double(3.9), member.detect(50), .01);
- Assert.assertEquals(new Double(8.25), member.detect(100), .01);
- Assert.assertEquals(new Double(12.6), member.detect(150), .01);
- Assert.assertEquals(new Double(14.77), member.detect(175), .01);
- Assert.assertEquals(new Double(Double.POSITIVE_INFINITY), member.detect(500), .01);
- member.recordHeartbeat(4);
- Assert.assertEquals(new Double(12.6), member.detect(150), .01);
+ public void normalDistribution() {
+ FailureDetector fd = new FailureDetector(1, 1000, "normal");
+ List<Integer> values = generateTimeList(0, 10000, 100);
+ Double deltaSum = 0.0;
+ Integer deltaCount = 0;
+ for (int i = 0; i < values.size() - 1; i++) {
+ fd.recordHeartbeat(values.get(i));
+ if (i != 0) {
+ deltaSum += values.get(i) - values.get(i - 1);
+ deltaCount++;
+ }
+ }
+ Integer lastRecorded = values.get(values.size() - 2);
+
+ //after "step" delay we need to be considered UP
+ Assert.assertTrue(fd.computePhiMeasure(values.get(values.size() - 1)) < failureThreshold);
+
+ //if we check phi-measure after mean delay we get value for 0.5 probability(normal distribution)
+ Assert.assertEquals(fd.computePhiMeasure(lastRecorded + Math.round(deltaSum / deltaCount)), -Math.log10(0.5), 0.1);
}
-
- @Ignore
- public void sameHeartbeatTest(){
- int samples = 1;
- int windowSize = 1000;
- LocalMember member = new LocalMember("", URI.create("udp://127.0.0.1:1000"),
- "", 0L, null, windowSize, samples, "exponential");
- member.recordHeartbeat(5);
- member.recordHeartbeat(5);
- member.recordHeartbeat(5);
- Assert.assertEquals(new Double(0.4342944819032518), member.detect(10));
+
+ @Test
+ public void checkMinimumSamples() {
+ Integer minimumSamples = 5;
+ FailureDetector fd = new FailureDetector(minimumSamples, 1000, "normal");
+ for (int i = 0; i < minimumSamples + 1; i++) { // +1 because we don't place first heartbeat into structure
+ Assert.assertNull(fd.computePhiMeasure(100));
+ fd.recordHeartbeat(i);
+ }
+ Assert.assertNotNull(fd.computePhiMeasure(100));
}
-
+
+ @Test
+ public void checkMonotonicDead() {
+ final FailureDetector fd = new FailureDetector(5, 1000, "normal");
+ TriConsumer<Integer, Integer, Integer> checkAlive = (begin, end, step) -> {
+ List<Integer> times = generateTimeList(begin, end, step);
+ for (int i = 0; i < times.size(); i++) {
+ Double current = fd.computePhiMeasure(times.get(i));
+ if (current != null) {
+ Assert.assertTrue(current < failureThreshold);
+ }
+ fd.recordHeartbeat(times.get(i));
+ }
+ };
+
+ TriConsumer<Integer, Integer, Integer> checkDeadMonotonic = (begin, end, step) -> {
+ List<Integer> times = generateTimeList(begin, end, step);
+ Double prev = null;
+ for (int i = 0; i < times.size(); i++) {
+ Double current = fd.computePhiMeasure(times.get(i));
+ if (current != null && prev != null) {
+ Assert.assertTrue(current >= prev);
+ }
+ prev = current;
+ }
+ };
+
+ checkAlive.accept(0, 20000, 100);
+ checkDeadMonotonic.accept(20000, 20500, 5);
+ }
}