GOSSIP-38 Multiple async GossipListeners
diff --git a/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java b/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java
index 792af85..32c00c9 100644
--- a/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java
+++ b/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java
@@ -43,7 +43,7 @@
   
   private String distribution = "normal";
   
-  private String activeGossipClass = "org.apache.gossip.manager.SimpleActiveGossipper";
+  private String activeGossipClass = "org.apache.gossip.manager.SimpleActiveGossiper";
 
   private String transportManagerClass = "org.apache.gossip.transport.udp.UdpTransportManager";
   private String protocolManagerClass = "org.apache.gossip.protocol.json.JacksonProtocolManager";
diff --git a/gossip-base/src/main/java/org/apache/gossip/Member.java b/gossip-base/src/main/java/org/apache/gossip/Member.java
index d04a7b6..54a6737 100644
--- a/gossip-base/src/main/java/org/apache/gossip/Member.java
+++ b/gossip-base/src/main/java/org/apache/gossip/Member.java
@@ -22,7 +22,7 @@
 import java.util.Map;
 
 /**
- * A abstract class representing a gossip member.
+ * An abstract class representing a gossip member.
  * 
  */
 public abstract class Member implements Comparable<Member> {
diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java b/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java
index d839b2e..db442c6 100644
--- a/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java
+++ b/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java
@@ -185,7 +185,7 @@
     if (settings.isPersistDataState()) {
       scheduledServiced.scheduleAtFixedRate(userDataState, 60, 60, TimeUnit.SECONDS);
     }
-    scheduledServiced.scheduleAtFixedRate(memberStateRefresher, 0, 100, TimeUnit.MILLISECONDS);
+    memberStateRefresher.init();
     LOGGER.debug("The GossipManager is started.");
   }
   
@@ -224,6 +224,7 @@
     gossipCore.shutdown();
     transportManager.shutdown();
     dataReaper.close();
+    memberStateRefresher.shutdown();
     scheduledServiced.shutdown();
     try {
       scheduledServiced.awaitTermination(1, TimeUnit.SECONDS);
@@ -366,4 +367,8 @@
   public void unregisterSharedDataSubscriber(UpdateSharedDataEventHandler handler){
     gossipCore.unregisterSharedDataSubscriber(handler);
   }
+
+  public void registerGossipListener(GossipListener listener) {
+    memberStateRefresher.register(listener);
+  }
 }
diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/GossipManagerBuilder.java b/gossip-base/src/main/java/org/apache/gossip/manager/GossipManagerBuilder.java
index 86dca57..f3ca23a 100644
--- a/gossip-base/src/main/java/org/apache/gossip/manager/GossipManagerBuilder.java
+++ b/gossip-base/src/main/java/org/apache/gossip/manager/GossipManagerBuilder.java
@@ -18,10 +18,11 @@
 package org.apache.gossip.manager;
 
 import com.codahale.metrics.MetricRegistry;
-import org.apache.gossip.Member;
 import org.apache.gossip.GossipSettings;
+import org.apache.gossip.Member;
 import org.apache.gossip.StartupSettings;
 import org.apache.gossip.event.GossipListener;
+import org.apache.gossip.event.GossipState;
 import org.apache.gossip.manager.handlers.MessageHandler;
 import org.apache.gossip.manager.handlers.MessageHandlerFactory;
 
diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/GossipMemberStateRefresher.java b/gossip-base/src/main/java/org/apache/gossip/manager/GossipMemberStateRefresher.java
index 1836309..652bf5c 100644
--- a/gossip-base/src/main/java/org/apache/gossip/manager/GossipMemberStateRefresher.java
+++ b/gossip-base/src/main/java/org/apache/gossip/manager/GossipMemberStateRefresher.java
@@ -26,27 +26,40 @@
 import org.apache.gossip.model.ShutdownMessage;
 import org.apache.log4j.Logger;
 
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
 import java.util.function.BiFunction;
 
