Merge branch 'GOSSIP-21' of github.com:edwardcapriolo/incubator-gossip
diff --git a/src/main/java/org/apache/gossip/GossipService.java b/src/main/java/org/apache/gossip/GossipService.java
index 68a4ca2..6c02e2c 100644
--- a/src/main/java/org/apache/gossip/GossipService.java
+++ b/src/main/java/org/apache/gossip/GossipService.java
@@ -24,6 +24,7 @@
import org.apache.gossip.event.GossipListener;
import org.apache.gossip.manager.GossipManager;
import org.apache.gossip.manager.random.RandomGossipManager;
+import org.apache.gossip.model.GossipDataMessage;
import org.apache.log4j.Logger;
/**
@@ -81,6 +82,19 @@
public GossipManager get_gossipManager() {
return gossipManager;
}
+
+ /**
+ * Gossip data to the entire cluster
+ * @param message
+ */
+ public void gossipData(GossipDataMessage message){
+ gossipManager.gossipData(message);
+ }
+
+
+ public GossipDataMessage findGossipData(String nodeId, String key){
+ return this.get_gossipManager().findGossipData(nodeId, key);
+ }
public void set_gossipManager(GossipManager _gossipManager) {
this.gossipManager = _gossipManager;
diff --git a/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java b/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java
index b57c25a..19caffe 100644
--- a/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java
+++ b/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java
@@ -20,17 +20,23 @@
import java.io.IOException;
import java.net.DatagramSocket;
import java.util.List;
+
+import java.util.Map.Entry;
import java.util.Random;
import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import org.apache.gossip.GossipService;
+
import org.apache.gossip.LocalGossipMember;
import org.apache.gossip.model.ActiveGossipOk;
+import org.apache.gossip.model.GossipDataMessage;
import org.apache.gossip.model.GossipMember;
import org.apache.gossip.model.Response;
import org.apache.gossip.udp.UdpActiveGossipMessage;
+import org.apache.gossip.udp.UdpGossipDataMessage;
+
import org.apache.log4j.Logger;
import org.codehaus.jackson.map.ObjectMapper;
@@ -41,21 +47,21 @@
*/
public class ActiveGossipThread {
- public static final Logger LOGGER = Logger.getLogger(ActiveGossipThread.class);
+ private static final Logger LOGGER = Logger.getLogger(ActiveGossipThread.class);
- private ScheduledExecutorService scheduledExecutorService ;
- private ObjectMapper MAPPER = new ObjectMapper();
+ private final GossipManager gossipManager;
private final Random random;
- protected final GossipManager gossipManager;
private final GossipCore gossipCore;
+ private ScheduledExecutorService scheduledExecutorService;
+ private ObjectMapper MAPPER = new ObjectMapper();
public ActiveGossipThread(GossipManager gossipManager, GossipCore gossipCore) {
this.gossipManager = gossipManager;
- this.scheduledExecutorService = Executors.newScheduledThreadPool(1024);
+ random = new Random();
this.gossipCore = gossipCore;
- this.random = new Random();
+ this.scheduledExecutorService = Executors.newScheduledThreadPool(1024);
}
-
+
public void init() {
scheduledExecutorService.scheduleAtFixedRate(
() -> sendMembershipList(gossipManager.getMyself(), gossipManager.getLiveMembers()), 0,
@@ -63,29 +69,65 @@
scheduledExecutorService.scheduleAtFixedRate(
() -> sendMembershipList(gossipManager.getMyself(), gossipManager.getDeadMembers()), 0,
gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
+ scheduledExecutorService.scheduleAtFixedRate(
+ () -> sendData(gossipManager.getMyself(), gossipManager.getLiveMembers()), 0,
+ gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
}
public void shutdown() {
- this.scheduledExecutorService.shutdown();
+ scheduledExecutorService.shutdown();
try {
- this.scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS);
+ scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
- LOGGER.warn("Did not complete shutdown", e);
+ LOGGER.debug("Issue during shurdown" + e);
}
}
+ public void sendData(LocalGossipMember me, List<LocalGossipMember> memberList){
+ LocalGossipMember member = selectPartner(memberList);
+ if (member == null) {
+ LOGGER.debug("Send sendMembershipList() is called without action");
+ return;
+ }
+ try (DatagramSocket socket = new DatagramSocket()) {
+ socket.setSoTimeout(gossipManager.getSettings().getGossipInterval());
+ for (Entry<String, ConcurrentHashMap<String, GossipDataMessage>> entry : gossipCore.getPerNodeData().entrySet()){
+ for (Entry<String, GossipDataMessage> innerEntry : entry.getValue().entrySet()){
+ UdpGossipDataMessage message = new UdpGossipDataMessage();
+ message.setUuid(UUID.randomUUID().toString());
+ message.setUriFrom(me.getId());
+ message.setExpireAt(innerEntry.getValue().getExpireAt());
+ message.setKey(innerEntry.getValue().getKey());
+ message.setNodeId(innerEntry.getValue().getNodeId());
+ message.setTimestamp(innerEntry.getValue().getTimestamp());
+ message.setPayload(innerEntry.getValue().getPayload());
+ message.setTimestamp(innerEntry.getValue().getTimestamp());
+ byte[] json_bytes = MAPPER.writeValueAsString(message).getBytes();
+ int packet_length = json_bytes.length;
+ if (packet_length < GossipManager.MAX_PACKET_SIZE) {
+ gossipCore.sendOneWay(message, member.getUri());
+ } else {
+ LOGGER.error("The length of the to be send message is too large ("
+ + packet_length + " > " + GossipManager.MAX_PACKET_SIZE + ").");
+ }
+ }
+ }
+ } catch (IOException e1) {
+ LOGGER.warn(e1);
+ }
+ }
+
/**
* Performs the sending of the membership list, after we have incremented our own heartbeat.
*/
- protected void sendMembershipList(LocalGossipMember me, List<LocalGossipMember> memberList) {
-
+ protected void sendMembershipList(LocalGossipMember me, List<LocalGossipMember> memberList) {
me.setHeartbeat(System.currentTimeMillis());
LocalGossipMember member = selectPartner(memberList);
if (member == null) {
- GossipService.LOGGER.debug("Send sendMembershipList() is called without action");
+ LOGGER.debug("Send sendMembershipList() is called without action");
return;
} else {
- GossipService.LOGGER.debug("Send sendMembershipList() is called to " + member.toString());
+ LOGGER.debug("Send sendMembershipList() is called to " + member.toString());
}
try (DatagramSocket socket = new DatagramSocket()) {
@@ -107,31 +149,30 @@
LOGGER.warn("Message "+ message + " generated response "+ r);
}
} else {
- GossipService.LOGGER.error("The length of the to be send message is too large ("
+ LOGGER.error("The length of the to be send message is too large ("
+ packet_length + " > " + GossipManager.MAX_PACKET_SIZE + ").");
}
} catch (IOException e1) {
- GossipService.LOGGER.warn(e1);
+ LOGGER.warn(e1);
}
}
+
/**
- * Abstract method which should be implemented by a subclass. This method should return a member
- * of the list to gossip with.
*
* @param memberList
* The list of members which are stored in the local list of members.
* @return The chosen LocalGossipMember to gossip with.
*/
- protected LocalGossipMember selectPartner(List<LocalGossipMember> memberList) {
- LocalGossipMember member = null;
- if (memberList.size() > 0) {
- int randomNeighborIndex = random.nextInt(memberList.size());
- member = memberList.get(randomNeighborIndex);
- } else {
- LOGGER.debug("I am alone in this world.");
- }
- return member;
- }
+ protected LocalGossipMember selectPartner(List<LocalGossipMember> memberList) {
+ LocalGossipMember member = null;
+ if (memberList.size() > 0) {
+ int randomNeighborIndex = random.nextInt(memberList.size());
+ member = memberList.get(randomNeighborIndex);
+ } else {
+ LOGGER.debug("I am alone in this world.");
+ }
+ return member;
+ }
private GossipMember convert(LocalGossipMember member){
GossipMember gm = new GossipMember();
diff --git a/src/main/java/org/apache/gossip/manager/GossipCore.java b/src/main/java/org/apache/gossip/manager/GossipCore.java
index ab24621..46d855a 100644
--- a/src/main/java/org/apache/gossip/manager/GossipCore.java
+++ b/src/main/java/org/apache/gossip/manager/GossipCore.java
@@ -21,10 +21,12 @@
import org.apache.gossip.RemoteGossipMember;
import org.apache.gossip.model.ActiveGossipMessage;
import org.apache.gossip.model.Base;
+import org.apache.gossip.model.GossipDataMessage;
import org.apache.gossip.model.Response;
import org.apache.gossip.udp.Trackable;
import org.apache.gossip.udp.UdpActiveGossipMessage;
import org.apache.gossip.udp.UdpActiveGossipOk;
+import org.apache.gossip.udp.UdpGossipDataMessage;
import org.apache.gossip.udp.UdpNotAMemberFault;
import org.apache.log4j.Logger;
import org.codehaus.jackson.map.ObjectMapper;
@@ -34,15 +36,32 @@
public static final Logger LOGGER = Logger.getLogger(GossipCore.class);
private static final ObjectMapper MAPPER = new ObjectMapper();
private final GossipManager gossipManager;
-
private ConcurrentHashMap<String, Base> requests;
-
private ExecutorService service;
+ private final ConcurrentHashMap<String, ConcurrentHashMap<String, GossipDataMessage>> perNodeData;
public GossipCore(GossipManager manager){
this.gossipManager = manager;
requests = new ConcurrentHashMap<>();
service = Executors.newFixedThreadPool(500);
+ perNodeData = new ConcurrentHashMap<>();
+ }
+
+ /**
+ *
+ * @param message
+ */
+ public void addPerNodeData(GossipDataMessage message){
+ ConcurrentHashMap<String,GossipDataMessage> m = new ConcurrentHashMap<>();
+ m.put(message.getKey(), message);
+ m = perNodeData.putIfAbsent(message.getNodeId(), m);
+ if (m != null){
+ m.put(message.getKey(), message); //TODO only put if > ts
+ }
+ }
+
+ public ConcurrentHashMap<String, ConcurrentHashMap<String, GossipDataMessage>> getPerNodeData(){
+ return perNodeData;
}
public void shutdown(){
@@ -61,6 +80,10 @@
requests.put(t.getUuid() + "/" + t.getUriFrom(), (Base) t);
}
}
+ if (base instanceof GossipDataMessage) {
+ UdpGossipDataMessage message = (UdpGossipDataMessage) base;
+ addPerNodeData(message);
+ }
if (base instanceof ActiveGossipMessage){
List<GossipMember> remoteGossipMembers = new ArrayList<>();
RemoteGossipMember senderMember = null;
@@ -153,11 +176,11 @@
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
- LOGGER.error(e.getMessage(), e);
+ LOGGER.debug(e.getMessage(), e);
return null;
} catch (TimeoutException e) {
boolean cancelled = response.cancel(true);
- LOGGER.error(String.format("Threadpool timeout attempting to contact %s, cancelled ? %b", uri.toString(), cancelled));
+ LOGGER.debug(String.format("Threadpool timeout attempting to contact %s, cancelled ? %b", uri.toString(), cancelled));
return null;
} finally {
if (t != null){
diff --git a/src/main/java/org/apache/gossip/manager/GossipManager.java b/src/main/java/org/apache/gossip/manager/GossipManager.java
index 0b2cfd2..94b57d1 100644
--- a/src/main/java/org/apache/gossip/manager/GossipManager.java
+++ b/src/main/java/org/apache/gossip/manager/GossipManager.java
@@ -22,6 +22,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -41,6 +42,9 @@
import org.apache.gossip.event.GossipState;
import org.apache.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread;
+import org.apache.gossip.model.GossipDataMessage;
+
+
public abstract class GossipManager implements NotificationListener {
public static final Logger LOGGER = Logger.getLogger(GossipManager.class);
@@ -213,4 +217,19 @@
LOGGER.error(e);
}
}
+
+ public void gossipData(GossipDataMessage message){
+ message.setNodeId(me.getId());
+ gossipCore.addPerNodeData(message);
+ }
+
+ public GossipDataMessage findGossipData(String nodeId, String key){
+ ConcurrentHashMap<String, GossipDataMessage> j = gossipCore.getPerNodeData().get(nodeId);
+ if (j == null){
+ return null;
+ } else {
+ return j.get(key);
+ }
+ }
+
}
diff --git a/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java b/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java
index 6d440de..11c371e 100644
--- a/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java
+++ b/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java
@@ -121,17 +121,4 @@
}
}
- /**
- * Abstract method for merging the local and remote list.
- *
- * @param gossipManager
- * The GossipManager for retrieving the local members and dead members list.
- * @param senderMember
- * The member who is sending this list, this could be used to send a response if the
- * remote list contains out-dated information.
- * @param remoteList
- * The list of members known at the remote side.
- */
- abstract protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember,
- List<GossipMember> remoteList);
}
\ No newline at end of file
diff --git a/src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java b/src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java
index 0b51573..dff5056 100644
--- a/src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java
+++ b/src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java
@@ -17,11 +17,6 @@
*/
package org.apache.gossip.manager.impl;
-import java.util.List;
-
-import org.apache.gossip.GossipMember;
-import org.apache.gossip.LocalGossipMember;
-import org.apache.gossip.RemoteGossipMember;
import org.apache.gossip.manager.GossipCore;
import org.apache.gossip.manager.GossipManager;
import org.apache.gossip.manager.PassiveGossipThread;
@@ -35,79 +30,4 @@
super(gossipManager, gossipCore);
}
- /**
- * Merge remote list (received from peer), and our local member list. Simply, we must update the
- * heartbeats that the remote list has with our list. Also, some additional logic is needed to
- * make sure we have not timed out a member and then immediately received a list with that member.
- *
- * @param gossipManager
- * @param senderMember
- * @param remoteList
- */
- protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember,
- List<GossipMember> remoteList) {
-
- // if the person sending to us is in the dead list consider them up
- for (LocalGossipMember i : gossipManager.getDeadList()) {
- if (i.getId().equals(senderMember.getId())) {
- LOGGER.info(gossipManager.getMyself() + " contacted by dead member " + senderMember.getUri());
- LocalGossipMember newLocalMember = new LocalGossipMember(senderMember.getClusterName(),
- senderMember.getUri(), senderMember.getId(),
- senderMember.getHeartbeat(), gossipManager, gossipManager.getSettings()
- .getCleanupInterval());
- gossipManager.reviveMember(newLocalMember);
- newLocalMember.startTimeoutTimer();
- }
- }
- for (GossipMember remoteMember : remoteList) {
- if (remoteMember.getId().equals(gossipManager.getMyself().getId())) {
- continue;
- }
- if (gossipManager.getLiveMembers().contains(remoteMember)) {
- LocalGossipMember localMember = gossipManager.getLiveMembers().get(
- gossipManager.getLiveMembers().indexOf(remoteMember));
- if (remoteMember.getHeartbeat() > localMember.getHeartbeat()) {
- localMember.setHeartbeat(remoteMember.getHeartbeat());
- localMember.resetTimeoutTimer();
- }
- } else if (!gossipManager.getLiveMembers().contains(remoteMember)
- && !gossipManager.getDeadList().contains(remoteMember)) {
- LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getClusterName(),
- remoteMember.getUri(), remoteMember.getId(),
- remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings()
- .getCleanupInterval());
- gossipManager.createOrReviveMember(newLocalMember);
- newLocalMember.startTimeoutTimer();
- } else {
- if (gossipManager.getDeadList().contains(remoteMember)) {
- LocalGossipMember localDeadMember = gossipManager.getDeadList().get(
- gossipManager.getDeadList().indexOf(remoteMember));
- if (remoteMember.getHeartbeat() > localDeadMember.getHeartbeat()) {
- LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getClusterName(),
- remoteMember.getUri(), remoteMember.getId(),
- remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings()
- .getCleanupInterval());
- gossipManager.reviveMember(newLocalMember);
- newLocalMember.startTimeoutTimer();
- LOGGER.debug("Removed remote member " + remoteMember.getAddress()
- + " from dead list and added to local member list.");
- } else {
- LOGGER.debug("me " + gossipManager.getMyself());
- LOGGER.debug("sender " + senderMember);
- LOGGER.debug("remote " + remoteList);
- LOGGER.debug("live " + gossipManager.getLiveMembers());
- LOGGER.debug("dead " + gossipManager.getDeadList());
- }
- } else {
- LOGGER.debug("me " + gossipManager.getMyself());
- LOGGER.debug("sender " + senderMember);
- LOGGER.debug("remote " + remoteList);
- LOGGER.debug("live " + gossipManager.getLiveMembers());
- LOGGER.debug("dead " + gossipManager.getDeadList());
- // throw new IllegalArgumentException("wtf");
- }
- }
- }
- }
-
}
diff --git a/src/main/java/org/apache/gossip/model/Base.java b/src/main/java/org/apache/gossip/model/Base.java
index ebb3215..66c2be6 100644
--- a/src/main/java/org/apache/gossip/model/Base.java
+++ b/src/main/java/org/apache/gossip/model/Base.java
@@ -2,6 +2,7 @@
import org.apache.gossip.udp.UdpActiveGossipMessage;
import org.apache.gossip.udp.UdpActiveGossipOk;
+import org.apache.gossip.udp.UdpGossipDataMessage;
import org.apache.gossip.udp.UdpNotAMemberFault;
import org.codehaus.jackson.annotate.JsonSubTypes;
import org.codehaus.jackson.annotate.JsonSubTypes.Type;
@@ -17,7 +18,9 @@
@Type(value = ActiveGossipOk.class, name = "ActiveGossipOk"),
@Type(value = UdpActiveGossipOk.class, name = "UdpActiveGossipOk"),
@Type(value = UdpActiveGossipMessage.class, name = "UdpActiveGossipMessage"),
- @Type(value = UdpNotAMemberFault.class, name = "UdpNotAMemberFault")
+ @Type(value = UdpNotAMemberFault.class, name = "UdpNotAMemberFault"),
+ @Type(value = GossipDataMessage.class, name = "GossipDataMessage"),
+ @Type(value = UdpGossipDataMessage.class, name = "UdpGossipDataMessage")
})
public class Base {
diff --git a/src/main/java/org/apache/gossip/model/GossipDataMessage.java b/src/main/java/org/apache/gossip/model/GossipDataMessage.java
new file mode 100644
index 0000000..835c668
--- /dev/null
+++ b/src/main/java/org/apache/gossip/model/GossipDataMessage.java
@@ -0,0 +1,49 @@
+package org.apache.gossip.model;
+
+public class GossipDataMessage extends Base {
+
+ private String nodeId;
+ private String key;
+ private Object payload;
+ private Long timestamp;
+ private Long expireAt;
+
+ public String getNodeId() {
+ return nodeId;
+ }
+ public void setNodeId(String nodeId) {
+ this.nodeId = nodeId;
+ }
+ public String getKey() {
+ return key;
+ }
+ public void setKey(String key) {
+ this.key = key;
+ }
+ public Object getPayload() {
+ return payload;
+ }
+ public void setPayload(Object payload) {
+ this.payload = payload;
+ }
+ public Long getTimestamp() {
+ return timestamp;
+ }
+ public void setTimestamp(Long timestamp) {
+ this.timestamp = timestamp;
+ }
+ public Long getExpireAt() {
+ return expireAt;
+ }
+ public void setExpireAt(Long expireAt) {
+ this.expireAt = expireAt;
+ }
+ @Override
+ public String toString() {
+ return "GossipDataMessage [nodeId=" + nodeId + ", key=" + key + ", payload=" + payload
+ + ", timestamp=" + timestamp + ", expireAt=" + expireAt + "]";
+ }
+
+
+
+}
diff --git a/src/main/java/org/apache/gossip/udp/UdpGossipDataMessage.java b/src/main/java/org/apache/gossip/udp/UdpGossipDataMessage.java
new file mode 100644
index 0000000..2ee4bbf
--- /dev/null
+++ b/src/main/java/org/apache/gossip/udp/UdpGossipDataMessage.java
@@ -0,0 +1,31 @@
+package org.apache.gossip.udp;
+
+import org.apache.gossip.model.GossipDataMessage;
+
+public class UdpGossipDataMessage extends GossipDataMessage implements Trackable {
+
+ private String uriFrom;
+ private String uuid;
+
+ public String getUriFrom() {
+ return uriFrom;
+ }
+
+ public void setUriFrom(String uriFrom) {
+ this.uriFrom = uriFrom;
+ }
+
+ public String getUuid() {
+ return uuid;
+ }
+
+ public void setUuid(String uuid) {
+ this.uuid = uuid;
+ }
+
+ @Override
+ public String toString() {
+ return "UdpGossipDataMessage [uriFrom=" + uriFrom + ", uuid=" + uuid + "]";
+ }
+
+}
diff --git a/src/test/java/org/apache/gossip/DataTest.java b/src/test/java/org/apache/gossip/DataTest.java
new file mode 100644
index 0000000..6260f9b
--- /dev/null
+++ b/src/test/java/org/apache/gossip/DataTest.java
@@ -0,0 +1,81 @@
+package org.apache.gossip;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.gossip.event.GossipListener;
+import org.apache.gossip.event.GossipState;
+import org.apache.gossip.model.GossipDataMessage;
+import org.junit.Test;
+
+import io.teknek.tunit.TUnit;
+
+public class DataTest {
+
+ @Test
+ public void abc() throws InterruptedException, UnknownHostException, URISyntaxException{
+ GossipSettings settings = new GossipSettings();
+ String cluster = UUID.randomUUID().toString();
+ int seedNodes = 1;
+ List<GossipMember> startupMembers = new ArrayList<>();
+ for (int i = 1; i < seedNodes+1; ++i) {
+ URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
+ startupMembers.add(new RemoteGossipMember(cluster, uri, i + ""));
+ }
+ final List<GossipService> clients = new ArrayList<>();
+ final int clusterMembers = 2;
+ for (int i = 1; i < clusterMembers+1; ++i) {
+ URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
+ GossipService gossipService = new GossipService(cluster, uri, i + "",
+ startupMembers, settings,
+ new GossipListener(){
+ public void gossipEvent(GossipMember member, GossipState state) {
+
+ }
+ });
+ clients.add(gossipService);
+ gossipService.start();
+ }
+ TUnit.assertThat(new Callable<Integer> (){
+ public Integer call() throws Exception {
+ int total = 0;
+ for (int i = 0; i < clusterMembers; ++i) {
+ total += clients.get(i).get_gossipManager().getLiveMembers().size();
+ }
+ return total;
+ }}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(2);
+ clients.get(0).gossipData(msg());
+ Thread.sleep(10000);
+ TUnit.assertThat(
+
+ new Callable<Object> (){
+ public Object call() throws Exception {
+ GossipDataMessage x = clients.get(1).findGossipData(1+"" , "a");
+ if (x == null) return "";
+ else return x.getPayload();
+ }})
+
+
+ //() -> clients.get(1).findGossipData(1+"" , "a").getPayload())
+ .afterWaitingAtMost(20, TimeUnit.SECONDS)
+ .isEqualTo("b");
+ for (int i = 0; i < clusterMembers; ++i) {
+ clients.get(i).shutdown();
+ }
+ }
+
+ private GossipDataMessage msg(){
+ GossipDataMessage g = new GossipDataMessage();
+ g.setExpireAt(Long.MAX_VALUE);
+ g.setKey("a");
+ g.setPayload("b");
+ g.setTimestamp(System.currentTimeMillis());
+ return g;
+ }
+}
diff --git a/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java b/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java
index f0d7f10..82cb625 100644
--- a/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java
+++ b/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java
@@ -29,7 +29,6 @@
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
-
import org.apache.log4j.Logger;
import org.apache.gossip.event.GossipListener;
@@ -43,97 +42,102 @@
public class ShutdownDeadtimeTest {
private static final Logger log = Logger.getLogger(ShutdownDeadtimeTest.class);
-
+
@Test
- public void DeadNodesDoNotComeAliveAgain() throws InterruptedException, UnknownHostException, URISyntaxException {
- GossipSettings settings = new GossipSettings(1000, 10000);
- String cluster = UUID.randomUUID().toString();
-
- log.info( "Adding seed nodes" );
- int seedNodes = 3;
- List<GossipMember> startupMembers = new ArrayList<>();
- for (int i = 1; i < seedNodes + 1; ++i) {
- URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
- startupMembers.add(new RemoteGossipMember(cluster, uri, i + ""));
- }
+ public void DeadNodesDoNotComeAliveAgain()
+ throws InterruptedException, UnknownHostException, URISyntaxException {
+ GossipSettings settings = new GossipSettings(1000, 10000);
+ String cluster = UUID.randomUUID().toString();
- log.info( "Adding clients" );
- final List<GossipService> clients = new ArrayList<>();
- final int clusterMembers = 5;
- for (int i = 1; i < clusterMembers+1; ++i) {
- final int j = i;
- URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
- GossipService gossipService = new GossipService(cluster, uri, i + "",
- startupMembers, settings,
- new GossipListener(){
- @Override
- public void gossipEvent(GossipMember member, GossipState state) {
- System.out.println(System.currentTimeMillis() + " Member "+j + " reports "+ member+" "+ state);
- }
- });
- clients.add(gossipService);
- gossipService.start();
- }
- TUnit.assertThat(new Callable<Integer> (){
- public Integer call() throws Exception {
- int total = 0;
- for (int i = 0; i < clusterMembers; ++i) {
- total += clients.get(i).get_gossipManager().getLiveMembers().size();
- }
- return total;
- }}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(20);
+ log.info("Adding seed nodes");
+ int seedNodes = 3;
+ List<GossipMember> startupMembers = new ArrayList<>();
+ for (int i = 1; i < seedNodes + 1; ++i) {
+ URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
+ startupMembers.add(new RemoteGossipMember(cluster, uri, i + ""));
+ }
- // shutdown one client and verify that one client is lost.
- Random r = new Random();
- int randomClientId = r.nextInt(clusterMembers);
- log.info( "shutting down " + randomClientId );
- final int shutdownPort = clients.get(randomClientId).get_gossipManager().getMyself().getUri().getPort();
- final String shutdownId = clients.get(randomClientId).get_gossipManager().getMyself().getId();
- clients.get(randomClientId).shutdown();
- TUnit.assertThat(new Callable<Integer> (){
- public Integer call() throws Exception {
- int total = 0;
- for (int i = 0; i < clusterMembers; ++i) {
- total += clients.get(i).get_gossipManager().getLiveMembers().size();
- }
- return total;
- }}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(16);
- clients.remove(randomClientId);
-
- TUnit.assertThat(new Callable<Integer> (){
- public Integer call() throws Exception {
- int total = 0;
- for (int i = 0; i < clusterMembers - 1; ++i) {
- total += clients.get(i).get_gossipManager().getDeadList().size();
- }
- return total;
- }}).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(4);
-
- URI uri = new URI("udp://" + "127.0.0.1" + ":" + shutdownPort);
- // start client again
- GossipService gossipService = new GossipService(cluster, uri, shutdownId + "",
- startupMembers, settings,
- new GossipListener(){
- @Override
- public void gossipEvent(GossipMember member, GossipState state) {
- //System.out.println("revived " + member+" "+ state);
- }
+ log.info("Adding clients");
+ final List<GossipService> clients = new ArrayList<>();
+ final int clusterMembers = 5;
+ for (int i = 1; i < clusterMembers + 1; ++i) {
+ final int j = i;
+ URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
+ GossipService gossipService = new GossipService(cluster, uri, i + "", startupMembers,
+ settings, new GossipListener() {
+ @Override
+ public void gossipEvent(GossipMember member, GossipState state) {
+ System.out.println(System.currentTimeMillis() + " Member " + j + " reports "
+ + member + " " + state);
+ }
});
clients.add(gossipService);
gossipService.start();
-
- // verify that the client is alive again for every node
- TUnit.assertThat(new Callable<Integer> (){
- public Integer call() throws Exception {
- int total = 0;
- for (int i = 0; i < clusterMembers; ++i) {
- total += clients.get(i).get_gossipManager().getLiveMembers().size();
- }
- return total;
- }}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(20);
-
- for (int i = 0; i < clusterMembers; ++i) {
- clients.get(i).shutdown();
+ }
+ TUnit.assertThat(new Callable<Integer>() {
+ public Integer call() throws Exception {
+ int total = 0;
+ for (int i = 0; i < clusterMembers; ++i) {
+ total += clients.get(i).get_gossipManager().getLiveMembers().size();
+ }
+ return total;
}
+ }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(20);
+
+ // shutdown one client and verify that one client is lost.
+ Random r = new Random();
+ int randomClientId = r.nextInt(clusterMembers);
+ log.info("shutting down " + randomClientId);
+ final int shutdownPort = clients.get(randomClientId).get_gossipManager().getMyself().getUri()
+ .getPort();
+ final String shutdownId = clients.get(randomClientId).get_gossipManager().getMyself().getId();
+ clients.get(randomClientId).shutdown();
+ TUnit.assertThat(new Callable<Integer>() {
+ public Integer call() throws Exception {
+ int total = 0;
+ for (int i = 0; i < clusterMembers; ++i) {
+ total += clients.get(i).get_gossipManager().getLiveMembers().size();
+ }
+ return total;
+ }
+ }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(16);
+ clients.remove(randomClientId);
+
+ TUnit.assertThat(new Callable<Integer>() {
+ public Integer call() throws Exception {
+ int total = 0;
+ for (int i = 0; i < clusterMembers - 1; ++i) {
+ total += clients.get(i).get_gossipManager().getDeadList().size();
+ }
+ return total;
+ }
+ }).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(4);
+
+ URI uri = new URI("udp://" + "127.0.0.1" + ":" + shutdownPort);
+ // start client again
+ GossipService gossipService = new GossipService(cluster, uri, shutdownId + "", startupMembers,
+ settings, new GossipListener() {
+ @Override
+ public void gossipEvent(GossipMember member, GossipState state) {
+ // System.out.println("revived " + member+" "+ state);
+ }
+ });
+ clients.add(gossipService);
+ gossipService.start();
+
+ // verify that the client is alive again for every node
+ TUnit.assertThat(new Callable<Integer>() {
+ public Integer call() throws Exception {
+ int total = 0;
+ for (int i = 0; i < clusterMembers; ++i) {
+ total += clients.get(i).get_gossipManager().getLiveMembers().size();
+ }
+ return total;
+ }
+ }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(20);
+
+ for (int i = 0; i < clusterMembers; ++i) {
+ clients.get(i).shutdown();
+ }
}
}
diff --git a/src/test/java/org/apache/gossip/StartupSettingsTest.java b/src/test/java/org/apache/gossip/StartupSettingsTest.java
index 9019ac1..3a52fc7 100644
--- a/src/test/java/org/apache/gossip/StartupSettingsTest.java
+++ b/src/test/java/org/apache/gossip/StartupSettingsTest.java
@@ -21,6 +21,8 @@
import org.json.JSONException;
import io.teknek.tunit.TUnit;
+
+import org.junit.After;
import org.junit.jupiter.api.Test;
import java.io.File;
@@ -44,6 +46,7 @@
private static final Logger log = Logger.getLogger( StartupSettingsTest.class );
private static final String CLUSTER = UUID.randomUUID().toString();
+
@Test
public void testUsingSettingsFile() throws IOException, InterruptedException, JSONException, URISyntaxException {
File settingsFile = File.createTempFile("gossipTest",".json");
diff --git a/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java b/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java
index c98b0d3..0faa968 100644
--- a/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java
+++ b/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.gossip;
+package org.apache.gossip;
import io.teknek.tunit.TUnit;
@@ -33,6 +33,7 @@
import org.apache.gossip.event.GossipListener;
import org.apache.gossip.event.GossipState;
+import org.junit.After;
import org.junit.jupiter.api.Test;
@RunWith(JUnitPlatform.class)
diff --git a/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java b/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java
index ab3242c..875a7ab 100644
--- a/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java
+++ b/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java
@@ -45,14 +45,12 @@
public static class TestGossipListener implements GossipListener {
@Override
public void gossipEvent(GossipMember member, GossipState state) {
- System.out.println("Got gossip event");
}
}
public static class TestNotificationListener implements NotificationListener {
@Override
public void handleNotification(Notification notification, Object o) {
- System.out.println("Got notification event");
}
}
@@ -75,8 +73,8 @@
expectThrows(IllegalArgumentException.class,() -> {
RandomGossipManager.newBuilder().withId("id").cluster("aCluster").build();
});
-
}
+
@Test
public void createMembersListIfNull() throws URISyntaxException {
RandomGossipManager gossipManager = RandomGossipManager.newBuilder()