GOSSIP-48 user shutdown message
diff --git a/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java b/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java
index d58aeb9..9fea30b 100644
--- a/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java
+++ b/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java
@@ -18,6 +18,8 @@
 package org.apache.gossip.manager;
 
 import java.util.Map.Entry;
+import java.util.List;
+import java.util.Random;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import com.codahale.metrics.Histogram;
@@ -28,6 +30,7 @@
 import org.apache.gossip.model.GossipMember;
 import org.apache.gossip.model.Response;
 import org.apache.gossip.model.SharedGossipDataMessage;
+import org.apache.gossip.model.ShutdownMessage;
 import org.apache.gossip.udp.UdpActiveGossipMessage;
 import org.apache.gossip.udp.UdpGossipDataMessage;
 import org.apache.gossip.udp.UdpSharedGossipDataMessage;
@@ -47,6 +50,7 @@
   private final Histogram sharedDataHistogram;
   private final Histogram sendPerNodeDataHistogram;
   private final Histogram sendMembershipHistorgram;
+  private final Random random;
 
   public AbstractActiveGossiper(GossipManager gossipManager, GossipCore gossipCore, MetricRegistry registry) {
     this.gossipManager = gossipManager;
@@ -54,6 +58,7 @@
     sharedDataHistogram = registry.histogram(name(AbstractActiveGossiper.class, "sharedDataHistogram-time"));
     sendPerNodeDataHistogram = registry.histogram(name(AbstractActiveGossiper.class, "sendPerNodeDataHistogram-time"));
     sendMembershipHistorgram = registry.histogram(name(AbstractActiveGossiper.class, "sendMembershipHistorgram-time"));
+    random = new Random();
   }
 
   public void init() {
@@ -64,6 +69,16 @@
 
   }
 
+  public final void sendShutdownMessage(LocalGossipMember me, LocalGossipMember target){
+    if (target == null){
+      return;
+    }
+    ShutdownMessage m = new ShutdownMessage();
+    m.setNodeId(me.getId());
+    m.setShutdownAtNanos(gossipManager.getClock().nanoTime());
+    gossipCore.sendOneWay(m, target.getUri());
+  }
+  
   public final void sendSharedData(LocalGossipMember me, LocalGossipMember member){
     if (member == null){
       return;
@@ -138,4 +153,19 @@
     gm.setProperties(member.getProperties());
     return gm;
   }
+  
+  /**
+   * 
+   * @param memberList
+   *          An immutable list
+   * @return The chosen LocalGossipMember to gossip with.
+   */
+  protected LocalGossipMember selectPartner(List<LocalGossipMember> memberList) {
+    LocalGossipMember member = null;
+    if (memberList.size() > 0) {
+      int randomNeighborIndex = random.nextInt(memberList.size());
+      member = memberList.get(randomNeighborIndex);
+    }
+    return member;
+  }
 }
