Merge branch 'master' of https://github.com/goby/incubator-gossip
diff --git a/pom.xml b/pom.xml
index ad78c91..71db8ed 100644
--- a/pom.xml
+++ b/pom.xml
@@ -39,6 +39,7 @@
<!-- dependecy versions -->
<jackson.version>2.8.5</jackson.version>
+ <commons-math.version>1.2</commons-math.version>
<junit.jupiter.version>5.0.0-M2</junit.jupiter.version>
<junit.platform.version>1.0.0-M2</junit.platform.version>
<junit.vintage.version>4.12.0-M2</junit.vintage.version>
@@ -80,6 +81,11 @@
<version>${jackson.version}</version>
</dependency>
<dependency>
+ <groupId>commons-math</groupId>
+ <artifactId>commons-math</artifactId>
+ <version>${commons-math.version}</version>
+ </dependency>
+ <dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
diff --git a/src/main/java/org/apache/gossip/GossipSettings.java b/src/main/java/org/apache/gossip/GossipSettings.java
index 99b5807..12037fd 100644
--- a/src/main/java/org/apache/gossip/GossipSettings.java
+++ b/src/main/java/org/apache/gossip/GossipSettings.java
@@ -25,11 +25,20 @@
public class GossipSettings {
/** Time between gossip'ing in ms. Default is 1 second. */
- private int gossipInterval = 1000;
+ private int gossipInterval = 10;
/** Time between cleanups in ms. Default is 10 seconds. */
private int cleanupInterval = 10000;
+ /** the minimum samples needed before reporting a result */
+ private int minimumSamples = 1;
+
+ /** the number of samples to keep per host */
+ private int windowSize = 5000;
+
+ /** the threshold for the detector */
+ //private double convictThreshold = 2.606201185901408;
+ private double convictThreshold = 4.5;
/**
* Construct GossipSettings with default settings.
*/
@@ -44,9 +53,12 @@
* @param cleanupInterval
* The cleanup interval in ms.
*/
- public GossipSettings(int gossipInterval, int cleanupInterval) {
+ public GossipSettings(int gossipInterval, int cleanupInterval, int windowSize, int minimumSamples, double convictThreshold) {
this.gossipInterval = gossipInterval;
this.cleanupInterval = cleanupInterval;
+ this.windowSize = windowSize;
+ this.minimumSamples = minimumSamples;
+ this.convictThreshold = convictThreshold;
}
/**
@@ -87,4 +99,33 @@
public int getCleanupInterval() {
return cleanupInterval;
}
+
+ public int getMinimumSamples() {
+ return minimumSamples;
+ }
+
+ public void setMinimumSamples(int minimumSamples) {
+ this.minimumSamples = minimumSamples;
+ }
+
+ public int getWindowSize() {
+ return windowSize;
+ }
+
+ public void setWindowSize(int windowSize) {
+ this.windowSize = windowSize;
+ }
+
+ public double getConvictThreshold() {
+ return convictThreshold;
+ }
+
+ public void setConvictThreshold(double convictThreshold) {
+ this.convictThreshold = convictThreshold;
+ }
+
+ public void setGossipInterval(int gossipInterval) {
+ this.gossipInterval = gossipInterval;
+ }
+
}
diff --git a/src/main/java/org/apache/gossip/LocalGossipMember.java b/src/main/java/org/apache/gossip/LocalGossipMember.java
index 97f4ab6..df3bb47 100644
--- a/src/main/java/org/apache/gossip/LocalGossipMember.java
+++ b/src/main/java/org/apache/gossip/LocalGossipMember.java
@@ -19,52 +19,50 @@
import java.net.URI;
-import javax.management.NotificationListener;
+import org.apache.gossip.accrual.FailureDetector;
/**
* This object represent a gossip member with the properties known locally. These objects are stored
- * in the local list of gossip member.s
+ * in the local list of gossip members.
*
- * @author harmenw
*/
public class LocalGossipMember extends GossipMember {
- /** The timeout timer for this gossip member. */
- private final transient GossipTimeoutTimer timeoutTimer;
+ /** The failure detector for this member */
+ private transient final FailureDetector detector;
/**
- * Constructor.
*
* @param uri
* The uri of the member
* @param id
* id of the node
* @param heartbeat
- * The current heartbeat.
- * @param notificationListener
- * @param cleanupTimeout
- * The cleanup timeout for this gossip member.
+ * The current heartbeat
*/
public LocalGossipMember(String clusterName, URI uri, String id,
- long heartbeat, NotificationListener notificationListener, int cleanupTimeout) {
+ long heartbeat, int windowSize, int minSamples) {
super(clusterName, uri, id, heartbeat);
- timeoutTimer = new GossipTimeoutTimer(cleanupTimeout, notificationListener, this);
+ detector = new FailureDetector(this, minSamples, windowSize);
}
- /**
- * Start the timeout timer.
- */
- public void startTimeoutTimer() {
- timeoutTimer.start();
+ public void recordHeartbeat(long now){
+ detector.recordHeartbeat(now);
+ }
+
+ public Double detect(long now) {
+ return detector.computePhiMeasure(now);
}
- /**
- * Reset the timeout timer.
- */
- public void resetTimeoutTimer() {
- timeoutTimer.reset();
+ @Override
+ public String toString() {
+ Double d = null;
+ try {
+ d = detect(System.nanoTime());
+ } catch (RuntimeException ex) {}
+ return "LocalGossipMember [uri=" + uri + ", heartbeat=" + heartbeat + ", clusterName="
+ + clusterName + ", id=" + id + ", currentdetect=" + d +" ]";
}
- public void disableTimer() {
- timeoutTimer.removeAllNotifications();
- }
+
+
}
diff --git a/src/main/java/org/apache/gossip/StartupSettings.java b/src/main/java/org/apache/gossip/StartupSettings.java
index 08f7975..38ccbf3 100644
--- a/src/main/java/org/apache/gossip/StartupSettings.java
+++ b/src/main/java/org/apache/gossip/StartupSettings.java
@@ -17,10 +17,8 @@
*/
package org.apache.gossip;
-import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
-import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
@@ -161,28 +159,24 @@
*/
public static StartupSettings fromJSONFile(File jsonFile) throws
FileNotFoundException, IOException, URISyntaxException {
- // Read the file to a String.
- StringBuffer buffer = new StringBuffer();
- try (BufferedReader br = new BufferedReader(new FileReader(jsonFile)) ){
- String line;
- while ((line = br.readLine()) != null) {
- buffer.append(line.trim());
- }
- }
ObjectMapper om = new ObjectMapper();
JsonNode root = om.readTree(jsonFile);
JsonNode jsonObject = root.get(0);
String uri = jsonObject.get("uri").textValue();
String id = jsonObject.get("id").textValue();
+ //TODO constants as defaults?
int gossipInterval = jsonObject.get("gossip_interval").intValue();
int cleanupInterval = jsonObject.get("cleanup_interval").intValue();
+ int windowSize = jsonObject.get("window_size").intValue();
+ int minSamples = jsonObject.get("minimum_samples").intValue();
+ double convictThreshold = jsonObject.get("convict_threshold").asDouble();
String cluster = jsonObject.get("cluster").textValue();
if (cluster == null){
throw new IllegalArgumentException("cluster was null. It is required");
}
URI uri2 = new URI(uri);
- StartupSettings settings = new StartupSettings(id, uri2, new GossipSettings(gossipInterval,
- cleanupInterval), cluster);
+ StartupSettings settings = new StartupSettings(id, uri2,
+ new GossipSettings(gossipInterval, cleanupInterval, windowSize, minSamples, convictThreshold), cluster);
String configMembersDetails = "Config-members [";
JsonNode membersJSON = jsonObject.get("members");
Iterator<JsonNode> it = membersJSON.iterator();
@@ -196,8 +190,6 @@
configMembersDetails += ", ";
}
log.info(configMembersDetails + "]");
-
- // Return the created settings object.
return settings;
}
}
diff --git a/src/main/java/org/apache/gossip/accrual/FailureDetector.java b/src/main/java/org/apache/gossip/accrual/FailureDetector.java
new file mode 100644
index 0000000..296e79f
--- /dev/null
+++ b/src/main/java/org/apache/gossip/accrual/FailureDetector.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.accrual;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.math.MathException;
+import org.apache.commons.math.distribution.ExponentialDistributionImpl;
+import org.apache.commons.math.stat.descriptive.DescriptiveStatistics;
+import org.apache.gossip.LocalGossipMember;
+import org.apache.log4j.Logger;
+
+public class FailureDetector {
+
+ private static final Logger LOGGER = Logger.getLogger(FailureDetector.class);
+ private final DescriptiveStatistics descriptiveStatistics;
+ private final long minimumSamples;
+ private volatile long latestHeartbeatMs = -1;
+ private final LocalGossipMember parent;
+
+ public FailureDetector(LocalGossipMember parent, long minimumSamples, int windowSize){
+ this.parent = parent;
+ descriptiveStatistics = new DescriptiveStatistics(windowSize);
+ this.minimumSamples = minimumSamples;
+ }
+
+ /**
+ * Updates the statistics based on the delta between the last
+ * heartbeat and supplied time
+ * @param now the time of the heartbeat in milliseconds
+ */
+ public void recordHeartbeat(long now){
+ if (now < latestHeartbeatMs)
+ return;
+ synchronized (descriptiveStatistics) {
+ if (latestHeartbeatMs != -1){
+ descriptiveStatistics.addValue(now - latestHeartbeatMs);
+ } else {
+ latestHeartbeatMs = now;
+ }
+ }
+ }
+
+ public Double computePhiMeasure(long now) {
+ if (latestHeartbeatMs == -1 || descriptiveStatistics.getN() < minimumSamples) {
+ LOGGER.debug(
+ String.format( "%s latests %s samples %s minumumSamples %s", parent.getId(), latestHeartbeatMs, descriptiveStatistics.getN(), minimumSamples));
+ return null;
+ }
+ synchronized (descriptiveStatistics) {
+ long delta = now - latestHeartbeatMs;
+ try {
+ //double probability = 1.0d - new NormalDistributionImpl(descriptiveStatistics.getMean(), descriptiveStatistics.getVariance()).cumulativeProbability(delta);
+ double probability = 1.0d - new ExponentialDistributionImpl(descriptiveStatistics.getMean()).cumulativeProbability(delta);
+ //LOGGER.warn (parent.getId() + " worked "+ -1.0d * Math.log10(probability));
+ return -1.0d * Math.log10(probability);
+ } catch (MathException | IllegalArgumentException e) {
+ //LOGGER.warn(parent.getId() + " Exception while computing phi", e);
+ //LOGGER.warn(descriptiveStatistics);
+ //LOGGER.warn(descriptiveStatistics.getMean());
+ List<Double> x = new ArrayList<>();
+ for (double z : descriptiveStatistics.getValues()){
+ x.add(z);
+ }
+ //LOGGER.warn(x);
+ //LOGGER.warn(parent.getId() + " " + descriptiveStatistics);
+ throw new IllegalArgumentException(e);
+ }
+ }
+ }
+}
diff --git a/src/main/java/org/apache/gossip/examples/StandAloneNode.java b/src/main/java/org/apache/gossip/examples/StandAloneNode.java
new file mode 100644
index 0000000..b6784cc
--- /dev/null
+++ b/src/main/java/org/apache/gossip/examples/StandAloneNode.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.examples;
+
+import java.net.URI;
+import java.net.UnknownHostException;
+import java.util.Arrays;
+
+import org.apache.gossip.GossipService;
+import org.apache.gossip.GossipSettings;
+import org.apache.gossip.RemoteGossipMember;
+
+public class StandAloneNode {
+ public static void main (String [] args) throws UnknownHostException, InterruptedException{
+ GossipSettings s = new GossipSettings();
+ s.setWindowSize(10);
+ s.setConvictThreshold(1.0);
+ s.setGossipInterval(10);
+ GossipService gossipService = new GossipService("mycluster", URI.create(args[0]), args[1],
+ Arrays.asList( new RemoteGossipMember("mycluster", URI.create(args[2]), args[3])), s, (a,b) -> {} );
+ gossipService.start();
+ while (true){
+ System.out.println( "Live: " + gossipService.getGossipManager().getLiveMembers());
+ System.out.println( "Dead: " + gossipService.getGossipManager().getDeadMembers());
+ Thread.sleep(2000);
+ }
+ }
+}
diff --git a/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java b/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java
index 29f4688..e6248dc 100644
--- a/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java
+++ b/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java
@@ -24,9 +24,12 @@
import java.util.Map.Entry;
import java.util.Random;
import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.gossip.LocalGossipMember;
@@ -56,6 +59,8 @@
private final Random random;
private final GossipCore gossipCore;
private ScheduledExecutorService scheduledExecutorService;
+ private final BlockingQueue<Runnable> workQueue;
+ private ThreadPoolExecutor threadService;
private ObjectMapper MAPPER = new ObjectMapper();
public ActiveGossipThread(GossipManager gossipManager, GossipCore gossipCore) {
@@ -63,14 +68,18 @@
random = new Random();
this.gossipCore = gossipCore;
this.scheduledExecutorService = Executors.newScheduledThreadPool(2);
+ workQueue = new ArrayBlockingQueue<Runnable>(1024);
+ threadService = new ThreadPoolExecutor(1, 30, 1, TimeUnit.SECONDS, workQueue, new ThreadPoolExecutor.DiscardOldestPolicy());
}
public void init() {
scheduledExecutorService.scheduleAtFixedRate(
- () -> sendMembershipList(gossipManager.getMyself(), gossipManager.getLiveMembers()), 0,
+ () -> {
+ threadService.execute( () -> { sendToALiveMember(); });
+ }, 0,
gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
scheduledExecutorService.scheduleAtFixedRate(
- () -> sendMembershipList(gossipManager.getMyself(), gossipManager.getDeadMembers()), 0,
+ () -> { this.sendToDeadMember(); }, 0,
gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
scheduledExecutorService.scheduleAtFixedRate(
() -> sendPerNodeData(gossipManager.getMyself(), gossipManager.getLiveMembers()), 0,
@@ -155,26 +164,33 @@
}
}
+ protected void sendToALiveMember(){
+ LocalGossipMember member = selectPartner(gossipManager.getLiveMembers());
+ sendMembershipList(gossipManager.getMyself(), member);
+ }
+
+ protected void sendToDeadMember(){
+ LocalGossipMember member = selectPartner(gossipManager.getDeadMembers());
+ sendMembershipList(gossipManager.getMyself(), member);
+ }
/**
* Performs the sending of the membership list, after we have incremented our own heartbeat.
*/
- protected void sendMembershipList(LocalGossipMember me, List<LocalGossipMember> memberList) {
- me.setHeartbeat(System.currentTimeMillis());
- LocalGossipMember member = selectPartner(memberList);
+ protected void sendMembershipList(LocalGossipMember me, LocalGossipMember member) {
+ me.setHeartbeat(System.nanoTime());
if (member == null) {
LOGGER.debug("Send sendMembershipList() is called without action");
return;
} else {
LOGGER.debug("Send sendMembershipList() is called to " + member.toString());
}
-
try (DatagramSocket socket = new DatagramSocket()) {
socket.setSoTimeout(gossipManager.getSettings().getGossipInterval());
UdpActiveGossipMessage message = new UdpActiveGossipMessage();
message.setUriFrom(gossipManager.getMyself().getUri().toASCIIString());
message.setUuid(UUID.randomUUID().toString());
message.getMembers().add(convert(me));
- for (LocalGossipMember other : memberList) {
+ for (LocalGossipMember other : gossipManager.getMembers().keySet()) {
message.getMembers().add(convert(other));
}
byte[] json_bytes = MAPPER.writeValueAsString(message).getBytes();
@@ -184,7 +200,7 @@
if (r instanceof ActiveGossipOk){
//maybe count metrics here
} else {
- LOGGER.warn("Message "+ message + " generated response "+ r);
+ LOGGER.debug("Message " + message + " generated response " + r);
}
} else {
LOGGER.error("The length of the to be send message is too large ("
diff --git a/src/main/java/org/apache/gossip/manager/GossipCore.java b/src/main/java/org/apache/gossip/manager/GossipCore.java
index 0ab56a0..eaea8f6 100644
--- a/src/main/java/org/apache/gossip/manager/GossipCore.java
+++ b/src/main/java/org/apache/gossip/manager/GossipCore.java
@@ -25,6 +25,7 @@
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map.Entry;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
@@ -33,12 +34,12 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.ThreadPoolExecutor.DiscardPolicy;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.gossip.GossipMember;
import org.apache.gossip.LocalGossipMember;
import org.apache.gossip.RemoteGossipMember;
+import org.apache.gossip.event.GossipState;
import org.apache.gossip.model.ActiveGossipMessage;
import org.apache.gossip.model.Base;
import org.apache.gossip.model.GossipDataMessage;
@@ -71,7 +72,7 @@
this.gossipManager = manager;
requests = new ConcurrentHashMap<>();
workQueue = new ArrayBlockingQueue<>(1024);
- service = new ThreadPoolExecutor(1, 5, 1, TimeUnit.SECONDS, workQueue, new DiscardPolicy());
+ service = new ThreadPoolExecutor(1, 5, 1, TimeUnit.SECONDS, workQueue, new ThreadPoolExecutor.DiscardOldestPolicy());
perNodeData = new ConcurrentHashMap<>();
sharedData = new ConcurrentHashMap<>();
}
@@ -195,6 +196,11 @@
}
public Response send(Base message, URI uri){
+ if (LOGGER.isDebugEnabled()){
+ LOGGER.debug("Sending " + message);
+ LOGGER.debug("Current request queue " + requests);
+ }
+
final Trackable t;
if (message instanceof Trackable){
t = (Trackable) message;
@@ -223,7 +229,8 @@
});
try {
- return response.get(10, TimeUnit.SECONDS);
+ //TODO this needs to be a setting base on attempts/second
+ return response.get(1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
@@ -261,81 +268,67 @@
/**
- * Merge remote list (received from peer), and our local member list. Simply, we must update the
- * heartbeats that the remote list has with our list. Also, some additional logic is needed to
- * make sure we have not timed out a member and then immediately received a list with that member.
+ * Merge lists from remote members and update heartbeats
*
* @param gossipManager
* @param senderMember
* @param remoteList
*
- * COPIED FROM PASSIVE GOSSIP THREAD
*/
protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember,
List<GossipMember> remoteList) {
-
+ if (LOGGER.isDebugEnabled()){
+ debugState(senderMember, remoteList);
+ }
// if the person sending to us is in the dead list consider them up
for (LocalGossipMember i : gossipManager.getDeadMembers()) {
if (i.getId().equals(senderMember.getId())) {
- LOGGER.info(gossipManager.getMyself() + " contacted by dead member " + senderMember.getUri());
- LocalGossipMember newLocalMember = new LocalGossipMember(senderMember.getClusterName(),
- senderMember.getUri(), senderMember.getId(),
- senderMember.getHeartbeat(), gossipManager, gossipManager.getSettings()
- .getCleanupInterval());
- gossipManager.reviveMember(newLocalMember);
- newLocalMember.startTimeoutTimer();
+ LOGGER.debug(gossipManager.getMyself() + " contacted by dead member " + senderMember.getUri());
+ i.recordHeartbeat(senderMember.getHeartbeat());
+ i.setHeartbeat(senderMember.getHeartbeat());
+ //TODO set node to UP here
+
}
}
for (GossipMember remoteMember : remoteList) {
if (remoteMember.getId().equals(gossipManager.getMyself().getId())) {
continue;
}
- if (gossipManager.getLiveMembers().contains(remoteMember)) {
- LocalGossipMember localMember = gossipManager.getLiveMembers().get(
- gossipManager.getLiveMembers().indexOf(remoteMember));
- if (remoteMember.getHeartbeat() > localMember.getHeartbeat()) {
- localMember.setHeartbeat(remoteMember.getHeartbeat());
- localMember.resetTimeoutTimer();
- }
- } else if (!gossipManager.getLiveMembers().contains(remoteMember)
- && !gossipManager.getDeadMembers().contains(remoteMember)) {
- LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getClusterName(),
- remoteMember.getUri(), remoteMember.getId(),
- remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings()
- .getCleanupInterval());
- gossipManager.createOrReviveMember(newLocalMember);
- newLocalMember.startTimeoutTimer();
- } else {
- if (gossipManager.getDeadMembers().contains(remoteMember)) {
- LocalGossipMember localDeadMember = gossipManager.getDeadMembers().get(
- gossipManager.getDeadMembers().indexOf(remoteMember));
- if (remoteMember.getHeartbeat() > localDeadMember.getHeartbeat()) {
- LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getClusterName(),
- remoteMember.getUri(), remoteMember.getId(),
- remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings()
- .getCleanupInterval());
- gossipManager.reviveMember(newLocalMember);
- newLocalMember.startTimeoutTimer();
- LOGGER.debug("Removed remote member " + remoteMember.getAddress()
- + " from dead list and added to local member list.");
- } else {
- LOGGER.debug("me " + gossipManager.getMyself());
- LOGGER.debug("sender " + senderMember);
- LOGGER.debug("remote " + remoteList);
- LOGGER.debug("live " + gossipManager.getLiveMembers());
- LOGGER.debug("dead " + gossipManager.getDeadMembers());
+ LocalGossipMember m = new LocalGossipMember(remoteMember.getClusterName(),
+ remoteMember.getUri(),
+ remoteMember.getId(),
+ remoteMember.getHeartbeat(),
+ gossipManager.getSettings().getWindowSize(),
+ gossipManager.getSettings().getMinimumSamples());
+ m.recordHeartbeat(remoteMember.getHeartbeat());
+
+ Object result = gossipManager.getMembers().putIfAbsent(m, GossipState.UP);
+ if (result != null){
+ for (Entry<LocalGossipMember, GossipState> l : gossipManager.getMembers().entrySet()){
+ if (l.getKey().getId().equals(remoteMember.getId())){
+ //if (l.getKey().getHeartbeat() < remoteMember.getHeartbeat()){
+ l.getKey().recordHeartbeat(remoteMember.getHeartbeat());
+ l.getKey().setHeartbeat(remoteMember.getHeartbeat());
+ //}
}
- } else {
- LOGGER.debug("me " + gossipManager.getMyself());
- LOGGER.debug("sender " + senderMember);
- LOGGER.debug("remote " + remoteList);
- LOGGER.debug("live " + gossipManager.getLiveMembers());
- LOGGER.debug("dead " + gossipManager.getDeadMembers());
- // throw new IllegalArgumentException("wtf");
}
}
}
+ if (LOGGER.isDebugEnabled()){
+ debugState(senderMember, remoteList);
+ }
}
-
+ private void debugState(RemoteGossipMember senderMember,
+ List<GossipMember> remoteList){
+ LOGGER.warn(
+ "-----------------------\n" +
+ "Me " + gossipManager.getMyself() + "\n" +
+ "Sender " + senderMember + "\n" +
+ "RemoteList " + remoteList + "\n" +
+ "Live " + gossipManager.getLiveMembers()+ "\n" +
+ "Dead " + gossipManager.getDeadMembers()+ "\n" +
+ "=======================");
+ }
+
}
diff --git a/src/main/java/org/apache/gossip/manager/GossipManager.java b/src/main/java/org/apache/gossip/manager/GossipManager.java
index cd6e0a9..2b081d0 100644
--- a/src/main/java/org/apache/gossip/manager/GossipManager.java
+++ b/src/main/java/org/apache/gossip/manager/GossipManager.java
@@ -27,16 +27,13 @@
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import javax.management.Notification;
-import javax.management.NotificationListener;
-
import org.apache.log4j.Logger;
import org.apache.gossip.GossipMember;
-import org.apache.gossip.GossipService;
import org.apache.gossip.GossipSettings;
import org.apache.gossip.LocalGossipMember;
import org.apache.gossip.event.GossipListener;
@@ -47,7 +44,7 @@
import org.apache.gossip.model.SharedGossipDataMessage;
-public abstract class GossipManager implements NotificationListener {
+public abstract class GossipManager {
public static final Logger LOGGER = Logger.getLogger(GossipManager.class);
@@ -74,6 +71,8 @@
private final DataReaper dataReaper;
private final Clock clock;
+
+ private final ScheduledExecutorService scheduledServiced;
public GossipManager(String cluster,
URI uri, String id, GossipSettings settings,
@@ -83,60 +82,26 @@
gossipCore = new GossipCore(this);
clock = new SystemClock();
dataReaper = new DataReaper(gossipCore, clock);
- me = new LocalGossipMember(cluster, uri, id, System.currentTimeMillis(), this,
- settings.getCleanupInterval());
+ me = new LocalGossipMember(cluster, uri, id, clock.nanoTime(),
+ + settings.getWindowSize(), settings.getMinimumSamples());
members = new ConcurrentSkipListMap<>();
for (GossipMember startupMember : gossipMembers) {
if (!startupMember.equals(me)) {
LocalGossipMember member = new LocalGossipMember(startupMember.getClusterName(),
startupMember.getUri(), startupMember.getId(),
- System.currentTimeMillis(), this, settings.getCleanupInterval());
- members.put(member, GossipState.UP);
- GossipService.LOGGER.debug(member);
+ clock.nanoTime(), settings.getWindowSize(), settings.getMinimumSamples());
+ //TODO should members start in down state?
+ members.put(member, GossipState.DOWN);
}
}
gossipThreadExecutor = Executors.newCachedThreadPool();
gossipServiceRunning = new AtomicBoolean(true);
this.listener = listener;
- Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
- public void run() {
- GossipService.LOGGER.debug("Service has been shutdown...");
- }
- }));
+ this.scheduledServiced = Executors.newScheduledThreadPool(1);
}
- /**
- * All timers associated with a member will trigger this method when it goes off. The timer will
- * go off if we have not heard from this member in <code> _settings.T_CLEANUP </code> time.
- */
- @Override
- public void handleNotification(Notification notification, Object handback) {
- LocalGossipMember deadMember = (LocalGossipMember) notification.getUserData();
- GossipService.LOGGER.debug("Dead member detected: " + deadMember);
- members.put(deadMember, GossipState.DOWN);
- if (listener != null) {
- listener.gossipEvent(deadMember, GossipState.DOWN);
- }
- }
-
- public void reviveMember(LocalGossipMember m) {
- for (Entry<LocalGossipMember, GossipState> it : this.members.entrySet()) {
- if (it.getKey().getId().equals(m.getId())) {
- it.getKey().disableTimer();
- }
- }
- members.remove(m);
- members.put(m, GossipState.UP);
- if (listener != null) {
- listener.gossipEvent(m, GossipState.UP);
- }
- }
-
- public void createOrReviveMember(LocalGossipMember m) {
- members.put(m, GossipState.UP);
- if (listener != null) {
- listener.gossipEvent(m, GossipState.UP);
- }
+ public ConcurrentSkipListMap<LocalGossipMember, GossipState> getMembers() {
+ return members;
}
public GossipSettings getSettings() {
@@ -181,17 +146,44 @@
* thread and start the receiver thread.
*/
public void init() {
- for (LocalGossipMember member : members.keySet()) {
- if (member != me) {
- member.startTimeoutTimer();
- }
- }
passiveGossipThread = new OnlyProcessReceivedPassiveGossipThread(this, gossipCore);
gossipThreadExecutor.execute(passiveGossipThread);
activeGossipThread = new ActiveGossipThread(this, this.gossipCore);
activeGossipThread.init();
dataReaper.init();
- GossipService.LOGGER.debug("The GossipService is started.");
+ scheduledServiced.scheduleAtFixedRate(() -> {
+ try {
+ for (Entry<LocalGossipMember, GossipState> entry : members.entrySet()) {
+ Double result = null;
+ try {
+ result = entry.getKey().detect(clock.nanoTime());
+ //System.out.println(entry.getKey() +" "+ result);
+ if (result != null) {
+ if (result > settings.getConvictThreshold() && entry.getValue() == GossipState.UP) {
+ members.put(entry.getKey(), GossipState.DOWN);
+ listener.gossipEvent(entry.getKey(), GossipState.DOWN);
+ }
+ if (result <= settings.getConvictThreshold() && entry.getValue() == GossipState.DOWN) {
+ members.put(entry.getKey(), GossipState.UP);
+ listener.gossipEvent(entry.getKey(), GossipState.UP);
+ }
+ }
+ } catch (IllegalArgumentException ex) {
+ //0.0 returns throws exception computing the mean.
+ long now = clock.nanoTime();
+ long nowInMillis = TimeUnit.MILLISECONDS.convert(now,TimeUnit.NANOSECONDS);
+ if (nowInMillis - settings.getCleanupInterval() > entry.getKey().getHeartbeat() && entry.getValue() == GossipState.UP){
+ LOGGER.warn("Marking down");
+ members.put(entry.getKey(), GossipState.DOWN);
+ listener.gossipEvent(entry.getKey(), GossipState.DOWN);
+ }
+ } //end catch
+ } // end for
+ } catch (RuntimeException ex) {
+ LOGGER.warn("scheduled state had exception", ex);
+ }
+ }, 0, 100, TimeUnit.MILLISECONDS);
+ LOGGER.debug("The GossipManager is started.");
}
/**
diff --git a/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java b/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java
index 26541ca..1444181 100644
--- a/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java
+++ b/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java
@@ -84,8 +84,8 @@
checkArgument(cluster != null, "You must specify a cluster name");
checkArgument(settings != null, "You must specify gossip settings");
checkArgument(uri != null, "You must specify a uri");
- if (this.gossipMembers == null) {
- this.gossipMembers = new ArrayList<>();
+ if (gossipMembers == null) {
+ gossipMembers = new ArrayList<>();
}
return new RandomGossipManager(cluster, uri, id, settings, gossipMembers, listener);
}
diff --git a/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java b/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java
index 49eac46..1cdb9ac 100644
--- a/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java
+++ b/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java
@@ -23,6 +23,7 @@
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.UUID;
@@ -31,8 +32,6 @@
import org.apache.log4j.Logger;
-import org.apache.gossip.event.GossipListener;
-import org.apache.gossip.event.GossipState;
import org.junit.platform.runner.JUnitPlatform;
import org.junit.jupiter.api.Test;
@@ -46,31 +45,20 @@
@Test
public void DeadNodesDoNotComeAliveAgain()
throws InterruptedException, UnknownHostException, URISyntaxException {
- GossipSettings settings = new GossipSettings(1000, 10000);
+ GossipSettings settings = new GossipSettings(1000, 10000, 1000, 1, 5.0);
String cluster = UUID.randomUUID().toString();
-
- log.info("Adding seed nodes");
int seedNodes = 3;
List<GossipMember> startupMembers = new ArrayList<>();
for (int i = 1; i < seedNodes + 1; ++i) {
- URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
+ URI uri = new URI("udp://" + "127.0.0.1" + ":" + (30300 + i));
startupMembers.add(new RemoteGossipMember(cluster, uri, i + ""));
}
-
- log.info("Adding clients");
- final List<GossipService> clients = new ArrayList<>();
+ final List<GossipService> clients = Collections.synchronizedList(new ArrayList<GossipService>());
final int clusterMembers = 5;
for (int i = 1; i < clusterMembers + 1; ++i) {
- final int j = i;
- URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
+ URI uri = new URI("udp://" + "127.0.0.1" + ":" + (30300 + i));
GossipService gossipService = new GossipService(cluster, uri, i + "", startupMembers,
- settings, new GossipListener() {
- @Override
- public void gossipEvent(GossipMember member, GossipState state) {
- System.out.println(System.currentTimeMillis() + " Member " + j + " reports "
- + member + " " + state);
- }
- });
+ settings, (a,b) -> {});
clients.add(gossipService);
gossipService.start();
}
@@ -100,7 +88,7 @@
}
return total;
}
- }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(16);
+ }).afterWaitingAtMost(40, TimeUnit.SECONDS).isEqualTo(16);
clients.remove(randomClientId);
TUnit.assertThat(new Callable<Integer>() {
@@ -111,17 +99,12 @@
}
return total;
}
- }).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(4);
+ }).afterWaitingAtMost(30, TimeUnit.SECONDS).isEqualTo(4);
URI uri = new URI("udp://" + "127.0.0.1" + ":" + shutdownPort);
// start client again
GossipService gossipService = new GossipService(cluster, uri, shutdownId + "", startupMembers,
- settings, new GossipListener() {
- @Override
- public void gossipEvent(GossipMember member, GossipState state) {
- // System.out.println("revived " + member+" "+ state);
- }
- });
+ settings, (a,b) -> {});
clients.add(gossipService);
gossipService.start();
@@ -134,7 +117,7 @@
}
return total;
}
- }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(20);
+ }).afterWaitingAtMost(60, TimeUnit.SECONDS).isEqualTo(20);
for (int i = 0; i < clusterMembers; ++i) {
clients.get(i).shutdown();
diff --git a/src/test/java/org/apache/gossip/StartupSettingsTest.java b/src/test/java/org/apache/gossip/StartupSettingsTest.java
index 1c0826b..73c758a 100644
--- a/src/test/java/org/apache/gossip/StartupSettingsTest.java
+++ b/src/test/java/org/apache/gossip/StartupSettingsTest.java
@@ -48,28 +48,16 @@
@Test
public void testUsingSettingsFile() throws IOException, InterruptedException, URISyntaxException {
File settingsFile = File.createTempFile("gossipTest",".json");
- log.debug( "Using settings file: " + settingsFile.getAbsolutePath() );
settingsFile.deleteOnExit();
writeSettingsFile(settingsFile);
URI uri = new URI("udp://" + "127.0.0.1" + ":" + 50000);
final GossipService firstService = new GossipService(
- CLUSTER, uri, UUID.randomUUID().toString(),
+ CLUSTER, uri, "1",
new ArrayList<GossipMember>(), new GossipSettings(), null);
-
firstService.start();
-
- TUnit.assertThat(new Callable<Integer> (){
- public Integer call() throws Exception {
- return firstService.getGossipManager().getLiveMembers().size();
- }}).afterWaitingAtMost(30, TimeUnit.SECONDS).isEqualTo(0);
final GossipService serviceUnderTest = new GossipService(
- StartupSettings.fromJSONFile( settingsFile )
- );
+ StartupSettings.fromJSONFile(settingsFile));
serviceUnderTest.start();
- TUnit.assertThat(new Callable<Integer> (){
- public Integer call() throws Exception {
- return serviceUnderTest.getGossipManager().getLiveMembers().size();
- }}).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(1);
firstService.shutdown();
serviceUnderTest.shutdown();
}
@@ -78,10 +66,13 @@
String settings =
"[{\n" + // It is odd that this is meant to be in an array, but oh well.
" \"cluster\":\"" + CLUSTER + "\",\n" +
- " \"id\":\"" + UUID.randomUUID() + "\",\n" +
+ " \"id\":\"" + "2" + "\",\n" +
" \"uri\":\"udp://127.0.0.1:50001\",\n" +
" \"gossip_interval\":1000,\n" +
+ " \"window_size\":1000,\n" +
+ " \"minimum_samples\":5,\n" +
" \"cleanup_interval\":10000,\n" +
+ " \"convict_threshold\":2.6,\n" +
" \"members\":[\n" +
" {\"cluster\": \"" + CLUSTER + "\",\"uri\":\"udp://127.0.0.1:5000\"}\n" +
" ]\n" +
diff --git a/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java b/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java
index b663a6e..aa9e2e8 100644
--- a/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java
+++ b/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java
@@ -29,54 +29,39 @@
import java.util.concurrent.TimeUnit;
import org.junit.platform.runner.JUnitPlatform;
import org.junit.runner.RunWith;
-import org.apache.log4j.Logger;
-
-import org.apache.gossip.event.GossipListener;
-import org.apache.gossip.event.GossipState;
import org.junit.jupiter.api.Test;
@RunWith(JUnitPlatform.class)
public class TenNodeThreeSeedTest {
- private static final Logger log = Logger.getLogger( TenNodeThreeSeedTest.class );
@Test
- public void test() throws UnknownHostException, InterruptedException, URISyntaxException{
- abc();
+ public void test() throws UnknownHostException, InterruptedException, URISyntaxException {
+ abc(30150);
}
@Test
public void testAgain() throws UnknownHostException, InterruptedException, URISyntaxException{
- abc();
+ abc(30100);
}
- public void abc() throws InterruptedException, UnknownHostException, URISyntaxException{
+ public void abc(int base) throws InterruptedException, UnknownHostException, URISyntaxException{
GossipSettings settings = new GossipSettings();
String cluster = UUID.randomUUID().toString();
-
- log.info( "Adding seed nodes" );
int seedNodes = 3;
List<GossipMember> startupMembers = new ArrayList<>();
for (int i = 1; i < seedNodes+1; ++i) {
- URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
+ URI uri = new URI("udp://" + "127.0.0.1" + ":" + (base + i));
startupMembers.add(new RemoteGossipMember(cluster, uri, i + ""));
}
-
- log.info( "Adding clients" );
final List<GossipService> clients = new ArrayList<>();
final int clusterMembers = 5;
for (int i = 1; i < clusterMembers+1; ++i) {
- URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
+ URI uri = new URI("udp://" + "127.0.0.1" + ":" + (base + i));
GossipService gossipService = new GossipService(cluster, uri, i + "",
- startupMembers, settings,
- new GossipListener(){
- @Override
- public void gossipEvent(GossipMember member, GossipState state) {
- log.info(member+" "+ state);
- }
- });
+ startupMembers, settings, (a,b) -> {});
clients.add(gossipService);
gossipService.start();
- }
+ }
TUnit.assertThat(new Callable<Integer> (){
public Integer call() throws Exception {
int total = 0;
@@ -84,8 +69,8 @@
total += clients.get(i).getGossipManager().getLiveMembers().size();
}
return total;
- }}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(20);
-
+ }}).afterWaitingAtMost(40, TimeUnit.SECONDS).isEqualTo(20);
+
for (int i = 0; i < clusterMembers; ++i) {
clients.get(i).shutdown();
}
diff --git a/src/test/java/org/apache/gossip/accrual/FailureDetectorTest.java b/src/test/java/org/apache/gossip/accrual/FailureDetectorTest.java
new file mode 100644
index 0000000..f9ff9ff
--- /dev/null
+++ b/src/test/java/org/apache/gossip/accrual/FailureDetectorTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.accrual;
+
+import java.net.URI;
+
+import org.apache.gossip.LocalGossipMember;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.jupiter.api.Test;
+import org.junit.platform.runner.JUnitPlatform;
+import org.junit.runner.RunWith;
+
+@RunWith(JUnitPlatform.class)
+public class FailureDetectorTest {
+
+ @Test
+ public void aTest(){
+ int samples = 1;
+ int windowSize = 1000;
+ LocalGossipMember member = new LocalGossipMember("", URI.create("udp://127.0.0.1:1000"), "", 0L, windowSize, samples);
+ member.recordHeartbeat(5);
+ member.recordHeartbeat(10);
+ Assert.assertEquals(new Double(0.4342944819032518), member.detect(10));
+ Assert.assertEquals(new Double(0.5211533782839021), member.detect(11));
+ Assert.assertEquals(new Double(1.3028834457097553), member.detect(20));
+ Assert.assertEquals(new Double(3.9), member.detect(50), .01);
+ Assert.assertEquals(new Double(8.25), member.detect(100), .01);
+ Assert.assertEquals(new Double(12.6), member.detect(150), .01);
+ Assert.assertEquals(new Double(14.77), member.detect(175), .01);
+ Assert.assertEquals(new Double(Double.POSITIVE_INFINITY), member.detect(500), .01);
+ member.recordHeartbeat(4);
+ Assert.assertEquals(new Double(12.6), member.detect(150), .01);
+ }
+
+ @Ignore
+ public void sameHeartbeatTest(){
+ int samples = 1;
+ int windowSize = 1000;
+ LocalGossipMember member = new LocalGossipMember("", URI.create("udp://127.0.0.1:1000"), "", 0L, windowSize, samples);
+ member.recordHeartbeat(5);
+ member.recordHeartbeat(5);
+ member.recordHeartbeat(5);
+ Assert.assertEquals(new Double(0.4342944819032518), member.detect(10));
+ }
+
+}
diff --git a/src/test/java/org/apache/gossip/manager/DataReaperTest.java b/src/test/java/org/apache/gossip/manager/DataReaperTest.java
index d19c802..1b5c35e 100644
--- a/src/test/java/org/apache/gossip/manager/DataReaperTest.java
+++ b/src/test/java/org/apache/gossip/manager/DataReaperTest.java
@@ -37,7 +37,8 @@
String value = "a";
GossipSettings settings = new GossipSettings();
GossipManager gm = RandomGossipManager.newBuilder().cluster("abc").settings(settings)
- .withId(myId).uri(URI.create("udp://localhost:5000")).build();
+ .withId(myId).uri(URI.create("udp://localhost:6000")).build();
+ gm.init();
gm.gossipPerNodeData(perNodeDatum(key, value));
gm.gossipSharedData(sharedDatum(key, value));
Assert.assertEquals(value, gm.findPerNodeGossipData(myId, key).getPayload());
@@ -46,6 +47,7 @@
gm.getDataReaper().runSharedOnce();
TUnit.assertThat(() -> gm.findPerNodeGossipData(myId, key)).equals(null);
TUnit.assertThat(() -> gm.findSharedGossipData(key)).equals(null);
+ gm.shutdown();
}
private GossipDataMessage perNodeDatum(String key, String value) {
@@ -74,7 +76,8 @@
String value = "a";
GossipSettings settings = new GossipSettings();
GossipManager gm = RandomGossipManager.newBuilder().cluster("abc").settings(settings)
- .withId(myId).uri(URI.create("udp://localhost:5000")).build();
+ .withId(myId).uri(URI.create("udp://localhost:7000")).build();
+ gm.init();
GossipDataMessage before = perNodeDatum(key, value);
GossipDataMessage after = perNodeDatum(key, "b");
after.setTimestamp(after.getTimestamp() - 1);
@@ -82,6 +85,7 @@
Assert.assertEquals(value, gm.findPerNodeGossipData(myId, key).getPayload());
gm.gossipPerNodeData(after);
Assert.assertEquals(value, gm.findPerNodeGossipData(myId, key).getPayload());
+ gm.shutdown();
}
}
diff --git a/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java b/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java
index 875a7ab..d9635af 100644
--- a/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java
+++ b/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java
@@ -83,14 +83,14 @@
.uri(new URI("udp://localhost:2000"))
.settings(new GossipSettings())
.gossipMembers(null).build();
-
assertNotNull(gossipManager.getLiveMembers());
}
@Test
public void useMemberListIfProvided() throws URISyntaxException {
- LocalGossipMember member = new LocalGossipMember("aCluster", new URI("udp://localhost:2000"), "aGossipMember",
- System.currentTimeMillis(), new TestNotificationListener(), 60000);
+ LocalGossipMember member = new LocalGossipMember(
+ "aCluster", new URI("udp://localhost:2000"), "aGossipMember",
+ System.nanoTime(), 1000, 1);
List<GossipMember> memberList = new ArrayList<>();
memberList.add(member);
RandomGossipManager gossipManager = RandomGossipManager.newBuilder()
@@ -99,8 +99,8 @@
.settings(new GossipSettings())
.uri(new URI("udp://localhost:8000"))
.gossipMembers(memberList).build();
- assertEquals(1, gossipManager.getLiveMembers().size());
- assertEquals(member.getId(), gossipManager.getLiveMembers().get(0).getId());
+ assertEquals(1, gossipManager.getDeadMembers().size());
+ assertEquals(member.getId(), gossipManager.getDeadMembers().get(0).getId());
}
}
\ No newline at end of file