GOSSIP-13 Add two way protocol
diff --git a/src/main/java/org/apache/gossip/manager/GossipCore.java b/src/main/java/org/apache/gossip/manager/GossipCore.java
new file mode 100644
index 0000000..ccfa951
--- /dev/null
+++ b/src/main/java/org/apache/gossip/manager/GossipCore.java
@@ -0,0 +1,266 @@
+package org.apache.gossip.manager;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.InetAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.gossip.GossipMember;
+import org.apache.gossip.LocalGossipMember;
+import org.apache.gossip.RemoteGossipMember;
+import org.apache.gossip.model.ActiveGossipMessage;
+import org.apache.gossip.model.Base;
+import org.apache.gossip.model.Response;
+import org.apache.gossip.udp.Trackable;
+import org.apache.gossip.udp.UdpActiveGossipMessage;
+import org.apache.gossip.udp.UdpActiveGossipOk;
+import org.apache.gossip.udp.UdpNotAMemberFault;
+import org.apache.log4j.Logger;
+import org.codehaus.jackson.map.ObjectMapper;
+
+public class GossipCore {
+
+ public static final Logger LOGGER = Logger.getLogger(GossipCore.class);
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+ private final GossipManager gossipManager;
+
+ private ConcurrentHashMap<String, Base> requests;
+
+ private ExecutorService service;
+
+ public GossipCore(GossipManager manager){
+ this.gossipManager = manager;
+ requests = new ConcurrentHashMap<>();
+ service = Executors.newFixedThreadPool(500);
+ }
+
+ public void shutdown(){
+ service.shutdown();
+ try {
+ service.awaitTermination(5, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ LOGGER.warn(e);
+ }
+ }
+
+ public void recieve(Base base){
+ if (base instanceof Response){
+ if (base instanceof Trackable){
+ Trackable t = (Trackable) base;
+ requests.put(t.getUuid() + "/" + t.getUriFrom(), (Base) t);
+ }
+ }
+ if (base instanceof ActiveGossipMessage){
+ List<GossipMember> remoteGossipMembers = new ArrayList<>();
+ RemoteGossipMember senderMember = null;
+ UdpActiveGossipMessage activeGossipMessage = (UdpActiveGossipMessage) base;
+ for (int i = 0; i < activeGossipMessage.getMembers().size(); i++) {
+ URI u = null;
+ try {
+ u = new URI(activeGossipMessage.getMembers().get(i).getUri());
+ } catch (URISyntaxException e) {
+ LOGGER.debug("Gossip message with faulty URI", e);
+ continue;
+ }
+ RemoteGossipMember member = new RemoteGossipMember(
+ activeGossipMessage.getMembers().get(i).getCluster(),
+ u,
+ activeGossipMessage.getMembers().get(i).getId(),
+ activeGossipMessage.getMembers().get(i).getHeartbeat());
+ if (i == 0) {
+ senderMember = member;
+ }
+ if (!(member.getClusterName().equals(gossipManager.getMyself().getClusterName()))){
+ UdpNotAMemberFault f = new UdpNotAMemberFault();
+ f.setException("Not a member of this cluster " + i);
+ f.setUriFrom(activeGossipMessage.getUriFrom());
+ f.setUuid(activeGossipMessage.getUuid());
+ LOGGER.warn(f);
+ sendOneWay(f, member.getUri());
+ continue;
+ }
+ remoteGossipMembers.add(member);
+ }
+ UdpActiveGossipOk o = new UdpActiveGossipOk();
+ o.setUriFrom(activeGossipMessage.getUriFrom());
+ o.setUuid(activeGossipMessage.getUuid());
+ sendOneWay(o, senderMember.getUri());
+ mergeLists(gossipManager, senderMember, remoteGossipMembers);
+ }
+ }
+
+ private void sendInternal(Base message, URI uri){
+ byte[] json_bytes;
+ try {
+ json_bytes = MAPPER.writeValueAsString(message).getBytes();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ int packet_length = json_bytes.length;
+ if (packet_length < GossipManager.MAX_PACKET_SIZE) {
+ byte[] buf = UdpUtil.createBuffer(packet_length, json_bytes);
+ try (DatagramSocket socket = new DatagramSocket()) {
+ InetAddress dest = InetAddress.getByName(uri.getHost());
+ DatagramPacket datagramPacket = new DatagramPacket(buf, buf.length, dest, uri.getPort());
+ socket.send(datagramPacket);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ public Response send(Base message, URI uri){
+ final Trackable t;
+ if (message instanceof Trackable){
+ t = (Trackable) message;
+ } else {
+ t = null;
+ }
+ sendInternal(message, uri);
+ if (t == null){
+ return null;
+ }
+ Future<Response> response = service.submit( new Callable<Response>(){
+ @Override
+ public Response call() throws Exception {
+ while(true){
+ Base b = requests.remove(t.getUuid() + "/" + t.getUriFrom());
+ if (b != null){
+ return (Response) b;
+ }
+ try {
+ Thread.sleep(0, 1000);
+ } catch (InterruptedException e) {
+
+ }
+ }
+ }
+ });
+
+ try {
+ return response.get(10, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ } catch (ExecutionException e) {
+ System.err.println(e);
+ return null;
+ } catch (TimeoutException e) {
+ System.err.println(e);
+ return null;
+ } finally {
+ if (t != null){
+ requests.remove(t.getUuid() + "/" + t.getUriFrom());
+ }
+ }
+
+ }
+
+ public void sendOneWay(Base message, URI u){
+ byte[] json_bytes;
+ try {
+ json_bytes = MAPPER.writeValueAsString(message).getBytes();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ int packet_length = json_bytes.length;
+ if (packet_length < GossipManager.MAX_PACKET_SIZE) {
+ byte[] buf = UdpUtil.createBuffer(packet_length, json_bytes);
+ try (DatagramSocket socket = new DatagramSocket()) {
+ InetAddress dest = InetAddress.getByName(u.getHost());
+ DatagramPacket datagramPacket = new DatagramPacket(buf, buf.length, dest, u.getPort());
+ socket.send(datagramPacket);
+ } catch (IOException ex) { }
+ }
+ }
+
+
+ /**
+ * Merge remote list (received from peer), and our local member list. Simply, we must update the
+ * heartbeats that the remote list has with our list. Also, some additional logic is needed to
+ * make sure we have not timed out a member and then immediately received a list with that member.
+ *
+ * @param gossipManager
+ * @param senderMember
+ * @param remoteList
+ *
+ * COPIED FROM PASSIVE GOSSIP THREAD
+ */
+ protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember,
+ List<GossipMember> remoteList) {
+
+ // if the person sending to us is in the dead list consider them up
+ for (LocalGossipMember i : gossipManager.getDeadList()) {
+ if (i.getId().equals(senderMember.getId())) {
+ LOGGER.info(gossipManager.getMyself() + " contacted by dead member " + senderMember.getUri());
+ LocalGossipMember newLocalMember = new LocalGossipMember(senderMember.getClusterName(),
+ senderMember.getUri(), senderMember.getId(),
+ senderMember.getHeartbeat(), gossipManager, gossipManager.getSettings()
+ .getCleanupInterval());
+ gossipManager.revivieMember(newLocalMember);
+ newLocalMember.startTimeoutTimer();
+ }
+ }
+ for (GossipMember remoteMember : remoteList) {
+ if (remoteMember.getId().equals(gossipManager.getMyself().getId())) {
+ continue;
+ }
+ if (gossipManager.getMemberList().contains(remoteMember)) {
+ LocalGossipMember localMember = gossipManager.getMemberList().get(
+ gossipManager.getMemberList().indexOf(remoteMember));
+ if (remoteMember.getHeartbeat() > localMember.getHeartbeat()) {
+ localMember.setHeartbeat(remoteMember.getHeartbeat());
+ localMember.resetTimeoutTimer();
+ }
+ } else if (!gossipManager.getMemberList().contains(remoteMember)
+ && !gossipManager.getDeadList().contains(remoteMember)) {
+ LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getClusterName(),
+ remoteMember.getUri(), remoteMember.getId(),
+ remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings()
+ .getCleanupInterval());
+ gossipManager.createOrRevivieMember(newLocalMember);
+ newLocalMember.startTimeoutTimer();
+ } else {
+ if (gossipManager.getDeadList().contains(remoteMember)) {
+ LocalGossipMember localDeadMember = gossipManager.getDeadList().get(
+ gossipManager.getDeadList().indexOf(remoteMember));
+ if (remoteMember.getHeartbeat() > localDeadMember.getHeartbeat()) {
+ LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getClusterName(),
+ remoteMember.getUri(), remoteMember.getId(),
+ remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings()
+ .getCleanupInterval());
+ gossipManager.revivieMember(newLocalMember);
+ newLocalMember.startTimeoutTimer();
+ LOGGER.debug("Removed remote member " + remoteMember.getAddress()
+ + " from dead list and added to local member list.");
+ } else {
+ LOGGER.debug("me " + gossipManager.getMyself());
+ LOGGER.debug("sender " + senderMember);
+ LOGGER.debug("remote " + remoteList);
+ LOGGER.debug("live " + gossipManager.getMemberList());
+ LOGGER.debug("dead " + gossipManager.getDeadList());
+ }
+ } else {
+ LOGGER.debug("me " + gossipManager.getMyself());
+ LOGGER.debug("sender " + senderMember);
+ LOGGER.debug("remote " + remoteList);
+ LOGGER.debug("live " + gossipManager.getMemberList());
+ LOGGER.debug("dead " + gossipManager.getDeadList());
+ // throw new IllegalArgumentException("wtf");
+ }
+ }
+ }
+ }
+
+
+}
diff --git a/src/main/java/org/apache/gossip/manager/GossipManager.java b/src/main/java/org/apache/gossip/manager/GossipManager.java
index 363a4a9..7a10c91 100644
--- a/src/main/java/org/apache/gossip/manager/GossipManager.java
+++ b/src/main/java/org/apache/gossip/manager/GossipManager.java
@@ -17,8 +17,6 @@
*/
package org.apache.gossip.manager;
-import java.lang.reflect.InvocationTargetException;
-import java.net.BindException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
@@ -41,6 +39,8 @@
import org.apache.gossip.LocalGossipMember;
import org.apache.gossip.event.GossipListener;
import org.apache.gossip.event.GossipState;
+import org.apache.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread;
+import org.apache.gossip.manager.random.RandomActiveGossipThread;
public abstract class GossipManager extends Thread implements NotificationListener {
@@ -56,10 +56,6 @@
private final AtomicBoolean gossipServiceRunning;
- private final Class<? extends PassiveGossipThread> passiveGossipThreadClass;
-
- private final Class<? extends ActiveGossipThread> activeGossipThreadClass;
-
private final GossipListener listener;
private ActiveGossipThread activeGossipThread;
@@ -67,14 +63,15 @@
private PassiveGossipThread passiveGossipThread;
private ExecutorService gossipThreadExecutor;
+
+ private GossipCore gossipCore;
- public GossipManager(Class<? extends PassiveGossipThread> passiveGossipThreadClass,
- Class<? extends ActiveGossipThread> activeGossipThreadClass, String cluster,
+ public GossipManager(String cluster,
URI uri, String id, GossipSettings settings,
List<GossipMember> gossipMembers, GossipListener listener) {
- this.passiveGossipThreadClass = passiveGossipThreadClass;
- this.activeGossipThreadClass = activeGossipThreadClass;
+
this.settings = settings;
+ this.gossipCore = new GossipCore(this);
me = new LocalGossipMember(cluster, uri, id, System.currentTimeMillis(), this,
settings.getCleanupInterval());
members = new ConcurrentSkipListMap<>();
@@ -173,20 +170,10 @@
member.startTimeoutTimer();
}
}
- try {
- passiveGossipThread = passiveGossipThreadClass.getConstructor(GossipManager.class)
- .newInstance(this);
- gossipThreadExecutor.execute(passiveGossipThread);
- activeGossipThread = activeGossipThreadClass.getConstructor(GossipManager.class)
- .newInstance(this);
- gossipThreadExecutor.execute(activeGossipThread);
- } catch (InstantiationException | IllegalAccessException | IllegalArgumentException
- | InvocationTargetException | NoSuchMethodException | SecurityException e1) {
- if (e1 instanceof BindException){
- LOGGER.fatal("could not bind to "+ me.getUri() + " " + me.getAddress());
- }
- throw new RuntimeException(e1);
- }
+ passiveGossipThread = new OnlyProcessReceivedPassiveGossipThread(this, gossipCore);
+ gossipThreadExecutor.execute(passiveGossipThread);
+ activeGossipThread = new RandomActiveGossipThread(this, this.gossipCore);
+ gossipThreadExecutor.execute(activeGossipThread);
GossipService.LOGGER.debug("The GossipService is started.");
while (gossipServiceRunning.get()) {
try {
@@ -204,6 +191,7 @@
public void shutdown() {
gossipServiceRunning.set(false);
gossipThreadExecutor.shutdown();
+ gossipCore.shutdown();
if (passiveGossipThread != null) {
passiveGossipThread.shutdown();
}
diff --git a/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java b/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java
index 0b12ee4..6d440de 100644
--- a/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java
+++ b/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java
@@ -23,15 +23,12 @@
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.SocketException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.gossip.GossipMember;
import org.apache.gossip.GossipService;
-import org.apache.gossip.model.ActiveGossipMessage;
+import org.apache.gossip.model.Base;
import org.apache.log4j.Logger;
import org.codehaus.jackson.map.ObjectMapper;
import org.apache.gossip.RemoteGossipMember;
@@ -49,16 +46,16 @@
/** The socket used for the passive thread of the gossip service. */
private final DatagramSocket server;
- private final GossipManager gossipManager;
-
private final AtomicBoolean keepRunning;
private final String cluster;
private final ObjectMapper MAPPER = new ObjectMapper();
+
+ private final GossipCore gossipCore;
- public PassiveGossipThread(GossipManager gossipManager) {
- this.gossipManager = gossipManager;
+ public PassiveGossipThread(GossipManager gossipManager, GossipCore gossipCore) {
+ this.gossipCore = gossipCore;
try {
SocketAddress socketAddress = new InetSocketAddress(gossipManager.getMyself().getUri().getHost(),
gossipManager.getMyself().getUri().getPort());
@@ -84,57 +81,21 @@
byte[] buf = new byte[server.getReceiveBufferSize()];
DatagramPacket p = new DatagramPacket(buf, buf.length);
server.receive(p);
- int packet_length = 0;
- for (int i = 0; i < 4; i++) {
- int shift = (4 - 1 - i) * 8;
- packet_length += (buf[i] & 0x000000FF) << shift;
- }
+ int packet_length = UdpUtil.readPacketLengthFromBuffer(buf);
if (packet_length <= GossipManager.MAX_PACKET_SIZE) {
byte[] json_bytes = new byte[packet_length];
for (int i = 0; i < packet_length; i++) {
json_bytes[i] = buf[i + 4];
}
- if (GossipService.LOGGER.isDebugEnabled()){
- String receivedMessage = new String(json_bytes);
- GossipService.LOGGER.debug("Received message (" + packet_length + " bytes): "
- + receivedMessage);
- }
+ debug(packet_length, json_bytes);
try {
- List<GossipMember> remoteGossipMembers = new ArrayList<>();
- RemoteGossipMember senderMember = null;
- ActiveGossipMessage activeGossipMessage = MAPPER.readValue(json_bytes,
- ActiveGossipMessage.class);
- for (int i = 0; i < activeGossipMessage.getMembers().size(); i++) {
- URI u = null;
- try {
- u = new URI(activeGossipMessage.getMembers().get(i).getUri());
- } catch (URISyntaxException e) {
- LOGGER.debug("Gossip message with faulty URI", e);
- continue;
- }
- RemoteGossipMember member = new RemoteGossipMember(
- activeGossipMessage.getMembers().get(i).getCluster(),
- u,
- activeGossipMessage.getMembers().get(i).getId(),
- activeGossipMessage.getMembers().get(i).getHeartbeat());
- if (!(member.getClusterName().equals(cluster))){
- LOGGER.warn("Note a member of this cluster " + i);
- continue;
- }
- // This is the first member found, so this should be the member who is communicating
- // with me.
- if (i == 0) {
- senderMember = member;
- }
- remoteGossipMembers.add(member);
- }
- mergeLists(gossipManager, senderMember, remoteGossipMembers);
- } catch (RuntimeException ex) {
+ Base activeGossipMessage = MAPPER.readValue(json_bytes, Base.class);
+ gossipCore.recieve(activeGossipMessage);
+ } catch (RuntimeException ex) {//TODO trap json exception
LOGGER.error("Unable to process message", ex);
}
} else {
- LOGGER
- .error("The received message is not of the expected size, it has been dropped.");
+ LOGGER.error("The received message is not of the expected size, it has been dropped.");
}
} catch (IOException e) {
@@ -145,6 +106,14 @@
shutdown();
}
+ private void debug(int packetLength, byte[] jsonBytes) {
+ if (GossipService.LOGGER.isDebugEnabled()){
+ String receivedMessage = new String(jsonBytes);
+ GossipService.LOGGER.debug("Received message (" + packetLength + " bytes): "
+ + receivedMessage);
+ }
+ }
+
public void shutdown() {
try {
server.close();
diff --git a/src/main/java/org/apache/gossip/manager/Transport.java b/src/main/java/org/apache/gossip/manager/Transport.java
new file mode 100644
index 0000000..72b90df
--- /dev/null
+++ b/src/main/java/org/apache/gossip/manager/Transport.java
@@ -0,0 +1,5 @@
+package org.apache.gossip.manager;
+
+public class Transport {
+
+}
diff --git a/src/main/java/org/apache/gossip/manager/UdpUtil.java b/src/main/java/org/apache/gossip/manager/UdpUtil.java
new file mode 100644
index 0000000..a6a0174
--- /dev/null
+++ b/src/main/java/org/apache/gossip/manager/UdpUtil.java
@@ -0,0 +1,28 @@
+package org.apache.gossip.manager;
+
+import java.nio.ByteBuffer;
+
+public class UdpUtil {
+
+ public static int readPacketLengthFromBuffer(byte [] buffer){
+ int packetLength = 0;
+ for (int i = 0; i < 4; i++) {
+ int shift = (4 - 1 - i) * 8;
+ packetLength += (buffer[i] & 0x000000FF) << shift;
+ }
+ return packetLength;
+ }
+
+ public static byte[] createBuffer(int packetLength, byte[] jsonBytes) {
+ byte[] lengthBytes = new byte[4];
+ lengthBytes[0] = (byte) (packetLength >> 24);
+ lengthBytes[1] = (byte) ((packetLength << 8) >> 24);
+ lengthBytes[2] = (byte) ((packetLength << 16) >> 24);
+ lengthBytes[3] = (byte) ((packetLength << 24) >> 24);
+ ByteBuffer byteBuffer = ByteBuffer.allocate(4 + jsonBytes.length);
+ byteBuffer.put(lengthBytes);
+ byteBuffer.put(jsonBytes);
+ byte[] buf = byteBuffer.array();
+ return buf;
+ }
+}
diff --git a/src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java b/src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java
index bde497f..79b04ce 100644
--- a/src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java
+++ b/src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java
@@ -22,6 +22,7 @@
import org.apache.gossip.GossipMember;
import org.apache.gossip.LocalGossipMember;
import org.apache.gossip.RemoteGossipMember;
+import org.apache.gossip.manager.GossipCore;
import org.apache.gossip.manager.GossipManager;
import org.apache.gossip.manager.PassiveGossipThread;
import org.apache.log4j.Logger;
@@ -30,8 +31,8 @@
public static final Logger LOGGER = Logger.getLogger(OnlyProcessReceivedPassiveGossipThread.class);
- public OnlyProcessReceivedPassiveGossipThread(GossipManager gossipManager) {
- super(gossipManager);
+ public OnlyProcessReceivedPassiveGossipThread(GossipManager gossipManager, GossipCore gossipCore) {
+ super(gossipManager, gossipCore);
}
/**
@@ -110,21 +111,3 @@
}
}
-
-/**
- * old comment section: // If a member is restarted the heartbeat will restart from 1, so we should
- * check // that here. // So a member can become from the dead when it is either larger than a
- * previous // heartbeat (due to network failure) // or when the heartbeat is 1 (after a restart of
- * the service). // TODO: What if the first message of a gossip service is sent to a dead node? The
- * // second member will receive a heartbeat of two. // TODO: The above does happen. Maybe a special
- * message for a revived member? // TODO: Or maybe when a member is declared dead for more than //
- * _settings.getCleanupInterval() ms, reset the heartbeat to 0. // It will then accept a revived
- * member. // The above is now handle by checking whether the heartbeat differs //
- * _settings.getCleanupInterval(), it must be restarted.
- */
-
-/*
- * // The remote member is back from the dead. // Remove it from the dead list. //
- * gossipManager.getDeadList().remove(localDeadMember); // Add it as a new member and add it to the
- * member list.
- */
\ No newline at end of file
diff --git a/src/main/java/org/apache/gossip/manager/random/RandomActiveGossipThread.java b/src/main/java/org/apache/gossip/manager/random/RandomActiveGossipThread.java
index 53885b6..da8ed22 100644
--- a/src/main/java/org/apache/gossip/manager/random/RandomActiveGossipThread.java
+++ b/src/main/java/org/apache/gossip/manager/random/RandomActiveGossipThread.java
@@ -18,31 +18,38 @@
package org.apache.gossip.manager.random;
import java.io.IOException;
-import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Random;
+import java.util.UUID;
import org.apache.gossip.GossipService;
import org.apache.gossip.LocalGossipMember;
import org.apache.gossip.manager.ActiveGossipThread;
+import org.apache.gossip.manager.GossipCore;
import org.apache.gossip.manager.GossipManager;
-import org.apache.gossip.model.ActiveGossipMessage;
+import org.apache.gossip.model.ActiveGossipOk;
import org.apache.gossip.model.GossipMember;
+import org.apache.gossip.model.Response;
+import org.apache.gossip.udp.UdpActiveGossipMessage;
+import org.apache.log4j.Logger;
import org.codehaus.jackson.map.ObjectMapper;
public class RandomActiveGossipThread extends ActiveGossipThread {
- protected ObjectMapper om = new ObjectMapper();
+ public static final Logger LOGGER = Logger.getLogger(RandomActiveGossipThread.class);
+ protected ObjectMapper MAPPER = new ObjectMapper();
/** The Random used for choosing a member to gossip with. */
private final Random random;
+ private final GossipCore gossipCore;
- public RandomActiveGossipThread(GossipManager gossipManager) {
+ public RandomActiveGossipThread(GossipManager gossipManager, GossipCore gossipCore) {
super(gossipManager);
random = new Random();
+ this.gossipCore = gossipCore;
}
/**
@@ -71,18 +78,22 @@
}
try (DatagramSocket socket = new DatagramSocket()) {
socket.setSoTimeout(gossipManager.getSettings().getGossipInterval());
- InetAddress dest = InetAddress.getByName(member.getUri().getHost());
- ActiveGossipMessage message = new ActiveGossipMessage();
+ UdpActiveGossipMessage message = new UdpActiveGossipMessage();
+ message.setUriFrom(gossipManager.getMyself().getUri().toASCIIString());
+ message.setUuid(UUID.randomUUID().toString());
message.getMembers().add(convert(me));
for (LocalGossipMember other : memberList) {
message.getMembers().add(convert(other));
}
- byte[] json_bytes = om.writeValueAsString(message).getBytes();
+ byte[] json_bytes = MAPPER.writeValueAsString(message).getBytes();
int packet_length = json_bytes.length;
if (packet_length < GossipManager.MAX_PACKET_SIZE) {
- byte[] buf = createBuffer(packet_length, json_bytes);
- DatagramPacket datagramPacket = new DatagramPacket(buf, buf.length, dest, member.getUri().getPort());
- socket.send(datagramPacket);
+ Response r = gossipCore.send(message, member.getUri());
+ if (r instanceof ActiveGossipOk){
+ //maybe count metrics here
+ } else {
+ LOGGER.warn("Message "+ message + " generated response "+ r);
+ }
} else {
GossipService.LOGGER.error("The length of the to be send message is too large ("
+ packet_length + " > " + GossipManager.MAX_PACKET_SIZE + ").");
@@ -91,19 +102,6 @@
GossipService.LOGGER.warn(e1);
}
}
-
- private byte[] createBuffer(int packetLength, byte[] jsonBytes) {
- byte[] lengthBytes = new byte[4];
- lengthBytes[0] = (byte) (packetLength >> 24);
- lengthBytes[1] = (byte) ((packetLength << 8) >> 24);
- lengthBytes[2] = (byte) ((packetLength << 16) >> 24);
- lengthBytes[3] = (byte) ((packetLength << 24) >> 24);
- ByteBuffer byteBuffer = ByteBuffer.allocate(4 + jsonBytes.length);
- byteBuffer.put(lengthBytes);
- byteBuffer.put(jsonBytes);
- byte[] buf = byteBuffer.array();
- return buf;
- }
private GossipMember convert(LocalGossipMember member){
GossipMember gm = new GossipMember();
diff --git a/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java b/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java
index fa2b1c5..e7e19da 100644
--- a/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java
+++ b/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java
@@ -85,18 +85,15 @@
checkArgument(cluster != null, "You must specify a cluster name");
checkArgument(settings != null, "You must specify gossip settings");
checkArgument(uri != null, "You must specify a uri");
-
if (this.gossipMembers == null) {
this.gossipMembers = new ArrayList<>();
}
-
return new RandomGossipManager(cluster, uri, id, settings, gossipMembers, listener);
}
}
private RandomGossipManager(String cluster, URI uri, String id, GossipSettings settings,
List<GossipMember> gossipMembers, GossipListener listener) {
- super(OnlyProcessReceivedPassiveGossipThread.class, RandomActiveGossipThread.class, cluster,
- uri, id, settings, gossipMembers, listener);
+ super(cluster, uri, id, settings, gossipMembers, listener);
}
}
diff --git a/src/main/java/org/apache/gossip/model/ActiveGossipMessage.java b/src/main/java/org/apache/gossip/model/ActiveGossipMessage.java
index ac940d8..1927371 100644
--- a/src/main/java/org/apache/gossip/model/ActiveGossipMessage.java
+++ b/src/main/java/org/apache/gossip/model/ActiveGossipMessage.java
@@ -3,7 +3,7 @@
import java.util.ArrayList;
import java.util.List;
-public class ActiveGossipMessage {
+public class ActiveGossipMessage extends Base {
private List<GossipMember> members = new ArrayList<>();
diff --git a/src/main/java/org/apache/gossip/model/ActiveGossipOk.java b/src/main/java/org/apache/gossip/model/ActiveGossipOk.java
new file mode 100644
index 0000000..256ccd6
--- /dev/null
+++ b/src/main/java/org/apache/gossip/model/ActiveGossipOk.java
@@ -0,0 +1,5 @@
+package org.apache.gossip.model;
+
+public class ActiveGossipOk extends Response {
+
+}
diff --git a/src/main/java/org/apache/gossip/model/Base.java b/src/main/java/org/apache/gossip/model/Base.java
new file mode 100644
index 0000000..ebb3215
--- /dev/null
+++ b/src/main/java/org/apache/gossip/model/Base.java
@@ -0,0 +1,24 @@
+package org.apache.gossip.model;
+
+import org.apache.gossip.udp.UdpActiveGossipMessage;
+import org.apache.gossip.udp.UdpActiveGossipOk;
+import org.apache.gossip.udp.UdpNotAMemberFault;
+import org.codehaus.jackson.annotate.JsonSubTypes;
+import org.codehaus.jackson.annotate.JsonSubTypes.Type;
+import org.codehaus.jackson.annotate.JsonTypeInfo;
+
+@JsonTypeInfo(
+ use = JsonTypeInfo.Id.CLASS,
+ include = JsonTypeInfo.As.PROPERTY,
+ property = "type")
+@JsonSubTypes({
+ @Type(value = ActiveGossipMessage.class, name = "ActiveGossipMessage"),
+ @Type(value = Fault.class, name = "Fault"),
+ @Type(value = ActiveGossipOk.class, name = "ActiveGossipOk"),
+ @Type(value = UdpActiveGossipOk.class, name = "UdpActiveGossipOk"),
+ @Type(value = UdpActiveGossipMessage.class, name = "UdpActiveGossipMessage"),
+ @Type(value = UdpNotAMemberFault.class, name = "UdpNotAMemberFault")
+ })
+public class Base {
+
+}
diff --git a/src/main/java/org/apache/gossip/model/Fault.java b/src/main/java/org/apache/gossip/model/Fault.java
new file mode 100644
index 0000000..ea00ea0
--- /dev/null
+++ b/src/main/java/org/apache/gossip/model/Fault.java
@@ -0,0 +1,23 @@
+package org.apache.gossip.model;
+
+public abstract class Fault extends Response {
+
+ private String exception;
+
+ public Fault(){}
+
+ public String getException() {
+ return exception;
+ }
+
+ public void setException(String exception) {
+ this.exception = exception;
+ }
+
+ @Override
+ public String toString() {
+ return "Fault [exception=" + exception + "]";
+ }
+
+}
+
diff --git a/src/main/java/org/apache/gossip/model/Message.java b/src/main/java/org/apache/gossip/model/Message.java
new file mode 100644
index 0000000..5eb59fa
--- /dev/null
+++ b/src/main/java/org/apache/gossip/model/Message.java
@@ -0,0 +1,5 @@
+package org.apache.gossip.model;
+
+public class Message extends Base{
+
+}
diff --git a/src/main/java/org/apache/gossip/model/NotAMemberFault.java b/src/main/java/org/apache/gossip/model/NotAMemberFault.java
new file mode 100644
index 0000000..e7badc1
--- /dev/null
+++ b/src/main/java/org/apache/gossip/model/NotAMemberFault.java
@@ -0,0 +1,12 @@
+package org.apache.gossip.model;
+
+public class NotAMemberFault extends Fault {
+
+ public NotAMemberFault(){
+
+ }
+
+ public NotAMemberFault(String message){
+ this.setException(message);
+ }
+}
diff --git a/src/main/java/org/apache/gossip/model/Response.java b/src/main/java/org/apache/gossip/model/Response.java
new file mode 100644
index 0000000..ab46b48
--- /dev/null
+++ b/src/main/java/org/apache/gossip/model/Response.java
@@ -0,0 +1,5 @@
+package org.apache.gossip.model;
+
+public abstract class Response extends Base {
+
+}
diff --git a/src/main/java/org/apache/gossip/udp/Trackable.java b/src/main/java/org/apache/gossip/udp/Trackable.java
new file mode 100644
index 0000000..e76e2c3
--- /dev/null
+++ b/src/main/java/org/apache/gossip/udp/Trackable.java
@@ -0,0 +1,13 @@
+package org.apache.gossip.udp;
+
+public interface Trackable {
+
+ public String getUriFrom();
+
+ public void setUriFrom(String uriFrom);
+
+ public String getUuid();
+
+ public void setUuid(String uuid);
+
+}
diff --git a/src/main/java/org/apache/gossip/udp/UdpActiveGossipMessage.java b/src/main/java/org/apache/gossip/udp/UdpActiveGossipMessage.java
new file mode 100644
index 0000000..1532294
--- /dev/null
+++ b/src/main/java/org/apache/gossip/udp/UdpActiveGossipMessage.java
@@ -0,0 +1,31 @@
+package org.apache.gossip.udp;
+
+import org.apache.gossip.model.ActiveGossipMessage;
+
+public class UdpActiveGossipMessage extends ActiveGossipMessage implements Trackable {
+
+ private String uriFrom;
+ private String uuid;
+
+ public String getUriFrom() {
+ return uriFrom;
+ }
+
+ public void setUriFrom(String uriFrom) {
+ this.uriFrom = uriFrom;
+ }
+
+ public String getUuid() {
+ return uuid;
+ }
+
+ public void setUuid(String uuid) {
+ this.uuid = uuid;
+ }
+
+ @Override
+ public String toString() {
+ return "UdpActiveGossipMessage [uriFrom=" + uriFrom + ", uuid=" + uuid + "]";
+ }
+
+}
diff --git a/src/main/java/org/apache/gossip/udp/UdpActiveGossipOk.java b/src/main/java/org/apache/gossip/udp/UdpActiveGossipOk.java
new file mode 100644
index 0000000..119bc50
--- /dev/null
+++ b/src/main/java/org/apache/gossip/udp/UdpActiveGossipOk.java
@@ -0,0 +1,27 @@
+package org.apache.gossip.udp;
+
+import org.apache.gossip.model.ActiveGossipOk;
+
+public class UdpActiveGossipOk extends ActiveGossipOk implements Trackable {
+
+
+ private String uriFrom;
+ private String uuid;
+
+ public String getUriFrom() {
+ return uriFrom;
+ }
+
+ public void setUriFrom(String uriFrom) {
+ this.uriFrom = uriFrom;
+ }
+
+ public String getUuid() {
+ return uuid;
+ }
+
+ public void setUuid(String uuid) {
+ this.uuid = uuid;
+ }
+
+}
diff --git a/src/main/java/org/apache/gossip/udp/UdpNotAMemberFault.java b/src/main/java/org/apache/gossip/udp/UdpNotAMemberFault.java
new file mode 100644
index 0000000..7b4d5ba
--- /dev/null
+++ b/src/main/java/org/apache/gossip/udp/UdpNotAMemberFault.java
@@ -0,0 +1,29 @@
+package org.apache.gossip.udp;
+
+import org.apache.gossip.model.NotAMemberFault;
+
+public class UdpNotAMemberFault extends NotAMemberFault implements Trackable{
+
+ public UdpNotAMemberFault(){
+
+ }
+ private String uriFrom;
+ private String uuid;
+
+ public String getUriFrom() {
+ return uriFrom;
+ }
+
+ public void setUriFrom(String uriFrom) {
+ this.uriFrom = uriFrom;
+ }
+
+ public String getUuid() {
+ return uuid;
+ }
+
+ public void setUuid(String uuid) {
+ this.uuid = uuid;
+ }
+
+}