Apply formatting
diff --git a/src/main/java/com/google/code/gossip/GossipSettings.java b/src/main/java/com/google/code/gossip/GossipSettings.java
index 9694724..10c20ff 100644
--- a/src/main/java/com/google/code/gossip/GossipSettings.java
+++ b/src/main/java/com/google/code/gossip/GossipSettings.java
@@ -7,58 +7,67 @@
*/
public class GossipSettings {
- /** Time between gossip'ing in ms. Default is 1 second. */
- private int _gossipInterval = 1000;
-
- /** Time between cleanups in ms. Default is 10 seconds. */
- private int _cleanupInterval = 10000;
-
- /**
- * Construct GossipSettings with default settings.
- */
- public GossipSettings() {}
-
- /**
- * Construct GossipSettings with given settings.
- * @param gossipInterval The gossip interval in ms.
- * @param cleanupInterval The cleanup interval in ms.
- */
- public GossipSettings(int gossipInterval, int cleanupInterval) {
- _gossipInterval = gossipInterval;
- _cleanupInterval = cleanupInterval;
- }
-
- /**
- * Set the gossip interval.
- * This is the time between a gossip message is send.
- * @param gossipInterval The gossip interval in ms.
- */
- public void setGossipTimeout(int gossipInterval) {
- _gossipInterval = gossipInterval;
- }
-
- /**
- * Set the cleanup interval.
- * This is the time between the last heartbeat received from a member and when it will be marked as dead.
- * @param cleanupInterval The cleanup interval in ms.
- */
- public void setCleanupInterval(int cleanupInterval) {
- _cleanupInterval = cleanupInterval;
- }
-
- /**
- * Get the gossip interval.
- * @return The gossip interval in ms.
- */
- public int getGossipInterval() {
- return _gossipInterval;
- }
-
- /**
- * Get the clean interval.
- * @return The cleanup interval.
- */
- public int getCleanupInterval() {
- return _cleanupInterval;
- }
+ /** Time between gossip'ing in ms. Default is 1 second. */
+ private int _gossipInterval = 1000;
+
+ /** Time between cleanups in ms. Default is 10 seconds. */
+ private int _cleanupInterval = 10000;
+
+ /**
+ * Construct GossipSettings with default settings.
+ */
+ public GossipSettings() {
+ }
+
+ /**
+ * Construct GossipSettings with given settings.
+ *
+ * @param gossipInterval
+ * The gossip interval in ms.
+ * @param cleanupInterval
+ * The cleanup interval in ms.
+ */
+ public GossipSettings(int gossipInterval, int cleanupInterval) {
+ _gossipInterval = gossipInterval;
+ _cleanupInterval = cleanupInterval;
+ }
+
+ /**
+ * Set the gossip interval. This is the time between a gossip message is send.
+ *
+ * @param gossipInterval
+ * The gossip interval in ms.
+ */
+ public void setGossipTimeout(int gossipInterval) {
+ _gossipInterval = gossipInterval;
+ }
+
+ /**
+ * Set the cleanup interval. This is the time between the last heartbeat received from a member
+ * and when it will be marked as dead.
+ *
+ * @param cleanupInterval
+ * The cleanup interval in ms.
+ */
+ public void setCleanupInterval(int cleanupInterval) {
+ _cleanupInterval = cleanupInterval;
+ }
+
+ /**
+ * Get the gossip interval.
+ *
+ * @return The gossip interval in ms.
+ */
+ public int getGossipInterval() {
+ return _gossipInterval;
+ }
+
+ /**
+ * Get the clean interval.
+ *
+ * @return The cleanup interval.
+ */
+ public int getCleanupInterval() {
+ return _cleanupInterval;
+ }
}
diff --git a/src/main/java/com/google/code/gossip/GossipTimeoutTimer.java b/src/main/java/com/google/code/gossip/GossipTimeoutTimer.java
index a58d330..94cc3a4 100644
--- a/src/main/java/com/google/code/gossip/GossipTimeoutTimer.java
+++ b/src/main/java/com/google/code/gossip/GossipTimeoutTimer.java
@@ -6,56 +6,58 @@
import javax.management.timer.Timer;
/**
- * This object represents a timer for a gossip member.
- * When the timer has elapsed without being reset in the meantime, it will inform the GossipService about this
- * who in turn will put the gossip member on the dead list, because it is apparantly not alive anymore.
+ * This object represents a timer for a gossip member. When the timer has elapsed without being
+ * reset in the meantime, it will inform the GossipService about this who in turn will put the
+ * gossip member on the dead list, because it is apparantly not alive anymore.
*
* @author joshclemm, harmenw
*/
public class GossipTimeoutTimer extends Timer {
- /** The amount of time this timer waits before generating a wake-up event. */
- private long _sleepTime;
+ /** The amount of time this timer waits before generating a wake-up event. */
+ private long _sleepTime;
- /** The gossip member this timer is for. */
- private LocalGossipMember _source;
+ /** The gossip member this timer is for. */
+ private LocalGossipMember _source;
- /**
- * Constructor.
- * Creates a reset-able timer that wakes up after millisecondsSleepTime.
- * @param millisecondsSleepTime The time for this timer to wait before an event.
- * @param service
- * @param member
- */
- public GossipTimeoutTimer(long millisecondsSleepTime, NotificationListener notificationListener, LocalGossipMember member) {
- super();
- _sleepTime = millisecondsSleepTime;
- _source = member;
- addNotificationListener(notificationListener, null, null);
- }
+ /**
+ * Constructor. Creates a reset-able timer that wakes up after millisecondsSleepTime.
+ *
+ * @param millisecondsSleepTime
+ * The time for this timer to wait before an event.
+ * @param service
+ * @param member
+ */
+ public GossipTimeoutTimer(long millisecondsSleepTime, NotificationListener notificationListener,
+ LocalGossipMember member) {
+ super();
+ _sleepTime = millisecondsSleepTime;
+ _source = member;
+ addNotificationListener(notificationListener, null, null);
+ }
- /**
- * @see javax.management.timer.Timer#start()
- */
- public void start() {
- this.reset();
- super.start();
- }
+ /**
+ * @see javax.management.timer.Timer#start()
+ */
+ public void start() {
+ this.reset();
+ super.start();
+ }
- /**
- * Resets timer to start counting down from original time.
- */
- public void reset() {
- removeAllNotifications();
- setWakeupTime(_sleepTime);
- }
+ /**
+ * Resets timer to start counting down from original time.
+ */
+ public void reset() {
+ removeAllNotifications();
+ setWakeupTime(_sleepTime);
+ }
- /**
- * Adds a new wake-up time for this timer.
- * @param milliseconds
- */
- private void setWakeupTime(long milliseconds) {
- addNotification("type", "message", _source, new Date(System.currentTimeMillis()+milliseconds));
- }
+ /**
+ * Adds a new wake-up time for this timer.
+ *
+ * @param milliseconds
+ */
+ private void setWakeupTime(long milliseconds) {
+ addNotification("type", "message", _source, new Date(System.currentTimeMillis() + milliseconds));
+ }
}
-
diff --git a/src/main/java/com/google/code/gossip/LocalGossipMember.java b/src/main/java/com/google/code/gossip/LocalGossipMember.java
index 9c08856..6d040ae 100644
--- a/src/main/java/com/google/code/gossip/LocalGossipMember.java
+++ b/src/main/java/com/google/code/gossip/LocalGossipMember.java
@@ -3,40 +3,47 @@
import javax.management.NotificationListener;
/**
- * This object represent a gossip member with the properties known locally.
- * These objects are stored in the local list of gossip member.s
+ * This object represent a gossip member with the properties known locally. These objects are stored
+ * in the local list of gossip member.s
*
* @author harmenw
*/
public class LocalGossipMember extends GossipMember {
- /** The timeout timer for this gossip member. */
- private transient GossipTimeoutTimer timeoutTimer;
+ /** The timeout timer for this gossip member. */
+ private transient GossipTimeoutTimer timeoutTimer;
- /**
- * Constructor.
- * @param host The hostname or IP address.
- * @param port The port number.
- * @param heartbeat The current heartbeat.
- * @param gossipService The GossipService object.
- * @param cleanupTimeout The cleanup timeout for this gossip member.
- */
- public LocalGossipMember(String hostname, int port, String id, int heartbeat, NotificationListener notificationListener, int cleanupTimeout) {
- super(hostname, port, id, heartbeat);
-
- this.timeoutTimer = new GossipTimeoutTimer(cleanupTimeout, notificationListener, this);
- }
+ /**
+ * Constructor.
+ *
+ * @param host
+ * The hostname or IP address.
+ * @param port
+ * The port number.
+ * @param heartbeat
+ * The current heartbeat.
+ * @param gossipService
+ * The GossipService object.
+ * @param cleanupTimeout
+ * The cleanup timeout for this gossip member.
+ */
+ public LocalGossipMember(String hostname, int port, String id, int heartbeat,
+ NotificationListener notificationListener, int cleanupTimeout) {
+ super(hostname, port, id, heartbeat);
- /**
- * Start the timeout timer.
- */
- public void startTimeoutTimer() {
- this.timeoutTimer.start();
- }
+ this.timeoutTimer = new GossipTimeoutTimer(cleanupTimeout, notificationListener, this);
+ }
- /**
- * Reset the timeout timer.
- */
- public void resetTimeoutTimer() {
- this.timeoutTimer.reset();
- }
+ /**
+ * Start the timeout timer.
+ */
+ public void startTimeoutTimer() {
+ this.timeoutTimer.start();
+ }
+
+ /**
+ * Reset the timeout timer.
+ */
+ public void resetTimeoutTimer() {
+ this.timeoutTimer.reset();
+ }
}
diff --git a/src/main/java/com/google/code/gossip/LogLevel.java b/src/main/java/com/google/code/gossip/LogLevel.java
index d486df4..bb22dac 100644
--- a/src/main/java/com/google/code/gossip/LogLevel.java
+++ b/src/main/java/com/google/code/gossip/LogLevel.java
@@ -2,24 +2,26 @@
public class LogLevel {
- public static final String CONFIG_ERROR = "ERROR";
-
- public static final String CONFIG_INFO = "INFO";
-
- public static final String CONFIG_DEBUG = "DEBUG";
-
- public static final int ERROR = 1;
- public static final int INFO = 2;
- public static final int DEBUG = 3;
-
- public static int fromString(String logLevel) {
- if (logLevel.equals(CONFIG_ERROR))
- return ERROR;
- else if (logLevel.equals(CONFIG_INFO))
- return INFO;
- else if (logLevel.equals(CONFIG_DEBUG))
- return DEBUG;
- else
- return INFO;
- }
+ public static final String CONFIG_ERROR = "ERROR";
+
+ public static final String CONFIG_INFO = "INFO";
+
+ public static final String CONFIG_DEBUG = "DEBUG";
+
+ public static final int ERROR = 1;
+
+ public static final int INFO = 2;
+
+ public static final int DEBUG = 3;
+
+ public static int fromString(String logLevel) {
+ if (logLevel.equals(CONFIG_ERROR))
+ return ERROR;
+ else if (logLevel.equals(CONFIG_INFO))
+ return INFO;
+ else if (logLevel.equals(CONFIG_DEBUG))
+ return DEBUG;
+ else
+ return INFO;
+ }
}
diff --git a/src/main/java/com/google/code/gossip/RemoteGossipMember.java b/src/main/java/com/google/code/gossip/RemoteGossipMember.java
index 37cdb35..f42f699 100644
--- a/src/main/java/com/google/code/gossip/RemoteGossipMember.java
+++ b/src/main/java/com/google/code/gossip/RemoteGossipMember.java
@@ -1,28 +1,36 @@
package com.google.code.gossip;
/**
- * The object represents a gossip member with the properties as received from a remote gossip member.
+ * The object represents a gossip member with the properties as received from a remote gossip
+ * member.
*
* @author harmenw
*/
public class RemoteGossipMember extends GossipMember {
- /**
- * Constructor.
- * @param host The hostname or IP address.
- * @param port The port number.
- * @param heartbeat The current heartbeat.
- */
- public RemoteGossipMember(String hostname, int port, String id, int heartbeat) {
- super(hostname, port, id, heartbeat);
- }
-
- /**
- * Construct a RemoteGossipMember with a heartbeat of 0.
- * @param host The hostname or IP address.
- * @param port The port number.
- */
- public RemoteGossipMember(String hostname, int port, String id) {
- super(hostname, port, id, 0);
- }
+ /**
+ * Constructor.
+ *
+ * @param host
+ * The hostname or IP address.
+ * @param port
+ * The port number.
+ * @param heartbeat
+ * The current heartbeat.
+ */
+ public RemoteGossipMember(String hostname, int port, String id, int heartbeat) {
+ super(hostname, port, id, heartbeat);
+ }
+
+ /**
+ * Construct a RemoteGossipMember with a heartbeat of 0.
+ *
+ * @param host
+ * The hostname or IP address.
+ * @param port
+ * The port number.
+ */
+ public RemoteGossipMember(String hostname, int port, String id) {
+ super(hostname, port, id, 0);
+ }
}
diff --git a/src/main/java/com/google/code/gossip/StartupSettings.java b/src/main/java/com/google/code/gossip/StartupSettings.java
index 3fa261d..2e558ef 100644
--- a/src/main/java/com/google/code/gossip/StartupSettings.java
+++ b/src/main/java/com/google/code/gossip/StartupSettings.java
@@ -17,145 +17,168 @@
* @author harmenw
*/
public class StartupSettings {
-
- /** The port to start the gossip service on. */
- private int _port;
-
- /** The logging level of the gossip service. */
- private int _logLevel;
-
- /** The gossip settings used at startup. */
- private GossipSettings _gossipSettings;
-
- /** The list with gossip members to start with. */
- private ArrayList<GossipMember> _gossipMembers;
-
- /**
- * Constructor.
- * @param port The port to start the service on.
- */
- public StartupSettings(int port, int logLevel) {
- this(port, logLevel, new GossipSettings());
- }
-
- /**
- * Constructor.
- * @param port The port to start the service on.
- */
- public StartupSettings(int port, int logLevel, GossipSettings gossipSettings) {
- _port = port;
- _logLevel = logLevel;
- _gossipSettings = gossipSettings;
- _gossipMembers = new ArrayList<GossipMember>();
- }
-
- /**
- * Set the port of the gossip service.
- * @param port The port for the gossip service.
- */
- public void setPort(int port) {
- _port = port;
- }
-
- /**
- * Get the port for the gossip service.
- * @return The port of the gossip service.
- */
- public int getPort() {
- return _port;
- }
-
- /**
- * Set the log level of the gossip service.
- * @param logLevel The log level({LogLevel}).
- */
- public void setLogLevel(int logLevel) {
- _logLevel = logLevel;
- }
-
- /**
- * Get the log level of the gossip service.
- * @return The log level.
- */
- public int getLogLevel() {
- return _logLevel;
- }
-
- /**
- * Get the GossipSettings.
- * @return The GossipSettings object.
- */
- public GossipSettings getGossipSettings() {
- return _gossipSettings;
- }
-
- /**
- * Add a gossip member to the list of members to start with.
- * @param member The member to add.
- */
- public void addGossipMember(GossipMember member) {
- _gossipMembers.add(member);
- }
-
- /**
- * Get the list with gossip members.
- * @return The gossip members.
- */
- public ArrayList<GossipMember> getGossipMembers() {
- return _gossipMembers;
- }
-
- /**
- * Parse the settings for the gossip service from a JSON file.
- * @param jsonFile The file object which refers to the JSON config file.
- * @return The StartupSettings object with the settings from the config file.
- * @throws JSONException Thrown when the file is not well-formed JSON.
- * @throws FileNotFoundException Thrown when the file cannot be found.
- * @throws IOException Thrown when reading the file gives problems.
- */
- public static StartupSettings fromJSONFile(File jsonFile) throws JSONException, FileNotFoundException, IOException {
- // Read the file to a String.
- BufferedReader br = new BufferedReader(new FileReader(jsonFile));
- StringBuffer buffer = new StringBuffer();
- String line;
- while((line = br.readLine()) != null) {
- buffer.append(line.trim());
- }
-
- // Lets parse the String as JSON.
- JSONObject jsonObject = new JSONArray(buffer.toString()).getJSONObject(0);
-
- // Now get the port number.
- int port = jsonObject.getInt("port");
- // Get the log level from the config file.
- int logLevel = LogLevel.fromString(jsonObject.getString("log_level"));
-
- // Get the gossip_interval from the config file.
- int gossipInterval = jsonObject.getInt("gossip_interval");
-
- // Get the cleanup_interval from the config file.
- int cleanupInterval = jsonObject.getInt("cleanup_interval");
-
- System.out.println("Config [port: " + port + ", log_level: " + logLevel + ", gossip_interval: " + gossipInterval + ", cleanup_interval: " + cleanupInterval + "]");
-
- // Initiate the settings with the port number.
- StartupSettings settings = new StartupSettings(port, logLevel, new GossipSettings(gossipInterval, cleanupInterval));
-
- // Now iterate over the members from the config file and add them to the settings.
- System.out.print("Config-members [");
- JSONArray membersJSON = jsonObject.getJSONArray("members");
- for (int i=0; i<membersJSON.length(); i++) {
- JSONObject memberJSON = membersJSON.getJSONObject(i);
- RemoteGossipMember member = new RemoteGossipMember(memberJSON.getString("host"), memberJSON.getInt("port"), "");
- settings.addGossipMember(member);
- System.out.print(member.getAddress());
- if (i < (membersJSON.length() - 1))
- System.out.print(", ");
- }
- System.out.println("]");
-
- // Return the created settings object.
- return settings;
- }
+ /** The port to start the gossip service on. */
+ private int _port;
+
+ /** The logging level of the gossip service. */
+ private int _logLevel;
+
+ /** The gossip settings used at startup. */
+ private GossipSettings _gossipSettings;
+
+ /** The list with gossip members to start with. */
+ private ArrayList<GossipMember> _gossipMembers;
+
+ /**
+ * Constructor.
+ *
+ * @param port
+ * The port to start the service on.
+ */
+ public StartupSettings(int port, int logLevel) {
+ this(port, logLevel, new GossipSettings());
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param port
+ * The port to start the service on.
+ */
+ public StartupSettings(int port, int logLevel, GossipSettings gossipSettings) {
+ _port = port;
+ _logLevel = logLevel;
+ _gossipSettings = gossipSettings;
+ _gossipMembers = new ArrayList<GossipMember>();
+ }
+
+ /**
+ * Set the port of the gossip service.
+ *
+ * @param port
+ * The port for the gossip service.
+ */
+ public void setPort(int port) {
+ _port = port;
+ }
+
+ /**
+ * Get the port for the gossip service.
+ *
+ * @return The port of the gossip service.
+ */
+ public int getPort() {
+ return _port;
+ }
+
+ /**
+ * Set the log level of the gossip service.
+ *
+ * @param logLevel
+ * The log level({LogLevel}).
+ */
+ public void setLogLevel(int logLevel) {
+ _logLevel = logLevel;
+ }
+
+ /**
+ * Get the log level of the gossip service.
+ *
+ * @return The log level.
+ */
+ public int getLogLevel() {
+ return _logLevel;
+ }
+
+ /**
+ * Get the GossipSettings.
+ *
+ * @return The GossipSettings object.
+ */
+ public GossipSettings getGossipSettings() {
+ return _gossipSettings;
+ }
+
+ /**
+ * Add a gossip member to the list of members to start with.
+ *
+ * @param member
+ * The member to add.
+ */
+ public void addGossipMember(GossipMember member) {
+ _gossipMembers.add(member);
+ }
+
+ /**
+ * Get the list with gossip members.
+ *
+ * @return The gossip members.
+ */
+ public ArrayList<GossipMember> getGossipMembers() {
+ return _gossipMembers;
+ }
+
+ /**
+ * Parse the settings for the gossip service from a JSON file.
+ *
+ * @param jsonFile
+ * The file object which refers to the JSON config file.
+ * @return The StartupSettings object with the settings from the config file.
+ * @throws JSONException
+ * Thrown when the file is not well-formed JSON.
+ * @throws FileNotFoundException
+ * Thrown when the file cannot be found.
+ * @throws IOException
+ * Thrown when reading the file gives problems.
+ */
+ public static StartupSettings fromJSONFile(File jsonFile) throws JSONException,
+ FileNotFoundException, IOException {
+ // Read the file to a String.
+ BufferedReader br = new BufferedReader(new FileReader(jsonFile));
+ StringBuffer buffer = new StringBuffer();
+ String line;
+ while ((line = br.readLine()) != null) {
+ buffer.append(line.trim());
+ }
+
+ // Lets parse the String as JSON.
+ JSONObject jsonObject = new JSONArray(buffer.toString()).getJSONObject(0);
+
+ // Now get the port number.
+ int port = jsonObject.getInt("port");
+
+ // Get the log level from the config file.
+ int logLevel = LogLevel.fromString(jsonObject.getString("log_level"));
+
+ // Get the gossip_interval from the config file.
+ int gossipInterval = jsonObject.getInt("gossip_interval");
+
+ // Get the cleanup_interval from the config file.
+ int cleanupInterval = jsonObject.getInt("cleanup_interval");
+
+ System.out.println("Config [port: " + port + ", log_level: " + logLevel + ", gossip_interval: "
+ + gossipInterval + ", cleanup_interval: " + cleanupInterval + "]");
+
+ // Initiate the settings with the port number.
+ StartupSettings settings = new StartupSettings(port, logLevel, new GossipSettings(
+ gossipInterval, cleanupInterval));
+
+ // Now iterate over the members from the config file and add them to the settings.
+ System.out.print("Config-members [");
+ JSONArray membersJSON = jsonObject.getJSONArray("members");
+ for (int i = 0; i < membersJSON.length(); i++) {
+ JSONObject memberJSON = membersJSON.getJSONObject(i);
+ RemoteGossipMember member = new RemoteGossipMember(memberJSON.getString("host"),
+ memberJSON.getInt("port"), "");
+ settings.addGossipMember(member);
+ System.out.print(member.getAddress());
+ if (i < (membersJSON.length() - 1))
+ System.out.print(", ");
+ }
+ System.out.println("]");
+
+ // Return the created settings object.
+ return settings;
+ }
}
diff --git a/src/main/java/com/google/code/gossip/examples/GossipExample.java b/src/main/java/com/google/code/gossip/examples/GossipExample.java
index 6fb1cd5..abc28e1 100644
--- a/src/main/java/com/google/code/gossip/examples/GossipExample.java
+++ b/src/main/java/com/google/code/gossip/examples/GossipExample.java
@@ -1,6 +1,5 @@
package com.google.code.gossip.examples;
-
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
@@ -12,67 +11,70 @@
import com.google.code.gossip.RemoteGossipMember;
/**
- * This class is an example of how one could use the gossip service.
- * Here we start multiple gossip clients on this host as specified in the config file.
+ * This class is an example of how one could use the gossip service. Here we start multiple gossip
+ * clients on this host as specified in the config file.
*
* @author harmenw
*/
public class GossipExample extends Thread {
- /** The number of clients to start. */
- private static final int NUMBER_OF_CLIENTS = 4;
+ /** The number of clients to start. */
+ private static final int NUMBER_OF_CLIENTS = 4;
- /**
- * @param args
- */
- public static void main(String[] args) {
- new GossipExample();
- }
-
- /**
- * Constructor.
- * This will start the this thread.
- */
- public GossipExample() {
- start();
- }
+ /**
+ * @param args
+ */
+ public static void main(String[] args) {
+ new GossipExample();
+ }
- /**
- * @see java.lang.Thread#run()
- */
- public void run() {
- try {
- GossipSettings settings = new GossipSettings();
-
- ArrayList<GossipService> clients = new ArrayList<GossipService>();
-
- // Get my ip address.
- String myIpAddress = InetAddress.getLocalHost().getHostAddress();
-
- // Create the gossip members and put them in a list and give them a port number starting with 2000.
- ArrayList<GossipMember> startupMembers = new ArrayList<GossipMember>();
- for (int i=0; i<NUMBER_OF_CLIENTS; ++i) {
- startupMembers.add(new RemoteGossipMember(myIpAddress, 2000+i, ""));
- }
-
- // Lets start the gossip clients.
- // Start the clients, waiting cleaning-interval + 1 second between them which will show the dead list handling.
- for (GossipMember member : startupMembers) {
- GossipService gossipService = new GossipService(myIpAddress, member.getPort(), "", LogLevel.DEBUG, startupMembers, settings);
- clients.add(gossipService);
- gossipService.start();
- sleep(settings.getCleanupInterval() + 1000);
- }
-
- // After starting all gossip clients, first wait 10 seconds and then shut them down.
- sleep(10000);
- System.err.println("Going to shutdown all services...");
- // Since they all run in the same virtual machine and share the same executor, if one is shutdown they will all stop.
- clients.get(0).shutdown();
-
- } catch (UnknownHostException e) {
- e.printStackTrace();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
+ /**
+ * Constructor. This will start the this thread.
+ */
+ public GossipExample() {
+ start();
+ }
+
+ /**
+ * @see java.lang.Thread#run()
+ */
+ public void run() {
+ try {
+ GossipSettings settings = new GossipSettings();
+
+ ArrayList<GossipService> clients = new ArrayList<GossipService>();
+
+ // Get my ip address.
+ String myIpAddress = InetAddress.getLocalHost().getHostAddress();
+
+ // Create the gossip members and put them in a list and give them a port number starting with
+ // 2000.
+ ArrayList<GossipMember> startupMembers = new ArrayList<GossipMember>();
+ for (int i = 0; i < NUMBER_OF_CLIENTS; ++i) {
+ startupMembers.add(new RemoteGossipMember(myIpAddress, 2000 + i, ""));
+ }
+
+ // Lets start the gossip clients.
+ // Start the clients, waiting cleaning-interval + 1 second between them which will show the
+ // dead list handling.
+ for (GossipMember member : startupMembers) {
+ GossipService gossipService = new GossipService(myIpAddress, member.getPort(), "",
+ LogLevel.DEBUG, startupMembers, settings);
+ clients.add(gossipService);
+ gossipService.start();
+ sleep(settings.getCleanupInterval() + 1000);
+ }
+
+ // After starting all gossip clients, first wait 10 seconds and then shut them down.
+ sleep(10000);
+ System.err.println("Going to shutdown all services...");
+ // Since they all run in the same virtual machine and share the same executor, if one is
+ // shutdown they will all stop.
+ clients.get(0).shutdown();
+
+ } catch (UnknownHostException e) {
+ e.printStackTrace();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
}
diff --git a/src/main/java/com/google/code/gossip/manager/ActiveGossipThread.java b/src/main/java/com/google/code/gossip/manager/ActiveGossipThread.java
index 49516d5..e5ab754 100644
--- a/src/main/java/com/google/code/gossip/manager/ActiveGossipThread.java
+++ b/src/main/java/com/google/code/gossip/manager/ActiveGossipThread.java
@@ -8,51 +8,52 @@
import com.google.code.gossip.LocalGossipMember;
/**
- * [The active thread: periodically send gossip request.]
- * The class handles gossiping the membership list.
- * This information is important to maintaining a common
- * state among all the nodes, and is important for detecting
- * failures.
+ * [The active thread: periodically send gossip request.] The class handles gossiping the membership
+ * list. This information is important to maintaining a common state among all the nodes, and is
+ * important for detecting failures.
*/
abstract public class ActiveGossipThread implements Runnable {
-
- private GossipManager _gossipManager;
-
- private final AtomicBoolean _keepRunning;
- public ActiveGossipThread(GossipManager gossipManager) {
- _gossipManager = gossipManager;
- _keepRunning = new AtomicBoolean(true);
- }
+ private GossipManager _gossipManager;
- @Override
- public void run() {
- while(_keepRunning.get()) {
- try {
- TimeUnit.MILLISECONDS.sleep(_gossipManager.getSettings().getGossipInterval());
- sendMembershipList(_gossipManager.getMyself(), _gossipManager.getMemberList());
- } catch (InterruptedException e) {
- GossipService.LOGGER.error(e);
- _keepRunning.set(false);
- }
- }
- shutdown();
- }
-
- public void shutdown(){
- _keepRunning.set(false);
- }
- /**
- * Performs the sending of the membership list, after we have
- * incremented our own heartbeat.
- */
- abstract protected void sendMembershipList(LocalGossipMember me, ArrayList<LocalGossipMember> memberList);
+ private final AtomicBoolean _keepRunning;
- /**
- * Abstract method which should be implemented by a subclass.
- * This method should return a member of the list to gossip with.
- * @param memberList The list of members which are stored in the local list of members.
- * @return The chosen LocalGossipMember to gossip with.
- */
- abstract protected LocalGossipMember selectPartner(ArrayList<LocalGossipMember> memberList);
+ public ActiveGossipThread(GossipManager gossipManager) {
+ _gossipManager = gossipManager;
+ _keepRunning = new AtomicBoolean(true);
+ }
+
+ @Override
+ public void run() {
+ while (_keepRunning.get()) {
+ try {
+ TimeUnit.MILLISECONDS.sleep(_gossipManager.getSettings().getGossipInterval());
+ sendMembershipList(_gossipManager.getMyself(), _gossipManager.getMemberList());
+ } catch (InterruptedException e) {
+ GossipService.LOGGER.error(e);
+ _keepRunning.set(false);
+ }
+ }
+ shutdown();
+ }
+
+ public void shutdown() {
+ _keepRunning.set(false);
+ }
+
+ /**
+ * Performs the sending of the membership list, after we have incremented our own heartbeat.
+ */
+ abstract protected void sendMembershipList(LocalGossipMember me,
+ ArrayList<LocalGossipMember> memberList);
+
+ /**
+ * Abstract method which should be implemented by a subclass. This method should return a member
+ * of the list to gossip with.
+ *
+ * @param memberList
+ * The list of members which are stored in the local list of members.
+ * @return The chosen LocalGossipMember to gossip with.
+ */
+ abstract protected LocalGossipMember selectPartner(ArrayList<LocalGossipMember> memberList);
}
diff --git a/src/main/java/com/google/code/gossip/manager/GossipManager.java b/src/main/java/com/google/code/gossip/manager/GossipManager.java
index f0e3356..99e308c 100644
--- a/src/main/java/com/google/code/gossip/manager/GossipManager.java
+++ b/src/main/java/com/google/code/gossip/manager/GossipManager.java
@@ -16,142 +16,147 @@
import com.google.code.gossip.LocalGossipMember;
public abstract class GossipManager extends Thread implements NotificationListener {
- /** The maximal number of bytes the packet with the GOSSIP may be. (Default is 100 kb) */
- public static final int MAX_PACKET_SIZE = 102400;
-
- /** The list of members which are in the gossip group (not including myself). */
- private ArrayList<LocalGossipMember> _memberList;
+ /** The maximal number of bytes the packet with the GOSSIP may be. (Default is 100 kb) */
+ public static final int MAX_PACKET_SIZE = 102400;
- /** The list of members which are known to be dead. */
- private ArrayList<LocalGossipMember> _deadList;
-
- /** The member I am representing. */
- private LocalGossipMember _me;
-
- /** The settings for gossiping. */
- private GossipSettings _settings;
-
- /** A boolean whether the gossip service should keep running. */
- private AtomicBoolean _gossipServiceRunning;
+ /** The list of members which are in the gossip group (not including myself). */
+ private ArrayList<LocalGossipMember> _memberList;
- /** A ExecutorService used for executing the active and passive gossip threads. */
- private ExecutorService _gossipThreadExecutor;
-
- private Class<? extends PassiveGossipThread> _passiveGossipThreadClass;
- private PassiveGossipThread passiveGossipThread;
-
- private Class<? extends ActiveGossipThread> _activeGossipThreadClass;
- private ActiveGossipThread activeGossipThread;
-
+ /** The list of members which are known to be dead. */
+ private ArrayList<LocalGossipMember> _deadList;
+
+ /** The member I am representing. */
+ private LocalGossipMember _me;
+
+ /** The settings for gossiping. */
+ private GossipSettings _settings;
+
+ /** A boolean whether the gossip service should keep running. */
+ private AtomicBoolean _gossipServiceRunning;
+
+ /** A ExecutorService used for executing the active and passive gossip threads. */
+ private ExecutorService _gossipThreadExecutor;
+
+ private Class<? extends PassiveGossipThread> _passiveGossipThreadClass;
+
+ private PassiveGossipThread passiveGossipThread;
+
+ private Class<? extends ActiveGossipThread> _activeGossipThreadClass;
+
+ private ActiveGossipThread activeGossipThread;
+
public GossipManager(Class<? extends PassiveGossipThread> passiveGossipThreadClass,
Class<? extends ActiveGossipThread> activeGossipThreadClass, String address, int port,
String id, GossipSettings settings, ArrayList<GossipMember> gossipMembers) {
- _passiveGossipThreadClass = passiveGossipThreadClass;
- _activeGossipThreadClass = activeGossipThreadClass;
- _settings = settings;
- _me = new LocalGossipMember(address, port, id, 0, this, settings.getCleanupInterval());
- _memberList = new ArrayList<LocalGossipMember>();
- _deadList = new ArrayList<LocalGossipMember>();
- for (GossipMember startupMember : gossipMembers) {
- if (!startupMember.equals(_me)) {
+ _passiveGossipThreadClass = passiveGossipThreadClass;
+ _activeGossipThreadClass = activeGossipThreadClass;
+ _settings = settings;
+ _me = new LocalGossipMember(address, port, id, 0, this, settings.getCleanupInterval());
+ _memberList = new ArrayList<LocalGossipMember>();
+ _deadList = new ArrayList<LocalGossipMember>();
+ for (GossipMember startupMember : gossipMembers) {
+ if (!startupMember.equals(_me)) {
LocalGossipMember member = new LocalGossipMember(startupMember.getHost(),
startupMember.getPort(), startupMember.getId(), 0, this,
settings.getCleanupInterval());
- _memberList.add(member);
- GossipService.LOGGER.debug(member);
- }
- }
-
- _gossipServiceRunning = new AtomicBoolean(true);
- Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
- public void run() {
- GossipService.LOGGER.info("Service has been shutdown...");
- }
- }));
- }
-
- /**
- * All timers associated with a member will trigger this method when it goes
- * off. The timer will go off if we have not heard from this member in
- * <code> _settings.T_CLEANUP </code> time.
- */
- @Override
- public void handleNotification(Notification notification, Object handback) {
- LocalGossipMember deadMember = (LocalGossipMember) notification.getUserData();
- GossipService.LOGGER.info("Dead member detected: " + deadMember);
- synchronized (this._memberList) {
- this._memberList.remove(deadMember);
- }
- synchronized (this._deadList) {
- this._deadList.add(deadMember);
- }
- }
+ _memberList.add(member);
+ GossipService.LOGGER.debug(member);
+ }
+ }
- public GossipSettings getSettings() {
- return _settings;
- }
+ _gossipServiceRunning = new AtomicBoolean(true);
+ Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+ public void run() {
+ GossipService.LOGGER.info("Service has been shutdown...");
+ }
+ }));
+ }
- /**
- * Get a clone of the memberlist.
- * @return
- */
- public ArrayList<LocalGossipMember> getMemberList() {
- return _memberList;
- }
-
- public LocalGossipMember getMyself() {
- return _me;
- }
+ /**
+ * All timers associated with a member will trigger this method when it goes off. The timer will
+ * go off if we have not heard from this member in <code> _settings.T_CLEANUP </code> time.
+ */
+ @Override
+ public void handleNotification(Notification notification, Object handback) {
+ LocalGossipMember deadMember = (LocalGossipMember) notification.getUserData();
+ GossipService.LOGGER.info("Dead member detected: " + deadMember);
+ synchronized (this._memberList) {
+ this._memberList.remove(deadMember);
+ }
+ synchronized (this._deadList) {
+ this._deadList.add(deadMember);
+ }
+ }
- public ArrayList<LocalGossipMember> getDeadList() {
- return _deadList;
- }
-
- /**
- * Starts the client. Specifically, start the various cycles for this protocol.
- * Start the gossip thread and start the receiver thread.
- * @throws InterruptedException
- */
- public void run() {
- for (LocalGossipMember member : _memberList) {
- if (member != _me) {
- member.startTimeoutTimer();
- }
- }
+ public GossipSettings getSettings() {
+ return _settings;
+ }
+
+ /**
+ * Get a clone of the memberlist.
+ *
+ * @return
+ */
+ public ArrayList<LocalGossipMember> getMemberList() {
+ return _memberList;
+ }
+
+ public LocalGossipMember getMyself() {
+ return _me;
+ }
+
+ public ArrayList<LocalGossipMember> getDeadList() {
+ return _deadList;
+ }
+
+ /**
+ * Starts the client. Specifically, start the various cycles for this protocol. Start the gossip
+ * thread and start the receiver thread.
+ *
+ * @throws InterruptedException
+ */
+ public void run() {
+ for (LocalGossipMember member : _memberList) {
+ if (member != _me) {
+ member.startTimeoutTimer();
+ }
+ }
_gossipThreadExecutor = Executors.newCachedThreadPool();
try {
- passiveGossipThread = _passiveGossipThreadClass.getConstructor(GossipManager.class).newInstance(this);
+ passiveGossipThread = _passiveGossipThreadClass.getConstructor(GossipManager.class)
+ .newInstance(this);
_gossipThreadExecutor.execute(passiveGossipThread);
- activeGossipThread = _activeGossipThreadClass.getConstructor(GossipManager.class).newInstance(this);
+ activeGossipThread = _activeGossipThreadClass.getConstructor(GossipManager.class)
+ .newInstance(this);
_gossipThreadExecutor.execute(activeGossipThread);
} catch (InstantiationException | IllegalAccessException | IllegalArgumentException
| InvocationTargetException | NoSuchMethodException | SecurityException e1) {
throw new RuntimeException(e1);
}
- GossipService.LOGGER.info("The GossipService is started.");
- while(_gossipServiceRunning.get()) {
- try {
- //TODO
- TimeUnit.MILLISECONDS.sleep(1);
- } catch (InterruptedException e) {
- GossipService.LOGGER.info("The GossipClient was interrupted.");
- }
- }
- }
-
- /**
- * Shutdown the gossip service.
- */
- public void shutdown() {
- _gossipThreadExecutor.shutdown();
- passiveGossipThread.shutdown();
- activeGossipThread.shutdown();
- try {
+ GossipService.LOGGER.info("The GossipService is started.");
+ while (_gossipServiceRunning.get()) {
+ try {
+ // TODO
+ TimeUnit.MILLISECONDS.sleep(1);
+ } catch (InterruptedException e) {
+ GossipService.LOGGER.info("The GossipClient was interrupted.");
+ }
+ }
+ }
+
+ /**
+ * Shutdown the gossip service.
+ */
+ public void shutdown() {
+ _gossipThreadExecutor.shutdown();
+ passiveGossipThread.shutdown();
+ activeGossipThread.shutdown();
+ try {
boolean result = _gossipThreadExecutor.awaitTermination(1000, TimeUnit.MILLISECONDS);
System.err.println("Terminate retuned " + result);
} catch (InterruptedException e) {
e.printStackTrace();
}
- _gossipServiceRunning.set(false);
- }
+ _gossipServiceRunning.set(false);
+ }
}
diff --git a/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java b/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java
index 7992bf5..08342b4 100644
--- a/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java
+++ b/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java
@@ -18,36 +18,36 @@
import com.google.code.gossip.RemoteGossipMember;
/**
- * [The passive thread: reply to incoming gossip request.]
- * This class handles the passive cycle, where this client
- * has received an incoming message. For now, this message
- * is always the membership list, but if you choose to gossip
- * additional information, you will need some logic to determine
- * the incoming message.
+ * [The passive thread: reply to incoming gossip request.] This class handles the passive cycle,
+ * where this client has received an incoming message. For now, this message is always the
+ * membership list, but if you choose to gossip additional information, you will need some logic to
+ * determine the incoming message.
*/
abstract public class PassiveGossipThread implements Runnable {
-
- /** The socket used for the passive thread of the gossip service. */
- private DatagramSocket _server;
-
- private GossipManager _gossipManager;
- private AtomicBoolean _keepRunning;
+ /** The socket used for the passive thread of the gossip service. */
+ private DatagramSocket _server;
- public PassiveGossipThread(GossipManager gossipManager) {
- _gossipManager = gossipManager;
- try {
- SocketAddress socketAddress = new InetSocketAddress(_gossipManager.getMyself().getHost(), _gossipManager.getMyself().getPort());
- _server = new DatagramSocket(socketAddress);
- GossipService.LOGGER.info("Gossip service successfully initialized on port " + _gossipManager.getMyself().getPort());
- GossipService.LOGGER.debug("I am " + _gossipManager.getMyself());
- } catch (SocketException ex) {
- GossipService.LOGGER.error(ex);
- _server = null;
- throw new RuntimeException(ex);
- }
- _keepRunning = new AtomicBoolean(true);
- }
+ private GossipManager _gossipManager;
+
+ private AtomicBoolean _keepRunning;
+
+ public PassiveGossipThread(GossipManager gossipManager) {
+ _gossipManager = gossipManager;
+ try {
+ SocketAddress socketAddress = new InetSocketAddress(_gossipManager.getMyself().getHost(),
+ _gossipManager.getMyself().getPort());
+ _server = new DatagramSocket(socketAddress);
+ GossipService.LOGGER.info("Gossip service successfully initialized on port "
+ + _gossipManager.getMyself().getPort());
+ GossipService.LOGGER.debug("I am " + _gossipManager.getMyself());
+ } catch (SocketException ex) {
+ GossipService.LOGGER.error(ex);
+ _server = null;
+ throw new RuntimeException(ex);
+ }
+ _keepRunning = new AtomicBoolean(true);
+ }
@Override
public void run() {
@@ -127,16 +127,22 @@
}
shutdown();
}
-
- public void shutdown(){
+
+ public void shutdown() {
_server.close();
}
-
- /**
- * Abstract method for merging the local and remote list.
- * @param gossipManager The GossipManager for retrieving the local members and dead members list.
- * @param senderMember The member who is sending this list, this could be used to send a response if the remote list contains out-dated information.
- * @param remoteList The list of members known at the remote side.
- */
- abstract protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember, ArrayList<GossipMember> remoteList);
+
+ /**
+ * Abstract method for merging the local and remote list.
+ *
+ * @param gossipManager
+ * The GossipManager for retrieving the local members and dead members list.
+ * @param senderMember
+ * The member who is sending this list, this could be used to send a response if the
+ * remote list contains out-dated information.
+ * @param remoteList
+ * The list of members known at the remote side.
+ */
+ abstract protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember,
+ ArrayList<GossipMember> remoteList);
}
\ No newline at end of file
diff --git a/src/main/java/com/google/code/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java b/src/main/java/com/google/code/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java
index e915240..206d5c5 100644
--- a/src/main/java/com/google/code/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java
+++ b/src/main/java/com/google/code/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java
@@ -11,83 +11,101 @@
public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread {
- public OnlyProcessReceivedPassiveGossipThread(GossipManager gossipManager) {
- super(gossipManager);
- }
+ public OnlyProcessReceivedPassiveGossipThread(GossipManager gossipManager) {
+ super(gossipManager);
+ }
- /**
- * Merge remote list (received from peer), and our local member list.
- * Simply, we must update the heartbeats that the remote list has with
- * our list. Also, some additional logic is needed to make sure we have
- * not timed out a member and then immediately received a list with that
- * member.
- * @param remoteList
- */
- protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember, ArrayList<GossipMember> remoteList) {
+ /**
+ * Merge remote list (received from peer), and our local member list. Simply, we must update the
+ * heartbeats that the remote list has with our list. Also, some additional logic is needed to
+ * make sure we have not timed out a member and then immediately received a list with that member.
+ *
+ * @param remoteList
+ */
+ protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember,
+ ArrayList<GossipMember> remoteList) {
- synchronized (gossipManager.getDeadList()) {
+ synchronized (gossipManager.getDeadList()) {
- synchronized (gossipManager.getMemberList()) {
+ synchronized (gossipManager.getMemberList()) {
- for (GossipMember remoteMember : remoteList) {
- // Skip myself. We don't want ourselves in the local member list.
- if (!remoteMember.equals(gossipManager.getMyself())) {
- if (gossipManager.getMemberList().contains(remoteMember)) {
- GossipService.LOGGER.debug("The local list already contains the remote member (" + remoteMember + ").");
- // The local memberlist contains the remote member.
- LocalGossipMember localMember = gossipManager.getMemberList().get(gossipManager.getMemberList().indexOf(remoteMember));
+ for (GossipMember remoteMember : remoteList) {
+ // Skip myself. We don't want ourselves in the local member list.
+ if (!remoteMember.equals(gossipManager.getMyself())) {
+ if (gossipManager.getMemberList().contains(remoteMember)) {
+ GossipService.LOGGER.debug("The local list already contains the remote member ("
+ + remoteMember + ").");
+ // The local memberlist contains the remote member.
+ LocalGossipMember localMember = gossipManager.getMemberList().get(
+ gossipManager.getMemberList().indexOf(remoteMember));
- // Let's synchronize it's heartbeat.
- if (remoteMember.getHeartbeat() > localMember.getHeartbeat()) {
- // update local list with latest heartbeat
- localMember.setHeartbeat(remoteMember.getHeartbeat());
- // and reset the timeout of that member
- localMember.resetTimeoutTimer();
- }
- // TODO: Otherwise, should we inform the other when the heartbeat is already higher?
- } else {
- // The local list does not contain the remote member.
- GossipService.LOGGER.debug("The local list does not contain the remote member (" + remoteMember + ").");
+ // Let's synchronize it's heartbeat.
+ if (remoteMember.getHeartbeat() > localMember.getHeartbeat()) {
+ // update local list with latest heartbeat
+ localMember.setHeartbeat(remoteMember.getHeartbeat());
+ // and reset the timeout of that member
+ localMember.resetTimeoutTimer();
+ }
+ // TODO: Otherwise, should we inform the other when the heartbeat is already higher?
+ } else {
+ // The local list does not contain the remote member.
+ GossipService.LOGGER.debug("The local list does not contain the remote member ("
+ + remoteMember + ").");
- // The remote member is either brand new, or a previously declared dead member.
- // If its dead, check the heartbeat because it may have come back from the dead.
- if (gossipManager.getDeadList().contains(remoteMember)) {
- // The remote member is known here as a dead member.
- GossipService.LOGGER.debug("The remote member is known here as a dead member.");
- LocalGossipMember localDeadMember = gossipManager.getDeadList().get(gossipManager.getDeadList().indexOf(remoteMember));
- // If a member is restarted the heartbeat will restart from 1, so we should check that here.
- // So a member can become from the dead when it is either larger than a previous heartbeat (due to network failure)
- // or when the heartbeat is 1 (after a restart of the service).
- // TODO: What if the first message of a gossip service is sent to a dead node? The second member will receive a heartbeat of two.
- // TODO: The above does happen. Maybe a special message for a revived member?
- // TODO: Or maybe when a member is declared dead for more than _settings.getCleanupInterval() ms, reset the heartbeat to 0.
- // It will then accept a revived member.
- // The above is now handle by checking whether the heartbeat differs _settings.getCleanupInterval(), it must be restarted.
- if (remoteMember.getHeartbeat() == 1
- || ((localDeadMember.getHeartbeat() - remoteMember.getHeartbeat()) * -1) > (gossipManager.getSettings().getCleanupInterval() / 1000)
- || remoteMember.getHeartbeat() > localDeadMember.getHeartbeat()) {
- GossipService.LOGGER.debug("The remote member is back from the dead. We will remove it from the dead list and add it as a new member.");
- // The remote member is back from the dead.
- // Remove it from the dead list.
- gossipManager.getDeadList().remove(localDeadMember);
- // Add it as a new member and add it to the member list.
- LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getHost(), remoteMember.getPort(), remoteMember.getId(), remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings().getCleanupInterval());
- gossipManager.getMemberList().add(newLocalMember);
- newLocalMember.startTimeoutTimer();
- GossipService.LOGGER.info("Removed remote member " + remoteMember.getAddress() + " from dead list and added to local member list.");
- }
- } else {
- // Brand spanking new member - welcome.
- LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getHost(), remoteMember.getPort(), remoteMember.getId(), remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings().getCleanupInterval());
- gossipManager.getMemberList().add(newLocalMember);
- newLocalMember.startTimeoutTimer();
- GossipService.LOGGER.info("Added new remote member " + remoteMember.getAddress() + " to local member list.");
- }
- }
- }
- }
- }
- }
- }
-
+ // The remote member is either brand new, or a previously declared dead member.
+ // If its dead, check the heartbeat because it may have come back from the dead.
+ if (gossipManager.getDeadList().contains(remoteMember)) {
+ // The remote member is known here as a dead member.
+ GossipService.LOGGER.debug("The remote member is known here as a dead member.");
+ LocalGossipMember localDeadMember = gossipManager.getDeadList().get(
+ gossipManager.getDeadList().indexOf(remoteMember));
+ // If a member is restarted the heartbeat will restart from 1, so we should check
+ // that here.
+ // So a member can become from the dead when it is either larger than a previous
+ // heartbeat (due to network failure)
+ // or when the heartbeat is 1 (after a restart of the service).
+ // TODO: What if the first message of a gossip service is sent to a dead node? The
+ // second member will receive a heartbeat of two.
+ // TODO: The above does happen. Maybe a special message for a revived member?
+ // TODO: Or maybe when a member is declared dead for more than
+ // _settings.getCleanupInterval() ms, reset the heartbeat to 0.
+ // It will then accept a revived member.
+ // The above is now handle by checking whether the heartbeat differs
+ // _settings.getCleanupInterval(), it must be restarted.
+ if (remoteMember.getHeartbeat() == 1
+ || ((localDeadMember.getHeartbeat() - remoteMember.getHeartbeat()) * -1) > (gossipManager
+ .getSettings().getCleanupInterval() / 1000)
+ || remoteMember.getHeartbeat() > localDeadMember.getHeartbeat()) {
+ GossipService.LOGGER
+ .debug("The remote member is back from the dead. We will remove it from the dead list and add it as a new member.");
+ // The remote member is back from the dead.
+ // Remove it from the dead list.
+ gossipManager.getDeadList().remove(localDeadMember);
+ // Add it as a new member and add it to the member list.
+ LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getHost(),
+ remoteMember.getPort(), remoteMember.getId(),
+ remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings()
+ .getCleanupInterval());
+ gossipManager.getMemberList().add(newLocalMember);
+ newLocalMember.startTimeoutTimer();
+ GossipService.LOGGER.info("Removed remote member " + remoteMember.getAddress()
+ + " from dead list and added to local member list.");
+ }
+ } else {
+ // Brand spanking new member - welcome.
+ LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getHost(),
+ remoteMember.getPort(), remoteMember.getId(), remoteMember.getHeartbeat(),
+ gossipManager, gossipManager.getSettings().getCleanupInterval());
+ gossipManager.getMemberList().add(newLocalMember);
+ newLocalMember.startTimeoutTimer();
+ GossipService.LOGGER.info("Added new remote member " + remoteMember.getAddress()
+ + " to local member list.");
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
}
diff --git a/src/main/java/com/google/code/gossip/manager/impl/SendMembersActiveGossipThread.java b/src/main/java/com/google/code/gossip/manager/impl/SendMembersActiveGossipThread.java
index 0b5c9be..3e2baf9 100644
--- a/src/main/java/com/google/code/gossip/manager/impl/SendMembersActiveGossipThread.java
+++ b/src/main/java/com/google/code/gossip/manager/impl/SendMembersActiveGossipThread.java
@@ -16,79 +16,80 @@
abstract public class SendMembersActiveGossipThread extends ActiveGossipThread {
- public SendMembersActiveGossipThread(GossipManager gossipManager) {
- super(gossipManager);
- }
-
- /**
- * Performs the sending of the membership list, after we have
- * incremented our own heartbeat.
- */
- protected void sendMembershipList(LocalGossipMember me, ArrayList<LocalGossipMember> memberList) {
- GossipService.LOGGER.debug("Send sendMembershipList() is called.");
+ public SendMembersActiveGossipThread(GossipManager gossipManager) {
+ super(gossipManager);
+ }
- // Increase the heartbeat of myself by 1.
- me.setHeartbeat(me.getHeartbeat() + 1);
-
- synchronized (memberList) {
- try {
- LocalGossipMember member = selectPartner(memberList);
+ /**
+ * Performs the sending of the membership list, after we have incremented our own heartbeat.
+ */
+ protected void sendMembershipList(LocalGossipMember me, ArrayList<LocalGossipMember> memberList) {
+ GossipService.LOGGER.debug("Send sendMembershipList() is called.");
- if (member != null) {
- InetAddress dest = InetAddress.getByName(member.getHost());
-
- // Create a StringBuffer for the JSON message.
- JSONArray jsonArray = new JSONArray();
- GossipService.LOGGER.debug("Sending memberlist to " + dest + ":" + member.getPort());
- GossipService.LOGGER.debug("---------------------");
-
- // First write myself, append the JSON representation of the member to the buffer.
- jsonArray.put(me.toJSONObject());
- GossipService.LOGGER.debug(me);
-
- // Then write the others.
- for (int i=0; i<memberList.size(); i++) {
- LocalGossipMember other = memberList.get(i);
- // Append the JSON representation of the member to the buffer.
- jsonArray.put(other.toJSONObject());
- GossipService.LOGGER.debug(other);
- }
- GossipService.LOGGER.debug("---------------------");
-
- // Write the objects to a byte array.
- byte[] json_bytes = jsonArray.toString().getBytes();
-
- int packet_length = json_bytes.length;
-
- if (packet_length < GossipManager.MAX_PACKET_SIZE) {
-
- // Convert the packet length to the byte representation of the int.
- byte[] length_bytes = new byte[4];
- length_bytes[0] =(byte)( packet_length >> 24 );
- length_bytes[1] =(byte)( (packet_length << 8) >> 24 );
- length_bytes[2] =(byte)( (packet_length << 16) >> 24 );
- length_bytes[3] =(byte)( (packet_length << 24) >> 24 );
-
-
- GossipService.LOGGER.debug("Sending message ("+packet_length+" bytes): " + jsonArray.toString());
-
+ // Increase the heartbeat of myself by 1.
+ me.setHeartbeat(me.getHeartbeat() + 1);
+
+ synchronized (memberList) {
+ try {
+ LocalGossipMember member = selectPartner(memberList);
+
+ if (member != null) {
+ InetAddress dest = InetAddress.getByName(member.getHost());
+
+ // Create a StringBuffer for the JSON message.
+ JSONArray jsonArray = new JSONArray();
+ GossipService.LOGGER.debug("Sending memberlist to " + dest + ":" + member.getPort());
+ GossipService.LOGGER.debug("---------------------");
+
+ // First write myself, append the JSON representation of the member to the buffer.
+ jsonArray.put(me.toJSONObject());
+ GossipService.LOGGER.debug(me);
+
+ // Then write the others.
+ for (int i = 0; i < memberList.size(); i++) {
+ LocalGossipMember other = memberList.get(i);
+ // Append the JSON representation of the member to the buffer.
+ jsonArray.put(other.toJSONObject());
+ GossipService.LOGGER.debug(other);
+ }
+ GossipService.LOGGER.debug("---------------------");
+
+ // Write the objects to a byte array.
+ byte[] json_bytes = jsonArray.toString().getBytes();
+
+ int packet_length = json_bytes.length;
+
+ if (packet_length < GossipManager.MAX_PACKET_SIZE) {
+
+ // Convert the packet length to the byte representation of the int.
+ byte[] length_bytes = new byte[4];
+ length_bytes[0] = (byte) (packet_length >> 24);
+ length_bytes[1] = (byte) ((packet_length << 8) >> 24);
+ length_bytes[2] = (byte) ((packet_length << 16) >> 24);
+ length_bytes[3] = (byte) ((packet_length << 24) >> 24);
+
+ GossipService.LOGGER.debug("Sending message (" + packet_length + " bytes): "
+ + jsonArray.toString());
+
ByteBuffer byteBuffer = ByteBuffer.allocate(4 + json_bytes.length);
- byteBuffer.put(length_bytes);
- byteBuffer.put(json_bytes);
- byte[] buf = byteBuffer.array();
-
- DatagramSocket socket = new DatagramSocket();
- DatagramPacket datagramPacket = new DatagramPacket(buf, buf.length, dest, member.getPort());
- socket.send(datagramPacket);
- socket.close();
- } else {
- GossipService.LOGGER.error("The length of the to be send message is too large (" + packet_length + " > " + GossipManager.MAX_PACKET_SIZE + ").");
- }
- }
+ byteBuffer.put(length_bytes);
+ byteBuffer.put(json_bytes);
+ byte[] buf = byteBuffer.array();
- } catch (IOException e1) {
- e1.printStackTrace();
- }
- }
- }
+ DatagramSocket socket = new DatagramSocket();
+ DatagramPacket datagramPacket = new DatagramPacket(buf, buf.length, dest,
+ member.getPort());
+ socket.send(datagramPacket);
+ socket.close();
+ } else {
+ GossipService.LOGGER.error("The length of the to be send message is too large ("
+ + packet_length + " > " + GossipManager.MAX_PACKET_SIZE + ").");
+ }
+ }
+
+ } catch (IOException e1) {
+ e1.printStackTrace();
+ }
+ }
+ }
}
diff --git a/src/main/java/com/google/code/gossip/manager/random/RandomActiveGossipThread.java b/src/main/java/com/google/code/gossip/manager/random/RandomActiveGossipThread.java
index fa2ec70..917c362 100644
--- a/src/main/java/com/google/code/gossip/manager/random/RandomActiveGossipThread.java
+++ b/src/main/java/com/google/code/gossip/manager/random/RandomActiveGossipThread.java
@@ -9,30 +9,30 @@
import com.google.code.gossip.manager.impl.SendMembersActiveGossipThread;
public class RandomActiveGossipThread extends SendMembersActiveGossipThread {
-
- /** The Random used for choosing a member to gossip with. */
- private Random _random;
- public RandomActiveGossipThread(GossipManager gossipManager) {
- super(gossipManager);
- _random = new Random();
- }
-
- /**
- * [The selectToSend() function.]
- * Find a random peer from the local membership list.
- * In the case where this client is the only member in the list, this method will return null.
- * @return Member random member if list is greater than 1, null otherwise
- */
- protected LocalGossipMember selectPartner(ArrayList<LocalGossipMember> memberList) {
- LocalGossipMember member = null;
- if (memberList.size() > 0) {
- int randomNeighborIndex = _random.nextInt(memberList.size());
- member = memberList.get(randomNeighborIndex);
- } else {
- GossipService.LOGGER.debug("I am alone in this world.");
- }
- return member;
- }
+ /** The Random used for choosing a member to gossip with. */
+ private Random _random;
+
+ public RandomActiveGossipThread(GossipManager gossipManager) {
+ super(gossipManager);
+ _random = new Random();
+ }
+
+ /**
+ * [The selectToSend() function.] Find a random peer from the local membership list. In the case
+ * where this client is the only member in the list, this method will return null.
+ *
+ * @return Member random member if list is greater than 1, null otherwise
+ */
+ protected LocalGossipMember selectPartner(ArrayList<LocalGossipMember> memberList) {
+ LocalGossipMember member = null;
+ if (memberList.size() > 0) {
+ int randomNeighborIndex = _random.nextInt(memberList.size());
+ member = memberList.get(randomNeighborIndex);
+ } else {
+ GossipService.LOGGER.debug("I am alone in this world.");
+ }
+ return member;
+ }
}
diff --git a/src/main/java/com/google/code/gossip/manager/random/RandomGossipManager.java b/src/main/java/com/google/code/gossip/manager/random/RandomGossipManager.java
index d09bd83..8893ff5 100644
--- a/src/main/java/com/google/code/gossip/manager/random/RandomGossipManager.java
+++ b/src/main/java/com/google/code/gossip/manager/random/RandomGossipManager.java
@@ -8,7 +8,9 @@
import com.google.code.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread;
public class RandomGossipManager extends GossipManager {
- public RandomGossipManager(String address, int port, String id, GossipSettings settings, ArrayList<GossipMember> gossipMembers) {
- super(OnlyProcessReceivedPassiveGossipThread.class, RandomActiveGossipThread.class, address, port, id, settings, gossipMembers);
- }
+ public RandomGossipManager(String address, int port, String id, GossipSettings settings,
+ ArrayList<GossipMember> gossipMembers) {
+ super(OnlyProcessReceivedPassiveGossipThread.class, RandomActiveGossipThread.class, address,
+ port, id, settings, gossipMembers);
+ }
}