Merge branch 'GOSSIP-21' of github.com:edwardcapriolo/incubator-gossip
diff --git a/src/main/java/org/apache/gossip/GossipService.java b/src/main/java/org/apache/gossip/GossipService.java
index 68a4ca2..6c02e2c 100644
--- a/src/main/java/org/apache/gossip/GossipService.java
+++ b/src/main/java/org/apache/gossip/GossipService.java
@@ -24,6 +24,7 @@
 import org.apache.gossip.event.GossipListener;
 import org.apache.gossip.manager.GossipManager;
 import org.apache.gossip.manager.random.RandomGossipManager;
+import org.apache.gossip.model.GossipDataMessage;
 import org.apache.log4j.Logger;
 
 /**
@@ -81,6 +82,19 @@
   public GossipManager get_gossipManager() {
     return gossipManager;
   }
+  
+  /**
+   * Gossip data to the entire cluster
+   * @param message
+   */
+  public void gossipData(GossipDataMessage message){
+    gossipManager.gossipData(message);
+  }
+  
+  
+  public GossipDataMessage findGossipData(String nodeId, String key){
+    return this.get_gossipManager().findGossipData(nodeId, key);
+  }
 
   public void set_gossipManager(GossipManager _gossipManager) {
     this.gossipManager = _gossipManager;
diff --git a/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java b/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java
index b57c25a..19caffe 100644
--- a/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java
+++ b/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java
@@ -20,17 +20,23 @@
 import java.io.IOException;
 import java.net.DatagramSocket;
 import java.util.List;
+
+import java.util.Map.Entry;
 import java.util.Random;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import org.apache.gossip.GossipService;
+
 import org.apache.gossip.LocalGossipMember;
 import org.apache.gossip.model.ActiveGossipOk;
+import org.apache.gossip.model.GossipDataMessage;
 import org.apache.gossip.model.GossipMember;
 import org.apache.gossip.model.Response;
 import org.apache.gossip.udp.UdpActiveGossipMessage;
+import org.apache.gossip.udp.UdpGossipDataMessage;
+
 import org.apache.log4j.Logger;
 import org.codehaus.jackson.map.ObjectMapper;
 
@@ -41,21 +47,21 @@
  */
 public class ActiveGossipThread {
 
-  public static final Logger LOGGER = Logger.getLogger(ActiveGossipThread.class);
+  private static final Logger LOGGER = Logger.getLogger(ActiveGossipThread.class);
   
-  private ScheduledExecutorService scheduledExecutorService ;
-  private ObjectMapper MAPPER = new ObjectMapper();
+  private final GossipManager gossipManager;
   private final Random random;
-  protected final GossipManager gossipManager;
   private final GossipCore gossipCore;
+  private ScheduledExecutorService scheduledExecutorService;
+  private ObjectMapper MAPPER = new ObjectMapper();
 
   public ActiveGossipThread(GossipManager gossipManager, GossipCore gossipCore) {
     this.gossipManager = gossipManager;
-    this.scheduledExecutorService = Executors.newScheduledThreadPool(1024);
+    random = new Random();
     this.gossipCore = gossipCore;
-    this.random = new Random();
+    this.scheduledExecutorService = Executors.newScheduledThreadPool(1024);
   }
-
+ 
   public void init() {
     scheduledExecutorService.scheduleAtFixedRate(
             () -> sendMembershipList(gossipManager.getMyself(), gossipManager.getLiveMembers()), 0,
@@ -63,29 +69,65 @@
     scheduledExecutorService.scheduleAtFixedRate(
             () -> sendMembershipList(gossipManager.getMyself(), gossipManager.getDeadMembers()), 0,
             gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
+    scheduledExecutorService.scheduleAtFixedRate(
+            () -> sendData(gossipManager.getMyself(), gossipManager.getLiveMembers()), 0,
+            gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
   }
   
   public void shutdown() {
-    this.scheduledExecutorService.shutdown();
+    scheduledExecutorService.shutdown();
     try {
-      this.scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS);
+      scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS);
     } catch (InterruptedException e) {
-      LOGGER.warn("Did not complete shutdown", e);
+      LOGGER.debug("Issue during shurdown" + e);
     }
   }
 
+  public void sendData(LocalGossipMember me, List<LocalGossipMember> memberList){
+    LocalGossipMember member = selectPartner(memberList);
+    if (member == null) {
+      LOGGER.debug("Send sendMembershipList() is called without action");
+      return;
+    }
+    try (DatagramSocket socket = new DatagramSocket()) {
+      socket.setSoTimeout(gossipManager.getSettings().getGossipInterval());
+      for (Entry<String, ConcurrentHashMap<String, GossipDataMessage>> entry : gossipCore.getPerNodeData().entrySet()){
+        for (Entry<String, GossipDataMessage> innerEntry : entry.getValue().entrySet()){
+          UdpGossipDataMessage message = new UdpGossipDataMessage();
+          message.setUuid(UUID.randomUUID().toString());
+          message.setUriFrom(me.getId());
+          message.setExpireAt(innerEntry.getValue().getExpireAt());
+          message.setKey(innerEntry.getValue().getKey());
+          message.setNodeId(innerEntry.getValue().getNodeId());
+          message.setTimestamp(innerEntry.getValue().getTimestamp());
+          message.setPayload(innerEntry.getValue().getPayload());
+          message.setTimestamp(innerEntry.getValue().getTimestamp());
+          byte[] json_bytes = MAPPER.writeValueAsString(message).getBytes();
+          int packet_length = json_bytes.length;
+          if (packet_length < GossipManager.MAX_PACKET_SIZE) {
+            gossipCore.sendOneWay(message, member.getUri());
+          } else {
+            LOGGER.error("The length of the to be send message is too large ("
+                    + packet_length + " > " + GossipManager.MAX_PACKET_SIZE + ").");
+          }
+        }
+      }
+    } catch (IOException e1) {
+      LOGGER.warn(e1);
+    }
+  }
+  
   /**
    * Performs the sending of the membership list, after we have incremented our own heartbeat.
    */
- protected void sendMembershipList(LocalGossipMember me, List<LocalGossipMember> memberList) {
-    
+  protected void sendMembershipList(LocalGossipMember me, List<LocalGossipMember> memberList) {  
     me.setHeartbeat(System.currentTimeMillis());
     LocalGossipMember member = selectPartner(memberList);
     if (member == null) {
-      GossipService.LOGGER.debug("Send sendMembershipList() is called without action");
+      LOGGER.debug("Send sendMembershipList() is called without action");
       return;
     } else {
-      GossipService.LOGGER.debug("Send sendMembershipList() is called to " + member.toString());
+      LOGGER.debug("Send sendMembershipList() is called to " + member.toString());
     }
     
     try (DatagramSocket socket = new DatagramSocket()) {
@@ -107,31 +149,30 @@
           LOGGER.warn("Message "+ message + " generated response "+ r);
         }
       } else {
-        GossipService.LOGGER.error("The length of the to be send message is too large ("
+        LOGGER.error("The length of the to be send message is too large ("
                 + packet_length + " > " + GossipManager.MAX_PACKET_SIZE + ").");
       }
     } catch (IOException e1) {
-      GossipService.LOGGER.warn(e1);
+      LOGGER.warn(e1);
     }
   }
+  
   /**
-   * Abstract method which should be implemented by a subclass. This method should return a member
-   * of the list to gossip with.
    * 
    * @param memberList
    *          The list of members which are stored in the local list of members.
    * @return The chosen LocalGossipMember to gossip with.
    */
- protected LocalGossipMember selectPartner(List<LocalGossipMember> memberList) {
-   LocalGossipMember member = null;
-   if (memberList.size() > 0) {
-     int randomNeighborIndex = random.nextInt(memberList.size());
-     member = memberList.get(randomNeighborIndex);
-   } else {
-     LOGGER.debug("I am alone in this world.");
-   }
-   return member;
- }
+  protected LocalGossipMember selectPartner(List<LocalGossipMember> memberList) {
+    LocalGossipMember member = null;
+    if (memberList.size() > 0) {
+      int randomNeighborIndex = random.nextInt(memberList.size());
+      member = memberList.get(randomNeighborIndex);
+    } else {
+      LOGGER.debug("I am alone in this world.");
+    }
+    return member;
+  }
   
   private GossipMember convert(LocalGossipMember member){
     GossipMember gm = new GossipMember();
diff --git a/src/main/java/org/apache/gossip/manager/GossipCore.java b/src/main/java/org/apache/gossip/manager/GossipCore.java
index ab24621..46d855a 100644
--- a/src/main/java/org/apache/gossip/manager/GossipCore.java
+++ b/src/main/java/org/apache/gossip/manager/GossipCore.java
@@ -21,10 +21,12 @@
 import org.apache.gossip.RemoteGossipMember;
 import org.apache.gossip.model.ActiveGossipMessage;
 import org.apache.gossip.model.Base;
+import org.apache.gossip.model.GossipDataMessage;
 import org.apache.gossip.model.Response;
 import org.apache.gossip.udp.Trackable;
 import org.apache.gossip.udp.UdpActiveGossipMessage;
 import org.apache.gossip.udp.UdpActiveGossipOk;
+import org.apache.gossip.udp.UdpGossipDataMessage;
 import org.apache.gossip.udp.UdpNotAMemberFault;
 import org.apache.log4j.Logger;
 import org.codehaus.jackson.map.ObjectMapper;
@@ -34,15 +36,32 @@
   public static final Logger LOGGER = Logger.getLogger(GossipCore.class);
   private static final ObjectMapper MAPPER = new ObjectMapper();
   private final GossipManager gossipManager;
-
   private ConcurrentHashMap<String, Base> requests;
-  
   private ExecutorService service;
+  private final ConcurrentHashMap<String, ConcurrentHashMap<String, GossipDataMessage>> perNodeData;
   
   public GossipCore(GossipManager manager){
     this.gossipManager = manager;
     requests = new ConcurrentHashMap<>();
     service = Executors.newFixedThreadPool(500);
+    perNodeData = new ConcurrentHashMap<>();
+  }
+  
+  /**
+   *  
+   * @param message
+   */
+  public void addPerNodeData(GossipDataMessage message){
+    ConcurrentHashMap<String,GossipDataMessage> m = new ConcurrentHashMap<>();
+    m.put(message.getKey(), message);
+    m = perNodeData.putIfAbsent(message.getNodeId(), m);
+    if (m != null){
+      m.put(message.getKey(), message);    //TODO only put if > ts
+    }
+  }
+  
+  public ConcurrentHashMap<String, ConcurrentHashMap<String, GossipDataMessage>> getPerNodeData(){
+    return perNodeData;
   }
   
   public void shutdown(){
@@ -61,6 +80,10 @@
         requests.put(t.getUuid() + "/" + t.getUriFrom(), (Base) t);
       }
     }
+    if (base instanceof GossipDataMessage) {
+      UdpGossipDataMessage message = (UdpGossipDataMessage) base;
+      addPerNodeData(message);
+    }
     if (base instanceof ActiveGossipMessage){
       List<GossipMember> remoteGossipMembers = new ArrayList<>();
       RemoteGossipMember senderMember = null;
@@ -153,11 +176,11 @@
     } catch (InterruptedException e) {
       throw new RuntimeException(e);
     } catch (ExecutionException e) {
-      LOGGER.error(e.getMessage(), e);
+      LOGGER.debug(e.getMessage(), e);
       return null;
     } catch (TimeoutException e) {
       boolean cancelled = response.cancel(true);
-      LOGGER.error(String.format("Threadpool timeout attempting to contact %s, cancelled ? %b", uri.toString(), cancelled));
+      LOGGER.debug(String.format("Threadpool timeout attempting to contact %s, cancelled ? %b", uri.toString(), cancelled));
       return null; 
     } finally {
       if (t != null){
diff --git a/src/main/java/org/apache/gossip/manager/GossipManager.java b/src/main/java/org/apache/gossip/manager/GossipManager.java
index 0b2cfd2..94b57d1 100644
--- a/src/main/java/org/apache/gossip/manager/GossipManager.java
+++ b/src/main/java/org/apache/gossip/manager/GossipManager.java
@@ -22,6 +22,7 @@
 import java.util.Collections;
 import java.util.List;
 import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -41,6 +42,9 @@
 import org.apache.gossip.event.GossipState;
 import org.apache.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread;
 
+import org.apache.gossip.model.GossipDataMessage;
+
+
 public abstract class GossipManager implements NotificationListener {
 
   public static final Logger LOGGER = Logger.getLogger(GossipManager.class);
@@ -213,4 +217,19 @@
       LOGGER.error(e);
     }
   }
+  
+  public void gossipData(GossipDataMessage message){
+    message.setNodeId(me.getId());
+    gossipCore.addPerNodeData(message);
+  }
+  
+  public GossipDataMessage findGossipData(String nodeId, String key){
+    ConcurrentHashMap<String, GossipDataMessage> j = gossipCore.getPerNodeData().get(nodeId);
+    if (j == null){
+      return null;
+    } else {
+      return j.get(key);
+    }
+  }
+            
 }
diff --git a/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java b/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java
index 6d440de..11c371e 100644
--- a/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java
+++ b/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java
@@ -121,17 +121,4 @@
     }
   }
 
-  /**
-   * Abstract method for merging the local and remote list.
-   * 
-   * @param gossipManager
-   *          The GossipManager for retrieving the local members and dead members list.
-   * @param senderMember
-   *          The member who is sending this list, this could be used to send a response if the
-   *          remote list contains out-dated information.
-   * @param remoteList
-   *          The list of members known at the remote side.
-   */
-  abstract protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember,
-          List<GossipMember> remoteList);
 }