diff --git a/src/main/java/org/apache/gossip/manager/DataReaper.java b/src/main/java/org/apache/gossip/manager/DataReaper.java
index 6760685..f165239 100644
--- a/src/main/java/org/apache/gossip/manager/DataReaper.java
+++ b/src/main/java/org/apache/gossip/manager/DataReaper.java
@@ -77,7 +77,7 @@
   public void close(){
     scheduledExecutor.shutdown();
     try {
-      scheduledExecutor.awaitTermination(5, TimeUnit.SECONDS);
+      scheduledExecutor.awaitTermination(1, TimeUnit.SECONDS);
     } catch (InterruptedException e) {
       
     }
diff --git a/src/main/java/org/apache/gossip/manager/DatacenterRackAwareActiveGossiper.java b/src/main/java/org/apache/gossip/manager/DatacenterRackAwareActiveGossiper.java
index 4f5dfdc..c66e332 100644
--- a/src/main/java/org/apache/gossip/manager/DatacenterRackAwareActiveGossiper.java
+++ b/src/main/java/org/apache/gossip/manager/DatacenterRackAwareActiveGossiper.java
@@ -18,7 +18,6 @@
 package org.apache.gossip.manager;
 
 import java.util.List;
-import java.util.Random;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.concurrent.ArrayBlockingQueue;
@@ -52,7 +51,6 @@
   private ScheduledExecutorService scheduledExecutorService;
   private final BlockingQueue<Runnable> workQueue;
   private ThreadPoolExecutor threadService;
-  private final Random random;
   
   public DatacenterRackAwareActiveGossiper(GossipManager gossipManager, GossipCore gossipCore,
           MetricRegistry registry) {
@@ -61,7 +59,6 @@
     workQueue = new ArrayBlockingQueue<Runnable>(1024);
     threadService = new ThreadPoolExecutor(1, 30, 1, TimeUnit.SECONDS, workQueue,
             new ThreadPoolExecutor.DiscardOldestPolicy());
-    random = new Random();
     try {
       sameRackGossipIntervalMs = Integer.parseInt(gossipManager.getSettings()
               .getActiveGossipProperties().get("sameRackGossipIntervalMs"));
@@ -216,19 +213,32 @@
     sendSharedData(gossipManager.getMyself(), selectPartner(sameDatacenterDifferentRack()));
   }
   
-
   @Override
   public void shutdown() {
     super.shutdown();
-  }
-
-  protected LocalGossipMember selectPartner(List<LocalGossipMember> memberList) {
-    LocalGossipMember member = null;
-    if (memberList.size() > 0) {
-      int randomNeighborIndex = random.nextInt(memberList.size());
-      member = memberList.get(randomNeighborIndex);
+    scheduledExecutorService.shutdown();
+    try {
+      scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      LOGGER.debug("Issue during shutdown", e);
     }
-    return member;
+    sendShutdownMessage();
+    threadService.shutdown();
+    try {
+      threadService.awaitTermination(5, TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      LOGGER.debug("Issue during shutdown", e);
+    }
   }
   
+  /**
+   * sends an optimistic shutdown message to several clusters nodes
+   */
+  protected void sendShutdownMessage(){
+    List<LocalGossipMember> l = gossipManager.getLiveMembers();
+    int sendTo = l.size() < 3 ? 1 : l.size() / 3;
+    for (int i = 0; i < sendTo; i++) {
+      threadService.execute(() -> sendShutdownMessage(gossipManager.getMyself(), selectPartner(l)));
+    }
+  }
 }
diff --git a/src/main/java/org/apache/gossip/manager/GossipCore.java b/src/main/java/org/apache/gossip/manager/GossipCore.java
index 31bd447..82d65fe 100644
--- a/src/main/java/org/apache/gossip/manager/GossipCore.java
+++ b/src/main/java/org/apache/gossip/manager/GossipCore.java
@@ -44,6 +44,7 @@
 import org.apache.gossip.model.GossipDataMessage;
 import org.apache.gossip.model.Response;
 import org.apache.gossip.model.SharedGossipDataMessage;
+import org.apache.gossip.model.ShutdownMessage;
 import org.apache.gossip.udp.Trackable;
 import org.apache.gossip.udp.UdpActiveGossipMessage;
 import org.apache.gossip.udp.UdpActiveGossipOk;
@@ -72,6 +73,10 @@
   private final Meter tranmissionException;
   private final Meter tranmissionSuccess;
   
+  {
+    MAPPER.enableDefaultTyping();
+  }
+  
   public GossipCore(GossipManager manager, MetricRegistry metrics){
     this.gossipManager = manager;
     requests = new ConcurrentHashMap<>();
@@ -128,10 +133,11 @@
   public void shutdown(){
     service.shutdown();
     try {
-      service.awaitTermination(5, TimeUnit.SECONDS);
+      service.awaitTermination(1, TimeUnit.SECONDS);
     } catch (InterruptedException e) {
       LOGGER.warn(e);
     }
+    service.shutdownNow();
   }
   
   public void receive(Base base){
@@ -141,6 +147,16 @@
         requests.put(t.getUuid() + "/" + t.getUriFrom(), (Base) t);
       }
     }
+    if (base instanceof ShutdownMessage){
+      ShutdownMessage s = (ShutdownMessage) base;
+      GossipDataMessage m = new GossipDataMessage();
+      m.setKey(ShutdownMessage.PER_NODE_KEY);
+      m.setNodeId(s.getNodeId());
+      m.setPayload(base);
+      m.setTimestamp(System.currentTimeMillis());
+      m.setExpireAt(System.currentTimeMillis() + 30L * 1000L);
+      addPerNodeData(m);
+    }
     if (base instanceof GossipDataMessage) {
       UdpGossipDataMessage message = (UdpGossipDataMessage) base;
       addPerNodeData(message);
diff --git a/src/main/java/org/apache/gossip/manager/GossipManager.java b/src/main/java/org/apache/gossip/manager/GossipManager.java
index 04afc28..53ed8c7 100644
--- a/src/main/java/org/apache/gossip/manager/GossipManager.java
+++ b/src/main/java/org/apache/gossip/manager/GossipManager.java
@@ -47,6 +47,7 @@
 
 import org.apache.gossip.model.GossipDataMessage;
 import org.apache.gossip.model.SharedGossipDataMessage;
+import org.apache.gossip.model.ShutdownMessage;
 
 
 public abstract class GossipManager {
@@ -159,6 +160,9 @@
     scheduledServiced.scheduleAtFixedRate(() -> {
       try {
         for (Entry<LocalGossipMember, GossipState> entry : members.entrySet()) {
+          boolean userDown = processOptomisticShutdown(entry);
+          if (userDown)
+            continue;
           Double result = null;
           try {
             result = entry.getKey().detect(clock.nanoTime());
@@ -190,6 +194,30 @@
     LOGGER.debug("The GossipManager is started.");
   }
 
+  /**
+   * If we have a special key the per-node data that means that the node has sent us 
+   * a pre-emptive shutdown message. We process this so node is seen down sooner
+   * @param l member to consider
+   * @return true if node forced down
+   */
+  public boolean processOptomisticShutdown(Entry<LocalGossipMember, GossipState> l){
+    GossipDataMessage m = findPerNodeGossipData(l.getKey().getId(), ShutdownMessage.PER_NODE_KEY);
+    if (m == null){
+      return false;
+    }
+    ShutdownMessage s = (ShutdownMessage) m.getPayload();
+    if (s.getShutdownAtNanos() > l.getKey().getHeartbeat()){
+      if (l.getValue() == GossipState.UP){
+        members.put(l.getKey(), GossipState.DOWN);
+        listener.gossipEvent(l.getKey(), GossipState.DOWN);
+      } else {
+        members.put(l.getKey(), GossipState.DOWN);
+      }
+      return true;
+    }
+    return false;
+  }
+  
   private void readSavedRingState() {
     for (LocalGossipMember l : ringState.readFromDisk()){
       LocalGossipMember member = new LocalGossipMember(l.getClusterName(),
@@ -226,7 +254,7 @@
       activeGossipThread.shutdown();
     }
     try {
-      boolean result = gossipThreadExecutor.awaitTermination(1000, TimeUnit.MILLISECONDS);
+      boolean result = gossipThreadExecutor.awaitTermination(10, TimeUnit.MILLISECONDS);
       if (!result) {
         LOGGER.error("executor shutdown timed out");
       }
@@ -298,5 +326,9 @@
   public UserDataPersister getUserDataState() {
     return userDataState;
   }
+
+  public Clock getClock() {
+    return clock;
+  }
   
 }
diff --git a/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java b/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java
index ebda513..47b8a8f 100644
--- a/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java
+++ b/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java
@@ -47,9 +47,13 @@
 
   private final String cluster;
   
-  private final ObjectMapper MAPPER = new ObjectMapper();
+  private final static ObjectMapper MAPPER = new ObjectMapper();
   
   private final GossipCore gossipCore;
+  
+  {
+    MAPPER.enableDefaultTyping();
+  }
 
   public PassiveGossipThread(GossipManager gossipManager, GossipCore gossipCore) {
     this.gossipCore = gossipCore;
diff --git a/src/main/java/org/apache/gossip/manager/SimpleActiveGossipper.java b/src/main/java/org/apache/gossip/manager/SimpleActiveGossipper.java
index 43237fb..839d796 100644
--- a/src/main/java/org/apache/gossip/manager/SimpleActiveGossipper.java
+++ b/src/main/java/org/apache/gossip/manager/SimpleActiveGossipper.java
@@ -18,7 +18,6 @@
 package org.apache.gossip.manager;
 
 import java.util.List;
-import java.util.Random;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Executors;
@@ -39,7 +38,6 @@
   private ScheduledExecutorService scheduledExecutorService;
   private final BlockingQueue<Runnable> workQueue;
   private ThreadPoolExecutor threadService;
-  private final Random random;
   
   public SimpleActiveGossipper(GossipManager gossipManager, GossipCore gossipCore,
           MetricRegistry registry) {
@@ -48,7 +46,6 @@
     workQueue = new ArrayBlockingQueue<Runnable>(1024);
     threadService = new ThreadPoolExecutor(1, 30, 1, TimeUnit.SECONDS, workQueue,
             new ThreadPoolExecutor.DiscardOldestPolicy());
-    random = new Random();
   }
 
   @Override
@@ -71,7 +68,7 @@
                     selectPartner(gossipManager.getLiveMembers())),
             0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
   }
-
+  
   @Override
   public void shutdown() {
     super.shutdown();
@@ -81,6 +78,13 @@
     } catch (InterruptedException e) {
       LOGGER.debug("Issue during shutdown", e);
     }
+    sendShutdownMessage();
+    threadService.shutdown();
+    try {
+      threadService.awaitTermination(5, TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      LOGGER.debug("Issue during shutdown", e);
+    }
   }
 
   protected void sendToALiveMember(){
@@ -94,18 +98,13 @@
   }
   
   /**
-   * 
-   * @param memberList
-   *          The list of members which are stored in the local list of members.
-   * @return The chosen LocalGossipMember to gossip with.
+   * sends an optimistic shutdown message to several clusters nodes
    */
-  protected LocalGossipMember selectPartner(List<LocalGossipMember> memberList) {
-    //TODO this selection is racey what if the list size changes?
-    LocalGossipMember member = null;
-    if (memberList.size() > 0) {
-      int randomNeighborIndex = random.nextInt(memberList.size());
-      member = memberList.get(randomNeighborIndex);
+  protected void sendShutdownMessage(){
+    List<LocalGossipMember> l = gossipManager.getLiveMembers();
+    int sendTo = l.size() < 3 ? 1 : l.size() / 2;
+    for (int i = 0; i < sendTo; i++) {
+      threadService.execute(() -> sendShutdownMessage(gossipManager.getMyself(), selectPartner(l)));
     }
-    return member;
   }
 }
diff --git a/src/main/java/org/apache/gossip/model/ShutdownMessage.java b/src/main/java/org/apache/gossip/model/ShutdownMessage.java
new file mode 100644
index 0000000..4bca508
--- /dev/null
+++ b/src/main/java/org/apache/gossip/model/ShutdownMessage.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.model;
+
+public class ShutdownMessage extends Message {
+
+  public static final String PER_NODE_KEY = "gossipcore.shutdowmessage";
+  private long shutdownAtNanos;
+  private String nodeId;
+  
+  public ShutdownMessage(){
+    
+  }
+
+  public String getNodeId() {
+    return nodeId;
+  }
+
+  public void setNodeId(String nodeId) {
+    this.nodeId = nodeId;
+  }
+
+  public long getShutdownAtNanos() {
+    return shutdownAtNanos;
+  }
+
+  public void setShutdownAtNanos(long shutdownAtNanos) {
+    this.shutdownAtNanos = shutdownAtNanos;
+  }
+
+  @Override
+  public String toString() {
+    return "ShutdownMessage [shutdownAtNanos=" + shutdownAtNanos + ", nodeId=" + nodeId + "]";
+  }
+  
+}
diff --git a/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java b/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java
index 2386084..6a0765b 100644
--- a/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java
+++ b/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java
@@ -47,7 +47,7 @@
   @Test
   public void DeadNodesDoNotComeAliveAgain()
           throws InterruptedException, UnknownHostException, URISyntaxException {
-    GossipSettings settings = new GossipSettings(1000, 10000, 1000, 1, 2.0, "exponential");
+    GossipSettings settings = new GossipSettings(100, 10000, 1000, 1, 2.0, "exponential");
     settings.setPersistRingState(false);
     settings.setPersistDataState(false);
     String cluster = UUID.randomUUID().toString();
@@ -75,7 +75,6 @@
         return total;
       }
     }).afterWaitingAtMost(40, TimeUnit.SECONDS).isEqualTo(20);
-
     // shutdown one client and verify that one client is lost.
     Random r = new Random();
     int randomClientId = r.nextInt(clusterMembers);
@@ -124,7 +123,12 @@
     }).afterWaitingAtMost(60, TimeUnit.SECONDS).isEqualTo(20);
 
     for (int i = 0; i < clusterMembers; ++i) {
-      clients.get(i).shutdown();
+      final int j = i;
+      new Thread() {
+        public void run(){
+          clients.get(j).shutdown();
+        }
+      }.start();
     }
   }
 }
diff --git a/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java b/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java
index aa797f5..8a9a9ab 100644
--- a/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java
+++ b/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java
@@ -47,7 +47,7 @@
   }
 
   public void abc(int base) throws InterruptedException, UnknownHostException, URISyntaxException{
-    GossipSettings settings = new GossipSettings();
+    GossipSettings settings = new GossipSettings(1000, 10000, 1000, 1, 2.0, "exponential");
     settings.setPersistRingState(false);
     settings.setPersistDataState(false);
     String cluster = UUID.randomUUID().toString();
@@ -76,7 +76,12 @@
       }}).afterWaitingAtMost(40, TimeUnit.SECONDS).isEqualTo(20);
           
     for (int i = 0; i < clusterMembers; ++i) {
-      clients.get(i).shutdown();
+      int j = i;
+      new Thread(){
+        public void run(){
+          clients.get(j).shutdown();
+        }
+      }.start();
     }
   }
 }