GOSSIP-13 Add two way protocol
diff --git a/src/main/java/org/apache/gossip/manager/GossipCore.java b/src/main/java/org/apache/gossip/manager/GossipCore.java
new file mode 100644
index 0000000..ccfa951
--- /dev/null
+++ b/src/main/java/org/apache/gossip/manager/GossipCore.java
@@ -0,0 +1,266 @@
+package org.apache.gossip.manager;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.InetAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.gossip.GossipMember;
+import org.apache.gossip.LocalGossipMember;
+import org.apache.gossip.RemoteGossipMember;
+import org.apache.gossip.model.ActiveGossipMessage;
+import org.apache.gossip.model.Base;
+import org.apache.gossip.model.Response;
+import org.apache.gossip.udp.Trackable;
+import org.apache.gossip.udp.UdpActiveGossipMessage;
+import org.apache.gossip.udp.UdpActiveGossipOk;
+import org.apache.gossip.udp.UdpNotAMemberFault;
+import org.apache.log4j.Logger;
+import org.codehaus.jackson.map.ObjectMapper;
+
+public class GossipCore {
+  
+  public static final Logger LOGGER = Logger.getLogger(GossipCore.class);
+  private static final ObjectMapper MAPPER = new ObjectMapper();
+  private final GossipManager gossipManager;
+
+  private ConcurrentHashMap<String, Base> requests;
+  
+  private ExecutorService service;
+  
+  public GossipCore(GossipManager manager){
+    this.gossipManager = manager;
+    requests = new ConcurrentHashMap<>();
+    service = Executors.newFixedThreadPool(500);
+  }
+  
+  public void shutdown(){
+    service.shutdown();
+    try {
+      service.awaitTermination(5, TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      LOGGER.warn(e);
+    }
+  }
+  
+  public void recieve(Base base){
+    if (base instanceof Response){
+      if (base instanceof Trackable){
+        Trackable t = (Trackable) base;
+        requests.put(t.getUuid() + "/" + t.getUriFrom(), (Base) t);
+      }
+    }
+    if (base instanceof ActiveGossipMessage){
+      List<GossipMember> remoteGossipMembers = new ArrayList<>();
+      RemoteGossipMember senderMember = null;
+      UdpActiveGossipMessage activeGossipMessage = (UdpActiveGossipMessage) base;
+      for (int i = 0; i < activeGossipMessage.getMembers().size(); i++) {
+        URI u = null;
+        try {
+          u = new URI(activeGossipMessage.getMembers().get(i).getUri());
+        } catch (URISyntaxException e) {
+          LOGGER.debug("Gossip message with faulty URI", e);
+          continue;
+        }
+        RemoteGossipMember member = new RemoteGossipMember(
+                activeGossipMessage.getMembers().get(i).getCluster(),
+                u,
+                activeGossipMessage.getMembers().get(i).getId(),
+                activeGossipMessage.getMembers().get(i).getHeartbeat());
+        if (i == 0) {
+          senderMember = member;
+        } 
+        if (!(member.getClusterName().equals(gossipManager.getMyself().getClusterName()))){
+          UdpNotAMemberFault f = new UdpNotAMemberFault();
+          f.setException("Not a member of this cluster " + i);
+          f.setUriFrom(activeGossipMessage.getUriFrom());
+          f.setUuid(activeGossipMessage.getUuid());
+          LOGGER.warn(f);
+          sendOneWay(f, member.getUri());
+          continue;
+        }
+        remoteGossipMembers.add(member);
+      }
+      UdpActiveGossipOk o = new UdpActiveGossipOk();
+      o.setUriFrom(activeGossipMessage.getUriFrom());
+      o.setUuid(activeGossipMessage.getUuid());
+      sendOneWay(o, senderMember.getUri());
+      mergeLists(gossipManager, senderMember, remoteGossipMembers);
+    }
+  }
+  
+  private void sendInternal(Base message, URI uri){
+    byte[] json_bytes;
+    try {
+      json_bytes = MAPPER.writeValueAsString(message).getBytes();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    int packet_length = json_bytes.length;
+    if (packet_length < GossipManager.MAX_PACKET_SIZE) {
+      byte[] buf = UdpUtil.createBuffer(packet_length, json_bytes);
+      try (DatagramSocket socket = new DatagramSocket()) {
+        InetAddress dest = InetAddress.getByName(uri.getHost());
+        DatagramPacket datagramPacket = new DatagramPacket(buf, buf.length, dest, uri.getPort());
+        socket.send(datagramPacket);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      } 
+    }
+  }
+  
+  public Response send(Base message, URI uri){
+    final Trackable t;
+    if (message instanceof Trackable){
+      t = (Trackable) message;
+    } else {
+      t = null;
+    }
+    sendInternal(message, uri);
+    if (t == null){
+      return null;
+    }
+    Future<Response> response = service.submit( new Callable<Response>(){
+      @Override
+      public Response call() throws Exception {
+        while(true){
+          Base b = requests.remove(t.getUuid() + "/" + t.getUriFrom());
+          if (b != null){
+            return (Response) b;
+          }
+          try {
+            Thread.sleep(0, 1000);
+          } catch (InterruptedException e) {
+            
+          }
+        }
+      }
+    });
+    
+    try {
+      return response.get(10, TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    } catch (ExecutionException e) {
+      System.err.println(e);
+      return null;
+    } catch (TimeoutException e) {
+      System.err.println(e);
+      return null; 
+    } finally {
+      if (t != null){
+        requests.remove(t.getUuid() + "/" + t.getUriFrom());
+      }
+    }
+    
+  }
+  
+  public void sendOneWay(Base message, URI u){
+    byte[] json_bytes;
+    try {
+      json_bytes = MAPPER.writeValueAsString(message).getBytes();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    int packet_length = json_bytes.length;
+    if (packet_length < GossipManager.MAX_PACKET_SIZE) {
+      byte[] buf = UdpUtil.createBuffer(packet_length, json_bytes);
+      try (DatagramSocket socket = new DatagramSocket()) {
+        InetAddress dest = InetAddress.getByName(u.getHost());
+        DatagramPacket datagramPacket = new DatagramPacket(buf, buf.length, dest, u.getPort());
+        socket.send(datagramPacket);
+      } catch (IOException ex) { }
+    }
+  }
+  
+
+  /**
+   * Merge remote list (received from peer), and our local member list. Simply, we must update the
+   * heartbeats that the remote list has with our list. Also, some additional logic is needed to
+   * make sure we have not timed out a member and then immediately received a list with that member.
+   * 
+   * @param gossipManager
+   * @param senderMember
+   * @param remoteList
+   * 
+   * COPIED FROM PASSIVE GOSSIP THREAD
+   */
+  protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember,
+          List<GossipMember> remoteList) {
+
+    // if the person sending to us is in the dead list consider them up
+    for (LocalGossipMember i : gossipManager.getDeadList()) {
+      if (i.getId().equals(senderMember.getId())) {
+        LOGGER.info(gossipManager.getMyself() + " contacted by dead member " + senderMember.getUri());
+        LocalGossipMember newLocalMember = new LocalGossipMember(senderMember.getClusterName(),
+                senderMember.getUri(), senderMember.getId(),
+                senderMember.getHeartbeat(), gossipManager, gossipManager.getSettings()
+                        .getCleanupInterval());
+        gossipManager.revivieMember(newLocalMember);
+        newLocalMember.startTimeoutTimer();
+      }
+    }
+    for (GossipMember remoteMember : remoteList) {
+      if (remoteMember.getId().equals(gossipManager.getMyself().getId())) {
+        continue;
+      }
+      if (gossipManager.getMemberList().contains(remoteMember)) {
+        LocalGossipMember localMember = gossipManager.getMemberList().get(
+                gossipManager.getMemberList().indexOf(remoteMember));
+        if (remoteMember.getHeartbeat() > localMember.getHeartbeat()) {
+          localMember.setHeartbeat(remoteMember.getHeartbeat());
+          localMember.resetTimeoutTimer();
+        }
+      } else if (!gossipManager.getMemberList().contains(remoteMember)
+              && !gossipManager.getDeadList().contains(remoteMember)) {
+        LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getClusterName(),
+                remoteMember.getUri(), remoteMember.getId(),
+                remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings()
+                        .getCleanupInterval());
+        gossipManager.createOrRevivieMember(newLocalMember);
+        newLocalMember.startTimeoutTimer();
+      } else {
+        if (gossipManager.getDeadList().contains(remoteMember)) {
+          LocalGossipMember localDeadMember = gossipManager.getDeadList().get(
+                  gossipManager.getDeadList().indexOf(remoteMember));
+          if (remoteMember.getHeartbeat() > localDeadMember.getHeartbeat()) {
+            LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getClusterName(),
+                    remoteMember.getUri(), remoteMember.getId(),
+                    remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings()
+                            .getCleanupInterval());
+            gossipManager.revivieMember(newLocalMember);
+            newLocalMember.startTimeoutTimer();
+            LOGGER.debug("Removed remote member " + remoteMember.getAddress()
+                    + " from dead list and added to local member list.");
+          } else {
+            LOGGER.debug("me " + gossipManager.getMyself());
+            LOGGER.debug("sender " + senderMember);
+            LOGGER.debug("remote " + remoteList);
+            LOGGER.debug("live " + gossipManager.getMemberList());
+            LOGGER.debug("dead " + gossipManager.getDeadList());
+          }
+        } else {
+          LOGGER.debug("me " + gossipManager.getMyself());
+          LOGGER.debug("sender " + senderMember);
+          LOGGER.debug("remote " + remoteList);
+          LOGGER.debug("live " + gossipManager.getMemberList());
+          LOGGER.debug("dead " + gossipManager.getDeadList());
+          // throw new IllegalArgumentException("wtf");
+        }
+      }
+    }
+  }
+
+  
+}
diff --git a/src/main/java/org/apache/gossip/manager/GossipManager.java b/src/main/java/org/apache/gossip/manager/GossipManager.java
index 363a4a9..7a10c91 100644
--- a/src/main/java/org/apache/gossip/manager/GossipManager.java
+++ b/src/main/java/org/apache/gossip/manager/GossipManager.java
@@ -17,8 +17,6 @@
  */
 package org.apache.gossip.manager;
 
-import java.lang.reflect.InvocationTargetException;
-import java.net.BindException;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -41,6 +39,8 @@
 import org.apache.gossip.LocalGossipMember;
 import org.apache.gossip.event.GossipListener;
 import org.apache.gossip.event.GossipState;
+import org.apache.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread;
+import org.apache.gossip.manager.random.RandomActiveGossipThread;
 
 public abstract class GossipManager extends Thread implements NotificationListener {
 
@@ -56,10 +56,6 @@
 
   private final AtomicBoolean gossipServiceRunning;
 
-  private final Class<? extends PassiveGossipThread> passiveGossipThreadClass;
-
-  private final Class<? extends ActiveGossipThread> activeGossipThreadClass;
-
   private final GossipListener listener;
 
   private ActiveGossipThread activeGossipThread;
@@ -67,14 +63,15 @@
   private PassiveGossipThread passiveGossipThread;
 
   private ExecutorService gossipThreadExecutor;
+  
+  private GossipCore gossipCore;
 
-  public GossipManager(Class<? extends PassiveGossipThread> passiveGossipThreadClass,
-          Class<? extends ActiveGossipThread> activeGossipThreadClass, String cluster,
+  public GossipManager(String cluster,
           URI uri, String id, GossipSettings settings,
           List<GossipMember> gossipMembers, GossipListener listener) {
-    this.passiveGossipThreadClass = passiveGossipThreadClass;
-    this.activeGossipThreadClass = activeGossipThreadClass;
+    
     this.settings = settings;
+    this.gossipCore = new GossipCore(this);
     me = new LocalGossipMember(cluster, uri, id, System.currentTimeMillis(), this,
             settings.getCleanupInterval());
     members = new ConcurrentSkipListMap<>();
@@ -173,20 +170,10 @@
         member.startTimeoutTimer();
       }
     }
-    try {
-      passiveGossipThread = passiveGossipThreadClass.getConstructor(GossipManager.class)
-              .newInstance(this);
-      gossipThreadExecutor.execute(passiveGossipThread);
-      activeGossipThread = activeGossipThreadClass.getConstructor(GossipManager.class)
-              .newInstance(this);
-      gossipThreadExecutor.execute(activeGossipThread);
-    } catch (InstantiationException | IllegalAccessException | IllegalArgumentException
-            | InvocationTargetException | NoSuchMethodException | SecurityException e1) {
-      if (e1 instanceof BindException){
-        LOGGER.fatal("could not bind to "+ me.getUri() + " " + me.getAddress());
-      }
-      throw new RuntimeException(e1);
-    }
+    passiveGossipThread = new OnlyProcessReceivedPassiveGossipThread(this, gossipCore);
+    gossipThreadExecutor.execute(passiveGossipThread);
+    activeGossipThread = new RandomActiveGossipThread(this, this.gossipCore);
+    gossipThreadExecutor.execute(activeGossipThread);
     GossipService.LOGGER.debug("The GossipService is started.");
     while (gossipServiceRunning.get()) {
       try {
@@ -204,6 +191,7 @@
   public void shutdown() {
     gossipServiceRunning.set(false);
     gossipThreadExecutor.shutdown();
+    gossipCore.shutdown();
     if (passiveGossipThread != null) {
       passiveGossipThread.shutdown();
     }
diff --git a/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java b/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java
index 0b12ee4..6d440de 100644
--- a/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java
+++ b/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java
@@ -23,15 +23,12 @@
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.net.SocketException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.gossip.GossipMember;
 import org.apache.gossip.GossipService;
-import org.apache.gossip.model.ActiveGossipMessage;
+import org.apache.gossip.model.Base;
 import org.apache.log4j.Logger;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.apache.gossip.RemoteGossipMember;
@@ -49,16 +46,16 @@
   /** The socket used for the passive thread of the gossip service. */
   private final DatagramSocket server;
 
-  private final GossipManager gossipManager;
-
   private final AtomicBoolean keepRunning;
 
   private final String cluster;
   
   private final ObjectMapper MAPPER = new ObjectMapper();
+  
+  private final GossipCore gossipCore;
 
-  public PassiveGossipThread(GossipManager gossipManager) {
-    this.gossipManager = gossipManager;
+  public PassiveGossipThread(GossipManager gossipManager, GossipCore gossipCore) {
+    this.gossipCore = gossipCore;
     try {
       SocketAddress socketAddress = new InetSocketAddress(gossipManager.getMyself().getUri().getHost(),
               gossipManager.getMyself().getUri().getPort());
@@ -84,57 +81,21 @@
         byte[] buf = new byte[server.getReceiveBufferSize()];
         DatagramPacket p = new DatagramPacket(buf, buf.length);
         server.receive(p);
-        int packet_length = 0;
-        for (int i = 0; i < 4; i++) {
-          int shift = (4 - 1 - i) * 8;
-          packet_length += (buf[i] & 0x000000FF) << shift;
-        }
+        int packet_length = UdpUtil.readPacketLengthFromBuffer(buf);
         if (packet_length <= GossipManager.MAX_PACKET_SIZE) {
           byte[] json_bytes = new byte[packet_length];
           for (int i = 0; i < packet_length; i++) {
             json_bytes[i] = buf[i + 4];
           }
-          if (GossipService.LOGGER.isDebugEnabled()){
-            String receivedMessage = new String(json_bytes);
-            GossipService.LOGGER.debug("Received message (" + packet_length + " bytes): "
-                  + receivedMessage);
-          }
+          debug(packet_length, json_bytes);
           try {
-            List<GossipMember> remoteGossipMembers = new ArrayList<>();
-            RemoteGossipMember senderMember = null;
-            ActiveGossipMessage activeGossipMessage = MAPPER.readValue(json_bytes,
-                    ActiveGossipMessage.class);
-            for (int i = 0; i < activeGossipMessage.getMembers().size(); i++) {
-              URI u = null;
-              try {
-                u = new URI(activeGossipMessage.getMembers().get(i).getUri());
-              } catch (URISyntaxException e) {
-                LOGGER.debug("Gossip message with faulty URI", e);
-                continue;
-              }
-              RemoteGossipMember member = new RemoteGossipMember(
-                      activeGossipMessage.getMembers().get(i).getCluster(),
-                      u,
-                      activeGossipMessage.getMembers().get(i).getId(),
-                      activeGossipMessage.getMembers().get(i).getHeartbeat());
-              if (!(member.getClusterName().equals(cluster))){
-                LOGGER.warn("Note a member of this cluster " + i);
-                continue;
-              }
-              // This is the first member found, so this should be the member who is communicating
-              // with me.
-              if (i == 0) {
-                senderMember = member;
-              } 
-              remoteGossipMembers.add(member);
-            }
-            mergeLists(gossipManager, senderMember, remoteGossipMembers);
-          } catch (RuntimeException ex) {
+            Base activeGossipMessage = MAPPER.readValue(json_bytes, Base.class);
+            gossipCore.recieve(activeGossipMessage);
+          } catch (RuntimeException ex) {//TODO trap json exception
             LOGGER.error("Unable to process message", ex);
           }
         } else {
-          LOGGER
-                  .error("The received message is not of the expected size, it has been dropped.");
+          LOGGER.error("The received message is not of the expected size, it has been dropped.");
         }
 
       } catch (IOException e) {
@@ -145,6 +106,14 @@
     shutdown();
   }
 
+  private void debug(int packetLength, byte[] jsonBytes) {
+    if (GossipService.LOGGER.isDebugEnabled()){
+      String receivedMessage = new String(jsonBytes);
+      GossipService.LOGGER.debug("Received message (" + packetLength + " bytes): "
+            + receivedMessage);
+    }
+  }
+
   public void shutdown() {
     try {
       server.close();
diff --git a/src/main/java/org/apache/gossip/manager/Transport.java b/src/main/java/org/apache/gossip/manager/Transport.java
new file mode 100644
index 0000000..72b90df
--- /dev/null
+++ b/src/main/java/org/apache/gossip/manager/Transport.java
@@ -0,0 +1,5 @@
+package org.apache.gossip.manager;
+
+public class Transport {
+
+}
diff --git a/src/main/java/org/apache/gossip/manager/UdpUtil.java b/src/main/java/org/apache/gossip/manager/UdpUtil.java
new file mode 100644
index 0000000..a6a0174
--- /dev/null
+++ b/src/main/java/org/apache/gossip/manager/UdpUtil.java
@@ -0,0 +1,28 @@
+package org.apache.gossip.manager;
+
+import java.nio.ByteBuffer;
+
+public class UdpUtil {
+
+  public static int readPacketLengthFromBuffer(byte [] buffer){
+    int packetLength = 0;
+    for (int i = 0; i < 4; i++) {
+      int shift = (4 - 1 - i) * 8;
+      packetLength += (buffer[i] & 0x000000FF) << shift;
+    }
+    return packetLength;
+  }
+  
+  public static byte[] createBuffer(int packetLength, byte[] jsonBytes) {
+    byte[] lengthBytes = new byte[4];
+    lengthBytes[0] = (byte) (packetLength >> 24);
+    lengthBytes[1] = (byte) ((packetLength << 8) >> 24);
+    lengthBytes[2] = (byte) ((packetLength << 16) >> 24);
+    lengthBytes[3] = (byte) ((packetLength << 24) >> 24);
+    ByteBuffer byteBuffer = ByteBuffer.allocate(4 + jsonBytes.length);
+    byteBuffer.put(lengthBytes);
+    byteBuffer.put(jsonBytes);
+    byte[] buf = byteBuffer.array();
+    return buf;
+  }
+}
diff --git a/src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java b/src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java
index bde497f..79b04ce 100644
--- a/src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java
+++ b/src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java
@@ -22,6 +22,7 @@
 import org.apache.gossip.GossipMember;
 import org.apache.gossip.LocalGossipMember;
 import org.apache.gossip.RemoteGossipMember;
+import org.apache.gossip.manager.GossipCore;
 import org.apache.gossip.manager.GossipManager;
 import org.apache.gossip.manager.PassiveGossipThread;
 import org.apache.log4j.Logger;
@@ -30,8 +31,8 @@
   
   public static final Logger LOGGER = Logger.getLogger(OnlyProcessReceivedPassiveGossipThread.class);
 
-  public OnlyProcessReceivedPassiveGossipThread(GossipManager gossipManager) {
-    super(gossipManager);
+  public OnlyProcessReceivedPassiveGossipThread(GossipManager gossipManager, GossipCore gossipCore) {
+    super(gossipManager, gossipCore);
   }
 
   /**
@@ -110,21 +111,3 @@
   }
 
 }
-
-/**
- * old comment section: // If a member is restarted the heartbeat will restart from 1, so we should
- * check // that here. // So a member can become from the dead when it is either larger than a
- * previous // heartbeat (due to network failure) // or when the heartbeat is 1 (after a restart of
- * the service). // TODO: What if the first message of a gossip service is sent to a dead node? The
- * // second member will receive a heartbeat of two. // TODO: The above does happen. Maybe a special
- * message for a revived member? // TODO: Or maybe when a member is declared dead for more than //
- * _settings.getCleanupInterval() ms, reset the heartbeat to 0. // It will then accept a revived
- * member. // The above is now handle by checking whether the heartbeat differs //
- * _settings.getCleanupInterval(), it must be restarted.
- */
-
-/*
- * // The remote member is back from the dead. // Remove it from the dead list. //
- * gossipManager.getDeadList().remove(localDeadMember); // Add it as a new member and add it to the
- * member list.
- */
\ No newline at end of file
diff --git a/src/main/java/org/apache/gossip/manager/random/RandomActiveGossipThread.java b/src/main/java/org/apache/gossip/manager/random/RandomActiveGossipThread.java
index 53885b6..da8ed22 100644
--- a/src/main/java/org/apache/gossip/manager/random/RandomActiveGossipThread.java
+++ b/src/main/java/org/apache/gossip/manager/random/RandomActiveGossipThread.java
@@ -18,31 +18,38 @@
 package org.apache.gossip.manager.random;
 
 import java.io.IOException;
-import java.net.DatagramPacket;
 import java.net.DatagramSocket;
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Random;
+import java.util.UUID;
 
 import org.apache.gossip.GossipService;
 import org.apache.gossip.LocalGossipMember;
 import org.apache.gossip.manager.ActiveGossipThread;
+import org.apache.gossip.manager.GossipCore;
 import org.apache.gossip.manager.GossipManager;
-import org.apache.gossip.model.ActiveGossipMessage;
+import org.apache.gossip.model.ActiveGossipOk;
 import org.apache.gossip.model.GossipMember;
+import org.apache.gossip.model.Response;
+import org.apache.gossip.udp.UdpActiveGossipMessage;
+import org.apache.log4j.Logger;
 import org.codehaus.jackson.map.ObjectMapper;
 
 public class RandomActiveGossipThread extends ActiveGossipThread {
 
-  protected ObjectMapper om = new ObjectMapper();
+  public static final Logger LOGGER = Logger.getLogger(RandomActiveGossipThread.class);
+  protected ObjectMapper MAPPER = new ObjectMapper();
   
   /** The Random used for choosing a member to gossip with. */
   private final Random random;
+  private final GossipCore gossipCore;
 
-  public RandomActiveGossipThread(GossipManager gossipManager) {
+  public RandomActiveGossipThread(GossipManager gossipManager, GossipCore gossipCore) {
     super(gossipManager);
     random = new Random();
+    this.gossipCore = gossipCore;
   }
 
   /**
@@ -71,18 +78,22 @@
     }
     try (DatagramSocket socket = new DatagramSocket()) {
       socket.setSoTimeout(gossipManager.getSettings().getGossipInterval());
-      InetAddress dest = InetAddress.getByName(member.getUri().getHost());
-      ActiveGossipMessage message = new ActiveGossipMessage();
+      UdpActiveGossipMessage message = new UdpActiveGossipMessage();
+      message.setUriFrom(gossipManager.getMyself().getUri().toASCIIString());
+      message.setUuid(UUID.randomUUID().toString());
       message.getMembers().add(convert(me));
       for (LocalGossipMember other : memberList) {
         message.getMembers().add(convert(other));
       }
-      byte[] json_bytes = om.writeValueAsString(message).getBytes();
+      byte[] json_bytes = MAPPER.writeValueAsString(message).getBytes();
       int packet_length = json_bytes.length;
       if (packet_length < GossipManager.MAX_PACKET_SIZE) {
-        byte[] buf = createBuffer(packet_length, json_bytes);
-        DatagramPacket datagramPacket = new DatagramPacket(buf, buf.length, dest, member.getUri().getPort());
-        socket.send(datagramPacket);
+        Response r = gossipCore.send(message, member.getUri());
+        if (r instanceof ActiveGossipOk){
+          //maybe count metrics here
+        } else {
+          LOGGER.warn("Message "+ message + " generated response "+ r);
+        }
       } else {
         GossipService.LOGGER.error("The length of the to be send message is too large ("
                 + packet_length + " > " + GossipManager.MAX_PACKET_SIZE + ").");
@@ -91,19 +102,6 @@
       GossipService.LOGGER.warn(e1);
     }
   }
-
-  private byte[] createBuffer(int packetLength, byte[] jsonBytes) {
-    byte[] lengthBytes = new byte[4];
-    lengthBytes[0] = (byte) (packetLength >> 24);
-    lengthBytes[1] = (byte) ((packetLength << 8) >> 24);
-    lengthBytes[2] = (byte) ((packetLength << 16) >> 24);
-    lengthBytes[3] = (byte) ((packetLength << 24) >> 24);
-    ByteBuffer byteBuffer = ByteBuffer.allocate(4 + jsonBytes.length);
-    byteBuffer.put(lengthBytes);
-    byteBuffer.put(jsonBytes);
-    byte[] buf = byteBuffer.array();
-    return buf;
-  }
   
   private GossipMember convert(LocalGossipMember member){
     GossipMember gm = new GossipMember();
diff --git a/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java b/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java
index fa2b1c5..e7e19da 100644
--- a/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java
+++ b/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java
@@ -85,18 +85,15 @@
       checkArgument(cluster != null, "You must specify a cluster name");
       checkArgument(settings != null, "You must specify gossip settings");
       checkArgument(uri != null, "You must specify a uri");
-
       if (this.gossipMembers == null) {
         this.gossipMembers = new ArrayList<>();
       }
-
       return new RandomGossipManager(cluster, uri, id, settings, gossipMembers, listener);
     }
   }
 
   private RandomGossipManager(String cluster, URI uri, String id, GossipSettings settings, 
           List<GossipMember> gossipMembers, GossipListener listener) {
-    super(OnlyProcessReceivedPassiveGossipThread.class, RandomActiveGossipThread.class, cluster,
-            uri, id, settings, gossipMembers, listener);
+    super(cluster, uri, id, settings, gossipMembers, listener);
   }
 }
diff --git a/src/main/java/org/apache/gossip/model/ActiveGossipMessage.java b/src/main/java/org/apache/gossip/model/ActiveGossipMessage.java
index ac940d8..1927371 100644
--- a/src/main/java/org/apache/gossip/model/ActiveGossipMessage.java
+++ b/src/main/java/org/apache/gossip/model/ActiveGossipMessage.java
@@ -3,7 +3,7 @@
 import java.util.ArrayList;
 import java.util.List;
 
-public class ActiveGossipMessage {
+public class ActiveGossipMessage extends Base {
 
   private List<GossipMember> members = new ArrayList<>();
   
diff --git a/src/main/java/org/apache/gossip/model/ActiveGossipOk.java b/src/main/java/org/apache/gossip/model/ActiveGossipOk.java
new file mode 100644
index 0000000..256ccd6
--- /dev/null
+++ b/src/main/java/org/apache/gossip/model/ActiveGossipOk.java
@@ -0,0 +1,5 @@
+package org.apache.gossip.model;
+
+public class ActiveGossipOk extends Response {
+
+}
diff --git a/src/main/java/org/apache/gossip/model/Base.java b/src/main/java/org/apache/gossip/model/Base.java
new file mode 100644
index 0000000..ebb3215
--- /dev/null
+++ b/src/main/java/org/apache/gossip/model/Base.java
@@ -0,0 +1,24 @@
+package org.apache.gossip.model;
+
+import org.apache.gossip.udp.UdpActiveGossipMessage;
+import org.apache.gossip.udp.UdpActiveGossipOk;
+import org.apache.gossip.udp.UdpNotAMemberFault;
+import org.codehaus.jackson.annotate.JsonSubTypes;
+import org.codehaus.jackson.annotate.JsonSubTypes.Type;
+import org.codehaus.jackson.annotate.JsonTypeInfo;
+
+@JsonTypeInfo(  
+        use = JsonTypeInfo.Id.CLASS,  
+        include = JsonTypeInfo.As.PROPERTY,  
+        property = "type") 
+@JsonSubTypes({
+        @Type(value = ActiveGossipMessage.class, name = "ActiveGossipMessage"),
+        @Type(value = Fault.class, name = "Fault"),
+        @Type(value = ActiveGossipOk.class, name = "ActiveGossipOk"),
+        @Type(value = UdpActiveGossipOk.class, name = "UdpActiveGossipOk"),
+        @Type(value = UdpActiveGossipMessage.class, name = "UdpActiveGossipMessage"),
+        @Type(value = UdpNotAMemberFault.class, name = "UdpNotAMemberFault")
+        })
+public class Base {
+
+}
diff --git a/src/main/java/org/apache/gossip/model/Fault.java b/src/main/java/org/apache/gossip/model/Fault.java
new file mode 100644
index 0000000..ea00ea0
--- /dev/null
+++ b/src/main/java/org/apache/gossip/model/Fault.java
@@ -0,0 +1,23 @@
+package org.apache.gossip.model;
+
+public abstract class Fault extends Response {
+
+  private String exception;
+
+  public Fault(){}
+
+  public String getException() {
+    return exception;
+  }
+
+  public void setException(String exception) {
+    this.exception = exception;
+  }
+
+  @Override
+  public String toString() {
+    return "Fault [exception=" + exception + "]";
+  }
+
+}
+
diff --git a/src/main/java/org/apache/gossip/model/Message.java b/src/main/java/org/apache/gossip/model/Message.java
new file mode 100644
index 0000000..5eb59fa
--- /dev/null
+++ b/src/main/java/org/apache/gossip/model/Message.java
@@ -0,0 +1,5 @@
+package org.apache.gossip.model;
+
+public class Message extends Base{
+
+}
diff --git a/src/main/java/org/apache/gossip/model/NotAMemberFault.java b/src/main/java/org/apache/gossip/model/NotAMemberFault.java
new file mode 100644
index 0000000..e7badc1
--- /dev/null
+++ b/src/main/java/org/apache/gossip/model/NotAMemberFault.java
@@ -0,0 +1,12 @@
+package org.apache.gossip.model;
+
+public class NotAMemberFault extends Fault {
+
+  public NotAMemberFault(){
+    
+  }
+  
+  public NotAMemberFault(String message){
+    this.setException(message);
+  }
+}
diff --git a/src/main/java/org/apache/gossip/model/Response.java b/src/main/java/org/apache/gossip/model/Response.java
new file mode 100644
index 0000000..ab46b48
--- /dev/null
+++ b/src/main/java/org/apache/gossip/model/Response.java
@@ -0,0 +1,5 @@
+package org.apache.gossip.model;
+
+public abstract class Response extends Base {
+
+}
diff --git a/src/main/java/org/apache/gossip/udp/Trackable.java b/src/main/java/org/apache/gossip/udp/Trackable.java
new file mode 100644
index 0000000..e76e2c3
--- /dev/null
+++ b/src/main/java/org/apache/gossip/udp/Trackable.java
@@ -0,0 +1,13 @@
+package org.apache.gossip.udp;
+
+public interface Trackable {
+
+  public String getUriFrom();
+  
+  public void setUriFrom(String uriFrom);
+  
+  public String getUuid();
+  
+  public void setUuid(String uuid);
+  
+}
diff --git a/src/main/java/org/apache/gossip/udp/UdpActiveGossipMessage.java b/src/main/java/org/apache/gossip/udp/UdpActiveGossipMessage.java
new file mode 100644
index 0000000..1532294
--- /dev/null
+++ b/src/main/java/org/apache/gossip/udp/UdpActiveGossipMessage.java
@@ -0,0 +1,31 @@
+package org.apache.gossip.udp;
+
+import org.apache.gossip.model.ActiveGossipMessage;
+
+public class UdpActiveGossipMessage extends ActiveGossipMessage implements Trackable {
+
+  private String uriFrom;
+  private String uuid;
+  
+  public String getUriFrom() {
+    return uriFrom;
+  }
+  
+  public void setUriFrom(String uriFrom) {
+    this.uriFrom = uriFrom;
+  }
+  
+  public String getUuid() {
+    return uuid;
+  }
+  
+  public void setUuid(String uuid) {
+    this.uuid = uuid;
+  }
+
+  @Override
+  public String toString() {
+    return "UdpActiveGossipMessage [uriFrom=" + uriFrom + ", uuid=" + uuid + "]";
+  }
+  
+}
diff --git a/src/main/java/org/apache/gossip/udp/UdpActiveGossipOk.java b/src/main/java/org/apache/gossip/udp/UdpActiveGossipOk.java
new file mode 100644
index 0000000..119bc50
--- /dev/null
+++ b/src/main/java/org/apache/gossip/udp/UdpActiveGossipOk.java
@@ -0,0 +1,27 @@
+package org.apache.gossip.udp;
+
+import org.apache.gossip.model.ActiveGossipOk;
+
+public class UdpActiveGossipOk extends ActiveGossipOk implements Trackable {
+
+
+  private String uriFrom;
+  private String uuid;
+  
+  public String getUriFrom() {
+    return uriFrom;
+  }
+  
+  public void setUriFrom(String uriFrom) {
+    this.uriFrom = uriFrom;
+  }
+  
+  public String getUuid() {
+    return uuid;
+  }
+  
+  public void setUuid(String uuid) {
+    this.uuid = uuid;
+  }
+
+}
diff --git a/src/main/java/org/apache/gossip/udp/UdpNotAMemberFault.java b/src/main/java/org/apache/gossip/udp/UdpNotAMemberFault.java
new file mode 100644
index 0000000..7b4d5ba
--- /dev/null
+++ b/src/main/java/org/apache/gossip/udp/UdpNotAMemberFault.java
@@ -0,0 +1,29 @@
+package org.apache.gossip.udp;
+
+import org.apache.gossip.model.NotAMemberFault;
+
+public class UdpNotAMemberFault extends NotAMemberFault implements Trackable{
+
+  public UdpNotAMemberFault(){
+    
+  }
+  private String uriFrom;
+  private String uuid;
+  
+  public String getUriFrom() {
+    return uriFrom;
+  }
+  
+  public void setUriFrom(String uriFrom) {
+    this.uriFrom = uriFrom;
+  }
+  
+  public String getUuid() {
+    return uuid;
+  }
+  
+  public void setUuid(String uuid) {
+    this.uuid = uuid;
+  }
+  
+}