Merge pull request #5 from edwardcapriolo/callback-for-state
Callback for state
diff --git a/README.md b/README.md
index 1fea9d4..690e08d 100644
--- a/README.md
+++ b/README.md
@@ -21,7 +21,7 @@
int clusterMembers = 5;
for (int i = 1; i < clusterMembers+1; ++i) {
GossipService gossipService = new GossipService("127.0.0." + i, 2000, i + "",
- LogLevel.DEBUG, startupMembers, settings);
+ LogLevel.DEBUG, startupMembers, settings, null);
clients.add(gossipService);
gossipService.start();
}
@@ -33,9 +33,30 @@
Assert.assertEquals(4, clients.get(i).get_gossipManager().getMemberList().size());
}
+Event Listener
+------
+
+The status can be polled using the getters that return immutable lists.
+
+ List<LocalGossipMember> getMemberList()
+ public List<LocalGossipMember> getDeadList()
+
+Users can also attach an event listener:
+
+ GossipService gossipService = new GossipService("127.0.0." + i, 2000, i + "", LogLevel.DEBUG,
+ startupMembers, settings,
+ new GossipListener(){
+ @Override
+ public void gossipEvent(GossipMember member, GossipState state) {
+ System.out.println(member+" "+ state);
+ }
+ });
+
+
Maven
------
+
You can get this software from maven central.
<dependency>
diff --git a/src/main/java/com/google/code/gossip/GossipMember.java b/src/main/java/com/google/code/gossip/GossipMember.java
index e24964e..50e9af3 100644
--- a/src/main/java/com/google/code/gossip/GossipMember.java
+++ b/src/main/java/com/google/code/gossip/GossipMember.java
@@ -10,25 +10,15 @@
*
* @author joshclemm, harmenw
*/
-public abstract class GossipMember {
- /** The JSON key for the host property. */
+public abstract class GossipMember implements Comparable<GossipMember>{
+
public static final String JSON_HOST = "host";
- /** The JSON key for the port property. */
public static final String JSON_PORT = "port";
- /** The JSON key for the heartbeat property. */
public static final String JSON_HEARTBEAT = "heartbeat";
-
public static final String JSON_ID = "id";
-
- /** The hostname or IP address of this gossip member. */
protected String _host;
-
- /** The port number of this gossip member. */
protected int _port;
-
- /** The current heartbeat of this gossip member. */
protected int _heartbeat;
-
protected String _id;
/**
@@ -146,4 +136,8 @@
throw new RuntimeException(e);
}
}
+
+ public int compareTo(GossipMember other){
+ return this.getAddress().compareTo(other.getAddress());
+ }
}
diff --git a/src/main/java/com/google/code/gossip/GossipService.java b/src/main/java/com/google/code/gossip/GossipService.java
index 00027bc..1f05c25 100644
--- a/src/main/java/com/google/code/gossip/GossipService.java
+++ b/src/main/java/com/google/code/gossip/GossipService.java
@@ -6,6 +6,7 @@
import java.util.ArrayList;
import org.apache.log4j.Logger;
+import com.google.code.gossip.event.GossipListener;
import com.google.code.gossip.manager.GossipManager;
import com.google.code.gossip.manager.random.RandomGossipManager;
@@ -30,7 +31,7 @@
UnknownHostException {
this(InetAddress.getLocalHost().getHostAddress(), startupSettings.getPort(), "",
startupSettings.getLogLevel(), startupSettings.getGossipMembers(), startupSettings
- .getGossipSettings());
+ .getGossipSettings(), null);
}
/**
@@ -41,9 +42,9 @@
* @throws UnknownHostException
*/
public GossipService(String ipAddress, int port, String id, int logLevel,
- ArrayList<GossipMember> gossipMembers, GossipSettings settings)
+ ArrayList<GossipMember> gossipMembers, GossipSettings settings, GossipListener listener)
throws InterruptedException, UnknownHostException {
- _gossipManager = new RandomGossipManager(ipAddress, port, id, settings, gossipMembers);
+ _gossipManager = new RandomGossipManager(ipAddress, port, id, settings, gossipMembers, listener);
}
public void start() {
diff --git a/src/main/java/com/google/code/gossip/GossipTimeoutTimer.java b/src/main/java/com/google/code/gossip/GossipTimeoutTimer.java
index 94cc3a4..c3613fb 100644
--- a/src/main/java/com/google/code/gossip/GossipTimeoutTimer.java
+++ b/src/main/java/com/google/code/gossip/GossipTimeoutTimer.java
@@ -14,10 +14,7 @@
*/
public class GossipTimeoutTimer extends Timer {
- /** The amount of time this timer waits before generating a wake-up event. */
private long _sleepTime;
-
- /** The gossip member this timer is for. */
private LocalGossipMember _source;
/**
diff --git a/src/main/java/com/google/code/gossip/event/GossipListener.java b/src/main/java/com/google/code/gossip/event/GossipListener.java
new file mode 100644
index 0000000..7d4a462
--- /dev/null
+++ b/src/main/java/com/google/code/gossip/event/GossipListener.java
@@ -0,0 +1,7 @@
+package com.google.code.gossip.event;
+
+import com.google.code.gossip.GossipMember;
+
+public interface GossipListener {
+ void gossipEvent(GossipMember member, GossipState state);
+}
diff --git a/src/main/java/com/google/code/gossip/event/GossipState.java b/src/main/java/com/google/code/gossip/event/GossipState.java
new file mode 100644
index 0000000..9e5db5f
--- /dev/null
+++ b/src/main/java/com/google/code/gossip/event/GossipState.java
@@ -0,0 +1,10 @@
+package com.google.code.gossip.event;
+
+public enum GossipState {
+ UP("up"), DOWN("down");
+ private String state;
+
+ private GossipState(String state){
+ this.state = state;
+ }
+}
diff --git a/src/main/java/com/google/code/gossip/examples/GossipExample.java b/src/main/java/com/google/code/gossip/examples/GossipExample.java
index abc28e1..ffcf7ca 100644
--- a/src/main/java/com/google/code/gossip/examples/GossipExample.java
+++ b/src/main/java/com/google/code/gossip/examples/GossipExample.java
@@ -58,7 +58,7 @@
// dead list handling.
for (GossipMember member : startupMembers) {
GossipService gossipService = new GossipService(myIpAddress, member.getPort(), "",
- LogLevel.DEBUG, startupMembers, settings);
+ LogLevel.DEBUG, startupMembers, settings, null);
clients.add(gossipService);
gossipService.start();
sleep(settings.getCleanupInterval() + 1000);
diff --git a/src/main/java/com/google/code/gossip/manager/ActiveGossipThread.java b/src/main/java/com/google/code/gossip/manager/ActiveGossipThread.java
index e5ab754..5823c74 100644
--- a/src/main/java/com/google/code/gossip/manager/ActiveGossipThread.java
+++ b/src/main/java/com/google/code/gossip/manager/ActiveGossipThread.java
@@ -1,6 +1,6 @@
package com.google.code.gossip.manager;
-import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -45,7 +45,7 @@
* Performs the sending of the membership list, after we have incremented our own heartbeat.
*/
abstract protected void sendMembershipList(LocalGossipMember me,
- ArrayList<LocalGossipMember> memberList);
+ List<LocalGossipMember> memberList);
/**
* Abstract method which should be implemented by a subclass. This method should return a member
@@ -55,5 +55,5 @@
* The list of members which are stored in the local list of members.
* @return The chosen LocalGossipMember to gossip with.
*/
- abstract protected LocalGossipMember selectPartner(ArrayList<LocalGossipMember> memberList);
+ abstract protected LocalGossipMember selectPartner(List<LocalGossipMember> memberList);
}
diff --git a/src/main/java/com/google/code/gossip/manager/GossipManager.java b/src/main/java/com/google/code/gossip/manager/GossipManager.java
index 1b76e82..197d624 100644
--- a/src/main/java/com/google/code/gossip/manager/GossipManager.java
+++ b/src/main/java/com/google/code/gossip/manager/GossipManager.java
@@ -2,6 +2,10 @@
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@@ -16,60 +20,46 @@
import com.google.code.gossip.GossipService;
import com.google.code.gossip.GossipSettings;
import com.google.code.gossip.LocalGossipMember;
+import com.google.code.gossip.event.GossipListener;
+import com.google.code.gossip.event.GossipState;
public abstract class GossipManager extends Thread implements NotificationListener {
public static final Logger LOGGER = Logger.getLogger(GossipManager.class);
-
- /** The maximal number of bytes the packet with the GOSSIP may be. (Default is 100 kb) */
public static final int MAX_PACKET_SIZE = 102400;
- /** The list of members which are in the gossip group (not including myself). */
- private ArrayList<LocalGossipMember> _memberList;
-
- /** The list of members which are known to be dead. */
- private ArrayList<LocalGossipMember> _deadList;
-
- /** The member I am representing. */
+ private ConcurrentSkipListMap<LocalGossipMember,GossipState> members;
private LocalGossipMember _me;
-
- /** The settings for gossiping. */
private GossipSettings _settings;
-
- /** A boolean whether the gossip service should keep running. */
private AtomicBoolean _gossipServiceRunning;
-
- /** A ExecutorService used for executing the active and passive gossip threads. */
private ExecutorService _gossipThreadExecutor;
-
private Class<? extends PassiveGossipThread> _passiveGossipThreadClass;
-
private PassiveGossipThread passiveGossipThread;
-
private Class<? extends ActiveGossipThread> _activeGossipThreadClass;
-
private ActiveGossipThread activeGossipThread;
+ private GossipListener listener;
public GossipManager(Class<? extends PassiveGossipThread> passiveGossipThreadClass,
Class<? extends ActiveGossipThread> activeGossipThreadClass, String address, int port,
- String id, GossipSettings settings, ArrayList<GossipMember> gossipMembers) {
+ String id, GossipSettings settings, ArrayList<GossipMember> gossipMembers,
+ GossipListener listener) {
_passiveGossipThreadClass = passiveGossipThreadClass;
_activeGossipThreadClass = activeGossipThreadClass;
_settings = settings;
_me = new LocalGossipMember(address, port, id, 0, this, settings.getCleanupInterval());
- _memberList = new ArrayList<LocalGossipMember>();
- _deadList = new ArrayList<LocalGossipMember>();
+ members = new ConcurrentSkipListMap<>();
for (GossipMember startupMember : gossipMembers) {
if (!startupMember.equals(_me)) {
LocalGossipMember member = new LocalGossipMember(startupMember.getHost(),
startupMember.getPort(), startupMember.getId(), 0, this,
settings.getCleanupInterval());
- _memberList.add(member);
+ members.put(member, GossipState.UP);
GossipService.LOGGER.debug(member);
}
}
_gossipServiceRunning = new AtomicBoolean(true);
+ this.listener = listener;
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
public void run() {
GossipService.LOGGER.info("Service has been shutdown...");
@@ -85,33 +75,45 @@
public void handleNotification(Notification notification, Object handback) {
LocalGossipMember deadMember = (LocalGossipMember) notification.getUserData();
GossipService.LOGGER.info("Dead member detected: " + deadMember);
- synchronized (this._memberList) {
- this._memberList.remove(deadMember);
- }
- synchronized (this._deadList) {
- this._deadList.add(deadMember);
+ members.put(deadMember, GossipState.DOWN);
+ if (listener != null) {
+ listener.gossipEvent(deadMember, GossipState.DOWN);
}
}
+ public void createOrRevivieMember(LocalGossipMember m){
+ members.put(m, GossipState.UP);
+ if (listener != null) {
+ listener.gossipEvent(m, GossipState.UP);
+ }
+ }
+
public GossipSettings getSettings() {
return _settings;
}
- /**
- * Get a clone of the memberlist.
- *
- * @return
- */
- public ArrayList<LocalGossipMember> getMemberList() {
- return _memberList;
+ public List<LocalGossipMember> getMemberList() {
+ List<LocalGossipMember> up = new ArrayList<>();
+ for (Entry<LocalGossipMember, GossipState> entry : members.entrySet()){
+ if (GossipState.UP.equals(entry.getValue())){
+ up.add(entry.getKey());
+ }
+ }
+ return Collections.unmodifiableList(up);
}
public LocalGossipMember getMyself() {
return _me;
}
-
- public ArrayList<LocalGossipMember> getDeadList() {
- return _deadList;
+
+ public List<LocalGossipMember> getDeadList() {
+ List<LocalGossipMember> up = new ArrayList<>();
+ for (Entry<LocalGossipMember, GossipState> entry : members.entrySet()){
+ if (GossipState.DOWN.equals(entry.getValue())){
+ up.add(entry.getKey());
+ }
+ }
+ return Collections.unmodifiableList(up);
}
/**
@@ -121,7 +123,7 @@
* @throws InterruptedException
*/
public void run() {
- for (LocalGossipMember member : _memberList) {
+ for (LocalGossipMember member : members.keySet()) {
if (member != _me) {
member.startTimeoutTimer();
}
diff --git a/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java b/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java
index 08342b4..314432a 100644
--- a/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java
+++ b/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java
@@ -7,8 +7,10 @@
import java.net.SocketAddress;
import java.net.SocketException;
import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.log4j.Logger;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
@@ -24,6 +26,8 @@
* determine the incoming message.
*/
abstract public class PassiveGossipThread implements Runnable {
+
+ public static final Logger LOGGER = Logger.getLogger(PassiveGossipThread.class);
/** The socket used for the passive thread of the gossip service. */
private DatagramSocket _server;
@@ -106,8 +110,6 @@
}
}
-
- // Merge our list with the one we just received
mergeLists(_gossipManager, senderMember, remoteGossipMembers);
} catch (JSONException e) {
GossipService.LOGGER
@@ -121,7 +123,7 @@
}
} catch (IOException e) {
- e.printStackTrace();
+ GossipService.LOGGER.error(e);
_keepRunning.set(false);
}
}
@@ -144,5 +146,5 @@
* The list of members known at the remote side.
*/
abstract protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember,
- ArrayList<GossipMember> remoteList);
+ List<GossipMember> remoteList);
}
\ No newline at end of file
diff --git a/src/main/java/com/google/code/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java b/src/main/java/com/google/code/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java
index 206d5c5..f0afaf9 100644
--- a/src/main/java/com/google/code/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java
+++ b/src/main/java/com/google/code/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java
@@ -1,6 +1,6 @@
package com.google.code.gossip.manager.impl;
-import java.util.ArrayList;
+import java.util.List;
import com.google.code.gossip.GossipMember;
import com.google.code.gossip.GossipService;
@@ -23,35 +23,21 @@
* @param remoteList
*/
protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember,
- ArrayList<GossipMember> remoteList) {
-
- synchronized (gossipManager.getDeadList()) {
-
- synchronized (gossipManager.getMemberList()) {
+ List<GossipMember> remoteList) {
for (GossipMember remoteMember : remoteList) {
// Skip myself. We don't want ourselves in the local member list.
- if (!remoteMember.equals(gossipManager.getMyself())) {
+ if (remoteMember.equals(gossipManager.getMyself())) {
+ continue;
+ }
if (gossipManager.getMemberList().contains(remoteMember)) {
- GossipService.LOGGER.debug("The local list already contains the remote member ("
- + remoteMember + ").");
- // The local memberlist contains the remote member.
LocalGossipMember localMember = gossipManager.getMemberList().get(
gossipManager.getMemberList().indexOf(remoteMember));
-
- // Let's synchronize it's heartbeat.
if (remoteMember.getHeartbeat() > localMember.getHeartbeat()) {
- // update local list with latest heartbeat
localMember.setHeartbeat(remoteMember.getHeartbeat());
- // and reset the timeout of that member
localMember.resetTimeoutTimer();
}
- // TODO: Otherwise, should we inform the other when the heartbeat is already higher?
} else {
- // The local list does not contain the remote member.
- GossipService.LOGGER.debug("The local list does not contain the remote member ("
- + remoteMember + ").");
-
// The remote member is either brand new, or a previously declared dead member.
// If its dead, check the heartbeat because it may have come back from the dead.
if (gossipManager.getDeadList().contains(remoteMember)) {
@@ -80,13 +66,14 @@
.debug("The remote member is back from the dead. We will remove it from the dead list and add it as a new member.");
// The remote member is back from the dead.
// Remove it from the dead list.
- gossipManager.getDeadList().remove(localDeadMember);
+ //gossipManager.getDeadList().remove(localDeadMember);
// Add it as a new member and add it to the member list.
LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getHost(),
remoteMember.getPort(), remoteMember.getId(),
remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings()
.getCleanupInterval());
- gossipManager.getMemberList().add(newLocalMember);
+ //gossipManager.getMemberList().add(newLocalMember);
+ gossipManager.createOrRevivieMember(newLocalMember);
newLocalMember.startTimeoutTimer();
GossipService.LOGGER.info("Removed remote member " + remoteMember.getAddress()
+ " from dead list and added to local member list.");
@@ -96,7 +83,7 @@
LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getHost(),
remoteMember.getPort(), remoteMember.getId(), remoteMember.getHeartbeat(),
gossipManager, gossipManager.getSettings().getCleanupInterval());
- gossipManager.getMemberList().add(newLocalMember);
+ gossipManager.createOrRevivieMember(newLocalMember);
newLocalMember.startTimeoutTimer();
GossipService.LOGGER.info("Added new remote member " + remoteMember.getAddress()
+ " to local member list.");
@@ -104,8 +91,7 @@
}
}
}
- }
- }
- }
+
+
}
diff --git a/src/main/java/com/google/code/gossip/manager/impl/SendMembersActiveGossipThread.java b/src/main/java/com/google/code/gossip/manager/impl/SendMembersActiveGossipThread.java
index 5235db2..85e4b8a 100644
--- a/src/main/java/com/google/code/gossip/manager/impl/SendMembersActiveGossipThread.java
+++ b/src/main/java/com/google/code/gossip/manager/impl/SendMembersActiveGossipThread.java
@@ -5,7 +5,7 @@
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
+import java.util.List;
import org.json.JSONArray;
@@ -23,7 +23,7 @@
/**
* Performs the sending of the membership list, after we have incremented our own heartbeat.
*/
- protected void sendMembershipList(LocalGossipMember me, ArrayList<LocalGossipMember> memberList) {
+ protected void sendMembershipList(LocalGossipMember me, List<LocalGossipMember> memberList) {
GossipService.LOGGER.debug("Send sendMembershipList() is called.");
me.setHeartbeat(me.getHeartbeat() + 1);
synchronized (memberList) {
diff --git a/src/main/java/com/google/code/gossip/manager/random/RandomActiveGossipThread.java b/src/main/java/com/google/code/gossip/manager/random/RandomActiveGossipThread.java
index 917c362..d232f38 100644
--- a/src/main/java/com/google/code/gossip/manager/random/RandomActiveGossipThread.java
+++ b/src/main/java/com/google/code/gossip/manager/random/RandomActiveGossipThread.java
@@ -1,6 +1,6 @@
package com.google.code.gossip.manager.random;
-import java.util.ArrayList;
+import java.util.List;
import java.util.Random;
import com.google.code.gossip.GossipService;
@@ -24,7 +24,7 @@
*
* @return Member random member if list is greater than 1, null otherwise
*/
- protected LocalGossipMember selectPartner(ArrayList<LocalGossipMember> memberList) {
+ protected LocalGossipMember selectPartner(List<LocalGossipMember> memberList) {
LocalGossipMember member = null;
if (memberList.size() > 0) {
int randomNeighborIndex = _random.nextInt(memberList.size());
diff --git a/src/main/java/com/google/code/gossip/manager/random/RandomGossipManager.java b/src/main/java/com/google/code/gossip/manager/random/RandomGossipManager.java
index 8893ff5..3d028eb 100644
--- a/src/main/java/com/google/code/gossip/manager/random/RandomGossipManager.java
+++ b/src/main/java/com/google/code/gossip/manager/random/RandomGossipManager.java
@@ -4,13 +4,14 @@
import com.google.code.gossip.GossipMember;
import com.google.code.gossip.GossipSettings;
+import com.google.code.gossip.event.GossipListener;
import com.google.code.gossip.manager.GossipManager;
import com.google.code.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread;
public class RandomGossipManager extends GossipManager {
public RandomGossipManager(String address, int port, String id, GossipSettings settings,
- ArrayList<GossipMember> gossipMembers) {
+ ArrayList<GossipMember> gossipMembers, GossipListener listener) {
super(OnlyProcessReceivedPassiveGossipThread.class, RandomActiveGossipThread.class, address,
- port, id, settings, gossipMembers);
+ port, id, settings, gossipMembers, listener);
}
}
diff --git a/src/test/java/io/teknek/gossip/TenNodeThreeSeedTest.java b/src/test/java/io/teknek/gossip/TenNodeThreeSeedTest.java
index e30f39a..93f93c5 100644
--- a/src/test/java/io/teknek/gossip/TenNodeThreeSeedTest.java
+++ b/src/test/java/io/teknek/gossip/TenNodeThreeSeedTest.java
@@ -12,6 +12,8 @@
import com.google.code.gossip.GossipSettings;
import com.google.code.gossip.LogLevel;
import com.google.code.gossip.RemoteGossipMember;
+import com.google.code.gossip.event.GossipListener;
+import com.google.code.gossip.event.GossipState;
public class TenNodeThreeSeedTest {
@@ -31,19 +33,25 @@
int seedNodes = 3;
ArrayList<GossipMember> startupMembers = new ArrayList<GossipMember>();
for (int i = 1; i < seedNodes+1; ++i) {
- startupMembers.add(new RemoteGossipMember("127.0.0." + i, 2000, i+""));
+ startupMembers.add(new RemoteGossipMember("127.0.0." + i, 2000, i + ""));
}
ArrayList<GossipService> clients = new ArrayList<GossipService>();
int clusterMembers = 5;
for (int i = 1; i < clusterMembers+1; ++i) {
- GossipService gossipService = new GossipService("127.0.0."+i, 2000, i+"", LogLevel.DEBUG, startupMembers, settings);
+ GossipService gossipService = new GossipService("127.0.0." + i, 2000, i + "", LogLevel.DEBUG,
+ startupMembers, settings,
+ new GossipListener(){
+ @Override
+ public void gossipEvent(GossipMember member, GossipState state) {
+ System.out.println(member+" "+ state);
+ }
+ });
clients.add(gossipService);
gossipService.start();
Thread.sleep(1000);
}
Thread.sleep(10000);
for (int i = 0; i < clusterMembers; ++i) {
- System.out.println(clients.get(i).get_gossipManager().getMemberList());
Assert.assertEquals(4, clients.get(i).get_gossipManager().getMemberList().size());
}
for (int i = 0; i < clusterMembers; ++i) {