Merge pull request #21 from ptgoetz/cluster-ids

WIP on configurable cluster names
diff --git a/src/main/java/com/google/code/gossip/GossipMember.java b/src/main/java/com/google/code/gossip/GossipMember.java
index b62313e..0160dea 100644
--- a/src/main/java/com/google/code/gossip/GossipMember.java
+++ b/src/main/java/com/google/code/gossip/GossipMember.java
@@ -33,9 +33,11 @@
 	public static final String JSON_PORT = "port";
 	public static final String JSON_HEARTBEAT = "heartbeat";
 	public static final String JSON_ID = "id";
+	public static final String JSON_CLUSTER = "cluster";
 	protected final String _host;
 	protected final int _port;
 	protected volatile long _heartbeat;
+	protected final String _clusterName;
 	/**
 	 * The purpose of the id field is to be able for nodes to identify themselves beyond there host/port. For example
 	 * an application might generate a persistent id so if they rejoin the cluster at a different host and port we 
@@ -50,7 +52,8 @@
 	 * @param heartbeat The current heartbeat.
 	 * @param id an id that may be replaced after contact
 	 */
-	public GossipMember(String host, int port, String id, long heartbeat) {
+	public GossipMember(String clusterName, String host, int port, String id, long heartbeat) {
+		_clusterName = clusterName;
 		_host = host;
 		_port = port;
 		_id = id;
@@ -58,6 +61,15 @@
 	}
 
 	/**
+	 * Get the name of the cluster the member belongs to.
+	 *
+	 * @return The cluster name
+     */
+	public String getClusterName(){
+		return _clusterName;
+	}
+
+	/**
 	 * Get the hostname or IP address of the remote gossip member.
 	 * @return The hostname or IP address.
 	 */
@@ -119,7 +131,8 @@
 		int result = 1;
 		String address = getAddress();
 		result = prime * result
-		+ ((address == null) ? 0 : address.hashCode());
+		+ ((address == null) ? 0 : address.hashCode())
+		+ _clusterName == null ? 0 : _clusterName.hashCode();
 		return result;
 	}
 
@@ -140,7 +153,8 @@
 			return false;
 		}
 		// The object is the same of they both have the same address (hostname and port).