-public class GossipMemberStateRefresher implements Runnable {
+public class GossipMemberStateRefresher {
   public static final Logger LOGGER = Logger.getLogger(GossipMemberStateRefresher.class);
 
   private final Map<LocalMember, GossipState> members;
   private final GossipSettings settings;
-  private final GossipListener listener;
+  private final List<GossipListener> listeners = new CopyOnWriteArrayList<>();
   private final Clock clock;
   private final BiFunction<String, String, PerNodeDataMessage> findPerNodeGossipData;
+  private final ExecutorService listenerExecutor;
+  private final ScheduledExecutorService scheduledExecutor;
+  private final BlockingQueue<Runnable> workQueue;
 
   public GossipMemberStateRefresher(Map<LocalMember, GossipState> members, GossipSettings settings,
-                                    GossipListener listener, BiFunction<String, String, PerNodeDataMessage> findPerNodeGossipData) {
+                                    GossipListener listener,
+                                    BiFunction<String, String, PerNodeDataMessage> findPerNodeGossipData) {
     this.members = members;
     this.settings = settings;
-    this.listener = listener;
+    listeners.add(listener);
     this.findPerNodeGossipData = findPerNodeGossipData;
     clock = new SystemClock();
+    workQueue = new ArrayBlockingQueue<>(1024);
+    listenerExecutor = new ThreadPoolExecutor(1, 20, 1, TimeUnit.SECONDS, workQueue,
+            new ThreadPoolExecutor.DiscardOldestPolicy());
+    scheduledExecutor = Executors.newScheduledThreadPool(1);
+  }
+
+  public void init() {
+    scheduledExecutor.scheduleAtFixedRate(() -> run(), 0, 100, TimeUnit.MILLISECONDS);
   }
 
   public void run() {
@@ -74,7 +87,9 @@
 
       if (entry.getValue() != requiredState) {
         members.put(entry.getKey(), requiredState);
-        listener.gossipEvent(entry.getKey(), requiredState);
+        /* Call listeners asynchronously */
+        for (GossipListener listener: listeners)
+          listenerExecutor.execute(() -> listener.gossipEvent(entry.getKey(), requiredState));
       }
     }
   }
@@ -112,10 +127,31 @@
     if (s.getShutdownAtNanos() > l.getKey().getHeartbeat()) {
       members.put(l.getKey(), GossipState.DOWN);
       if (l.getValue() == GossipState.UP) {
-        listener.gossipEvent(l.getKey(), GossipState.DOWN);
+        for (GossipListener listener: listeners)
+          listenerExecutor.execute(() -> listener.gossipEvent(l.getKey(), GossipState.DOWN));
       }
       return true;
     }
     return false;
   }
+
+  public void register(GossipListener listener) {
+    listeners.add(listener);
+  }
+
+  public void shutdown() {
+    scheduledExecutor.shutdown();
+    try {
+      scheduledExecutor.awaitTermination(5, TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      LOGGER.debug("Issue during shutdown", e);
+    }
+    listenerExecutor.shutdown();
+    try {
+      listenerExecutor.awaitTermination(5, TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      LOGGER.debug("Issue during shutdown", e);
+    }
+    listenerExecutor.shutdownNow();
+  }
 }
\ No newline at end of file
diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/SimpleActiveGossipper.java b/gossip-base/src/main/java/org/apache/gossip/manager/SimpleActiveGossiper.java
similarity index 94%
rename from gossip-base/src/main/java/org/apache/gossip/manager/SimpleActiveGossipper.java
rename to gossip-base/src/main/java/org/apache/gossip/manager/SimpleActiveGossiper.java
index e47fe2a..7d498b4 100644
--- a/gossip-base/src/main/java/org/apache/gossip/manager/SimpleActiveGossipper.java
+++ b/gossip-base/src/main/java/org/apache/gossip/manager/SimpleActiveGossiper.java
@@ -33,14 +33,14 @@
  * Base implementation gossips randomly to live nodes periodically gossips to dead ones
  *
  */
-public class SimpleActiveGossipper extends AbstractActiveGossiper {
+public class SimpleActiveGossiper extends AbstractActiveGossiper {
 
   private ScheduledExecutorService scheduledExecutorService;
   private final BlockingQueue<Runnable> workQueue;
   private ThreadPoolExecutor threadService;
   
-  public SimpleActiveGossipper(GossipManager gossipManager, GossipCore gossipCore,
-          MetricRegistry registry) {
+  public SimpleActiveGossiper(GossipManager gossipManager, GossipCore gossipCore,
+                              MetricRegistry registry) {
     super(gossipManager, gossipCore, registry);
     scheduledExecutorService = Executors.newScheduledThreadPool(2);
     workQueue = new ArrayBlockingQueue<Runnable>(1024);
diff --git a/gossip-base/src/main/java/org/apache/gossip/transport/TransportManager.java b/gossip-base/src/main/java/org/apache/gossip/transport/TransportManager.java
index 031d90e..99354d1 100644
--- a/gossip-base/src/main/java/org/apache/gossip/transport/TransportManager.java
+++ b/gossip-base/src/main/java/org/apache/gossip/transport/TransportManager.java
@@ -20,7 +20,7 @@
 import java.io.IOException;
 import java.net.URI;
 
-/** interface for manage that sends and receives messages that have already been serialized. */
+/** interface for manager that sends and receives messages that have already been serialized. */
 public interface TransportManager {
   
   /** starts the active gossip thread responsible for reaching out to remote nodes. Not related to `startEndpoint()` */