\ No newline at end of file
diff --git a/src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java b/src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java
index 0b51573..dff5056 100644
--- a/src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java
+++ b/src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java
@@ -17,11 +17,6 @@
  */
 package org.apache.gossip.manager.impl;
 
-import java.util.List;
-
-import org.apache.gossip.GossipMember;
-import org.apache.gossip.LocalGossipMember;
-import org.apache.gossip.RemoteGossipMember;
 import org.apache.gossip.manager.GossipCore;
 import org.apache.gossip.manager.GossipManager;
 import org.apache.gossip.manager.PassiveGossipThread;
@@ -35,79 +30,4 @@
     super(gossipManager, gossipCore);
   }
 
-  /**
-   * Merge remote list (received from peer), and our local member list. Simply, we must update the
-   * heartbeats that the remote list has with our list. Also, some additional logic is needed to
-   * make sure we have not timed out a member and then immediately received a list with that member.
-   * 
-   * @param gossipManager
-   * @param senderMember
-   * @param remoteList
-   */
-  protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember,
-          List<GossipMember> remoteList) {
-
-    // if the person sending to us is in the dead list consider them up
-    for (LocalGossipMember i : gossipManager.getDeadList()) {
-      if (i.getId().equals(senderMember.getId())) {
-        LOGGER.info(gossipManager.getMyself() + " contacted by dead member " + senderMember.getUri());
-        LocalGossipMember newLocalMember = new LocalGossipMember(senderMember.getClusterName(),
-                senderMember.getUri(), senderMember.getId(),
-                senderMember.getHeartbeat(), gossipManager, gossipManager.getSettings()
-                        .getCleanupInterval());
-        gossipManager.reviveMember(newLocalMember);
-        newLocalMember.startTimeoutTimer();
-      }
-    }
-    for (GossipMember remoteMember : remoteList) {
-      if (remoteMember.getId().equals(gossipManager.getMyself().getId())) {
-        continue;
-      }
-      if (gossipManager.getLiveMembers().contains(remoteMember)) {
-        LocalGossipMember localMember = gossipManager.getLiveMembers().get(
-                gossipManager.getLiveMembers().indexOf(remoteMember));
-        if (remoteMember.getHeartbeat() > localMember.getHeartbeat()) {
-          localMember.setHeartbeat(remoteMember.getHeartbeat());
-          localMember.resetTimeoutTimer();
-        }
-      } else if (!gossipManager.getLiveMembers().contains(remoteMember)
-              && !gossipManager.getDeadList().contains(remoteMember)) {
-        LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getClusterName(),
-                remoteMember.getUri(), remoteMember.getId(),
-                remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings()
-                        .getCleanupInterval());
-        gossipManager.createOrReviveMember(newLocalMember);
-        newLocalMember.startTimeoutTimer();
-      } else {
-        if (gossipManager.getDeadList().contains(remoteMember)) {
-          LocalGossipMember localDeadMember = gossipManager.getDeadList().get(
-                  gossipManager.getDeadList().indexOf(remoteMember));
-          if (remoteMember.getHeartbeat() > localDeadMember.getHeartbeat()) {
-            LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getClusterName(),
-                    remoteMember.getUri(), remoteMember.getId(),
-                    remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings()
-                            .getCleanupInterval());
-            gossipManager.reviveMember(newLocalMember);
-            newLocalMember.startTimeoutTimer();
-            LOGGER.debug("Removed remote member " + remoteMember.getAddress()
-                    + " from dead list and added to local member list.");
-          } else {
-            LOGGER.debug("me " + gossipManager.getMyself());
-            LOGGER.debug("sender " + senderMember);
-            LOGGER.debug("remote " + remoteList);
-            LOGGER.debug("live " + gossipManager.getLiveMembers());
-            LOGGER.debug("dead " + gossipManager.getDeadList());
-          }
-        } else {
-          LOGGER.debug("me " + gossipManager.getMyself());
-          LOGGER.debug("sender " + senderMember);
-          LOGGER.debug("remote " + remoteList);
-          LOGGER.debug("live " + gossipManager.getLiveMembers());
-          LOGGER.debug("dead " + gossipManager.getDeadList());
-          // throw new IllegalArgumentException("wtf");
-        }
-      }
-    }
-  }
-
 }