-		return getAddress().equals(((LocalGossipMember) obj).getAddress());
+		return getAddress().equals(((LocalGossipMember) obj).getAddress()) &&
+				getClusterName().equals(((LocalGossipMember) obj).getClusterName());
 	}
 
 	/**
@@ -150,6 +164,7 @@
 	public JSONObject toJSONObject() {
 		try {
 			JSONObject jsonObject = new JSONObject();
+			jsonObject.put(JSON_CLUSTER, _clusterName);
 			jsonObject.put(JSON_HOST, _host);
 			jsonObject.put(JSON_PORT, _port);
 			jsonObject.put(JSON_ID, _id);
diff --git a/src/main/java/com/google/code/gossip/GossipService.java b/src/main/java/com/google/code/gossip/GossipService.java
index 13be3ec..3d578a4 100644
--- a/src/main/java/com/google/code/gossip/GossipService.java
+++ b/src/main/java/com/google/code/gossip/GossipService.java
@@ -46,7 +46,7 @@
    */
   public GossipService(StartupSettings startupSettings) throws InterruptedException,
           UnknownHostException {
-    this(InetAddress.getLocalHost().getHostAddress(), startupSettings.getPort(), startupSettings.getId(),
+    this(startupSettings.getCluster(), InetAddress.getLocalHost().getHostAddress(), startupSettings.getPort(), startupSettings.getId(),
              startupSettings.getGossipMembers(), startupSettings
                     .getGossipSettings(), null);
   }
@@ -57,10 +57,10 @@
    * @throws InterruptedException
    * @throws UnknownHostException
    */
-  public GossipService(String ipAddress, int port, String id,
+  public GossipService(String cluster, String ipAddress, int port, String id,
           List<GossipMember> gossipMembers, GossipSettings settings, GossipListener listener)
           throws InterruptedException, UnknownHostException {
-    _gossipManager = new RandomGossipManager(ipAddress, port, id, settings, gossipMembers, listener);
+    _gossipManager = new RandomGossipManager(cluster, ipAddress, port, id, settings, gossipMembers, listener);
   }
 
   public void start() {
diff --git a/src/main/java/com/google/code/gossip/LocalGossipMember.java b/src/main/java/com/google/code/gossip/LocalGossipMember.java
index 719c1f3..978e822 100644
--- a/src/main/java/com/google/code/gossip/LocalGossipMember.java
+++ b/src/main/java/com/google/code/gossip/LocalGossipMember.java
@@ -43,9 +43,9 @@
    * @param cleanupTimeout
    *          The cleanup timeout for this gossip member.
    */
-  public LocalGossipMember(String hostname, int port, String id, long heartbeat,
+  public LocalGossipMember(String clusterName, String hostname, int port, String id, long heartbeat,
           NotificationListener notificationListener, int cleanupTimeout) {
-    super(hostname, port, id, heartbeat);
+    super(clusterName, hostname, port, id, heartbeat);
 
     this.timeoutTimer = new GossipTimeoutTimer(cleanupTimeout, notificationListener, this);
   }
diff --git a/src/main/java/com/google/code/gossip/RemoteGossipMember.java b/src/main/java/com/google/code/gossip/RemoteGossipMember.java
index 6435bf2..d3848fd 100644
--- a/src/main/java/com/google/code/gossip/RemoteGossipMember.java
+++ b/src/main/java/com/google/code/gossip/RemoteGossipMember.java
@@ -35,8 +35,8 @@
    * @param heartbeat
    *          The current heartbeat.
    */
-  public RemoteGossipMember(String hostname, int port, String id, long heartbeat) {
-    super(hostname, port, id, heartbeat);
+  public RemoteGossipMember(String clusterName, String hostname, int port, String id, long heartbeat) {
+    super(clusterName, hostname, port, id, heartbeat);
   }
 
   /**
@@ -47,7 +47,7 @@
    * @param port
    *          The port number.
    */
-  public RemoteGossipMember(String hostname, int port, String id) {
-    super(hostname, port, id, System.currentTimeMillis());
+  public RemoteGossipMember(String clusterName, String hostname, int port, String id) {
+    super(clusterName, hostname, port, id, System.currentTimeMillis());
   }
 }
diff --git a/src/main/java/com/google/code/gossip/StartupSettings.java b/src/main/java/com/google/code/gossip/StartupSettings.java
index b6475de..5e0a674 100644
--- a/src/main/java/com/google/code/gossip/StartupSettings.java
+++ b/src/main/java/com/google/code/gossip/StartupSettings.java
@@ -44,6 +44,8 @@
   /** The port to start the gossip service on. */
   private int _port;
 
+  private String cluster;
+
   /** The gossip settings used at startup. */
   private final GossipSettings _gossipSettings;
 
@@ -79,6 +81,14 @@
     _gossipMembers = new ArrayList<>();
   }
 
+  public void setCluster(String cluster){
+    this.cluster = cluster;
+  }
+
+  public String getCluster(){
+    return this.cluster;
+  }
+
   /**
    * Set the id to be used for this service.
    *
@@ -192,7 +202,7 @@
     JSONArray membersJSON = jsonObject.getJSONArray("members");
     for (int i = 0; i < membersJSON.length(); i++) {
       JSONObject memberJSON = membersJSON.getJSONObject(i);
-      RemoteGossipMember member = new RemoteGossipMember(memberJSON.getString("host"),
+      RemoteGossipMember member = new RemoteGossipMember(memberJSON.getString("cluster"), memberJSON.getString("host"),
               memberJSON.getInt("port"), "");
       settings.addGossipMember(member);
       configMembersDetails += member.getAddress();
diff --git a/src/main/java/com/google/code/gossip/examples/GossipExample.java b/src/main/java/com/google/code/gossip/examples/GossipExample.java
index 2c8ea94..bb5c884 100644
--- a/src/main/java/com/google/code/gossip/examples/GossipExample.java
+++ b/src/main/java/com/google/code/gossip/examples/GossipExample.java
@@ -63,18 +63,20 @@
       // Get my ip address.
       String myIpAddress = InetAddress.getLocalHost().getHostAddress();
 
+      String cluster = "My Gossip Cluster";
+
       // Create the gossip members and put them in a list and give them a port number starting with
       // 2000.
       List<GossipMember> startupMembers = new ArrayList<>();
       for (int i = 0; i < NUMBER_OF_CLIENTS; ++i) {
-        startupMembers.add(new RemoteGossipMember(myIpAddress, 2000 + i, ""));
+        startupMembers.add(new RemoteGossipMember(cluster, myIpAddress, 2000 + i, ""));
       }
 
       // Lets start the gossip clients.
       // Start the clients, waiting cleaning-interval + 1 second between them which will show the
       // dead list handling.
       for (GossipMember member : startupMembers) {
-        GossipService gossipService = new GossipService(myIpAddress, member.getPort(), "",
+        GossipService gossipService = new GossipService(cluster, myIpAddress, member.getPort(), "",
                 startupMembers, settings, null);
         clients.add(gossipService);
         gossipService.start();
diff --git a/src/main/java/com/google/code/gossip/manager/GossipManager.java b/src/main/java/com/google/code/gossip/manager/GossipManager.java
index 99aef68..b695fcc 100644
--- a/src/main/java/com/google/code/gossip/manager/GossipManager.java
+++ b/src/main/java/com/google/code/gossip/manager/GossipManager.java
@@ -57,17 +57,17 @@
   private ExecutorService _gossipThreadExecutor;
 
   public GossipManager(Class<? extends PassiveGossipThread> passiveGossipThreadClass,
-          Class<? extends ActiveGossipThread> activeGossipThreadClass, String address, int port,
+          Class<? extends ActiveGossipThread> activeGossipThreadClass, String cluster, String address, int port,
           String id, GossipSettings settings, List<GossipMember> gossipMembers,
           GossipListener listener) {
     _passiveGossipThreadClass = passiveGossipThreadClass;
     _activeGossipThreadClass = activeGossipThreadClass;
     _settings = settings;
-    _me = new LocalGossipMember(address, port, id, System.currentTimeMillis(), this, settings.getCleanupInterval());
+    _me = new LocalGossipMember(cluster, address, port, id, System.currentTimeMillis(), this, settings.getCleanupInterval());
     members = new ConcurrentSkipListMap<>();
     for (GossipMember startupMember : gossipMembers) {
       if (!startupMember.equals(_me)) {
-        LocalGossipMember member = new LocalGossipMember(startupMember.getHost(),
+        LocalGossipMember member = new LocalGossipMember(startupMember.getClusterName(), startupMember.getHost(),
                 startupMember.getPort(), startupMember.getId(), System.currentTimeMillis(), this,
                 settings.getCleanupInterval());
         members.put(member, GossipState.UP);
diff --git a/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java b/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java
index acffb94..5dcea09 100644
--- a/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java
+++ b/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java
@@ -53,6 +53,8 @@
 
   private AtomicBoolean _keepRunning;
 
+  private final String _cluster;
+
   public PassiveGossipThread(GossipManager gossipManager) {
     _gossipManager = gossipManager;
     try {
@@ -62,6 +64,7 @@
       GossipService.LOGGER.debug("Gossip service successfully initialized on port "
               + _gossipManager.getMyself().getPort());
       GossipService.LOGGER.debug("I am " + _gossipManager.getMyself());
+      _cluster = _gossipManager.getMyself().getClusterName();
     } catch (SocketException ex) {
       GossipService.LOGGER.warn(ex);
       throw new RuntimeException(ex);
@@ -95,8 +98,9 @@
             JSONArray jsonArray = new JSONArray(receivedMessage);
             for (int i = 0; i < jsonArray.length(); i++) {
               JSONObject memberJSONObject = jsonArray.getJSONObject(i);
-              if (memberJSONObject.length() == 4) {
+              if (memberJSONObject.length() == 5  && _cluster.equals(memberJSONObject.get(GossipMember.JSON_CLUSTER))) {
                 RemoteGossipMember member = new RemoteGossipMember(
+                        memberJSONObject.getString(GossipMember.JSON_CLUSTER),
                         memberJSONObject.getString(GossipMember.JSON_HOST),
                         memberJSONObject.getInt(GossipMember.JSON_PORT),
                         memberJSONObject.getString(GossipMember.JSON_ID),
@@ -108,9 +112,11 @@
                   senderMember = member;
                 }
                 remoteGossipMembers.add(member);
+              } else if(memberJSONObject.length() == 5) {
+                GossipService.LOGGER.warn("The member object does not belong to this cluster.");
               } else {
                 GossipService.LOGGER
-                        .error("The received member object does not contain 4 objects:\n"
+                        .error("The received member object does not contain 5 objects:\n"
                                 + memberJSONObject.toString());
               }
 
diff --git a/src/main/java/com/google/code/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java b/src/main/java/com/google/code/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java
index acfc43e..16584f9 100644
--- a/src/main/java/com/google/code/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java
+++ b/src/main/java/com/google/code/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java
@@ -48,7 +48,7 @@
     for (LocalGossipMember i : gossipManager.getDeadList()){
       if (i.getId().equals(senderMember.getId())){
         System.out.println(gossipManager.getMyself() +" caught a live one!");
-        LocalGossipMember newLocalMember = new LocalGossipMember(senderMember.getHost(),
+        LocalGossipMember newLocalMember = new LocalGossipMember(senderMember.getClusterName(), senderMember.getHost(),
                 senderMember.getPort(), senderMember.getId(), senderMember.getHeartbeat(),
                 gossipManager, gossipManager.getSettings().getCleanupInterval());
         gossipManager.revivieMember(newLocalMember);
@@ -68,7 +68,7 @@
         }
       } else if (!gossipManager.getMemberList().contains(remoteMember) 
               && !gossipManager.getDeadList().contains(remoteMember) ){
-        LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getHost(),
+        LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getClusterName(), remoteMember.getHost(),
                 remoteMember.getPort(), remoteMember.getId(), remoteMember.getHeartbeat(),
                 gossipManager, gossipManager.getSettings().getCleanupInterval());
         gossipManager.createOrRevivieMember(newLocalMember);
@@ -78,7 +78,7 @@
           LocalGossipMember localDeadMember = gossipManager.getDeadList().get(
                   gossipManager.getDeadList().indexOf(remoteMember));
           if (remoteMember.getHeartbeat() > localDeadMember.getHeartbeat()) {
-            LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getHost(),
+            LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getClusterName(), remoteMember.getHost(),
                     remoteMember.getPort(), remoteMember.getId(), remoteMember.getHeartbeat(),
                     gossipManager, gossipManager.getSettings().getCleanupInterval());
             gossipManager.revivieMember(newLocalMember);
diff --git a/src/main/java/com/google/code/gossip/manager/random/RandomGossipManager.java b/src/main/java/com/google/code/gossip/manager/random/RandomGossipManager.java
index d04f913..e256ec2 100644
--- a/src/main/java/com/google/code/gossip/manager/random/RandomGossipManager.java
+++ b/src/main/java/com/google/code/gossip/manager/random/RandomGossipManager.java
@@ -26,9 +26,9 @@
 import java.util.List;
 
 public class RandomGossipManager extends GossipManager {
-  public RandomGossipManager(String address, int port, String id, GossipSettings settings,
+  public RandomGossipManager(String cluster, String address, int port, String id, GossipSettings settings,
           List<GossipMember> gossipMembers, GossipListener listener) {
-    super(OnlyProcessReceivedPassiveGossipThread.class, RandomActiveGossipThread.class, address,
+    super(OnlyProcessReceivedPassiveGossipThread.class, RandomActiveGossipThread.class, cluster, address,
             port, id, settings, gossipMembers, listener);
   }
 }
diff --git a/src/test/java/io/teknek/gossip/ShutdownDeadtimeTest.java b/src/test/java/io/teknek/gossip/ShutdownDeadtimeTest.java
index ddd8364..af30eb7 100644
--- a/src/test/java/io/teknek/gossip/ShutdownDeadtimeTest.java
+++ b/src/test/java/io/teknek/gossip/ShutdownDeadtimeTest.java
@@ -23,6 +23,7 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
+import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 
@@ -44,12 +45,13 @@
   //@Ignore
   public void DeadNodesDoNotComeAliveAgain() throws InterruptedException, UnknownHostException {
       GossipSettings settings = new GossipSettings(1000, 10000);
+      String cluster = UUID.randomUUID().toString();
       
       log.info( "Adding seed nodes" );
       int seedNodes = 3;
       List<GossipMember> startupMembers = new ArrayList<>();
       for (int i = 1; i < seedNodes + 1; ++i) {
-          startupMembers.add(new RemoteGossipMember("127.0.0.1", 50000 + i, i + ""));
+          startupMembers.add(new RemoteGossipMember(cluster, "127.0.0.1", 50000 + i, i + ""));
       }
 
       log.info( "Adding clients" );
@@ -57,7 +59,7 @@
       final int clusterMembers = 5;
       for (int i = 1; i < clusterMembers+1; ++i) {
           final int j = i;
-          GossipService gossipService = new GossipService("127.0.0.1", 50000 + i, i + "",
+          GossipService gossipService = new GossipService(cluster, "127.0.0.1", 50000 + i, i + "",
                   startupMembers, settings,
                   new GossipListener(){
                       @Override
@@ -104,7 +106,7 @@
         }}).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(4);
       
       // start client again
-      GossipService gossipService = new GossipService("127.0.0.1", shutdownPort, shutdownId + "",
+      GossipService gossipService = new GossipService(cluster, "127.0.0.1", shutdownPort, shutdownId + "",
               startupMembers, settings,
               new GossipListener(){
                   @Override
diff --git a/src/test/java/io/teknek/gossip/StartupSettingsTest.java b/src/test/java/io/teknek/gossip/StartupSettingsTest.java
index 5cb52e0..5f25dd9 100644
--- a/src/test/java/io/teknek/gossip/StartupSettingsTest.java
+++ b/src/test/java/io/teknek/gossip/StartupSettingsTest.java
@@ -44,6 +44,7 @@
  */
 public class StartupSettingsTest {
   private static final Logger log = Logger.getLogger( StartupSettingsTest.class );
+  private static final String CLUSTER = UUID.randomUUID().toString();
 
   @Test
   public void testUsingSettingsFile() throws IOException, InterruptedException, JSONException {
@@ -52,7 +53,7 @@
     settingsFile.deleteOnExit();
     writeSettingsFile(settingsFile);
     final GossipService firstService = new GossipService(
-      "127.0.0.1", 50000, UUID.randomUUID().toString(),
+            CLUSTER, "127.0.0.1", 50000, UUID.randomUUID().toString(),
       new ArrayList<GossipMember>(), new GossipSettings(), null);
     
     firstService.start();
@@ -76,12 +77,13 @@
   private void writeSettingsFile( File target ) throws IOException {
     String settings =
             "[{\n" + // It is odd that this is meant to be in an array, but oh well.
+            "  \"cluster\":\"" + CLUSTER + "\",\n" +
             "  \"id\":\"" + UUID.randomUUID() + "\",\n" +
             "  \"port\":50001,\n" +
             "  \"gossip_interval\":1000,\n" +
             "  \"cleanup_interval\":10000,\n" +
             "  \"members\":[\n" +
-            "    {\"host\":\"127.0.0.1\", \"port\":50000}\n" +
+            "    {\"cluster\": \"" + CLUSTER + "\",\"host\":\"127.0.0.1\", \"port\":50000}\n" +
             "  ]\n" +
             "}]";
 
diff --git a/src/test/java/io/teknek/gossip/TenNodeThreeSeedTest.java b/src/test/java/io/teknek/gossip/TenNodeThreeSeedTest.java
index 39844ac..0065ade 100644
--- a/src/test/java/io/teknek/gossip/TenNodeThreeSeedTest.java
+++ b/src/test/java/io/teknek/gossip/TenNodeThreeSeedTest.java
@@ -22,6 +22,7 @@
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 
@@ -52,19 +53,20 @@
 
   public void abc() throws InterruptedException, UnknownHostException{
     GossipSettings settings = new GossipSettings();
+    String cluster = UUID.randomUUID().toString();
 
     log.info( "Adding seed nodes" );
     int seedNodes = 3;
     List<GossipMember> startupMembers = new ArrayList<>();
     for (int i = 1; i < seedNodes+1; ++i) {
-      startupMembers.add(new RemoteGossipMember("127.0.0.1", 50000 + i, i + ""));
+      startupMembers.add(new RemoteGossipMember(cluster, "127.0.0.1", 50000 + i, i + ""));
     }
 
     log.info( "Adding clients" );
     final List<GossipService> clients = new ArrayList<>();
     final int clusterMembers = 5;
     for (int i = 1; i < clusterMembers+1; ++i) {
-      GossipService gossipService = new GossipService("127.0.0.1", 50000 + i, i + "",
+      GossipService gossipService = new GossipService(cluster, "127.0.0.1", 50000 + i, i + "",
               startupMembers, settings,
               new GossipListener(){
         @Override