GOSSIP-48 user shutdown message
diff --git a/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java b/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java
index d58aeb9..9fea30b 100644
--- a/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java
+++ b/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java
@@ -18,6 +18,8 @@
package org.apache.gossip.manager;
import java.util.Map.Entry;
+import java.util.List;
+import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import com.codahale.metrics.Histogram;
@@ -28,6 +30,7 @@
import org.apache.gossip.model.GossipMember;
import org.apache.gossip.model.Response;
import org.apache.gossip.model.SharedGossipDataMessage;
+import org.apache.gossip.model.ShutdownMessage;
import org.apache.gossip.udp.UdpActiveGossipMessage;
import org.apache.gossip.udp.UdpGossipDataMessage;
import org.apache.gossip.udp.UdpSharedGossipDataMessage;
@@ -47,6 +50,7 @@
private final Histogram sharedDataHistogram;
private final Histogram sendPerNodeDataHistogram;
private final Histogram sendMembershipHistorgram;
+ private final Random random;
public AbstractActiveGossiper(GossipManager gossipManager, GossipCore gossipCore, MetricRegistry registry) {
this.gossipManager = gossipManager;
@@ -54,6 +58,7 @@
sharedDataHistogram = registry.histogram(name(AbstractActiveGossiper.class, "sharedDataHistogram-time"));
sendPerNodeDataHistogram = registry.histogram(name(AbstractActiveGossiper.class, "sendPerNodeDataHistogram-time"));
sendMembershipHistorgram = registry.histogram(name(AbstractActiveGossiper.class, "sendMembershipHistorgram-time"));
+ random = new Random();
}
public void init() {
@@ -64,6 +69,16 @@
}
+ public final void sendShutdownMessage(LocalGossipMember me, LocalGossipMember target){
+ if (target == null){
+ return;
+ }
+ ShutdownMessage m = new ShutdownMessage();
+ m.setNodeId(me.getId());
+ m.setShutdownAtNanos(gossipManager.getClock().nanoTime());
+ gossipCore.sendOneWay(m, target.getUri());
+ }
+
public final void sendSharedData(LocalGossipMember me, LocalGossipMember member){
if (member == null){
return;
@@ -138,4 +153,19 @@
gm.setProperties(member.getProperties());
return gm;
}
+
+ /**
+ *
+ * @param memberList
+ * An immutable list
+ * @return The chosen LocalGossipMember to gossip with.
+ */
+ protected LocalGossipMember selectPartner(List<LocalGossipMember> memberList) {
+ LocalGossipMember member = null;
+ if (memberList.size() > 0) {
+ int randomNeighborIndex = random.nextInt(memberList.size());
+ member = memberList.get(randomNeighborIndex);
+ }
+ return member;
+ }
}
diff --git a/src/main/java/org/apache/gossip/manager/DataReaper.java b/src/main/java/org/apache/gossip/manager/DataReaper.java
index 6760685..f165239 100644
--- a/src/main/java/org/apache/gossip/manager/DataReaper.java
+++ b/src/main/java/org/apache/gossip/manager/DataReaper.java
@@ -77,7 +77,7 @@
public void close(){
scheduledExecutor.shutdown();
try {
- scheduledExecutor.awaitTermination(5, TimeUnit.SECONDS);
+ scheduledExecutor.awaitTermination(1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
}
diff --git a/src/main/java/org/apache/gossip/manager/DatacenterRackAwareActiveGossiper.java b/src/main/java/org/apache/gossip/manager/DatacenterRackAwareActiveGossiper.java
index 4f5dfdc..c66e332 100644
--- a/src/main/java/org/apache/gossip/manager/DatacenterRackAwareActiveGossiper.java
+++ b/src/main/java/org/apache/gossip/manager/DatacenterRackAwareActiveGossiper.java
@@ -18,7 +18,6 @@
package org.apache.gossip.manager;
import java.util.List;
-import java.util.Random;
import java.util.ArrayList;
import java.util.Collections;
import java.util.concurrent.ArrayBlockingQueue;
@@ -52,7 +51,6 @@
private ScheduledExecutorService scheduledExecutorService;
private final BlockingQueue<Runnable> workQueue;
private ThreadPoolExecutor threadService;
- private final Random random;
public DatacenterRackAwareActiveGossiper(GossipManager gossipManager, GossipCore gossipCore,
MetricRegistry registry) {
@@ -61,7 +59,6 @@
workQueue = new ArrayBlockingQueue<Runnable>(1024);
threadService = new ThreadPoolExecutor(1, 30, 1, TimeUnit.SECONDS, workQueue,
new ThreadPoolExecutor.DiscardOldestPolicy());
- random = new Random();
try {
sameRackGossipIntervalMs = Integer.parseInt(gossipManager.getSettings()
.getActiveGossipProperties().get("sameRackGossipIntervalMs"));
@@ -216,19 +213,32 @@
sendSharedData(gossipManager.getMyself(), selectPartner(sameDatacenterDifferentRack()));
}
-
@Override
public void shutdown() {
super.shutdown();
- }
-
- protected LocalGossipMember selectPartner(List<LocalGossipMember> memberList) {
- LocalGossipMember member = null;
- if (memberList.size() > 0) {
- int randomNeighborIndex = random.nextInt(memberList.size());
- member = memberList.get(randomNeighborIndex);
+ scheduledExecutorService.shutdown();
+ try {
+ scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ LOGGER.debug("Issue during shutdown", e);
}
- return member;
+ sendShutdownMessage();
+ threadService.shutdown();
+ try {
+ threadService.awaitTermination(5, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ LOGGER.debug("Issue during shutdown", e);
+ }
}
+ /**
+ * sends an optimistic shutdown message to several clusters nodes
+ */
+ protected void sendShutdownMessage(){
+ List<LocalGossipMember> l = gossipManager.getLiveMembers();
+ int sendTo = l.size() < 3 ? 1 : l.size() / 3;
+ for (int i = 0; i < sendTo; i++) {
+ threadService.execute(() -> sendShutdownMessage(gossipManager.getMyself(), selectPartner(l)));
+ }
+ }
}
diff --git a/src/main/java/org/apache/gossip/manager/GossipCore.java b/src/main/java/org/apache/gossip/manager/GossipCore.java
index 31bd447..82d65fe 100644
--- a/src/main/java/org/apache/gossip/manager/GossipCore.java
+++ b/src/main/java/org/apache/gossip/manager/GossipCore.java
@@ -44,6 +44,7 @@
import org.apache.gossip.model.GossipDataMessage;
import org.apache.gossip.model.Response;
import org.apache.gossip.model.SharedGossipDataMessage;
+import org.apache.gossip.model.ShutdownMessage;
import org.apache.gossip.udp.Trackable;
import org.apache.gossip.udp.UdpActiveGossipMessage;
import org.apache.gossip.udp.UdpActiveGossipOk;
@@ -72,6 +73,10 @@
private final Meter tranmissionException;
private final Meter tranmissionSuccess;
+ {
+ MAPPER.enableDefaultTyping();
+ }
+
public GossipCore(GossipManager manager, MetricRegistry metrics){
this.gossipManager = manager;
requests = new ConcurrentHashMap<>();
@@ -128,10 +133,11 @@
public void shutdown(){
service.shutdown();
try {
- service.awaitTermination(5, TimeUnit.SECONDS);
+ service.awaitTermination(1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
LOGGER.warn(e);
}
+ service.shutdownNow();
}
public void receive(Base base){
@@ -141,6 +147,16 @@
requests.put(t.getUuid() + "/" + t.getUriFrom(), (Base) t);
}
}
+ if (base instanceof ShutdownMessage){
+ ShutdownMessage s = (ShutdownMessage) base;
+ GossipDataMessage m = new GossipDataMessage();
+ m.setKey(ShutdownMessage.PER_NODE_KEY);
+ m.setNodeId(s.getNodeId());
+ m.setPayload(base);
+ m.setTimestamp(System.currentTimeMillis());
+ m.setExpireAt(System.currentTimeMillis() + 30L * 1000L);
+ addPerNodeData(m);
+ }
if (base instanceof GossipDataMessage) {
UdpGossipDataMessage message = (UdpGossipDataMessage) base;
addPerNodeData(message);
diff --git a/src/main/java/org/apache/gossip/manager/GossipManager.java b/src/main/java/org/apache/gossip/manager/GossipManager.java
index 04afc28..53ed8c7 100644
--- a/src/main/java/org/apache/gossip/manager/GossipManager.java
+++ b/src/main/java/org/apache/gossip/manager/GossipManager.java
@@ -47,6 +47,7 @@
import org.apache.gossip.model.GossipDataMessage;
import org.apache.gossip.model.SharedGossipDataMessage;
+import org.apache.gossip.model.ShutdownMessage;
public abstract class GossipManager {
@@ -159,6 +160,9 @@
scheduledServiced.scheduleAtFixedRate(() -> {
try {
for (Entry<LocalGossipMember, GossipState> entry : members.entrySet()) {
+ boolean userDown = processOptomisticShutdown(entry);
+ if (userDown)
+ continue;
Double result = null;
try {
result = entry.getKey().detect(clock.nanoTime());
@@ -190,6 +194,30 @@
LOGGER.debug("The GossipManager is started.");
}
+ /**
+ * If we have a special key the per-node data that means that the node has sent us
+ * a pre-emptive shutdown message. We process this so node is seen down sooner
+ * @param l member to consider
+ * @return true if node forced down
+ */
+ public boolean processOptomisticShutdown(Entry<LocalGossipMember, GossipState> l){
+ GossipDataMessage m = findPerNodeGossipData(l.getKey().getId(), ShutdownMessage.PER_NODE_KEY);
+ if (m == null){
+ return false;
+ }
+ ShutdownMessage s = (ShutdownMessage) m.getPayload();
+ if (s.getShutdownAtNanos() > l.getKey().getHeartbeat()){
+ if (l.getValue() == GossipState.UP){
+ members.put(l.getKey(), GossipState.DOWN);
+ listener.gossipEvent(l.getKey(), GossipState.DOWN);
+ } else {
+ members.put(l.getKey(), GossipState.DOWN);
+ }
+ return true;
+ }
+ return false;
+ }
+
private void readSavedRingState() {
for (LocalGossipMember l : ringState.readFromDisk()){
LocalGossipMember member = new LocalGossipMember(l.getClusterName(),
@@ -226,7 +254,7 @@
activeGossipThread.shutdown();
}
try {
- boolean result = gossipThreadExecutor.awaitTermination(1000, TimeUnit.MILLISECONDS);
+ boolean result = gossipThreadExecutor.awaitTermination(10, TimeUnit.MILLISECONDS);
if (!result) {
LOGGER.error("executor shutdown timed out");
}
@@ -298,5 +326,9 @@
public UserDataPersister getUserDataState() {
return userDataState;
}
+
+ public Clock getClock() {
+ return clock;
+ }
}
diff --git a/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java b/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java
index ebda513..47b8a8f 100644
--- a/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java
+++ b/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java
@@ -47,9 +47,13 @@
private final String cluster;
- private final ObjectMapper MAPPER = new ObjectMapper();
+ private final static ObjectMapper MAPPER = new ObjectMapper();
private final GossipCore gossipCore;
+
+ {
+ MAPPER.enableDefaultTyping();
+ }
public PassiveGossipThread(GossipManager gossipManager, GossipCore gossipCore) {
this.gossipCore = gossipCore;
diff --git a/src/main/java/org/apache/gossip/manager/SimpleActiveGossipper.java b/src/main/java/org/apache/gossip/manager/SimpleActiveGossipper.java
index 43237fb..839d796 100644
--- a/src/main/java/org/apache/gossip/manager/SimpleActiveGossipper.java
+++ b/src/main/java/org/apache/gossip/manager/SimpleActiveGossipper.java
@@ -18,7 +18,6 @@
package org.apache.gossip.manager;
import java.util.List;
-import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
@@ -39,7 +38,6 @@
private ScheduledExecutorService scheduledExecutorService;
private final BlockingQueue<Runnable> workQueue;
private ThreadPoolExecutor threadService;
- private final Random random;
public SimpleActiveGossipper(GossipManager gossipManager, GossipCore gossipCore,
MetricRegistry registry) {
@@ -48,7 +46,6 @@
workQueue = new ArrayBlockingQueue<Runnable>(1024);
threadService = new ThreadPoolExecutor(1, 30, 1, TimeUnit.SECONDS, workQueue,
new ThreadPoolExecutor.DiscardOldestPolicy());
- random = new Random();
}
@Override
@@ -71,7 +68,7 @@
selectPartner(gossipManager.getLiveMembers())),
0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
}
-
+
@Override
public void shutdown() {
super.shutdown();
@@ -81,6 +78,13 @@
} catch (InterruptedException e) {
LOGGER.debug("Issue during shutdown", e);
}
+ sendShutdownMessage();
+ threadService.shutdown();
+ try {
+ threadService.awaitTermination(5, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ LOGGER.debug("Issue during shutdown", e);
+ }
}
protected void sendToALiveMember(){
@@ -94,18 +98,13 @@
}
/**
- *
- * @param memberList
- * The list of members which are stored in the local list of members.
- * @return The chosen LocalGossipMember to gossip with.
+ * sends an optimistic shutdown message to several clusters nodes
*/
- protected LocalGossipMember selectPartner(List<LocalGossipMember> memberList) {
- //TODO this selection is racey what if the list size changes?
- LocalGossipMember member = null;
- if (memberList.size() > 0) {
- int randomNeighborIndex = random.nextInt(memberList.size());
- member = memberList.get(randomNeighborIndex);
+ protected void sendShutdownMessage(){
+ List<LocalGossipMember> l = gossipManager.getLiveMembers();
+ int sendTo = l.size() < 3 ? 1 : l.size() / 2;
+ for (int i = 0; i < sendTo; i++) {
+ threadService.execute(() -> sendShutdownMessage(gossipManager.getMyself(), selectPartner(l)));
}
- return member;
}
}
diff --git a/src/main/java/org/apache/gossip/model/ShutdownMessage.java b/src/main/java/org/apache/gossip/model/ShutdownMessage.java
new file mode 100644
index 0000000..4bca508
--- /dev/null
+++ b/src/main/java/org/apache/gossip/model/ShutdownMessage.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.model;
+
+public class ShutdownMessage extends Message {
+
+ public static final String PER_NODE_KEY = "gossipcore.shutdowmessage";
+ private long shutdownAtNanos;
+ private String nodeId;
+
+ public ShutdownMessage(){
+
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ public void setNodeId(String nodeId) {
+ this.nodeId = nodeId;
+ }
+
+ public long getShutdownAtNanos() {
+ return shutdownAtNanos;
+ }
+
+ public void setShutdownAtNanos(long shutdownAtNanos) {
+ this.shutdownAtNanos = shutdownAtNanos;
+ }
+
+ @Override
+ public String toString() {
+ return "ShutdownMessage [shutdownAtNanos=" + shutdownAtNanos + ", nodeId=" + nodeId + "]";
+ }
+
+}
diff --git a/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java b/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java
index 2386084..6a0765b 100644
--- a/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java
+++ b/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java
@@ -47,7 +47,7 @@
@Test
public void DeadNodesDoNotComeAliveAgain()
throws InterruptedException, UnknownHostException, URISyntaxException {
- GossipSettings settings = new GossipSettings(1000, 10000, 1000, 1, 2.0, "exponential");
+ GossipSettings settings = new GossipSettings(100, 10000, 1000, 1, 2.0, "exponential");
settings.setPersistRingState(false);
settings.setPersistDataState(false);
String cluster = UUID.randomUUID().toString();
@@ -75,7 +75,6 @@
return total;
}
}).afterWaitingAtMost(40, TimeUnit.SECONDS).isEqualTo(20);
-
// shutdown one client and verify that one client is lost.
Random r = new Random();
int randomClientId = r.nextInt(clusterMembers);
@@ -124,7 +123,12 @@
}).afterWaitingAtMost(60, TimeUnit.SECONDS).isEqualTo(20);
for (int i = 0; i < clusterMembers; ++i) {
- clients.get(i).shutdown();
+ final int j = i;
+ new Thread() {
+ public void run(){
+ clients.get(j).shutdown();
+ }
+ }.start();
}
}
}
diff --git a/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java b/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java
index aa797f5..8a9a9ab 100644
--- a/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java
+++ b/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java
@@ -47,7 +47,7 @@
}
public void abc(int base) throws InterruptedException, UnknownHostException, URISyntaxException{
- GossipSettings settings = new GossipSettings();
+ GossipSettings settings = new GossipSettings(1000, 10000, 1000, 1, 2.0, "exponential");
settings.setPersistRingState(false);
settings.setPersistDataState(false);
String cluster = UUID.randomUUID().toString();
@@ -76,7 +76,12 @@
}}).afterWaitingAtMost(40, TimeUnit.SECONDS).isEqualTo(20);
for (int i = 0; i < clusterMembers; ++i) {
- clients.get(i).shutdown();
+ int j = i;
+ new Thread(){
+ public void run(){
+ clients.get(j).shutdown();
+ }
+ }.start();
}
}
}