GOSSIP-21 Gossip user defined data
diff --git a/pom.xml b/pom.xml
index 912da3b..581c8b1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -41,6 +41,8 @@
<!-- dependecy versions -->
<jackson-datatype-json-org.version>1.8.0</jackson-datatype-json-org.version>
<junit.jupiter.version>5.0.0-M2</junit.jupiter.version>
+ <junit.platform.version>1.0.0-M2</junit.platform.version>
+ <junit.vintage.version>4.12.0-M2</junit.vintage.version>
<log4j.version>1.2.17</log4j.version>
<tunit.version>0.0.0</tunit.version>
@@ -78,15 +80,30 @@
<artifactId>jackson-datatype-json-org</artifactId>
<version>${jackson-datatype-json-org.version}</version>
</dependency>
-
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>${junit.jupiter.version}</version>
<scope>test</scope>
</dependency>
-
-
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-engine</artifactId>
+ <version>${junit.jupiter.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.junit.vintage</groupId>
+ <artifactId>junit-vintage-engine</artifactId>
+ <version>${junit.vintage.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.junit.platform</groupId>
+ <artifactId>junit-platform-runner</artifactId>
+ <version>${junit.platform.version}</version>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>io.teknek</groupId>
<artifactId>tunit</artifactId>
@@ -119,24 +136,6 @@
<build>
<pluginManagement>
<plugins>
- <!-- <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-release-plugin</artifactId>
- <version>2.1</version> <configuration> <mavenExecutorId>forked-path</mavenExecutorId>
- <useReleaseProfile>false</useReleaseProfile> <arguments>${arguments} -Psonatype-oss-release</arguments>
- </configuration> </plugin> -->
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-gpg-plugin</artifactId>
- <version>${maven-gpg-plugin.version}</version>
- <executions>
- <execution>
- <id>sign-artifacts</id>
- <phase>verify</phase>
- <goals>
- <goal>sign</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
@@ -153,24 +152,6 @@
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-eclipse-plugin</artifactId>
- <version>${maven-eclipse-plugin.version}</version>
- <configuration>
- <projectNameTemplate>[artifactId]</projectNameTemplate>
- <wtpmanifest>true</wtpmanifest>
- <wtpapplicationxml>true</wtpapplicationxml>
- <additionalBuildcommands>
- <buildcommand>org.eclipse.jdt.core.javabuilder</buildcommand>
- <buildcommand>org.maven.ide.eclipse.maven2Builder</buildcommand>
- </additionalBuildcommands>
- <additionalProjectnatures>
- <projectnature>org.eclipse.jdt.core.javanature</projectnature>
- <projectnature>org.maven.ide.eclipse.maven2Nature</projectnature>
- </additionalProjectnatures>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>${maven-compiler-plugin.version}</version>
<configuration>
@@ -186,6 +167,27 @@
</plugin>
</plugins>
</pluginManagement>
+ <plugins>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.1</version>
+ <configuration>
+ <source>${java.version}</source>
+ <target>${java.version}</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.19.1</version>
+ <dependencies>
+ <dependency>
+ <groupId>org.junit.platform</groupId>
+ <artifactId>junit-platform-surefire-provider</artifactId>
+ <version>${junit.platform.version}</version>
+ </dependency>
+ </dependencies>
+ </plugin>
+ </plugins>
</build>
<repositories>
diff --git a/src/main/java/org/apache/gossip/GossipService.java b/src/main/java/org/apache/gossip/GossipService.java
index 6f9b5be..6c02e2c 100644
--- a/src/main/java/org/apache/gossip/GossipService.java
+++ b/src/main/java/org/apache/gossip/GossipService.java
@@ -71,8 +71,8 @@
}
public void start() {
- LOGGER.debug("Starting: " + gossipManager.getName() + " - " + get_gossipManager().getMyself().getUri());
- gossipManager.start();
+ LOGGER.debug("Starting: " + get_gossipManager().getMyself().getUri());
+ gossipManager.init();
}
public void shutdown() {
diff --git a/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java b/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java
index 4790c09..19caffe 100644
--- a/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java
+++ b/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.net.DatagramSocket;
import java.util.List;
+
import java.util.Map.Entry;
import java.util.Random;
import java.util.UUID;
@@ -28,7 +29,6 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import org.apache.gossip.GossipService;
import org.apache.gossip.LocalGossipMember;
import org.apache.gossip.model.ActiveGossipOk;
import org.apache.gossip.model.GossipDataMessage;
@@ -36,6 +36,7 @@
import org.apache.gossip.model.Response;
import org.apache.gossip.udp.UdpActiveGossipMessage;
import org.apache.gossip.udp.UdpGossipDataMessage;
+
import org.apache.log4j.Logger;
import org.codehaus.jackson.map.ObjectMapper;
@@ -60,60 +61,32 @@
this.gossipCore = gossipCore;
this.scheduledExecutorService = Executors.newScheduledThreadPool(1024);
}
-
- public void init(){
- Runnable liveGossip = new Runnable(){
- @Override
- public void run() {
- try {
- sendMembershipList(gossipManager.getMyself(), gossipManager.getLiveMembers());
- } catch (RuntimeException ex){
- LOGGER.warn(ex);
- }
- }
- };
- scheduledExecutorService.scheduleAtFixedRate(liveGossip, 0,
+
+ public void init() {
+ scheduledExecutorService.scheduleAtFixedRate(
+ () -> sendMembershipList(gossipManager.getMyself(), gossipManager.getLiveMembers()), 0,
gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
- Runnable deadGossip = new Runnable(){
- @Override
- public void run() {
- try {
- sendMembershipList(gossipManager.getMyself(), gossipManager.getDeadMembers());
- } catch (RuntimeException ex){
- LOGGER.warn(ex);
- }
- }
- };
- scheduledExecutorService.scheduleAtFixedRate(deadGossip, 0,
+ scheduledExecutorService.scheduleAtFixedRate(
+ () -> sendMembershipList(gossipManager.getMyself(), gossipManager.getDeadMembers()), 0,
gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
- Runnable dataGossip = new Runnable(){
- @Override
- public void run() {
- try {
- sendData(gossipManager.getMyself(), gossipManager.getLiveMembers());
- } catch (RuntimeException ex){
- LOGGER.warn(ex);
- }
- }
- };
- scheduledExecutorService.scheduleAtFixedRate(dataGossip, 0,
+ scheduledExecutorService.scheduleAtFixedRate(
+ () -> sendData(gossipManager.getMyself(), gossipManager.getLiveMembers()), 0,
gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
-
}
-
+
public void shutdown() {
scheduledExecutorService.shutdown();
try {
scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
- LOGGER.warn(e);
+ LOGGER.debug("Issue during shurdown" + e);
}
}
public void sendData(LocalGossipMember me, List<LocalGossipMember> memberList){
LocalGossipMember member = selectPartner(memberList);
if (member == null) {
- GossipService.LOGGER.debug("Send sendMembershipList() is called without action");
+ LOGGER.debug("Send sendMembershipList() is called without action");
return;
}
try (DatagramSocket socket = new DatagramSocket()) {
@@ -121,7 +94,6 @@
for (Entry<String, ConcurrentHashMap<String, GossipDataMessage>> entry : gossipCore.getPerNodeData().entrySet()){
for (Entry<String, GossipDataMessage> innerEntry : entry.getValue().entrySet()){
UdpGossipDataMessage message = new UdpGossipDataMessage();
- System.out.println("sending message " + message);
message.setUuid(UUID.randomUUID().toString());
message.setUriFrom(me.getId());
message.setExpireAt(innerEntry.getValue().getExpireAt());
@@ -133,32 +105,29 @@
byte[] json_bytes = MAPPER.writeValueAsString(message).getBytes();
int packet_length = json_bytes.length;
if (packet_length < GossipManager.MAX_PACKET_SIZE) {
- //Response r = gossipCore.send(message, member.getUri());
gossipCore.sendOneWay(message, member.getUri());
- //TODO: ack this message
} else {
- GossipService.LOGGER.error("The length of the to be send message is too large ("
+ LOGGER.error("The length of the to be send message is too large ("
+ packet_length + " > " + GossipManager.MAX_PACKET_SIZE + ").");
}
}
}
} catch (IOException e1) {
- GossipService.LOGGER.warn(e1);
+ LOGGER.warn(e1);
}
}
/**
* Performs the sending of the membership list, after we have incremented our own heartbeat.
*/
- protected void sendMembershipList(LocalGossipMember me, List<LocalGossipMember> memberList) {
-
+ protected void sendMembershipList(LocalGossipMember me, List<LocalGossipMember> memberList) {
me.setHeartbeat(System.currentTimeMillis());
LocalGossipMember member = selectPartner(memberList);
if (member == null) {
- GossipService.LOGGER.debug("Send sendMembershipList() is called without action");
+ LOGGER.debug("Send sendMembershipList() is called without action");
return;
} else {
- GossipService.LOGGER.debug("Send sendMembershipList() is called to " + member.toString());
+ LOGGER.debug("Send sendMembershipList() is called to " + member.toString());
}
try (DatagramSocket socket = new DatagramSocket()) {
@@ -180,17 +149,15 @@
LOGGER.warn("Message "+ message + " generated response "+ r);
}
} else {
- GossipService.LOGGER.error("The length of the to be send message is too large ("
+ LOGGER.error("The length of the to be send message is too large ("
+ packet_length + " > " + GossipManager.MAX_PACKET_SIZE + ").");
}
} catch (IOException e1) {
- GossipService.LOGGER.warn(e1);
+ LOGGER.warn(e1);
}
}
-
+
/**
- * Abstract method which should be implemented by a subclass. This method should return a member
- * of the list to gossip with.
*
* @param memberList
* The list of members which are stored in the local list of members.
@@ -202,8 +169,7 @@
int randomNeighborIndex = random.nextInt(memberList.size());
member = memberList.get(randomNeighborIndex);
} else {
- GossipService.LOGGER.debug("I am alone in this world.");
-
+ LOGGER.debug("I am alone in this world.");
}
return member;
}
diff --git a/src/main/java/org/apache/gossip/manager/GossipCore.java b/src/main/java/org/apache/gossip/manager/GossipCore.java
index 8bcba46..46d855a 100644
--- a/src/main/java/org/apache/gossip/manager/GossipCore.java
+++ b/src/main/java/org/apache/gossip/manager/GossipCore.java
@@ -47,6 +47,10 @@
perNodeData = new ConcurrentHashMap<>();
}
+ /**
+ *
+ * @param message
+ */
public void addPerNodeData(GossipDataMessage message){
ConcurrentHashMap<String,GossipDataMessage> m = new ConcurrentHashMap<>();
m.put(message.getKey(), message);
@@ -70,7 +74,6 @@
}
public void recieve(Base base){
- System.out.println(base);
if (base instanceof Response){
if (base instanceof Trackable){
Trackable t = (Trackable) base;
@@ -80,11 +83,6 @@
if (base instanceof GossipDataMessage) {
UdpGossipDataMessage message = (UdpGossipDataMessage) base;
addPerNodeData(message);
- /*
- UdpActiveGossipOk o = new UdpActiveGossipOk();
- o.setUriFrom(message.getUriFrom());
- o.setUuid(message.getUuid());
- sendOneWay(o, senderMember.getUri());*/
}
if (base instanceof ActiveGossipMessage){
List<GossipMember> remoteGossipMembers = new ArrayList<>();
@@ -178,11 +176,11 @@
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
- LOGGER.error(e.getMessage(), e);
+ LOGGER.debug(e.getMessage(), e);
return null;
} catch (TimeoutException e) {
boolean cancelled = response.cancel(true);
- LOGGER.error(String.format("Threadpool timeout attempting to contact %s, cancelled ? %b", uri.toString(), cancelled));
+ LOGGER.debug(String.format("Threadpool timeout attempting to contact %s, cancelled ? %b", uri.toString(), cancelled));
return null;
} finally {
if (t != null){
diff --git a/src/main/java/org/apache/gossip/manager/GossipManager.java b/src/main/java/org/apache/gossip/manager/GossipManager.java
index 36bc10a..94b57d1 100644
--- a/src/main/java/org/apache/gossip/manager/GossipManager.java
+++ b/src/main/java/org/apache/gossip/manager/GossipManager.java
@@ -41,9 +41,11 @@
import org.apache.gossip.event.GossipListener;
import org.apache.gossip.event.GossipState;
import org.apache.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread;
+
import org.apache.gossip.model.GossipDataMessage;
-public abstract class GossipManager extends Thread implements NotificationListener {
+
+public abstract class GossipManager implements NotificationListener {
public static final Logger LOGGER = Logger.getLogger(GossipManager.class);
@@ -180,7 +182,7 @@
* Starts the client. Specifically, start the various cycles for this protocol. Start the gossip
* thread and start the receiver thread.
*/
- public void run() {
+ public void init() {
for (LocalGossipMember member : members.keySet()) {
if (member != me) {
member.startTimeoutTimer();
@@ -191,14 +193,6 @@
activeGossipThread = new ActiveGossipThread(this, this.gossipCore);
activeGossipThread.init();
GossipService.LOGGER.debug("The GossipService is started.");
- while (gossipServiceRunning.get()) {
- try {
- // TODO
- TimeUnit.MILLISECONDS.sleep(1);
- } catch (InterruptedException e) {
- GossipService.LOGGER.warn("The GossipClient was interrupted.");
- }
- }
}
/**
@@ -227,7 +221,6 @@
public void gossipData(GossipDataMessage message){
message.setNodeId(me.getId());
gossipCore.addPerNodeData(message);
- System.out.println(this.getMyself() + " " + gossipCore.getPerNodeData());
}
public GossipDataMessage findGossipData(String nodeId, String key){
diff --git a/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java b/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java
index 6d440de..11c371e 100644
--- a/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java
+++ b/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java
@@ -121,17 +121,4 @@
}
}
- /**
- * Abstract method for merging the local and remote list.
- *
- * @param gossipManager
- * The GossipManager for retrieving the local members and dead members list.
- * @param senderMember
- * The member who is sending this list, this could be used to send a response if the
- * remote list contains out-dated information.
- * @param remoteList
- * The list of members known at the remote side.
- */
- abstract protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember,
- List<GossipMember> remoteList);
}
\ No newline at end of file
diff --git a/src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java b/src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java
index 0b51573..dff5056 100644
--- a/src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java
+++ b/src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java
@@ -17,11 +17,6 @@
*/
package org.apache.gossip.manager.impl;
-import java.util.List;
-
-import org.apache.gossip.GossipMember;
-import org.apache.gossip.LocalGossipMember;
-import org.apache.gossip.RemoteGossipMember;
import org.apache.gossip.manager.GossipCore;
import org.apache.gossip.manager.GossipManager;
import org.apache.gossip.manager.PassiveGossipThread;
@@ -35,79 +30,4 @@
super(gossipManager, gossipCore);
}
- /**
- * Merge remote list (received from peer), and our local member list. Simply, we must update the
- * heartbeats that the remote list has with our list. Also, some additional logic is needed to
- * make sure we have not timed out a member and then immediately received a list with that member.
- *
- * @param gossipManager
- * @param senderMember
- * @param remoteList
- */
- protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember,
- List<GossipMember> remoteList) {
-
- // if the person sending to us is in the dead list consider them up
- for (LocalGossipMember i : gossipManager.getDeadList()) {
- if (i.getId().equals(senderMember.getId())) {
- LOGGER.info(gossipManager.getMyself() + " contacted by dead member " + senderMember.getUri());
- LocalGossipMember newLocalMember = new LocalGossipMember(senderMember.getClusterName(),
- senderMember.getUri(), senderMember.getId(),
- senderMember.getHeartbeat(), gossipManager, gossipManager.getSettings()
- .getCleanupInterval());
- gossipManager.reviveMember(newLocalMember);
- newLocalMember.startTimeoutTimer();
- }
- }
- for (GossipMember remoteMember : remoteList) {
- if (remoteMember.getId().equals(gossipManager.getMyself().getId())) {
- continue;
- }
- if (gossipManager.getLiveMembers().contains(remoteMember)) {
- LocalGossipMember localMember = gossipManager.getLiveMembers().get(
- gossipManager.getLiveMembers().indexOf(remoteMember));
- if (remoteMember.getHeartbeat() > localMember.getHeartbeat()) {
- localMember.setHeartbeat(remoteMember.getHeartbeat());
- localMember.resetTimeoutTimer();
- }
- } else if (!gossipManager.getLiveMembers().contains(remoteMember)
- && !gossipManager.getDeadList().contains(remoteMember)) {
- LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getClusterName(),
- remoteMember.getUri(), remoteMember.getId(),
- remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings()
- .getCleanupInterval());
- gossipManager.createOrReviveMember(newLocalMember);
- newLocalMember.startTimeoutTimer();
- } else {
- if (gossipManager.getDeadList().contains(remoteMember)) {
- LocalGossipMember localDeadMember = gossipManager.getDeadList().get(
- gossipManager.getDeadList().indexOf(remoteMember));
- if (remoteMember.getHeartbeat() > localDeadMember.getHeartbeat()) {
- LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getClusterName(),
- remoteMember.getUri(), remoteMember.getId(),
- remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings()
- .getCleanupInterval());
- gossipManager.reviveMember(newLocalMember);
- newLocalMember.startTimeoutTimer();
- LOGGER.debug("Removed remote member " + remoteMember.getAddress()
- + " from dead list and added to local member list.");
- } else {
- LOGGER.debug("me " + gossipManager.getMyself());
- LOGGER.debug("sender " + senderMember);
- LOGGER.debug("remote " + remoteList);
- LOGGER.debug("live " + gossipManager.getLiveMembers());
- LOGGER.debug("dead " + gossipManager.getDeadList());
- }
- } else {
- LOGGER.debug("me " + gossipManager.getMyself());
- LOGGER.debug("sender " + senderMember);
- LOGGER.debug("remote " + remoteList);
- LOGGER.debug("live " + gossipManager.getLiveMembers());
- LOGGER.debug("dead " + gossipManager.getDeadList());
- // throw new IllegalArgumentException("wtf");
- }
- }
- }
- }
-
}
diff --git a/src/main/java/org/apache/gossip/model/GossipDataMessage.java b/src/main/java/org/apache/gossip/model/GossipDataMessage.java
index 2128dfe..835c668 100644
--- a/src/main/java/org/apache/gossip/model/GossipDataMessage.java
+++ b/src/main/java/org/apache/gossip/model/GossipDataMessage.java
@@ -2,7 +2,6 @@
public class GossipDataMessage extends Base {
-
private String nodeId;
private String key;
private Object payload;
diff --git a/src/test/java/org/apache/gossip/DataTest.java b/src/test/java/org/apache/gossip/DataTest.java
index 3f42eeb..6260f9b 100644
--- a/src/test/java/org/apache/gossip/DataTest.java
+++ b/src/test/java/org/apache/gossip/DataTest.java
@@ -17,7 +17,7 @@
import io.teknek.tunit.TUnit;
public class DataTest {
-
+
@Test
public void abc() throws InterruptedException, UnknownHostException, URISyntaxException{
GossipSettings settings = new GossipSettings();
@@ -36,7 +36,7 @@
startupMembers, settings,
new GossipListener(){
public void gossipEvent(GossipMember member, GossipState state) {
- System.out.println(member + " " + state);
+
}
});
clients.add(gossipService);
diff --git a/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java b/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java
index 251550b..82cb625 100644
--- a/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java
+++ b/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java
@@ -29,107 +29,115 @@
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
-
import org.apache.log4j.Logger;
import org.apache.gossip.event.GossipListener;
import org.apache.gossip.event.GossipState;
+import org.junit.platform.runner.JUnitPlatform;
import org.junit.jupiter.api.Test;
+import org.junit.runner.RunWith;
+
+@RunWith(JUnitPlatform.class)
public class ShutdownDeadtimeTest {
- private static final Logger log = Logger.getLogger(ShutdownDeadtimeTest.class );
+ private static final Logger log = Logger.getLogger(ShutdownDeadtimeTest.class);
+
@Test
- //@Ignore
- public void DeadNodesDoNotComeAliveAgain() throws InterruptedException, UnknownHostException, URISyntaxException {
- GossipSettings settings = new GossipSettings(1000, 10000);
- String cluster = UUID.randomUUID().toString();
-
- log.info( "Adding seed nodes" );
- int seedNodes = 3;
- List<GossipMember> startupMembers = new ArrayList<>();
- for (int i = 1; i < seedNodes + 1; ++i) {
- URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
- startupMembers.add(new RemoteGossipMember(cluster, uri, i + ""));
- }
+ public void DeadNodesDoNotComeAliveAgain()
+ throws InterruptedException, UnknownHostException, URISyntaxException {
+ GossipSettings settings = new GossipSettings(1000, 10000);
+ String cluster = UUID.randomUUID().toString();
- log.info( "Adding clients" );
- final List<GossipService> clients = new ArrayList<>();
- final int clusterMembers = 5;
- for (int i = 1; i < clusterMembers+1; ++i) {
- final int j = i;
- URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
- GossipService gossipService = new GossipService(cluster, uri, i + "",
- startupMembers, settings,
- new GossipListener(){
- @Override
- public void gossipEvent(GossipMember member, GossipState state) {
- System.out.println(System.currentTimeMillis() + " Member "+j + " reports "+ member+" "+ state);
- }
- });
- clients.add(gossipService);
- gossipService.start();
- }
- TUnit.assertThat(new Callable<Integer> (){
- public Integer call() throws Exception {
- int total = 0;
- for (int i = 0; i < clusterMembers; ++i) {
- total += clients.get(i).get_gossipManager().getLiveMembers().size();
- }
- return total;
- }}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(20);
+ log.info("Adding seed nodes");
+ int seedNodes = 3;
+ List<GossipMember> startupMembers = new ArrayList<>();
+ for (int i = 1; i < seedNodes + 1; ++i) {
+ URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
+ startupMembers.add(new RemoteGossipMember(cluster, uri, i + ""));
+ }
- // shutdown one client and verify that one client is lost.
- Random r = new Random();
- int randomClientId = r.nextInt(clusterMembers);
- log.info( "shutting down " + randomClientId );
- final int shutdownPort = clients.get(randomClientId).get_gossipManager().getMyself().getUri().getPort();
- final String shutdownId = clients.get(randomClientId).get_gossipManager().getMyself().getId();
- clients.get(randomClientId).shutdown();
- TUnit.assertThat(new Callable<Integer> (){
- public Integer call() throws Exception {
- int total = 0;
- for (int i = 0; i < clusterMembers; ++i) {
- total += clients.get(i).get_gossipManager().getLiveMembers().size();
- }
- return total;
- }}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(16);
- clients.remove(randomClientId);
-
- TUnit.assertThat(new Callable<Integer> (){
- public Integer call() throws Exception {
- int total = 0;
- for (int i = 0; i < clusterMembers - 1; ++i) {
- total += clients.get(i).get_gossipManager().getDeadList().size();
- }
- return total;
- }}).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(4);
-
- URI uri = new URI("udp://" + "127.0.0.1" + ":" + shutdownPort);
- // start client again
- GossipService gossipService = new GossipService(cluster, uri, shutdownId + "",
- startupMembers, settings,
- new GossipListener(){
- @Override
- public void gossipEvent(GossipMember member, GossipState state) {
- //System.out.println("revived " + member+" "+ state);
- }
+ log.info("Adding clients");
+ final List<GossipService> clients = new ArrayList<>();
+ final int clusterMembers = 5;
+ for (int i = 1; i < clusterMembers + 1; ++i) {
+ final int j = i;
+ URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
+ GossipService gossipService = new GossipService(cluster, uri, i + "", startupMembers,
+ settings, new GossipListener() {
+ @Override
+ public void gossipEvent(GossipMember member, GossipState state) {
+ System.out.println(System.currentTimeMillis() + " Member " + j + " reports "
+ + member + " " + state);
+ }
});
clients.add(gossipService);
gossipService.start();
-
- // verify that the client is alive again for every node
- TUnit.assertThat(new Callable<Integer> (){
- public Integer call() throws Exception {
- int total = 0;
- for (int i = 0; i < clusterMembers; ++i) {
- total += clients.get(i).get_gossipManager().getLiveMembers().size();
- }
- return total;
- }}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(20);
-
- for (int i = 0; i < clusterMembers; ++i) {
- clients.get(i).shutdown();
+ }
+ TUnit.assertThat(new Callable<Integer>() {
+ public Integer call() throws Exception {
+ int total = 0;
+ for (int i = 0; i < clusterMembers; ++i) {
+ total += clients.get(i).get_gossipManager().getLiveMembers().size();
+ }
+ return total;
}
+ }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(20);
+
+ // shutdown one client and verify that one client is lost.
+ Random r = new Random();
+ int randomClientId = r.nextInt(clusterMembers);
+ log.info("shutting down " + randomClientId);
+ final int shutdownPort = clients.get(randomClientId).get_gossipManager().getMyself().getUri()
+ .getPort();
+ final String shutdownId = clients.get(randomClientId).get_gossipManager().getMyself().getId();
+ clients.get(randomClientId).shutdown();
+ TUnit.assertThat(new Callable<Integer>() {
+ public Integer call() throws Exception {
+ int total = 0;
+ for (int i = 0; i < clusterMembers; ++i) {
+ total += clients.get(i).get_gossipManager().getLiveMembers().size();
+ }
+ return total;
+ }
+ }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(16);
+ clients.remove(randomClientId);
+
+ TUnit.assertThat(new Callable<Integer>() {
+ public Integer call() throws Exception {
+ int total = 0;
+ for (int i = 0; i < clusterMembers - 1; ++i) {
+ total += clients.get(i).get_gossipManager().getDeadList().size();
+ }
+ return total;
+ }
+ }).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(4);
+
+ URI uri = new URI("udp://" + "127.0.0.1" + ":" + shutdownPort);
+ // start client again
+ GossipService gossipService = new GossipService(cluster, uri, shutdownId + "", startupMembers,
+ settings, new GossipListener() {
+ @Override
+ public void gossipEvent(GossipMember member, GossipState state) {
+ // System.out.println("revived " + member+" "+ state);
+ }
+ });
+ clients.add(gossipService);
+ gossipService.start();
+
+ // verify that the client is alive again for every node
+ TUnit.assertThat(new Callable<Integer>() {
+ public Integer call() throws Exception {
+ int total = 0;
+ for (int i = 0; i < clusterMembers; ++i) {
+ total += clients.get(i).get_gossipManager().getLiveMembers().size();
+ }
+ return total;
+ }
+ }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(20);
+
+ for (int i = 0; i < clusterMembers; ++i) {
+ clients.get(i).shutdown();
+ }
}
}
diff --git a/src/test/java/org/apache/gossip/StartupSettingsTest.java b/src/test/java/org/apache/gossip/StartupSettingsTest.java
index ed069c3..3a52fc7 100644
--- a/src/test/java/org/apache/gossip/StartupSettingsTest.java
+++ b/src/test/java/org/apache/gossip/StartupSettingsTest.java
@@ -21,6 +21,8 @@
import org.json.JSONException;
import io.teknek.tunit.TUnit;
+
+import org.junit.After;
import org.junit.jupiter.api.Test;
import java.io.File;
@@ -32,15 +34,19 @@
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
+import org.junit.platform.runner.JUnitPlatform;
+import org.junit.runner.RunWith;
/**
* Tests support of using {@code StartupSettings} and thereby reading
* setup config from file.
*/
+@RunWith(JUnitPlatform.class)
public class StartupSettingsTest {
private static final Logger log = Logger.getLogger( StartupSettingsTest.class );
private static final String CLUSTER = UUID.randomUUID().toString();
+
@Test
public void testUsingSettingsFile() throws IOException, InterruptedException, JSONException, URISyntaxException {
File settingsFile = File.createTempFile("gossipTest",".json");
diff --git a/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java b/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java
index 350fc6f..0faa968 100644
--- a/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java
+++ b/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.gossip;
+package org.apache.gossip;
import io.teknek.tunit.TUnit;
@@ -27,14 +27,16 @@
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
-
-
+import org.junit.platform.runner.JUnitPlatform;
+import org.junit.runner.RunWith;
import org.apache.log4j.Logger;
import org.apache.gossip.event.GossipListener;
import org.apache.gossip.event.GossipState;
+import org.junit.After;
import org.junit.jupiter.api.Test;
+@RunWith(JUnitPlatform.class)
public class TenNodeThreeSeedTest {
private static final Logger log = Logger.getLogger( TenNodeThreeSeedTest.class );
diff --git a/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java b/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java
index 6c63516..875a7ab 100644
--- a/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java
+++ b/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java
@@ -24,6 +24,8 @@
import org.apache.gossip.event.GossipState;
import org.apache.gossip.manager.random.RandomGossipManager;
import org.junit.jupiter.api.Test;
+import org.junit.platform.runner.JUnitPlatform;
+import org.junit.runner.RunWith;
import javax.management.Notification;
import javax.management.NotificationListener;
@@ -37,20 +39,18 @@
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.expectThrows;
-
+@RunWith(JUnitPlatform.class)
public class RandomGossipManagerBuilderTest {
public static class TestGossipListener implements GossipListener {
@Override
public void gossipEvent(GossipMember member, GossipState state) {
- System.out.println("Got gossip event");
}
}
public static class TestNotificationListener implements NotificationListener {
@Override
public void handleNotification(Notification notification, Object o) {
- System.out.println("Got notification event");
}
}
@@ -73,8 +73,8 @@
expectThrows(IllegalArgumentException.class,() -> {
RandomGossipManager.newBuilder().withId("id").cluster("aCluster").build();
});
-
}
+
@Test
public void createMembersListIfNull() throws URISyntaxException {
RandomGossipManager gossipManager = RandomGossipManager.newBuilder()