diff --git a/src/main/java/org/apache/gossip/model/Base.java b/src/main/java/org/apache/gossip/model/Base.java
index ebb3215..66c2be6 100644
--- a/src/main/java/org/apache/gossip/model/Base.java
+++ b/src/main/java/org/apache/gossip/model/Base.java
@@ -2,6 +2,7 @@
 
 import org.apache.gossip.udp.UdpActiveGossipMessage;
 import org.apache.gossip.udp.UdpActiveGossipOk;
+import org.apache.gossip.udp.UdpGossipDataMessage;
 import org.apache.gossip.udp.UdpNotAMemberFault;
 import org.codehaus.jackson.annotate.JsonSubTypes;
 import org.codehaus.jackson.annotate.JsonSubTypes.Type;
@@ -17,7 +18,9 @@
         @Type(value = ActiveGossipOk.class, name = "ActiveGossipOk"),
         @Type(value = UdpActiveGossipOk.class, name = "UdpActiveGossipOk"),
         @Type(value = UdpActiveGossipMessage.class, name = "UdpActiveGossipMessage"),
-        @Type(value = UdpNotAMemberFault.class, name = "UdpNotAMemberFault")
+        @Type(value = UdpNotAMemberFault.class, name = "UdpNotAMemberFault"),
+        @Type(value = GossipDataMessage.class, name = "GossipDataMessage"),
+        @Type(value = UdpGossipDataMessage.class, name = "UdpGossipDataMessage")
         })
 public class Base {
 
diff --git a/src/main/java/org/apache/gossip/model/GossipDataMessage.java b/src/main/java/org/apache/gossip/model/GossipDataMessage.java
new file mode 100644
index 0000000..835c668
--- /dev/null
+++ b/src/main/java/org/apache/gossip/model/GossipDataMessage.java
@@ -0,0 +1,49 @@
+package org.apache.gossip.model;
+
+public class GossipDataMessage extends Base {
+
+  private String nodeId;
+  private String key;
+  private Object payload;
+  private Long timestamp;
+  private Long expireAt;
+  
+  public String getNodeId() {
+    return nodeId;
+  }
+  public void setNodeId(String nodeId) {
+    this.nodeId = nodeId;
+  }
+  public String getKey() {
+    return key;
+  }
+  public void setKey(String key) {
+    this.key = key;
+  }
+  public Object getPayload() {
+    return payload;
+  }
+  public void setPayload(Object payload) {
+    this.payload = payload;
+  }
+  public Long getTimestamp() {
+    return timestamp;
+  }
+  public void setTimestamp(Long timestamp) {
+    this.timestamp = timestamp;
+  }
+  public Long getExpireAt() {
+    return expireAt;
+  }
+  public void setExpireAt(Long expireAt) {
+    this.expireAt = expireAt;
+  }
+  @Override
+  public String toString() {
+    return "GossipDataMessage [nodeId=" + nodeId + ", key=" + key + ", payload=" + payload
+            + ", timestamp=" + timestamp + ", expireAt=" + expireAt + "]";
+  }
+
+  
+  
+}
diff --git a/src/main/java/org/apache/gossip/udp/UdpGossipDataMessage.java b/src/main/java/org/apache/gossip/udp/UdpGossipDataMessage.java
new file mode 100644
index 0000000..2ee4bbf
--- /dev/null
+++ b/src/main/java/org/apache/gossip/udp/UdpGossipDataMessage.java
@@ -0,0 +1,31 @@
+package org.apache.gossip.udp;
+
+import org.apache.gossip.model.GossipDataMessage;
+
+public class UdpGossipDataMessage extends GossipDataMessage implements Trackable {
+
+  private String uriFrom;
+  private String uuid;
+  
+  public String getUriFrom() {
+    return uriFrom;
+  }
+  
+  public void setUriFrom(String uriFrom) {
+    this.uriFrom = uriFrom;
+  }
+  
+  public String getUuid() {
+    return uuid;
+  }
+  
+  public void setUuid(String uuid) {
+    this.uuid = uuid;
+  }
+
+  @Override
+  public String toString() {
+    return "UdpGossipDataMessage [uriFrom=" + uriFrom + ", uuid=" + uuid + "]";
+  }
+
+}
diff --git a/src/test/java/org/apache/gossip/DataTest.java b/src/test/java/org/apache/gossip/DataTest.java
new file mode 100644
index 0000000..6260f9b
--- /dev/null
+++ b/src/test/java/org/apache/gossip/DataTest.java
@@ -0,0 +1,81 @@
+package org.apache.gossip;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.gossip.event.GossipListener;
+import org.apache.gossip.event.GossipState;
+import org.apache.gossip.model.GossipDataMessage;
+import org.junit.Test;
+
+import io.teknek.tunit.TUnit;
+
+public class DataTest {
+  
+  @Test
+  public void abc() throws InterruptedException, UnknownHostException, URISyntaxException{
+    GossipSettings settings = new GossipSettings();
+    String cluster = UUID.randomUUID().toString();
+    int seedNodes = 1;
+    List<GossipMember> startupMembers = new ArrayList<>();
+    for (int i = 1; i < seedNodes+1; ++i) {
+      URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
+      startupMembers.add(new RemoteGossipMember(cluster, uri, i + ""));
+    }
+    final List<GossipService> clients = new ArrayList<>();
+    final int clusterMembers = 2;
+    for (int i = 1; i < clusterMembers+1; ++i) {
+      URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
+      GossipService gossipService = new GossipService(cluster, uri, i + "",
+              startupMembers, settings,
+              new GossipListener(){
+        public void gossipEvent(GossipMember member, GossipState state) {
+          
+        }
+      });
+      clients.add(gossipService);
+      gossipService.start();
+    }
+    TUnit.assertThat(new Callable<Integer> (){
+      public Integer call() throws Exception {
+        int total = 0;
+        for (int i = 0; i < clusterMembers; ++i) {
+          total += clients.get(i).get_gossipManager().getLiveMembers().size();
+        }
+        return total;
+      }}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(2);
+    clients.get(0).gossipData(msg());
+    Thread.sleep(10000);
+    TUnit.assertThat(
+            
+            new Callable<Object> (){
+              public Object call() throws Exception {
+                GossipDataMessage x = clients.get(1).findGossipData(1+"" , "a");
+                if (x == null) return "";
+                else return x.getPayload();
+              }})
+            
+            
+            //() -> clients.get(1).findGossipData(1+"" , "a").getPayload())
+    .afterWaitingAtMost(20, TimeUnit.SECONDS)
+    .isEqualTo("b");
+    for (int i = 0; i < clusterMembers; ++i) {
+      clients.get(i).shutdown();
+    }
+  }
+  
+  private GossipDataMessage msg(){
+    GossipDataMessage g = new GossipDataMessage();
+    g.setExpireAt(Long.MAX_VALUE);
+    g.setKey("a");
+    g.setPayload("b");
+    g.setTimestamp(System.currentTimeMillis());
+    return g;
+  }
+}
diff --git a/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java b/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java
index f0d7f10..82cb625 100644
--- a/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java
+++ b/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java
@@ -29,7 +29,6 @@
 import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 
-
 import org.apache.log4j.Logger;
 
 import org.apache.gossip.event.GossipListener;
@@ -43,97 +42,102 @@
 public class ShutdownDeadtimeTest {
 
   private static final Logger log = Logger.getLogger(ShutdownDeadtimeTest.class);
-
+  
   @Test
-  public void DeadNodesDoNotComeAliveAgain() throws InterruptedException, UnknownHostException, URISyntaxException {
-      GossipSettings settings = new GossipSettings(1000, 10000);
-      String cluster = UUID.randomUUID().toString();
-      
-      log.info( "Adding seed nodes" );
-      int seedNodes = 3;
-      List<GossipMember> startupMembers = new ArrayList<>();
-      for (int i = 1; i < seedNodes + 1; ++i) {
-        URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
-        startupMembers.add(new RemoteGossipMember(cluster, uri, i + ""));
-      }
+  public void DeadNodesDoNotComeAliveAgain()
+          throws InterruptedException, UnknownHostException, URISyntaxException {
+    GossipSettings settings = new GossipSettings(1000, 10000);
+    String cluster = UUID.randomUUID().toString();
 
-      log.info( "Adding clients" );
-      final List<GossipService> clients = new ArrayList<>();
-      final int clusterMembers = 5;
-      for (int i = 1; i < clusterMembers+1; ++i) {
-          final int j = i;
-          URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
-          GossipService gossipService = new GossipService(cluster, uri, i + "",
-                  startupMembers, settings,
-                  new GossipListener(){
-                      @Override
-                      public void gossipEvent(GossipMember member, GossipState state) {
-                          System.out.println(System.currentTimeMillis() + " Member "+j + " reports "+ member+" "+ state);
-                      }
-                  });
-          clients.add(gossipService);
-          gossipService.start();
-      }
-      TUnit.assertThat(new Callable<Integer> (){
-          public Integer call() throws Exception {
-              int total = 0;
-              for (int i = 0; i < clusterMembers; ++i) {
-                  total += clients.get(i).get_gossipManager().getLiveMembers().size();
-              }
-              return total;
-          }}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(20);
+    log.info("Adding seed nodes");
+    int seedNodes = 3;
+    List<GossipMember> startupMembers = new ArrayList<>();
+    for (int i = 1; i < seedNodes + 1; ++i) {
+      URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
+      startupMembers.add(new RemoteGossipMember(cluster, uri, i + ""));
+    }
 
-      // shutdown one client and verify that one client is lost.
-      Random r = new Random();
-      int randomClientId = r.nextInt(clusterMembers);
-      log.info( "shutting down " + randomClientId );
-      final int shutdownPort = clients.get(randomClientId).get_gossipManager().getMyself().getUri().getPort();
-      final String shutdownId = clients.get(randomClientId).get_gossipManager().getMyself().getId();
-      clients.get(randomClientId).shutdown();
-      TUnit.assertThat(new Callable<Integer> (){
-          public Integer call() throws Exception {
-              int total = 0;
-              for (int i = 0; i < clusterMembers; ++i) {
-                  total += clients.get(i).get_gossipManager().getLiveMembers().size();
-              }
-              return total;
-          }}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(16);
-      clients.remove(randomClientId);
-      
-      TUnit.assertThat(new Callable<Integer> (){
-        public Integer call() throws Exception {
-            int total = 0;
-            for (int i = 0; i < clusterMembers - 1; ++i) {
-                total += clients.get(i).get_gossipManager().getDeadList().size();
-            }
-            return total;
-        }}).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(4);
-      
-      URI uri = new URI("udp://" + "127.0.0.1" + ":" + shutdownPort);
-      // start client again
-      GossipService gossipService = new GossipService(cluster, uri, shutdownId + "",
-              startupMembers, settings,
-              new GossipListener(){
-                  @Override
-                  public void gossipEvent(GossipMember member, GossipState state) {
-                      //System.out.println("revived " + member+" "+ state);
-                  }
+    log.info("Adding clients");
+    final List<GossipService> clients = new ArrayList<>();
+    final int clusterMembers = 5;
+    for (int i = 1; i < clusterMembers + 1; ++i) {
+      final int j = i;
+      URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
+      GossipService gossipService = new GossipService(cluster, uri, i + "", startupMembers,
+              settings, new GossipListener() {
+                @Override
+                public void gossipEvent(GossipMember member, GossipState state) {
+                  System.out.println(System.currentTimeMillis() + " Member " + j + " reports "
+                          + member + " " + state);
+                }
               });
       clients.add(gossipService);
       gossipService.start();
-
-      // verify that the client is alive again for every node
-      TUnit.assertThat(new Callable<Integer> (){
-          public Integer call() throws Exception {
-              int total = 0;
-              for (int i = 0; i < clusterMembers; ++i) {
-                  total += clients.get(i).get_gossipManager().getLiveMembers().size();
-              }
-              return total;
-          }}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(20);
-      
-      for (int i = 0; i < clusterMembers; ++i) {
-        clients.get(i).shutdown();
+    }
+    TUnit.assertThat(new Callable<Integer>() {
+      public Integer call() throws Exception {
+        int total = 0;
+        for (int i = 0; i < clusterMembers; ++i) {
+          total += clients.get(i).get_gossipManager().getLiveMembers().size();
+        }
+        return total;
       }
+    }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(20);
+
+    // shutdown one client and verify that one client is lost.
+    Random r = new Random();
+    int randomClientId = r.nextInt(clusterMembers);
+    log.info("shutting down " + randomClientId);
+    final int shutdownPort = clients.get(randomClientId).get_gossipManager().getMyself().getUri()
+            .getPort();
+    final String shutdownId = clients.get(randomClientId).get_gossipManager().getMyself().getId();
+    clients.get(randomClientId).shutdown();
+    TUnit.assertThat(new Callable<Integer>() {
+      public Integer call() throws Exception {
+        int total = 0;
+        for (int i = 0; i < clusterMembers; ++i) {
+          total += clients.get(i).get_gossipManager().getLiveMembers().size();
+        }
+        return total;
+      }
+    }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(16);
+    clients.remove(randomClientId);
+
+    TUnit.assertThat(new Callable<Integer>() {
+      public Integer call() throws Exception {
+        int total = 0;
+        for (int i = 0; i < clusterMembers - 1; ++i) {
+          total += clients.get(i).get_gossipManager().getDeadList().size();
+        }
+        return total;
+      }
+    }).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(4);
+
+    URI uri = new URI("udp://" + "127.0.0.1" + ":" + shutdownPort);
+    // start client again
+    GossipService gossipService = new GossipService(cluster, uri, shutdownId + "", startupMembers,
+            settings, new GossipListener() {
+              @Override
+              public void gossipEvent(GossipMember member, GossipState state) {
+                // System.out.println("revived " + member+" "+ state);
+              }
+            });
+    clients.add(gossipService);
+    gossipService.start();
+
+    // verify that the client is alive again for every node
+    TUnit.assertThat(new Callable<Integer>() {
+      public Integer call() throws Exception {
+        int total = 0;
+        for (int i = 0; i < clusterMembers; ++i) {
+          total += clients.get(i).get_gossipManager().getLiveMembers().size();
+        }
+        return total;
+      }
+    }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(20);
+
+    for (int i = 0; i < clusterMembers; ++i) {
+      clients.get(i).shutdown();
+    }
   }
 }
diff --git a/src/test/java/org/apache/gossip/StartupSettingsTest.java b/src/test/java/org/apache/gossip/StartupSettingsTest.java
index 9019ac1..3a52fc7 100644
--- a/src/test/java/org/apache/gossip/StartupSettingsTest.java
+++ b/src/test/java/org/apache/gossip/StartupSettingsTest.java
@@ -21,6 +21,8 @@
 import org.json.JSONException;
 
 import io.teknek.tunit.TUnit;
+
+import org.junit.After;
 import org.junit.jupiter.api.Test;
 
 import java.io.File;
@@ -44,6 +46,7 @@
   private static final Logger log = Logger.getLogger( StartupSettingsTest.class );
   private static final String CLUSTER = UUID.randomUUID().toString();
 
+  
   @Test
   public void testUsingSettingsFile() throws IOException, InterruptedException, JSONException, URISyntaxException {
     File settingsFile = File.createTempFile("gossipTest",".json");
diff --git a/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java b/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java
index c98b0d3..0faa968 100644
--- a/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java
+++ b/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.gossip;
+package org.apache.gossip; 
 
 import io.teknek.tunit.TUnit;
 
@@ -33,6 +33,7 @@
 
 import org.apache.gossip.event.GossipListener;
 import org.apache.gossip.event.GossipState;
+import org.junit.After;
 import org.junit.jupiter.api.Test;
 
 @RunWith(JUnitPlatform.class)
diff --git a/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java b/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java
index ab3242c..875a7ab 100644
--- a/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java
+++ b/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java
@@ -45,14 +45,12 @@
   public static class TestGossipListener implements GossipListener {
     @Override
     public void gossipEvent(GossipMember member, GossipState state) {
-      System.out.println("Got gossip event");
     }
   }
 
   public static class TestNotificationListener implements NotificationListener {
     @Override
     public void handleNotification(Notification notification, Object o) {
-      System.out.println("Got notification event");
     }
   }
 
@@ -75,8 +73,8 @@
       expectThrows(IllegalArgumentException.class,() -> {
           RandomGossipManager.newBuilder().withId("id").cluster("aCluster").build();
       });
-
   }
+  
   @Test
   public void createMembersListIfNull() throws URISyntaxException {
     RandomGossipManager gossipManager = RandomGossipManager.newBuilder()