Better socket cleanup for ActiveThread
diff --git a/src/main/java/com/google/code/gossip/manager/ActiveGossipThread.java b/src/main/java/com/google/code/gossip/manager/ActiveGossipThread.java
index 1dd8837..cbc6051 100644
--- a/src/main/java/com/google/code/gossip/manager/ActiveGossipThread.java
+++ b/src/main/java/com/google/code/gossip/manager/ActiveGossipThread.java
@@ -14,7 +14,7 @@
*/
abstract public class ActiveGossipThread implements Runnable {
- private final GossipManager _gossipManager;
+ protected final GossipManager _gossipManager;
private final AtomicBoolean _keepRunning;
diff --git a/src/main/java/com/google/code/gossip/manager/GossipManager.java b/src/main/java/com/google/code/gossip/manager/GossipManager.java
index 377188d..81afaa3 100644
--- a/src/main/java/com/google/code/gossip/manager/GossipManager.java
+++ b/src/main/java/com/google/code/gossip/manager/GossipManager.java
@@ -92,6 +92,10 @@
return _settings;
}
+ /**
+ *
+ * @return a read only list of members found in the UP state
+ */
public List<LocalGossipMember> getMemberList() {
List<LocalGossipMember> up = new ArrayList<>();
for (Entry<LocalGossipMember, GossipState> entry : members.entrySet()){
diff --git a/src/main/java/com/google/code/gossip/manager/impl/SendMembersActiveGossipThread.java b/src/main/java/com/google/code/gossip/manager/impl/SendMembersActiveGossipThread.java
index d788f79..88159f2 100644
--- a/src/main/java/com/google/code/gossip/manager/impl/SendMembersActiveGossipThread.java
+++ b/src/main/java/com/google/code/gossip/manager/impl/SendMembersActiveGossipThread.java
@@ -26,51 +26,46 @@
protected void sendMembershipList(LocalGossipMember me, List<LocalGossipMember> memberList) {
GossipService.LOGGER.debug("Send sendMembershipList() is called.");
me.setHeartbeat(me.getHeartbeat() + 1);
- synchronized (memberList) {
- try {
- LocalGossipMember member = selectPartner(memberList);
- if (member != null) {
- InetAddress dest = InetAddress.getByName(member.getHost());
- JSONArray jsonArray = new JSONArray();
- GossipService.LOGGER.debug("Sending memberlist to " + dest + ":" + member.getPort());
- jsonArray.put(me.toJSONObject());
- GossipService.LOGGER.debug(me);
- for (LocalGossipMember other : memberList) {
- jsonArray.put(other.toJSONObject());
- GossipService.LOGGER.debug(other);
- }
- byte[] json_bytes = jsonArray.toString().getBytes();
- int packet_length = json_bytes.length;
- if (packet_length < GossipManager.MAX_PACKET_SIZE) {
- // Convert the packet length to the byte representation of the int.
- byte[] length_bytes = new byte[4];
- length_bytes[0] = (byte) (packet_length >> 24);
- length_bytes[1] = (byte) ((packet_length << 8) >> 24);
- length_bytes[2] = (byte) ((packet_length << 16) >> 24);
- length_bytes[3] = (byte) ((packet_length << 24) >> 24);
-
- GossipService.LOGGER.debug("Sending message (" + packet_length + " bytes): "
- + jsonArray.toString());
-
- ByteBuffer byteBuffer = ByteBuffer.allocate(4 + json_bytes.length);
- byteBuffer.put(length_bytes);
- byteBuffer.put(json_bytes);
- byte[] buf = byteBuffer.array();
-
- DatagramSocket socket = new DatagramSocket();
- DatagramPacket datagramPacket = new DatagramPacket(buf, buf.length, dest,
- member.getPort());
- socket.send(datagramPacket);
- socket.close();
- } else {
- GossipService.LOGGER.error("The length of the to be send message is too large ("
- + packet_length + " > " + GossipManager.MAX_PACKET_SIZE + ").");
- }
- }
-
- } catch (IOException e1) {
- e1.printStackTrace();
+ LocalGossipMember member = selectPartner(memberList);
+ if (member == null) {
+ return;
+ }
+ try (DatagramSocket socket = new DatagramSocket()){
+ socket.setSoTimeout(_gossipManager.getSettings().getGossipInterval());
+ InetAddress dest = InetAddress.getByName(member.getHost());
+ JSONArray jsonArray = new JSONArray();
+ jsonArray.put(me.toJSONObject());
+ for (LocalGossipMember other : memberList) {
+ jsonArray.put(other.toJSONObject());
+ GossipService.LOGGER.debug(other);
}
+ byte[] json_bytes = jsonArray.toString().getBytes();
+ int packet_length = json_bytes.length;
+ if (packet_length < GossipManager.MAX_PACKET_SIZE) {
+ byte[] buf = createBuffer(packet_length, json_bytes);
+ DatagramPacket datagramPacket = new DatagramPacket(buf, buf.length, dest,
+ member.getPort());
+ socket.send(datagramPacket);
+ } else {
+ GossipService.LOGGER.error("The length of the to be send message is too large ("
+ + packet_length + " > " + GossipManager.MAX_PACKET_SIZE + ").");
+ }
+ } catch (IOException e1) {
+ GossipService.LOGGER.warn(e1);
}
}
+
+ private byte[] createBuffer(int packetLength, byte [] jsonBytes){
+ byte[] lengthBytes = new byte[4];
+ lengthBytes[0] = (byte) (packetLength >> 24);
+ lengthBytes[1] = (byte) ((packetLength << 8) >> 24);
+ lengthBytes[2] = (byte) ((packetLength << 16) >> 24);
+ lengthBytes[3] = (byte) ((packetLength << 24) >> 24);
+ ByteBuffer byteBuffer = ByteBuffer.allocate(4 + jsonBytes.length);
+ byteBuffer.put(lengthBytes);
+ byteBuffer.put(jsonBytes);
+ byte[] buf = byteBuffer.array();
+ return buf;
+ }
+
}
diff --git a/src/test/java/io/teknek/gossip/ShutdownDeadtimeTest.java b/src/test/java/io/teknek/gossip/ShutdownDeadtimeTest.java
new file mode 100644
index 0000000..27e34ba
--- /dev/null
+++ b/src/test/java/io/teknek/gossip/ShutdownDeadtimeTest.java
@@ -0,0 +1,103 @@
+package io.teknek.gossip;
+
+import io.teknek.tunit.TUnit;
+
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+
+import org.apache.log4j.Logger;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import com.google.code.gossip.GossipMember;
+import com.google.code.gossip.GossipService;
+import com.google.code.gossip.GossipSettings;
+import com.google.code.gossip.RemoteGossipMember;
+import com.google.code.gossip.event.GossipListener;
+import com.google.code.gossip.event.GossipState;
+
+public class ShutdownDeadtimeTest {
+
+ private static final Logger log = Logger.getLogger(ShutdownDeadtimeTest.class );
+ //@Test
+ @Ignore
+ public void DeadNodesDoNotComeAliveAgain() throws InterruptedException, UnknownHostException {
+ GossipSettings settings = new GossipSettings(10,10000);
+
+ log.info( "Adding seed nodes" );
+ int seedNodes = 3;
+ List<GossipMember> startupMembers = new ArrayList<>();
+ for (int i = 1; i < seedNodes + 1; ++i) {
+ startupMembers.add(new RemoteGossipMember("127.0.0.1", 50000 + i, i + ""));
+ }
+
+ log.info( "Adding clients" );
+ final List<GossipService> clients = new ArrayList<>();
+ final int clusterMembers = 5;
+ for (int i = 1; i < clusterMembers+1; ++i) {
+ final int j = i;
+ GossipService gossipService = new GossipService("127.0.0.1", 50000 + i, i + "",
+ startupMembers, settings,
+ new GossipListener(){
+ @Override
+ public void gossipEvent(GossipMember member, GossipState state) {
+ System.out.println(System.currentTimeMillis() + " Member "+j + " reports "+ member+" "+ state);
+ }
+ });
+ clients.add(gossipService);
+ gossipService.start();
+ }
+ TUnit.assertThat(new Callable<Integer> (){
+ public Integer call() throws Exception {
+ int total = 0;
+ for (int i = 0; i < clusterMembers; ++i) {
+ total += clients.get(i).get_gossipManager().getMemberList().size();
+ }
+ return total;
+ }}).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(20);
+
+ // shutdown one client and verify that one client is lost.
+ Random r = new Random();
+ int randomClientId = r.nextInt(clusterMembers);
+ log.info( "shutting down " + randomClientId );
+ final int shutdownPort = clients.get(randomClientId).get_gossipManager().getMyself().getPort();
+ final String shutdownId = clients.get(randomClientId).get_gossipManager().getMyself().getId();
+ clients.get(randomClientId).shutdown();
+ TUnit.assertThat(new Callable<Integer> (){
+ public Integer call() throws Exception {
+ int total = 0;
+ for (int i = 0; i < clusterMembers; ++i) {
+ total += clients.get(i).get_gossipManager().getMemberList().size();
+ }
+ return total;
+ }}).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(16);
+
+ // start client again
+ GossipService gossipService = new GossipService("127.0.0.1", shutdownPort, shutdownId + "",
+ startupMembers, settings,
+ new GossipListener(){
+ @Override
+ public void gossipEvent(GossipMember member, GossipState state) {
+ //System.out.println("revived " + member+" "+ state);
+ }
+ });
+ clients.add(gossipService);
+ gossipService.start();
+
+ // verify that the client is alive again for every node
+ TUnit.assertThat(new Callable<Integer> (){
+ public Integer call() throws Exception {
+ int total = 0;
+ for (int i = 0; i < clusterMembers; ++i) {
+ total += clients.get(i).get_gossipManager().getMemberList().size();
+ }
+ return total;
+ }}).afterWaitingAtMost(70, TimeUnit.SECONDS).isEqualTo(20);
+
+ }
+}