GOSSIP-74 Phi calculation or sampling is wrong in FailureDetector
diff --git a/src/main/java/org/apache/gossip/GossipSettings.java b/src/main/java/org/apache/gossip/GossipSettings.java
index bcea75c..6b2bf8b 100644
--- a/src/main/java/org/apache/gossip/GossipSettings.java
+++ b/src/main/java/org/apache/gossip/GossipSettings.java
@@ -30,19 +30,18 @@
   private int gossipInterval = 10;
 
   /** Time between cleanups in ms. Default is 10 seconds. */
-  private int cleanupInterval = 10000;
+  private int cleanupInterval = 5000;
 
   /** the minimum samples needed before reporting a result */
-  private int minimumSamples = 1;
+  private int minimumSamples = 5;
   
   /** the number of samples to keep per host */
   private int windowSize = 5000;
   
   /** the threshold for the detector */
-  //private double convictThreshold = 2.606201185901408;
-  private double convictThreshold = 2.606201185901408;
+  private double convictThreshold = 10;
   
-  private String distribution = "exponential";
+  private String distribution = "normal";
   
   private String activeGossipClass = "org.apache.gossip.manager.SimpleActiveGossipper";
   
diff --git a/src/main/java/org/apache/gossip/LocalMember.java b/src/main/java/org/apache/gossip/LocalMember.java
index db6e3f7..450bce5 100644
--- a/src/main/java/org/apache/gossip/LocalMember.java
+++ b/src/main/java/org/apache/gossip/LocalMember.java
@@ -43,7 +43,7 @@
   public LocalMember(String clusterName, URI uri, String id,
           long heartbeat, Map<String,String> properties, int windowSize, int minSamples, String distribution) {
     super(clusterName, uri, id, heartbeat, properties );
-    detector = new FailureDetector(this, minSamples, windowSize, distribution);
+    detector = new FailureDetector(minSamples, windowSize, distribution);
   }
 
   protected LocalMember(){
diff --git a/src/main/java/org/apache/gossip/accrual/FailureDetector.java b/src/main/java/org/apache/gossip/accrual/FailureDetector.java
index 22e73db..5abd5c6 100644
--- a/src/main/java/org/apache/gossip/accrual/FailureDetector.java
+++ b/src/main/java/org/apache/gossip/accrual/FailureDetector.java
@@ -21,72 +21,60 @@
 import org.apache.commons.math.distribution.ExponentialDistributionImpl;
 import org.apache.commons.math.distribution.NormalDistributionImpl;
 import org.apache.commons.math.stat.descriptive.DescriptiveStatistics;
-import org.apache.gossip.LocalMember;
 import org.apache.log4j.Logger;
 
 public class FailureDetector {
 
-  private static final Logger LOGGER = Logger.getLogger(FailureDetector.class);
+  public static final Logger LOGGER = Logger.getLogger(FailureDetector.class);
   private final DescriptiveStatistics descriptiveStatistics;
   private final long minimumSamples;
   private volatile long latestHeartbeatMs = -1;
-  private final LocalMember parent;
   private final String distribution;
-  
-  public FailureDetector(LocalMember parent, long minimumSamples, int windowSize, String distribution){
-    this.parent = parent;
+
+  public FailureDetector(long minimumSamples, int windowSize, String distribution) {
     descriptiveStatistics = new DescriptiveStatistics(windowSize);
     this.minimumSamples = minimumSamples;
     this.distribution = distribution;
   }
-  
+
   /**
-   * Updates the statistics based on the delta between the last 
+   * Updates the statistics based on the delta between the last
    * heartbeat and supplied time
+   *
    * @param now the time of the heartbeat in milliseconds
    */
-  public void recordHeartbeat(long now){
-    if (now < latestHeartbeatMs)
-      return;
-    if (now - latestHeartbeatMs == 0){
+  public synchronized void recordHeartbeat(long now) {
+    if (now <= latestHeartbeatMs) {
       return;
     }
-    synchronized (descriptiveStatistics) {
-      if (latestHeartbeatMs != -1){
-        descriptiveStatistics.addValue(now - latestHeartbeatMs);
-      } else {
-        latestHeartbeatMs = now;
-      }
+    if (latestHeartbeatMs != -1) {
+      descriptiveStatistics.addValue(now - latestHeartbeatMs);
     }
+    latestHeartbeatMs = now;
   }
-  
-  public Double computePhiMeasure(long now)  {
+
+  public synchronized Double computePhiMeasure(long now) {
     if (latestHeartbeatMs == -1 || descriptiveStatistics.getN() < minimumSamples) {
-      LOGGER.debug(
-              String.format( "%s latests %s samples %s minumumSamples %s", parent.getId(), 
-                      latestHeartbeatMs, descriptiveStatistics.getN(), minimumSamples));
       return null;
     }
-    synchronized (descriptiveStatistics) {
-      long delta = now - latestHeartbeatMs;
-      try {
-        double probability = 0.0;
-        if (distribution.equals("normal")){
-          double variance = descriptiveStatistics.getVariance();
-          probability = 1.0d - new NormalDistributionImpl(descriptiveStatistics.getMean(), 
-                  variance == 0 ? 0.1 : variance).cumulativeProbability(delta);
-        } else {
-          probability = 1.0d - new ExponentialDistributionImpl(descriptiveStatistics.getMean()).cumulativeProbability(delta);
-        }
-        return -1.0d * Math.log10(probability);
-      } catch (MathException | IllegalArgumentException e) {
-        StringBuilder sb = new StringBuilder();
-        for ( double d: descriptiveStatistics.getSortedValues()){
-          sb.append(d + " ");
-        }
-        LOGGER.debug(e.getMessage() +" "+ sb.toString() +" "+ descriptiveStatistics);
-        throw new IllegalArgumentException(e);
+    long delta = now - latestHeartbeatMs;
+    try {
+      double probability;
+      if (distribution.equals("normal")) {
+        double standardDeviation = descriptiveStatistics.getStandardDeviation();
+        standardDeviation = standardDeviation < 0.1 ? 0.1 : standardDeviation;
+        probability = new NormalDistributionImpl(descriptiveStatistics.getMean(), standardDeviation).cumulativeProbability(delta);
+      } else {
+        probability = new ExponentialDistributionImpl(descriptiveStatistics.getMean()).cumulativeProbability(delta);
       }
+      final double eps = 1e-12;
+      if (1 - probability < eps) {
+        probability = 1.0;
+      }
+      return -1.0d * Math.log10(1.0d - probability);
+    } catch (MathException | IllegalArgumentException e) {
+      LOGGER.debug(e);
+      return null;
     }
   }
 }
diff --git a/src/main/java/org/apache/gossip/examples/StandAloneDatacenterAndRack.java b/src/main/java/org/apache/gossip/examples/StandAloneDatacenterAndRack.java
index 357e316..497894c 100644
--- a/src/main/java/org/apache/gossip/examples/StandAloneDatacenterAndRack.java
+++ b/src/main/java/org/apache/gossip/examples/StandAloneDatacenterAndRack.java
@@ -34,9 +34,8 @@
 
   public static void main (String [] args) throws UnknownHostException, InterruptedException {
     GossipSettings s = new GossipSettings();
-    s.setWindowSize(10);
-    s.setConvictThreshold(1.0);
-    s.setGossipInterval(1000);
+    s.setWindowSize(1000);
+    s.setGossipInterval(100);
     s.setActiveGossipClass(DatacenterRackAwareActiveGossiper.class.getName());
     Map<String, String> gossipProps = new HashMap<>();
     gossipProps.put("sameRackGossipIntervalMs", "2000");
diff --git a/src/main/java/org/apache/gossip/examples/StandAloneNode.java b/src/main/java/org/apache/gossip/examples/StandAloneNode.java
index b38865e..93421b1 100644
--- a/src/main/java/org/apache/gossip/examples/StandAloneNode.java
+++ b/src/main/java/org/apache/gossip/examples/StandAloneNode.java
@@ -28,9 +28,8 @@
 public class StandAloneNode {
   public static void main (String [] args) throws UnknownHostException, InterruptedException{
     GossipSettings s = new GossipSettings();
-    s.setWindowSize(10);
-    s.setConvictThreshold(1.0);
-    s.setGossipInterval(10);
+    s.setWindowSize(1000);
+    s.setGossipInterval(100);
     GossipManager gossipService = GossipManagerBuilder.newBuilder()
             .cluster("mycluster")
             .uri(URI.create(args[0]))
diff --git a/src/main/java/org/apache/gossip/examples/StandAloneNodeCrdtOrSet.java b/src/main/java/org/apache/gossip/examples/StandAloneNodeCrdtOrSet.java
index 0c5c0d5..d78cf5e 100644
--- a/src/main/java/org/apache/gossip/examples/StandAloneNodeCrdtOrSet.java
+++ b/src/main/java/org/apache/gossip/examples/StandAloneNodeCrdtOrSet.java
@@ -33,9 +33,8 @@
 public class StandAloneNodeCrdtOrSet {
   public static void main (String [] args) throws InterruptedException, IOException{
     GossipSettings s = new GossipSettings();
-    s.setWindowSize(10);
-    s.setConvictThreshold(1.0);
-    s.setGossipInterval(10);
+    s.setWindowSize(1000);
+    s.setGossipInterval(100);
     GossipManager gossipService = GossipManagerBuilder.newBuilder()
             .cluster("mycluster")
             .uri(URI.create(args[0]))
diff --git a/src/main/java/org/apache/gossip/manager/GossipManager.java b/src/main/java/org/apache/gossip/manager/GossipManager.java
index d2f5d20..c2b50ae 100644
--- a/src/main/java/org/apache/gossip/manager/GossipManager.java
+++ b/src/main/java/org/apache/gossip/manager/GossipManager.java
@@ -51,7 +51,6 @@
   private final LocalMember me;
   private final GossipSettings settings;
   private final AtomicBoolean gossipServiceRunning;
-  private final GossipListener listener;
   private AbstractActiveGossiper activeGossipThread;
   private PassiveGossipThread passiveGossipThread;
   private ExecutorService gossipThreadExecutor;
@@ -92,7 +91,6 @@
     }
     gossipThreadExecutor = Executors.newCachedThreadPool();
     gossipServiceRunning = new AtomicBoolean(true);
-    this.listener = listener;
     this.scheduledServiced = Executors.newScheduledThreadPool(1);
     this.registry = registry;
     this.ringState = new RingStatePersister(this);
diff --git a/src/main/java/org/apache/gossip/manager/GossipMemberStateRefresher.java b/src/main/java/org/apache/gossip/manager/GossipMemberStateRefresher.java
index ad2e055..1836309 100644
--- a/src/main/java/org/apache/gossip/manager/GossipMemberStateRefresher.java
+++ b/src/main/java/org/apache/gossip/manager/GossipMemberStateRefresher.java
@@ -62,27 +62,21 @@
       boolean userDown = processOptimisticShutdown(entry);
       if (userDown)
         continue;
-      try {
-        Double phiMeasure = entry.getKey().detect(clock.nanoTime());
-        if (phiMeasure != null) {
-          GossipState requiredState = calcRequiredState(phiMeasure);
-          if (entry.getValue() != requiredState) {
-            members.put(entry.getKey(), requiredState);
-            listener.gossipEvent(entry.getKey(), requiredState);
-          }
-        }
-      } catch (IllegalArgumentException ex) {
-        //0.0 returns throws exception computing the mean.
 
-        long now = clock.nanoTime();
-        long nowInMillis = TimeUnit.MILLISECONDS.convert(now, TimeUnit.NANOSECONDS);
-        if (nowInMillis - settings.getCleanupInterval() > entry.getKey().getHeartbeat() && entry.getValue() == GossipState.UP) {
-          LOGGER.warn("Marking down");
-          members.put(entry.getKey(), GossipState.DOWN);
-          listener.gossipEvent(entry.getKey(), GossipState.DOWN);
-        }
-      } //end catch
-    } // end for
+      Double phiMeasure = entry.getKey().detect(clock.nanoTime());
+      GossipState requiredState;
+
+      if (phiMeasure != null) {
+        requiredState = calcRequiredState(phiMeasure);
+      } else {
+        requiredState = calcRequiredStateCleanupInterval(entry.getKey(), entry.getValue());
+      }
+
+      if (entry.getValue() != requiredState) {
+        members.put(entry.getKey(), requiredState);
+        listener.gossipEvent(entry.getKey(), requiredState);
+      }
+    }
   }
 
   public GossipState calcRequiredState(Double phiMeasure) {
@@ -92,6 +86,16 @@
       return GossipState.UP;
   }
 
+  public GossipState calcRequiredStateCleanupInterval(LocalMember member, GossipState state) {
+    long now = clock.nanoTime();
+    long nowInMillis = TimeUnit.MILLISECONDS.convert(now, TimeUnit.NANOSECONDS);
+    if (nowInMillis - settings.getCleanupInterval() > member.getHeartbeat()) {
+      return GossipState.DOWN;
+    } else {
+      return state;
+    }
+  }
+
   /**
    * If we have a special key the per-node data that means that the node has sent us
    * a pre-emptive shutdown message. We process this so node is seen down sooner
diff --git a/src/main/java/org/apache/gossip/manager/handlers/MessageInvokerCombiner.java b/src/main/java/org/apache/gossip/manager/handlers/MessageInvokerCombiner.java
index cc6ef52..5faf6a5 100644
--- a/src/main/java/org/apache/gossip/manager/handlers/MessageInvokerCombiner.java
+++ b/src/main/java/org/apache/gossip/manager/handlers/MessageInvokerCombiner.java
@@ -24,7 +24,6 @@
 
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.function.Predicate;
 
 public class MessageInvokerCombiner implements MessageInvoker {
   private final List<MessageInvoker> invokers = new CopyOnWriteArrayList<>();
diff --git a/src/test/java/org/apache/gossip/IdAndPropertyTest.java b/src/test/java/org/apache/gossip/IdAndPropertyTest.java
index 7c02d2d..7f550de 100644
--- a/src/test/java/org/apache/gossip/IdAndPropertyTest.java
+++ b/src/test/java/org/apache/gossip/IdAndPropertyTest.java
@@ -63,7 +63,7 @@
     y.put("datacenter", "dc2");
     y.put("rack", "rack2");
     GossipManager gossipService2 = GossipManagerBuilder.newBuilder().cluster("a")
-            .uri( new URI("udp://" + "127.0.0.1" + ":" + (29000 + 1)))
+            .uri( new URI("udp://" + "127.0.0.1" + ":" + (29000 + 10)))
             .id("1")
             .properties(y)
             .gossipMembers(Arrays.asList(new RemoteMember("a",
diff --git a/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java b/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java
index a91480e..54005c3 100644
--- a/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java
+++ b/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java
@@ -47,7 +47,7 @@
   @Test
   public void DeadNodesDoNotComeAliveAgain()
           throws InterruptedException, UnknownHostException, URISyntaxException {
-    GossipSettings settings = new GossipSettings(100, 10000, 1000, 1, 2.0, "exponential");
+    GossipSettings settings = new GossipSettings(100, 10000, 1000, 1, 10.0, "normal");
     settings.setPersistRingState(false);
     settings.setPersistDataState(false);
     String cluster = UUID.randomUUID().toString();
diff --git a/src/test/java/org/apache/gossip/accrual/FailureDetectorTest.java b/src/test/java/org/apache/gossip/accrual/FailureDetectorTest.java
index b00c0c3..3434c17 100644
--- a/src/test/java/org/apache/gossip/accrual/FailureDetectorTest.java
+++ b/src/test/java/org/apache/gossip/accrual/FailureDetectorTest.java
@@ -17,59 +17,97 @@
  */
 package org.apache.gossip.accrual;
 
-import java.net.URI;
-
-import org.apache.gossip.LocalMember;
+import org.apache.gossip.GossipSettings;
 import org.junit.Assert;
-import org.junit.Ignore;
 import org.junit.jupiter.api.Test;
 import org.junit.platform.runner.JUnitPlatform;
 import org.junit.runner.RunWith;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
 @RunWith(JUnitPlatform.class)
 public class FailureDetectorTest {
-  
-  @Test
-  public void aNormalTest(){
-    int samples = 1;
-    int windowSize = 1000;
-    LocalMember member = new LocalMember("", URI.create("udp://127.0.0.1:1000"), 
-            "", 0L, null, windowSize, samples, "normal");
-    member.recordHeartbeat(5);
-    member.recordHeartbeat(10);
-    Assert.assertEquals(new Double(0.3010299956639812), member.detect(10));
+
+  @FunctionalInterface
+  interface TriConsumer<A, B, C> {
+    void accept(A a, B b, C c);
+  }
+
+  static final Double failureThreshold = new GossipSettings().getConvictThreshold();
+
+  List<Integer> generateTimeList(int begin, int end, int step) {
+    List<Integer> values = new ArrayList<>();
+    Random rand = new Random();
+    for (int i = begin; i < end; i += step) {
+      int delta = (int) ((rand.nextDouble() - 0.5) * step / 2);
+
+      values.add(i + delta);
+    }
+    return values;
   }
 
   @Test
-  public void aTest(){
-    int samples = 1;
-    int windowSize = 1000;
-    LocalMember member = new LocalMember("", URI.create("udp://127.0.0.1:1000"), 
-            "", 0L, null, windowSize, samples, "exponential");
-    member.recordHeartbeat(5);
-    member.recordHeartbeat(10);
-    Assert.assertEquals(new Double(0.4342944819032518), member.detect(10));
-    Assert.assertEquals(new Double(0.5211533782839021), member.detect(11));
-    Assert.assertEquals(new Double(1.3028834457097553), member.detect(20));
-    Assert.assertEquals(new Double(3.9), member.detect(50), .01);
-    Assert.assertEquals(new Double(8.25), member.detect(100), .01);
-    Assert.assertEquals(new Double(12.6), member.detect(150), .01);
-    Assert.assertEquals(new Double(14.77), member.detect(175), .01);
-    Assert.assertEquals(new Double(Double.POSITIVE_INFINITY), member.detect(500), .01);
-    member.recordHeartbeat(4);   
-    Assert.assertEquals(new Double(12.6), member.detect(150), .01);
+  public void normalDistribution() {
+    FailureDetector fd = new FailureDetector(1, 1000, "normal");
+    List<Integer> values = generateTimeList(0, 10000, 100);
+    Double deltaSum = 0.0;
+    Integer deltaCount = 0;
+    for (int i = 0; i < values.size() - 1; i++) {
+      fd.recordHeartbeat(values.get(i));
+      if (i != 0) {
+        deltaSum += values.get(i) - values.get(i - 1);
+        deltaCount++;
+      }
+    }
+    Integer lastRecorded = values.get(values.size() - 2);
+
+    //after "step" delay we need to be considered UP
+    Assert.assertTrue(fd.computePhiMeasure(values.get(values.size() - 1)) < failureThreshold);
+
+    //if we check phi-measure after mean delay we get value for 0.5 probability(normal distribution)
+    Assert.assertEquals(fd.computePhiMeasure(lastRecorded + Math.round(deltaSum / deltaCount)), -Math.log10(0.5), 0.1);
   }
-  
-  @Ignore
-  public void sameHeartbeatTest(){
-    int samples = 1;
-    int windowSize = 1000;
-    LocalMember member = new LocalMember("", URI.create("udp://127.0.0.1:1000"), 
-            "", 0L, null, windowSize, samples, "exponential");
-    member.recordHeartbeat(5);
-    member.recordHeartbeat(5);
-    member.recordHeartbeat(5);
-    Assert.assertEquals(new Double(0.4342944819032518), member.detect(10));
+
+  @Test
+  public void checkMinimumSamples() {
+    Integer minimumSamples = 5;
+    FailureDetector fd = new FailureDetector(minimumSamples, 1000, "normal");
+    for (int i = 0; i < minimumSamples + 1; i++) { // +1 because we don't place first heartbeat into structure
+      Assert.assertNull(fd.computePhiMeasure(100));
+      fd.recordHeartbeat(i);
+    }
+    Assert.assertNotNull(fd.computePhiMeasure(100));
   }
-  
+
+  @Test
+  public void checkMonotonicDead() {
+    final FailureDetector fd = new FailureDetector(5, 1000, "normal");
+    TriConsumer<Integer, Integer, Integer> checkAlive = (begin, end, step) -> {
+      List<Integer> times = generateTimeList(begin, end, step);
+      for (int i = 0; i < times.size(); i++) {
+        Double current = fd.computePhiMeasure(times.get(i));
+        if (current != null) {
+          Assert.assertTrue(current < failureThreshold);
+        }
+        fd.recordHeartbeat(times.get(i));
+      }
+    };
+
+    TriConsumer<Integer, Integer, Integer> checkDeadMonotonic = (begin, end, step) -> {
+      List<Integer> times = generateTimeList(begin, end, step);
+      Double prev = null;
+      for (int i = 0; i < times.size(); i++) {
+        Double current = fd.computePhiMeasure(times.get(i));
+        if (current != null && prev != null) {
+          Assert.assertTrue(current >= prev);
+        }
+        prev = current;
+      }
+    };
+
+    checkAlive.accept(0, 20000, 100);
+    checkDeadMonotonic.accept(20000, 20500, 5);
+  }
 }