WIP on configurable cluster names
diff --git a/src/main/java/com/google/code/gossip/GossipMember.java b/src/main/java/com/google/code/gossip/GossipMember.java
index b62313e..0160dea 100644
--- a/src/main/java/com/google/code/gossip/GossipMember.java
+++ b/src/main/java/com/google/code/gossip/GossipMember.java
@@ -33,9 +33,11 @@
public static final String JSON_PORT = "port";
public static final String JSON_HEARTBEAT = "heartbeat";
public static final String JSON_ID = "id";
+ public static final String JSON_CLUSTER = "cluster";
protected final String _host;
protected final int _port;
protected volatile long _heartbeat;
+ protected final String _clusterName;
/**
* The purpose of the id field is to be able for nodes to identify themselves beyond there host/port. For example
* an application might generate a persistent id so if they rejoin the cluster at a different host and port we
@@ -50,7 +52,8 @@
* @param heartbeat The current heartbeat.
* @param id an id that may be replaced after contact
*/
- public GossipMember(String host, int port, String id, long heartbeat) {
+ public GossipMember(String clusterName, String host, int port, String id, long heartbeat) {
+ _clusterName = clusterName;
_host = host;
_port = port;
_id = id;
@@ -58,6 +61,15 @@
}
/**
+ * Get the name of the cluster the member belongs to.
+ *
+ * @return The cluster name
+ */
+ public String getClusterName(){
+ return _clusterName;
+ }
+
+ /**
* Get the hostname or IP address of the remote gossip member.
* @return The hostname or IP address.
*/
@@ -119,7 +131,8 @@
int result = 1;
String address = getAddress();
result = prime * result
- + ((address == null) ? 0 : address.hashCode());
+ + ((address == null) ? 0 : address.hashCode())
+ + _clusterName == null ? 0 : _clusterName.hashCode();
return result;
}
@@ -140,7 +153,8 @@
return false;
}
// The object is the same of they both have the same address (hostname and port).
- return getAddress().equals(((LocalGossipMember) obj).getAddress());
+ return getAddress().equals(((LocalGossipMember) obj).getAddress()) &&
+ getClusterName().equals(((LocalGossipMember) obj).getClusterName());
}
/**
@@ -150,6 +164,7 @@
public JSONObject toJSONObject() {
try {
JSONObject jsonObject = new JSONObject();
+ jsonObject.put(JSON_CLUSTER, _clusterName);
jsonObject.put(JSON_HOST, _host);
jsonObject.put(JSON_PORT, _port);
jsonObject.put(JSON_ID, _id);
diff --git a/src/main/java/com/google/code/gossip/GossipService.java b/src/main/java/com/google/code/gossip/GossipService.java
index 13be3ec..3d578a4 100644
--- a/src/main/java/com/google/code/gossip/GossipService.java
+++ b/src/main/java/com/google/code/gossip/GossipService.java
@@ -46,7 +46,7 @@
*/
public GossipService(StartupSettings startupSettings) throws InterruptedException,
UnknownHostException {
- this(InetAddress.getLocalHost().getHostAddress(), startupSettings.getPort(), startupSettings.getId(),
+ this(startupSettings.getCluster(), InetAddress.getLocalHost().getHostAddress(), startupSettings.getPort(), startupSettings.getId(),
startupSettings.getGossipMembers(), startupSettings
.getGossipSettings(), null);
}
@@ -57,10 +57,10 @@
* @throws InterruptedException
* @throws UnknownHostException
*/
- public GossipService(String ipAddress, int port, String id,
+ public GossipService(String cluster, String ipAddress, int port, String id,
List<GossipMember> gossipMembers, GossipSettings settings, GossipListener listener)
throws InterruptedException, UnknownHostException {
- _gossipManager = new RandomGossipManager(ipAddress, port, id, settings, gossipMembers, listener);
+ _gossipManager = new RandomGossipManager(cluster, ipAddress, port, id, settings, gossipMembers, listener);
}
public void start() {
diff --git a/src/main/java/com/google/code/gossip/LocalGossipMember.java b/src/main/java/com/google/code/gossip/LocalGossipMember.java
index 719c1f3..978e822 100644
--- a/src/main/java/com/google/code/gossip/LocalGossipMember.java
+++ b/src/main/java/com/google/code/gossip/LocalGossipMember.java
@@ -43,9 +43,9 @@
* @param cleanupTimeout
* The cleanup timeout for this gossip member.
*/
- public LocalGossipMember(String hostname, int port, String id, long heartbeat,
+ public LocalGossipMember(String clusterName, String hostname, int port, String id, long heartbeat,
NotificationListener notificationListener, int cleanupTimeout) {
- super(hostname, port, id, heartbeat);
+ super(clusterName, hostname, port, id, heartbeat);
this.timeoutTimer = new GossipTimeoutTimer(cleanupTimeout, notificationListener, this);
}
diff --git a/src/main/java/com/google/code/gossip/RemoteGossipMember.java b/src/main/java/com/google/code/gossip/RemoteGossipMember.java
index 6435bf2..d3848fd 100644
--- a/src/main/java/com/google/code/gossip/RemoteGossipMember.java
+++ b/src/main/java/com/google/code/gossip/RemoteGossipMember.java
@@ -35,8 +35,8 @@
* @param heartbeat
* The current heartbeat.
*/
- public RemoteGossipMember(String hostname, int port, String id, long heartbeat) {
- super(hostname, port, id, heartbeat);
+ public RemoteGossipMember(String clusterName, String hostname, int port, String id, long heartbeat) {
+ super(clusterName, hostname, port, id, heartbeat);
}
/**
@@ -47,7 +47,7 @@
* @param port
* The port number.
*/
- public RemoteGossipMember(String hostname, int port, String id) {
- super(hostname, port, id, System.currentTimeMillis());
+ public RemoteGossipMember(String clusterName, String hostname, int port, String id) {
+ super(clusterName, hostname, port, id, System.currentTimeMillis());
}
}
diff --git a/src/main/java/com/google/code/gossip/StartupSettings.java b/src/main/java/com/google/code/gossip/StartupSettings.java
index b6475de..5e0a674 100644
--- a/src/main/java/com/google/code/gossip/StartupSettings.java
+++ b/src/main/java/com/google/code/gossip/StartupSettings.java
@@ -44,6 +44,8 @@
/** The port to start the gossip service on. */
private int _port;
+ private String cluster;
+
/** The gossip settings used at startup. */
private final GossipSettings _gossipSettings;
@@ -79,6 +81,14 @@
_gossipMembers = new ArrayList<>();
}
+ public void setCluster(String cluster){
+ this.cluster = cluster;
+ }
+
+ public String getCluster(){
+ return this.cluster;
+ }
+
/**
* Set the id to be used for this service.
*
@@ -192,7 +202,7 @@
JSONArray membersJSON = jsonObject.getJSONArray("members");
for (int i = 0; i < membersJSON.length(); i++) {
JSONObject memberJSON = membersJSON.getJSONObject(i);
- RemoteGossipMember member = new RemoteGossipMember(memberJSON.getString("host"),
+ RemoteGossipMember member = new RemoteGossipMember(memberJSON.getString("cluster"), memberJSON.getString("host"),
memberJSON.getInt("port"), "");
settings.addGossipMember(member);
configMembersDetails += member.getAddress();
diff --git a/src/main/java/com/google/code/gossip/examples/GossipExample.java b/src/main/java/com/google/code/gossip/examples/GossipExample.java
index 2c8ea94..bb5c884 100644
--- a/src/main/java/com/google/code/gossip/examples/GossipExample.java
+++ b/src/main/java/com/google/code/gossip/examples/GossipExample.java
@@ -63,18 +63,20 @@
// Get my ip address.
String myIpAddress = InetAddress.getLocalHost().getHostAddress();
+ String cluster = "My Gossip Cluster";
+
// Create the gossip members and put them in a list and give them a port number starting with
// 2000.
List<GossipMember> startupMembers = new ArrayList<>();
for (int i = 0; i < NUMBER_OF_CLIENTS; ++i) {
- startupMembers.add(new RemoteGossipMember(myIpAddress, 2000 + i, ""));
+ startupMembers.add(new RemoteGossipMember(cluster, myIpAddress, 2000 + i, ""));
}
// Lets start the gossip clients.
// Start the clients, waiting cleaning-interval + 1 second between them which will show the
// dead list handling.
for (GossipMember member : startupMembers) {
- GossipService gossipService = new GossipService(myIpAddress, member.getPort(), "",
+ GossipService gossipService = new GossipService(cluster, myIpAddress, member.getPort(), "",
startupMembers, settings, null);
clients.add(gossipService);
gossipService.start();
diff --git a/src/main/java/com/google/code/gossip/manager/GossipManager.java b/src/main/java/com/google/code/gossip/manager/GossipManager.java
index f3ee91b..ca88feb 100644
--- a/src/main/java/com/google/code/gossip/manager/GossipManager.java
+++ b/src/main/java/com/google/code/gossip/manager/GossipManager.java
@@ -57,17 +57,17 @@
private ExecutorService _gossipThreadExecutor;
public GossipManager(Class<? extends PassiveGossipThread> passiveGossipThreadClass,
- Class<? extends ActiveGossipThread> activeGossipThreadClass, String address, int port,
+ Class<? extends ActiveGossipThread> activeGossipThreadClass, String cluster, String address, int port,
String id, GossipSettings settings, List<GossipMember> gossipMembers,
GossipListener listener) {
_passiveGossipThreadClass = passiveGossipThreadClass;
_activeGossipThreadClass = activeGossipThreadClass;
_settings = settings;
- _me = new LocalGossipMember(address, port, id, System.currentTimeMillis(), this, settings.getCleanupInterval());
+ _me = new LocalGossipMember(cluster, address, port, id, System.currentTimeMillis(), this, settings.getCleanupInterval());
members = new ConcurrentSkipListMap<>();
for (GossipMember startupMember : gossipMembers) {
if (!startupMember.equals(_me)) {
- LocalGossipMember member = new LocalGossipMember(startupMember.getHost(),
+ LocalGossipMember member = new LocalGossipMember(startupMember.getClusterName(), startupMember.getHost(),
startupMember.getPort(), startupMember.getId(), System.currentTimeMillis(), this,
settings.getCleanupInterval());
members.put(member, GossipState.UP);
diff --git a/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java b/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java
index b466bc7..bfb8732 100644
--- a/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java
+++ b/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java
@@ -53,6 +53,8 @@
private AtomicBoolean _keepRunning;
+ private final String _cluster;
+
public PassiveGossipThread(GossipManager gossipManager) {
_gossipManager = gossipManager;
try {
@@ -62,6 +64,7 @@
GossipService.LOGGER.debug("Gossip service successfully initialized on port "
+ _gossipManager.getMyself().getPort());
GossipService.LOGGER.debug("I am " + _gossipManager.getMyself());
+ _cluster = _gossipManager.getMyself().getClusterName();
} catch (SocketException ex) {
GossipService.LOGGER.warn(ex);
_server = null;
@@ -96,8 +99,9 @@
JSONArray jsonArray = new JSONArray(receivedMessage);
for (int i = 0; i < jsonArray.length(); i++) {
JSONObject memberJSONObject = jsonArray.getJSONObject(i);
- if (memberJSONObject.length() == 4) {
+ if (memberJSONObject.length() == 5 && _cluster.equals(memberJSONObject.get(GossipMember.JSON_CLUSTER))) {
RemoteGossipMember member = new RemoteGossipMember(
+ memberJSONObject.getString(GossipMember.JSON_CLUSTER),
memberJSONObject.getString(GossipMember.JSON_HOST),
memberJSONObject.getInt(GossipMember.JSON_PORT),
memberJSONObject.getString(GossipMember.JSON_ID),
@@ -109,9 +113,11 @@
senderMember = member;
}
remoteGossipMembers.add(member);
+ } else if(memberJSONObject.length() == 5) {
+ GossipService.LOGGER.warn("The member object does not belong to this cluster.");
} else {
GossipService.LOGGER
- .error("The received member object does not contain 4 objects:\n"
+ .error("The received member object does not contain 5 objects:\n"
+ memberJSONObject.toString());
}
diff --git a/src/main/java/com/google/code/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java b/src/main/java/com/google/code/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java
index acfc43e..16584f9 100644
--- a/src/main/java/com/google/code/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java
+++ b/src/main/java/com/google/code/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java
@@ -48,7 +48,7 @@
for (LocalGossipMember i : gossipManager.getDeadList()){
if (i.getId().equals(senderMember.getId())){
System.out.println(gossipManager.getMyself() +" caught a live one!");
- LocalGossipMember newLocalMember = new LocalGossipMember(senderMember.getHost(),
+ LocalGossipMember newLocalMember = new LocalGossipMember(senderMember.getClusterName(), senderMember.getHost(),
senderMember.getPort(), senderMember.getId(), senderMember.getHeartbeat(),
gossipManager, gossipManager.getSettings().getCleanupInterval());
gossipManager.revivieMember(newLocalMember);
@@ -68,7 +68,7 @@
}
} else if (!gossipManager.getMemberList().contains(remoteMember)
&& !gossipManager.getDeadList().contains(remoteMember) ){
- LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getHost(),
+ LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getClusterName(), remoteMember.getHost(),
remoteMember.getPort(), remoteMember.getId(), remoteMember.getHeartbeat(),
gossipManager, gossipManager.getSettings().getCleanupInterval());
gossipManager.createOrRevivieMember(newLocalMember);
@@ -78,7 +78,7 @@
LocalGossipMember localDeadMember = gossipManager.getDeadList().get(
gossipManager.getDeadList().indexOf(remoteMember));
if (remoteMember.getHeartbeat() > localDeadMember.getHeartbeat()) {
- LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getHost(),
+ LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getClusterName(), remoteMember.getHost(),
remoteMember.getPort(), remoteMember.getId(), remoteMember.getHeartbeat(),
gossipManager, gossipManager.getSettings().getCleanupInterval());
gossipManager.revivieMember(newLocalMember);
diff --git a/src/main/java/com/google/code/gossip/manager/random/RandomGossipManager.java b/src/main/java/com/google/code/gossip/manager/random/RandomGossipManager.java
index d04f913..e256ec2 100644
--- a/src/main/java/com/google/code/gossip/manager/random/RandomGossipManager.java
+++ b/src/main/java/com/google/code/gossip/manager/random/RandomGossipManager.java
@@ -26,9 +26,9 @@
import java.util.List;
public class RandomGossipManager extends GossipManager {
- public RandomGossipManager(String address, int port, String id, GossipSettings settings,
+ public RandomGossipManager(String cluster, String address, int port, String id, GossipSettings settings,
List<GossipMember> gossipMembers, GossipListener listener) {
- super(OnlyProcessReceivedPassiveGossipThread.class, RandomActiveGossipThread.class, address,
+ super(OnlyProcessReceivedPassiveGossipThread.class, RandomActiveGossipThread.class, cluster, address,
port, id, settings, gossipMembers, listener);
}
}
diff --git a/src/test/java/io/teknek/gossip/ShutdownDeadtimeTest.java b/src/test/java/io/teknek/gossip/ShutdownDeadtimeTest.java
index ddd8364..af30eb7 100644
--- a/src/test/java/io/teknek/gossip/ShutdownDeadtimeTest.java
+++ b/src/test/java/io/teknek/gossip/ShutdownDeadtimeTest.java
@@ -23,6 +23,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
+import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
@@ -44,12 +45,13 @@
//@Ignore
public void DeadNodesDoNotComeAliveAgain() throws InterruptedException, UnknownHostException {
GossipSettings settings = new GossipSettings(1000, 10000);
+ String cluster = UUID.randomUUID().toString();
log.info( "Adding seed nodes" );
int seedNodes = 3;
List<GossipMember> startupMembers = new ArrayList<>();
for (int i = 1; i < seedNodes + 1; ++i) {
- startupMembers.add(new RemoteGossipMember("127.0.0.1", 50000 + i, i + ""));
+ startupMembers.add(new RemoteGossipMember(cluster, "127.0.0.1", 50000 + i, i + ""));
}
log.info( "Adding clients" );
@@ -57,7 +59,7 @@
final int clusterMembers = 5;
for (int i = 1; i < clusterMembers+1; ++i) {
final int j = i;
- GossipService gossipService = new GossipService("127.0.0.1", 50000 + i, i + "",
+ GossipService gossipService = new GossipService(cluster, "127.0.0.1", 50000 + i, i + "",
startupMembers, settings,
new GossipListener(){
@Override
@@ -104,7 +106,7 @@
}}).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(4);
// start client again
- GossipService gossipService = new GossipService("127.0.0.1", shutdownPort, shutdownId + "",
+ GossipService gossipService = new GossipService(cluster, "127.0.0.1", shutdownPort, shutdownId + "",
startupMembers, settings,
new GossipListener(){
@Override
diff --git a/src/test/java/io/teknek/gossip/StartupSettingsTest.java b/src/test/java/io/teknek/gossip/StartupSettingsTest.java
index 5cb52e0..5f25dd9 100644
--- a/src/test/java/io/teknek/gossip/StartupSettingsTest.java
+++ b/src/test/java/io/teknek/gossip/StartupSettingsTest.java
@@ -44,6 +44,7 @@
*/
public class StartupSettingsTest {
private static final Logger log = Logger.getLogger( StartupSettingsTest.class );
+ private static final String CLUSTER = UUID.randomUUID().toString();
@Test
public void testUsingSettingsFile() throws IOException, InterruptedException, JSONException {
@@ -52,7 +53,7 @@
settingsFile.deleteOnExit();
writeSettingsFile(settingsFile);
final GossipService firstService = new GossipService(
- "127.0.0.1", 50000, UUID.randomUUID().toString(),
+ CLUSTER, "127.0.0.1", 50000, UUID.randomUUID().toString(),
new ArrayList<GossipMember>(), new GossipSettings(), null);
firstService.start();
@@ -76,12 +77,13 @@
private void writeSettingsFile( File target ) throws IOException {
String settings =
"[{\n" + // It is odd that this is meant to be in an array, but oh well.
+ " \"cluster\":\"" + CLUSTER + "\",\n" +
" \"id\":\"" + UUID.randomUUID() + "\",\n" +
" \"port\":50001,\n" +
" \"gossip_interval\":1000,\n" +
" \"cleanup_interval\":10000,\n" +
" \"members\":[\n" +
- " {\"host\":\"127.0.0.1\", \"port\":50000}\n" +
+ " {\"cluster\": \"" + CLUSTER + "\",\"host\":\"127.0.0.1\", \"port\":50000}\n" +
" ]\n" +
"}]";
diff --git a/src/test/java/io/teknek/gossip/TenNodeThreeSeedTest.java b/src/test/java/io/teknek/gossip/TenNodeThreeSeedTest.java
index 39844ac..0065ade 100644
--- a/src/test/java/io/teknek/gossip/TenNodeThreeSeedTest.java
+++ b/src/test/java/io/teknek/gossip/TenNodeThreeSeedTest.java
@@ -22,6 +22,7 @@
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
+import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
@@ -52,19 +53,20 @@
public void abc() throws InterruptedException, UnknownHostException{
GossipSettings settings = new GossipSettings();
+ String cluster = UUID.randomUUID().toString();
log.info( "Adding seed nodes" );
int seedNodes = 3;
List<GossipMember> startupMembers = new ArrayList<>();
for (int i = 1; i < seedNodes+1; ++i) {
- startupMembers.add(new RemoteGossipMember("127.0.0.1", 50000 + i, i + ""));
+ startupMembers.add(new RemoteGossipMember(cluster, "127.0.0.1", 50000 + i, i + ""));
}
log.info( "Adding clients" );
final List<GossipService> clients = new ArrayList<>();
final int clusterMembers = 5;
for (int i = 1; i < clusterMembers+1; ++i) {
- GossipService gossipService = new GossipService("127.0.0.1", 50000 + i, i + "",
+ GossipService gossipService = new GossipService(cluster, "127.0.0.1", 50000 + i, i + "",
startupMembers, settings,
new GossipListener(){
@Override