GOSSIP-39 User Defined Active Gossip (review by Dorian Ellerbe)
diff --git a/src/main/java/org/apache/gossip/GossipMember.java b/src/main/java/org/apache/gossip/GossipMember.java
index a2f416e..f2834bd 100644
--- a/src/main/java/org/apache/gossip/GossipMember.java
+++ b/src/main/java/org/apache/gossip/GossipMember.java
@@ -19,6 +19,7 @@
import java.net.InetSocketAddress;
import java.net.URI;
+import java.util.Map;
/**
* A abstract class representing a gossip member.
@@ -40,6 +41,9 @@
*/
protected String id;
+ /* properties provided at startup time */
+ protected Map<String,String> properties;
+
/**
* Constructor.
*
@@ -52,11 +56,12 @@
* @param id
* An id that may be replaced after contact
*/
- public GossipMember(String clusterName, URI uri, String id, long heartbeat) {
+ public GossipMember(String clusterName, URI uri, String id, long heartbeat, Map<String,String> properties) {
this.clusterName = clusterName;
this.id = id;
this.heartbeat = heartbeat;
this.uri = uri;
+ this.properties = properties;
}
/**
@@ -104,6 +109,14 @@
this.id = _id;
}
+ public Map<String, String> getProperties() {
+ return properties;
+ }
+
+ public void setProperties(Map<String, String> properties) {
+ this.properties = properties;
+ }
+
public String toString() {
return "Member [address=" + getAddress() + ", id=" + id + ", heartbeat=" + heartbeat + "]";
}
diff --git a/src/main/java/org/apache/gossip/GossipRunner.java b/src/main/java/org/apache/gossip/GossipRunner.java
deleted file mode 100644
index c8a1f13..0000000
--- a/src/main/java/org/apache/gossip/GossipRunner.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gossip;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.net.URISyntaxException;
-
-public class GossipRunner {
-
- public static void main(String[] args) throws URISyntaxException {
- File configFile;
- if (args.length == 1) {
- configFile = new File("./" + args[0]);
- } else {
- configFile = new File("gossip.conf");
- }
- new GossipRunner(configFile);
- }
-
- public GossipRunner(File configFile) throws URISyntaxException {
- if (configFile != null && configFile.exists()) {
- try {
- System.out.println("Parsing the configuration file...");
- StartupSettings _settings = StartupSettings.fromJSONFile(configFile);
- GossipService gossipService = new GossipService(_settings);
- System.out.println("Gossip service successfully initialized, let's start it...");
- gossipService.start();
- } catch (FileNotFoundException e) {
- System.err.println("The given file is not found!");
- } catch (IOException e) {
- System.err.println("Could not read the configuration file: " + e.getMessage());
- } catch (InterruptedException e) {
- System.err.println("Error while starting the gossip service: " + e.getMessage());
- }
- } else {
- System.out
- .println("The gossip.conf file is not found.\n\nEither specify the path to the startup settings file or place the gossip.json file in the same folder as the JAR file.");
- }
- }
-}
diff --git a/src/main/java/org/apache/gossip/GossipService.java b/src/main/java/org/apache/gossip/GossipService.java
index fca9f28..f32eb35 100644
--- a/src/main/java/org/apache/gossip/GossipService.java
+++ b/src/main/java/org/apache/gossip/GossipService.java
@@ -20,7 +20,9 @@
import com.codahale.metrics.MetricRegistry;
import java.net.URI;
import java.net.UnknownHostException;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import com.codahale.metrics.JmxReporter;
import org.apache.gossip.event.GossipListener;
@@ -50,7 +52,7 @@
public GossipService(StartupSettings startupSettings) throws InterruptedException,
UnknownHostException {
this(startupSettings.getCluster(), startupSettings.getUri()
- , startupSettings.getId(), startupSettings.getGossipMembers(),
+ , startupSettings.getId(), new HashMap<String,String> (),startupSettings.getGossipMembers(),
startupSettings.getGossipSettings(), null, new MetricRegistry());
}
@@ -60,7 +62,7 @@
* @throws InterruptedException
* @throws UnknownHostException
*/
- public GossipService(String cluster, URI uri, String id,
+ public GossipService(String cluster, URI uri, String id, Map<String,String> properties,
List<GossipMember> gossipMembers, GossipSettings settings, GossipListener listener, MetricRegistry registry)
throws InterruptedException, UnknownHostException {
jmxReporter = JmxReporter.forRegistry(registry).build();
@@ -73,6 +75,7 @@
.gossipMembers(gossipMembers)
.listener(listener)
.registry(registry)
+ .properties(properties)
.build();
}
diff --git a/src/main/java/org/apache/gossip/GossipSettings.java b/src/main/java/org/apache/gossip/GossipSettings.java
index 36fabb6..1fed914 100644
--- a/src/main/java/org/apache/gossip/GossipSettings.java
+++ b/src/main/java/org/apache/gossip/GossipSettings.java
@@ -17,6 +17,9 @@
*/
package org.apache.gossip;
+import java.util.HashMap;
+import java.util.Map;
+
/**
* In this object the settings used by the GossipService are held.
*
@@ -41,6 +44,10 @@
private String distribution = "exponential";
+ private String activeGossipClass = "org.apache.gossip.manager.SimpleActiveGossipper";
+
+ private Map<String,String> activeGossipProperties = new HashMap<>();
+
/**
* Construct GossipSettings with default settings.
*/
@@ -139,5 +146,21 @@
public void setDistribution(String distribution) {
this.distribution = distribution;
}
+
+ public String getActiveGossipClass() {
+ return activeGossipClass;
+ }
+
+ public void setActiveGossipClass(String activeGossipClass) {
+ this.activeGossipClass = activeGossipClass;
+ }
+
+ public Map<String, String> getActiveGossipProperties() {
+ return activeGossipProperties;
+ }
+
+ public void setActiveGossipProperties(Map<String, String> activeGossipProperties) {
+ this.activeGossipProperties = activeGossipProperties;
+ }
}
diff --git a/src/main/java/org/apache/gossip/LocalGossipMember.java b/src/main/java/org/apache/gossip/LocalGossipMember.java
index 83a13df..557ffcb 100644
--- a/src/main/java/org/apache/gossip/LocalGossipMember.java
+++ b/src/main/java/org/apache/gossip/LocalGossipMember.java
@@ -18,6 +18,7 @@
package org.apache.gossip;
import java.net.URI;
+import java.util.Map;
import org.apache.gossip.accrual.FailureDetector;
@@ -40,8 +41,8 @@
* The current heartbeat
*/
public LocalGossipMember(String clusterName, URI uri, String id,
- long heartbeat, int windowSize, int minSamples, String distribution) {
- super(clusterName, uri, id, heartbeat );
+ long heartbeat, Map<String,String> properties, int windowSize, int minSamples, String distribution) {
+ super(clusterName, uri, id, heartbeat, properties );
detector = new FailureDetector(this, minSamples, windowSize, distribution);
}
diff --git a/src/main/java/org/apache/gossip/RemoteGossipMember.java b/src/main/java/org/apache/gossip/RemoteGossipMember.java
index a9e6a76..e3f6620 100644
--- a/src/main/java/org/apache/gossip/RemoteGossipMember.java
+++ b/src/main/java/org/apache/gossip/RemoteGossipMember.java
@@ -18,12 +18,13 @@
package org.apache.gossip;
import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
/**
* The object represents a gossip member with the properties as received from a remote gossip
* member.
*
- * @author harmenw
*/
public class RemoteGossipMember extends GossipMember {
@@ -35,12 +36,12 @@
* @param heartbeat
* The current heartbeat
*/
- public RemoteGossipMember(String clusterName, URI uri, String id, long heartbeat) {
- super(clusterName, uri, id, heartbeat);
+ public RemoteGossipMember(String clusterName, URI uri, String id, long heartbeat, Map<String,String> properties) {
+ super(clusterName, uri, id, heartbeat, properties);
}
public RemoteGossipMember(String clusterName, URI uri, String id) {
- super(clusterName, uri, id, System.currentTimeMillis());
+ super(clusterName, uri, id, System.nanoTime(), new HashMap<String,String>());
}
}
diff --git a/src/main/java/org/apache/gossip/StartupSettings.java b/src/main/java/org/apache/gossip/StartupSettings.java
index de63c66..0117be7 100644
--- a/src/main/java/org/apache/gossip/StartupSettings.java
+++ b/src/main/java/org/apache/gossip/StartupSettings.java
@@ -23,8 +23,11 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
import org.apache.log4j.Logger;
@@ -164,6 +167,13 @@
JsonNode jsonObject = root.get(0);
String uri = jsonObject.get("uri").textValue();
String id = jsonObject.get("id").textValue();
+ Map<String,String> properties = new HashMap<String,String>();
+ JsonNode n = jsonObject.get("properties");
+ Iterator<Entry<String, JsonNode>> l = n.fields();
+ while (l.hasNext()){
+ Entry<String, JsonNode> i = l.next();
+ properties.put(i.getKey(), i.getValue().asText());
+ }
//TODO constants as defaults?
int gossipInterval = jsonObject.get("gossip_interval").intValue();
int cleanupInterval = jsonObject.get("cleanup_interval").intValue();
@@ -186,7 +196,7 @@
JsonNode child = it.next();
URI uri3 = new URI(child.get("uri").textValue());
RemoteGossipMember member = new RemoteGossipMember(child.get("cluster").asText(),
- uri3, "", 0);
+ uri3, "", 0, new HashMap<String,String>());
settings.addGossipMember(member);
configMembersDetails += member.getAddress();
configMembersDetails += ", ";
diff --git a/src/main/java/org/apache/gossip/accrual/FailureDetector.java b/src/main/java/org/apache/gossip/accrual/FailureDetector.java
index b6a12b6..10d66a9 100644
--- a/src/main/java/org/apache/gossip/accrual/FailureDetector.java
+++ b/src/main/java/org/apache/gossip/accrual/FailureDetector.java
@@ -48,6 +48,9 @@
public void recordHeartbeat(long now){
if (now < latestHeartbeatMs)
return;
+ if (now - latestHeartbeatMs == 0){
+ return;
+ }
synchronized (descriptiveStatistics) {
if (latestHeartbeatMs != -1){
descriptiveStatistics.addValue(now - latestHeartbeatMs);
@@ -77,7 +80,11 @@
}
return -1.0d * Math.log10(probability);
} catch (MathException | IllegalArgumentException e) {
- e.printStackTrace();
+ StringBuilder sb = new StringBuilder();
+ for ( double d: descriptiveStatistics.getSortedValues()){
+ sb.append(d + " ");
+ }
+ LOGGER.debug(e.getMessage() +" "+ sb.toString() +" "+ descriptiveStatistics);
throw new IllegalArgumentException(e);
}
}
diff --git a/src/main/java/org/apache/gossip/examples/GossipExample.java b/src/main/java/org/apache/gossip/examples/GossipExample.java
index 01cd3e3..8236d46 100644
--- a/src/main/java/org/apache/gossip/examples/GossipExample.java
+++ b/src/main/java/org/apache/gossip/examples/GossipExample.java
@@ -23,6 +23,7 @@
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import org.apache.gossip.GossipMember;
@@ -74,14 +75,14 @@
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
- startupMembers.add(new RemoteGossipMember(cluster, u, "", 0 ));
+ startupMembers.add(new RemoteGossipMember(cluster, u, "", 0, new HashMap<String,String>()));
}
// Lets start the gossip clients.
// Start the clients, waiting cleaning-interval + 1 second between them which will show the
// dead list handling.
for (GossipMember member : startupMembers) {
- GossipService gossipService = new GossipService(cluster, member.getUri(), "",
+ GossipService gossipService = new GossipService(cluster, member.getUri(), "", new HashMap<String,String>(),
startupMembers, settings, null, new MetricRegistry());
clients.add(gossipService);
gossipService.start();
diff --git a/src/main/java/org/apache/gossip/examples/StandAloneNode.java b/src/main/java/org/apache/gossip/examples/StandAloneNode.java
index d24c0fa..3564943 100644
--- a/src/main/java/org/apache/gossip/examples/StandAloneNode.java
+++ b/src/main/java/org/apache/gossip/examples/StandAloneNode.java
@@ -21,6 +21,7 @@
import java.net.URI;
import java.net.UnknownHostException;
import java.util.Arrays;
+import java.util.HashMap;
import org.apache.gossip.GossipService;
import org.apache.gossip.GossipSettings;
@@ -31,8 +32,8 @@
GossipSettings s = new GossipSettings();
s.setWindowSize(10);
s.setConvictThreshold(1.0);
- s.setGossipInterval(1000);
- GossipService gossipService = new GossipService("mycluster", URI.create(args[0]), args[1],
+ s.setGossipInterval(10);
+ GossipService gossipService = new GossipService("mycluster", URI.create(args[0]), args[1], new HashMap<String, String>(),
Arrays.asList( new RemoteGossipMember("mycluster", URI.create(args[2]), args[3])), s, (a,b) -> {}, new MetricRegistry());
gossipService.start();
while (true){
diff --git a/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java b/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java
new file mode 100644
index 0000000..d58aeb9
--- /dev/null
+++ b/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.manager;
+
+import java.util.Map.Entry;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.gossip.LocalGossipMember;
+import org.apache.gossip.model.ActiveGossipOk;
+import org.apache.gossip.model.GossipDataMessage;
+import org.apache.gossip.model.GossipMember;
+import org.apache.gossip.model.Response;
+import org.apache.gossip.model.SharedGossipDataMessage;
+import org.apache.gossip.udp.UdpActiveGossipMessage;
+import org.apache.gossip.udp.UdpGossipDataMessage;
+import org.apache.gossip.udp.UdpSharedGossipDataMessage;
+import org.apache.log4j.Logger;
+
+import static com.codahale.metrics.MetricRegistry.name;
+
+/**
+ * The ActiveGossipThread is sends information. Pick a random partner and send the membership list to that partner
+ */
+public abstract class AbstractActiveGossiper {
+
+ protected static final Logger LOGGER = Logger.getLogger(AbstractActiveGossiper.class);
+
+ protected final GossipManager gossipManager;
+ protected final GossipCore gossipCore;
+ private final Histogram sharedDataHistogram;
+ private final Histogram sendPerNodeDataHistogram;
+ private final Histogram sendMembershipHistorgram;
+
+ public AbstractActiveGossiper(GossipManager gossipManager, GossipCore gossipCore, MetricRegistry registry) {
+ this.gossipManager = gossipManager;
+ this.gossipCore = gossipCore;
+ sharedDataHistogram = registry.histogram(name(AbstractActiveGossiper.class, "sharedDataHistogram-time"));
+ sendPerNodeDataHistogram = registry.histogram(name(AbstractActiveGossiper.class, "sendPerNodeDataHistogram-time"));
+ sendMembershipHistorgram = registry.histogram(name(AbstractActiveGossiper.class, "sendMembershipHistorgram-time"));
+ }
+
+ public void init() {
+
+ }
+
+ public void shutdown() {
+
+ }
+
+ public final void sendSharedData(LocalGossipMember me, LocalGossipMember member){
+ if (member == null){
+ return;
+ }
+ long startTime = System.currentTimeMillis();
+ for (Entry<String, SharedGossipDataMessage> innerEntry : gossipCore.getSharedData().entrySet()){
+ UdpSharedGossipDataMessage message = new UdpSharedGossipDataMessage();
+ message.setUuid(UUID.randomUUID().toString());
+ message.setUriFrom(me.getId());
+ message.setExpireAt(innerEntry.getValue().getExpireAt());
+ message.setKey(innerEntry.getValue().getKey());
+ message.setNodeId(innerEntry.getValue().getNodeId());
+ message.setTimestamp(innerEntry.getValue().getTimestamp());
+ message.setPayload(innerEntry.getValue().getPayload());
+ gossipCore.sendOneWay(message, member.getUri());
+ }
+ sharedDataHistogram.update(System.currentTimeMillis() - startTime);
+ }
+
+ public final void sendPerNodeData(LocalGossipMember me, LocalGossipMember member){
+ if (member == null){
+ return;
+ }
+ long startTime = System.currentTimeMillis();
+ for (Entry<String, ConcurrentHashMap<String, GossipDataMessage>> entry : gossipCore.getPerNodeData().entrySet()){
+ for (Entry<String, GossipDataMessage> innerEntry : entry.getValue().entrySet()){
+ UdpGossipDataMessage message = new UdpGossipDataMessage();
+ message.setUuid(UUID.randomUUID().toString());
+ message.setUriFrom(me.getId());
+ message.setExpireAt(innerEntry.getValue().getExpireAt());
+ message.setKey(innerEntry.getValue().getKey());
+ message.setNodeId(innerEntry.getValue().getNodeId());
+ message.setTimestamp(innerEntry.getValue().getTimestamp());
+ message.setPayload(innerEntry.getValue().getPayload());
+ gossipCore.sendOneWay(message, member.getUri());
+ }
+ }
+ sendPerNodeDataHistogram.update(System.currentTimeMillis() - startTime);
+ }
+
+ /**
+ * Performs the sending of the membership list, after we have incremented our own heartbeat.
+ */
+ protected void sendMembershipList(LocalGossipMember me, LocalGossipMember member) {
+ if (member == null){
+ return;
+ }
+ long startTime = System.currentTimeMillis();
+ me.setHeartbeat(System.nanoTime());
+ UdpActiveGossipMessage message = new UdpActiveGossipMessage();
+ message.setUriFrom(gossipManager.getMyself().getUri().toASCIIString());
+ message.setUuid(UUID.randomUUID().toString());
+ message.getMembers().add(convert(me));
+ for (LocalGossipMember other : gossipManager.getMembers().keySet()) {
+ message.getMembers().add(convert(other));
+ }
+ Response r = gossipCore.send(message, member.getUri());
+ if (r instanceof ActiveGossipOk){
+ //maybe count metrics here
+ } else {
+ LOGGER.debug("Message " + message + " generated response " + r);
+ }
+ sendMembershipHistorgram.update(System.currentTimeMillis() - startTime);
+ }
+
+ protected final GossipMember convert(LocalGossipMember member){
+ GossipMember gm = new GossipMember();
+ gm.setCluster(member.getClusterName());
+ gm.setHeartbeat(member.getHeartbeat());
+ gm.setUri(member.getUri().toASCIIString());
+ gm.setId(member.getId());
+ gm.setProperties(member.getProperties());
+ return gm;
+ }
+}
diff --git a/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java b/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java
deleted file mode 100644
index f81565b..0000000
--- a/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gossip.manager;
-
-import java.util.List;
-
-import java.util.Map.Entry;
-import java.util.Random;
-import java.util.UUID;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import com.codahale.metrics.Histogram;
-import com.codahale.metrics.MetricRegistry;
-import org.apache.gossip.LocalGossipMember;
-import org.apache.gossip.model.ActiveGossipOk;
-import org.apache.gossip.model.GossipDataMessage;
-import org.apache.gossip.model.GossipMember;
-import org.apache.gossip.model.Response;
-import org.apache.gossip.model.SharedGossipDataMessage;
-import org.apache.gossip.udp.UdpActiveGossipMessage;
-import org.apache.gossip.udp.UdpGossipDataMessage;
-import org.apache.gossip.udp.UdpSharedGossipDataMessage;
-import org.apache.log4j.Logger;
-
-import static com.codahale.metrics.MetricRegistry.name;
-
-/**
- * The ActiveGossipThread is sends information. Pick a random partner and send the membership list to that partner
- */
-public class ActiveGossipThread {
-
- private static final Logger LOGGER = Logger.getLogger(ActiveGossipThread.class);
-
- private final GossipManager gossipManager;
- private final Random random;
- private final GossipCore gossipCore;
- private ScheduledExecutorService scheduledExecutorService;
- private final BlockingQueue<Runnable> workQueue;
- private ThreadPoolExecutor threadService;
-
- private final Histogram sharedDataHistogram;
- private final Histogram sendPerNodeDataHistogram;
- private final Histogram sendMembershipHistorgram;
-
- public ActiveGossipThread(GossipManager gossipManager, GossipCore gossipCore, MetricRegistry registry) {
- this.gossipManager = gossipManager;
- random = new Random();
- this.gossipCore = gossipCore;
- scheduledExecutorService = Executors.newScheduledThreadPool(2);
- workQueue = new ArrayBlockingQueue<Runnable>(1024);
- threadService = new ThreadPoolExecutor(1, 30, 1, TimeUnit.SECONDS, workQueue, new ThreadPoolExecutor.DiscardOldestPolicy());
- sharedDataHistogram = registry.histogram(name(ActiveGossipThread.class, "sharedDataHistogram-time"));
- sendPerNodeDataHistogram = registry.histogram(name(ActiveGossipThread.class, "sendPerNodeDataHistogram-time"));
- sendMembershipHistorgram = registry.histogram(name(ActiveGossipThread.class, "sendMembershipHistorgram-time"));
- }
-
-
- public void init() {
- scheduledExecutorService.scheduleAtFixedRate(
- () -> {
- threadService.execute( () -> { sendToALiveMember(); });
- }, 0,
- gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
- scheduledExecutorService.scheduleAtFixedRate(
- () -> { this.sendToDeadMember(); }, 0,
- gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
- scheduledExecutorService.scheduleAtFixedRate(
- () -> sendPerNodeData(gossipManager.getMyself(), gossipManager.getLiveMembers()), 0,
- gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
- scheduledExecutorService.scheduleAtFixedRate(
- () -> sendSharedData(gossipManager.getMyself(), gossipManager.getLiveMembers()), 0,
- gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
- }
-
- public void shutdown() {
- scheduledExecutorService.shutdown();
- try {
- scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- LOGGER.debug("Issue during shurdown" + e);
- }
- }
-
- public void sendSharedData(LocalGossipMember me, List<LocalGossipMember> memberList){
- long startTime = System.currentTimeMillis();
-
- LocalGossipMember member = selectPartner(memberList);
- if (member == null) {
- LOGGER.debug("Send sendMembershipList() is called without action");
- sharedDataHistogram.update(System.currentTimeMillis() - startTime);
- return;
- }
- for (Entry<String, SharedGossipDataMessage> innerEntry : gossipCore.getSharedData().entrySet()){
- UdpSharedGossipDataMessage message = new UdpSharedGossipDataMessage();
- message.setUuid(UUID.randomUUID().toString());
- message.setUriFrom(me.getId());
- message.setExpireAt(innerEntry.getValue().getExpireAt());
- message.setKey(innerEntry.getValue().getKey());
- message.setNodeId(innerEntry.getValue().getNodeId());
- message.setTimestamp(innerEntry.getValue().getTimestamp());
- message.setPayload(innerEntry.getValue().getPayload());
- gossipCore.sendOneWay(message, member.getUri());
- }
- sharedDataHistogram.update(System.currentTimeMillis() - startTime);
- }
-
- public void sendPerNodeData(LocalGossipMember me, List<LocalGossipMember> memberList){
- long startTime = System.currentTimeMillis();
-
- LocalGossipMember member = selectPartner(memberList);
- if (member == null) {
- LOGGER.debug("Send sendMembershipList() is called without action");
- sendPerNodeDataHistogram.update(System.currentTimeMillis() - startTime);
- return;
- }
- for (Entry<String, ConcurrentHashMap<String, GossipDataMessage>> entry : gossipCore.getPerNodeData().entrySet()){
- for (Entry<String, GossipDataMessage> innerEntry : entry.getValue().entrySet()){
- UdpGossipDataMessage message = new UdpGossipDataMessage();
- message.setUuid(UUID.randomUUID().toString());
- message.setUriFrom(me.getId());
- message.setExpireAt(innerEntry.getValue().getExpireAt());
- message.setKey(innerEntry.getValue().getKey());
- message.setNodeId(innerEntry.getValue().getNodeId());
- message.setTimestamp(innerEntry.getValue().getTimestamp());
- message.setPayload(innerEntry.getValue().getPayload());
- gossipCore.sendOneWay(message, member.getUri());
- }
- }
- sendPerNodeDataHistogram.update(System.currentTimeMillis() - startTime);
- }
-
- protected void sendToALiveMember(){
- LocalGossipMember member = selectPartner(gossipManager.getLiveMembers());
- System.out.println("send" );
- sendMembershipList(gossipManager.getMyself(), member);
- }
-
- protected void sendToDeadMember(){
- LocalGossipMember member = selectPartner(gossipManager.getDeadMembers());
- sendMembershipList(gossipManager.getMyself(), member);
- }
-
- /**
- * Performs the sending of the membership list, after we have incremented our own heartbeat.
- */
- protected void sendMembershipList(LocalGossipMember me, LocalGossipMember member) {
- long startTime = System.currentTimeMillis();
- me.setHeartbeat(System.nanoTime());
- if (member == null) {
- LOGGER.debug("Send sendMembershipList() is called without action");
- sendMembershipHistorgram.update(System.currentTimeMillis() - startTime);
- return;
- } else {
- LOGGER.debug("Send sendMembershipList() is called to " + member.toString());
- }
- UdpActiveGossipMessage message = new UdpActiveGossipMessage();
- message.setUriFrom(gossipManager.getMyself().getUri().toASCIIString());
- message.setUuid(UUID.randomUUID().toString());
- message.getMembers().add(convert(me));
- for (LocalGossipMember other : gossipManager.getMembers().keySet()) {
- message.getMembers().add(convert(other));
- }
- Response r = gossipCore.send(message, member.getUri());
- if (r instanceof ActiveGossipOk){
- //maybe count metrics here
- } else {
- LOGGER.debug("Message " + message + " generated response " + r);
- }
- sendMembershipHistorgram.update(System.currentTimeMillis() - startTime);
- }
-
- /**
- *
- * @param memberList
- * The list of members which are stored in the local list of members.
- * @return The chosen LocalGossipMember to gossip with.
- */
- protected LocalGossipMember selectPartner(List<LocalGossipMember> memberList) {
- LocalGossipMember member = null;
- if (memberList.size() > 0) {
- int randomNeighborIndex = random.nextInt(memberList.size());
- member = memberList.get(randomNeighborIndex);
- } else {
- LOGGER.debug("I am alone in this world.");
- }
- return member;
- }
-
- private GossipMember convert(LocalGossipMember member){
- GossipMember gm = new GossipMember();
- gm.setCluster(member.getClusterName());
- gm.setHeartbeat(member.getHeartbeat());
- gm.setUri(member.getUri().toASCIIString());
- gm.setId(member.getId());
- return gm;
- }
-}
diff --git a/src/main/java/org/apache/gossip/manager/DatacenterRackAwareActiveGossiper.java b/src/main/java/org/apache/gossip/manager/DatacenterRackAwareActiveGossiper.java
new file mode 100644
index 0000000..40b9c28
--- /dev/null
+++ b/src/main/java/org/apache/gossip/manager/DatacenterRackAwareActiveGossiper.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.manager;
+
+import java.util.List;
+import java.util.Random;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.gossip.LocalGossipMember;
+
+import com.codahale.metrics.MetricRegistry;
+
+/**
+ * Sends gossip traffic at different rates to other racks and data-centers.
+ * This implementation controls the rate at which gossip traffic is shared.
+ * There are two constructs Datacenter and Rack. It is assumed that bandwidth and latency is higher
+ * in the rack than in the the datacenter. We can adjust the rate at which we send messages to each group.
+ *
+ */
+public class DatacenterRackAwareActiveGossiper extends AbstractActiveGossiper {
+
+ public static final String DATACENTER = "datacenter";
+ public static final String RACK = "rack";
+
+ private int sameRackGossipIntervalMs = 100;
+ private int sameDcGossipIntervalMs = 500;
+ private int differentDatacenterGossipIntervalMs = 1000;
+ private int randomDeadMemberSendIntervalMs = 250;
+
+ private ScheduledExecutorService scheduledExecutorService;
+ private final BlockingQueue<Runnable> workQueue;
+ private ThreadPoolExecutor threadService;
+ private final Random random;
+
+ public DatacenterRackAwareActiveGossiper(GossipManager gossipManager, GossipCore gossipCore,
+ MetricRegistry registry) {
+ super(gossipManager, gossipCore, registry);
+ scheduledExecutorService = Executors.newScheduledThreadPool(2);
+ workQueue = new ArrayBlockingQueue<Runnable>(1024);
+ threadService = new ThreadPoolExecutor(1, 30, 1, TimeUnit.SECONDS, workQueue,
+ new ThreadPoolExecutor.DiscardOldestPolicy());
+ random = new Random();
+ try {
+ sameRackGossipIntervalMs = Integer.parseInt(gossipManager.getSettings()
+ .getActiveGossipProperties().get("sameRackGossipIntervalMs"));
+ } catch (RuntimeException ex) { }
+ try {
+ sameDcGossipIntervalMs = Integer.parseInt(gossipManager.getSettings()
+ .getActiveGossipProperties().get("sameDcGossipIntervalMs"));
+ } catch (RuntimeException ex) { }
+ try {
+ differentDatacenterGossipIntervalMs = Integer.parseInt(gossipManager.getSettings()
+ .getActiveGossipProperties().get("differentDatacenterGossipIntervalMs"));
+ } catch (RuntimeException ex) { }
+ try {
+ randomDeadMemberSendIntervalMs = Integer.parseInt(gossipManager.getSettings()
+ .getActiveGossipProperties().get("randomDeadMemberSendIntervalMs"));
+ } catch (RuntimeException ex) { }
+ }
+
+ @Override
+ public void init() {
+ super.init();
+ //same rack
+ scheduledExecutorService.scheduleAtFixedRate(() ->
+ threadService.execute(() -> sendToSameRackMember()),
+ 0, sameRackGossipIntervalMs, TimeUnit.MILLISECONDS);
+
+ scheduledExecutorService.scheduleAtFixedRate(() ->
+ threadService.execute(() -> sendToSameRackMemberPerNode()),
+ 0, sameRackGossipIntervalMs, TimeUnit.MILLISECONDS);
+
+ scheduledExecutorService.scheduleAtFixedRate(() ->
+ threadService.execute(() -> sendToSameRackShared()),
+ 0, sameRackGossipIntervalMs, TimeUnit.MILLISECONDS);
+
+ //same dc different rack
+ scheduledExecutorService.scheduleAtFixedRate(() ->
+ threadService.execute(() -> sameDcDiffernetRackMember()),
+ 0, sameDcGossipIntervalMs, TimeUnit.MILLISECONDS);
+
+ scheduledExecutorService.scheduleAtFixedRate(() ->
+ threadService.execute(() -> sameDcDiffernetRackPerNode()),
+ 0, sameDcGossipIntervalMs, TimeUnit.MILLISECONDS);
+
+ scheduledExecutorService.scheduleAtFixedRate(() ->
+ threadService.execute(() -> sameDcDiffernetRackShared()),
+ 0, sameDcGossipIntervalMs, TimeUnit.MILLISECONDS);
+
+ //different dc
+ scheduledExecutorService.scheduleAtFixedRate(() ->
+ threadService.execute(() -> differentDcMember()),
+ 0, differentDatacenterGossipIntervalMs, TimeUnit.MILLISECONDS);
+
+ scheduledExecutorService.scheduleAtFixedRate(() ->
+ threadService.execute(() -> differentDcPerNode()),
+ 0, differentDatacenterGossipIntervalMs, TimeUnit.MILLISECONDS);
+
+ scheduledExecutorService.scheduleAtFixedRate(() ->
+ threadService.execute(() -> differentDcShared()),
+ 0, differentDatacenterGossipIntervalMs, TimeUnit.MILLISECONDS);
+
+ //the dead
+ scheduledExecutorService.scheduleAtFixedRate(() ->
+ threadService.execute(() -> sendToDeadMember()),
+ 0, randomDeadMemberSendIntervalMs, TimeUnit.MILLISECONDS);
+
+ }
+
+ private void sendToDeadMember() {
+ sendMembershipList(gossipManager.getMyself(), selectPartner(gossipManager.getDeadMembers()));
+ }
+
+ private List<LocalGossipMember> differentDataCenter(){
+ String myDc = gossipManager.getMyself().getProperties().get(DATACENTER);
+ String rack = gossipManager.getMyself().getProperties().get(RACK);
+ if (myDc == null|| rack == null){
+ return Collections.emptyList();
+ }
+ List<LocalGossipMember> notMyDc = new ArrayList<LocalGossipMember>(10);
+ for (LocalGossipMember i : gossipManager.getLiveMembers()){
+ if (!myDc.equals(i.getProperties().get(DATACENTER))){
+ notMyDc.add(i);
+ }
+ }
+ return notMyDc;
+ }
+
+ private List<LocalGossipMember> sameDatacenterDifferentRack(){
+ String myDc = gossipManager.getMyself().getProperties().get(DATACENTER);
+ String rack = gossipManager.getMyself().getProperties().get(RACK);
+ if (myDc == null|| rack == null){
+ return Collections.emptyList();
+ }
+ List<LocalGossipMember> notMyDc = new ArrayList<LocalGossipMember>(10);
+ for (LocalGossipMember i : gossipManager.getLiveMembers()){
+ if (myDc.equals(i.getProperties().get(DATACENTER)) && !rack.equals(i.getProperties().get(RACK))){
+ notMyDc.add(i);
+ }
+ }
+ return notMyDc;
+ }
+
+ private List<LocalGossipMember> sameRackNodes(){
+ String myDc = gossipManager.getMyself().getProperties().get(DATACENTER);
+ String rack = gossipManager.getMyself().getProperties().get(RACK);
+ if (myDc == null|| rack == null){
+ return Collections.emptyList();
+ }
+ List<LocalGossipMember> sameDcAndRack = new ArrayList<LocalGossipMember>(10);
+ for (LocalGossipMember i : gossipManager.getLiveMembers()){
+ if (myDc.equals(i.getProperties().get(DATACENTER))
+ && rack.equals(i.getProperties().get(RACK))){
+ sameDcAndRack.add(i);
+ }
+ }
+ return sameDcAndRack;
+ }
+
+ private void sendToSameRackMember() {
+ sendMembershipList(gossipManager.getMyself(), selectPartner(sameRackNodes()));
+ }
+
+ private void sendToSameRackMemberPerNode() {
+ sendPerNodeData(gossipManager.getMyself(), selectPartner(sameRackNodes()));
+ }
+
+ private void sendToSameRackShared() {
+ sendSharedData(gossipManager.getMyself(), selectPartner(sameRackNodes()));
+ }
+
+ private void differentDcMember() {
+ sendMembershipList(gossipManager.getMyself(), selectPartner(differentDataCenter()));
+ }
+
+ private void differentDcPerNode() {
+ sendPerNodeData(gossipManager.getMyself(), selectPartner(differentDataCenter()));
+ }
+
+ private void differentDcShared() {
+ sendSharedData(gossipManager.getMyself(), selectPartner(differentDataCenter()));
+ }
+
+ private void sameDcDiffernetRackMember() {
+ sendMembershipList(gossipManager.getMyself(), selectPartner(sameDatacenterDifferentRack()));
+ }
+
+ private void sameDcDiffernetRackPerNode() {
+ sendPerNodeData(gossipManager.getMyself(), selectPartner(sameDatacenterDifferentRack()));
+ }
+
+ private void sameDcDiffernetRackShared() {
+ sendSharedData(gossipManager.getMyself(), selectPartner(sameDatacenterDifferentRack()));
+ }
+
+
+ @Override
+ public void shutdown() {
+ super.shutdown();
+ }
+
+ protected LocalGossipMember selectPartner(List<LocalGossipMember> memberList) {
+ LocalGossipMember member = null;
+ if (memberList.size() > 0) {
+ int randomNeighborIndex = random.nextInt(memberList.size());
+ member = memberList.get(randomNeighborIndex);
+ }
+ return member;
+ }
+
+}
diff --git a/src/main/java/org/apache/gossip/manager/GossipCore.java b/src/main/java/org/apache/gossip/manager/GossipCore.java
index 5d561c3..31bd447 100644
--- a/src/main/java/org/apache/gossip/manager/GossipCore.java
+++ b/src/main/java/org/apache/gossip/manager/GossipCore.java
@@ -165,7 +165,8 @@
activeGossipMessage.getMembers().get(i).getCluster(),
u,
activeGossipMessage.getMembers().get(i).getId(),
- activeGossipMessage.getMembers().get(i).getHeartbeat());
+ activeGossipMessage.getMembers().get(i).getHeartbeat(),
+ activeGossipMessage.getMembers().get(i).getProperties());
if (i == 0) {
senderMember = member;
}
@@ -321,6 +322,7 @@
remoteMember.getUri(),
remoteMember.getId(),
remoteMember.getHeartbeat(),
+ remoteMember.getProperties(),
gossipManager.getSettings().getWindowSize(),
gossipManager.getSettings().getMinimumSamples(),
gossipManager.getSettings().getDistribution());
@@ -331,6 +333,7 @@
if (localMember.getKey().getId().equals(remoteMember.getId())){
localMember.getKey().recordHeartbeat(remoteMember.getHeartbeat());
localMember.getKey().setHeartbeat(remoteMember.getHeartbeat());
+ localMember.getKey().setProperties(remoteMember.getProperties());
}
}
}
diff --git a/src/main/java/org/apache/gossip/manager/GossipManager.java b/src/main/java/org/apache/gossip/manager/GossipManager.java
index cf67c9c..840efb9 100644
--- a/src/main/java/org/apache/gossip/manager/GossipManager.java
+++ b/src/main/java/org/apache/gossip/manager/GossipManager.java
@@ -18,9 +18,13 @@
package org.apache.gossip.manager;
import com.codahale.metrics.MetricRegistry;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
import java.net.URI;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
@@ -59,7 +63,7 @@
private final GossipListener listener;
- private ActiveGossipThread activeGossipThread;
+ private AbstractActiveGossiper activeGossipThread;
private PassiveGossipThread passiveGossipThread;
@@ -76,21 +80,22 @@
private MetricRegistry registry;
public GossipManager(String cluster,
- URI uri, String id, GossipSettings settings,
+ URI uri, String id, Map<String,String> properties, GossipSettings settings,
List<GossipMember> gossipMembers, GossipListener listener, MetricRegistry registry) {
this.settings = settings;
gossipCore = new GossipCore(this, registry);
clock = new SystemClock();
dataReaper = new DataReaper(gossipCore, clock);
- me = new LocalGossipMember(cluster, uri, id, clock.nanoTime(),
+ me = new LocalGossipMember(cluster, uri, id, clock.nanoTime(), properties,
settings.getWindowSize(), settings.getMinimumSamples(), settings.getDistribution());
members = new ConcurrentSkipListMap<>();
for (GossipMember startupMember : gossipMembers) {
if (!startupMember.equals(me)) {
LocalGossipMember member = new LocalGossipMember(startupMember.getClusterName(),
startupMember.getUri(), startupMember.getId(),
- clock.nanoTime(), settings.getWindowSize(), settings.getMinimumSamples(), settings.getDistribution());
+ clock.nanoTime(), startupMember.getProperties(), settings.getWindowSize(),
+ settings.getMinimumSamples(), settings.getDistribution());
//TODO should members start in down state?
members.put(member, GossipState.DOWN);
}
@@ -137,6 +142,14 @@
return me;
}
+ private AbstractActiveGossiper constructActiveGossiper(){
+ try {
+ Constructor<?> c = Class.forName(settings.getActiveGossipClass()).getConstructor(GossipManager.class, GossipCore.class, MetricRegistry.class);
+ return (AbstractActiveGossiper) c.newInstance(this, gossipCore, registry);
+ } catch (NoSuchMethodException | SecurityException | ClassNotFoundException | InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
+ throw new RuntimeException(e);
+ }
+ }
/**
* Starts the client. Specifically, start the various cycles for this protocol. Start the gossip
* thread and start the receiver thread.
@@ -144,7 +157,7 @@
public void init() {
passiveGossipThread = new OnlyProcessReceivedPassiveGossipThread(this, gossipCore);
gossipThreadExecutor.execute(passiveGossipThread);
- activeGossipThread = new ActiveGossipThread(this, this.gossipCore, registry);
+ activeGossipThread = constructActiveGossiper();
activeGossipThread.init();
dataReaper.init();
scheduledServiced.scheduleAtFixedRate(() -> {
diff --git a/src/main/java/org/apache/gossip/manager/SimpleActiveGossipper.java b/src/main/java/org/apache/gossip/manager/SimpleActiveGossipper.java
new file mode 100644
index 0000000..43237fb
--- /dev/null
+++ b/src/main/java/org/apache/gossip/manager/SimpleActiveGossipper.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.manager;
+
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.gossip.LocalGossipMember;
+
+import com.codahale.metrics.MetricRegistry;
+
+/**
+ * Base implementation gossips randomly to live nodes periodically gossips to dead ones
+ *
+ */
+public class SimpleActiveGossipper extends AbstractActiveGossiper {
+
+ private ScheduledExecutorService scheduledExecutorService;
+ private final BlockingQueue<Runnable> workQueue;
+ private ThreadPoolExecutor threadService;
+ private final Random random;
+
+ public SimpleActiveGossipper(GossipManager gossipManager, GossipCore gossipCore,
+ MetricRegistry registry) {
+ super(gossipManager, gossipCore, registry);
+ scheduledExecutorService = Executors.newScheduledThreadPool(2);
+ workQueue = new ArrayBlockingQueue<Runnable>(1024);
+ threadService = new ThreadPoolExecutor(1, 30, 1, TimeUnit.SECONDS, workQueue,
+ new ThreadPoolExecutor.DiscardOldestPolicy());
+ random = new Random();
+ }
+
+ @Override
+ public void init() {
+ super.init();
+ scheduledExecutorService.scheduleAtFixedRate(() -> {
+ threadService.execute(() -> {
+ sendToALiveMember();
+ });
+ }, 0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
+ scheduledExecutorService.scheduleAtFixedRate(() -> {
+ sendToDeadMember();
+ }, 0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
+ scheduledExecutorService.scheduleAtFixedRate(
+ () -> sendPerNodeData(gossipManager.getMyself(),
+ selectPartner(gossipManager.getLiveMembers())),
+ 0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
+ scheduledExecutorService.scheduleAtFixedRate(
+ () -> sendSharedData(gossipManager.getMyself(),
+ selectPartner(gossipManager.getLiveMembers())),
+ 0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public void shutdown() {
+ super.shutdown();
+ scheduledExecutorService.shutdown();
+ try {
+ scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ LOGGER.debug("Issue during shutdown", e);
+ }
+ }
+
+ protected void sendToALiveMember(){
+ LocalGossipMember member = selectPartner(gossipManager.getLiveMembers());
+ sendMembershipList(gossipManager.getMyself(), member);
+ }
+
+ protected void sendToDeadMember(){
+ LocalGossipMember member = selectPartner(gossipManager.getDeadMembers());
+ sendMembershipList(gossipManager.getMyself(), member);
+ }
+
+ /**
+ *
+ * @param memberList
+ * The list of members which are stored in the local list of members.
+ * @return The chosen LocalGossipMember to gossip with.
+ */
+ protected LocalGossipMember selectPartner(List<LocalGossipMember> memberList) {
+ //TODO this selection is racey what if the list size changes?
+ LocalGossipMember member = null;
+ if (memberList.size() > 0) {
+ int randomNeighborIndex = random.nextInt(memberList.size());
+ member = memberList.get(randomNeighborIndex);
+ }
+ return member;
+ }
+}
diff --git a/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java b/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java
index fd936f1..4a150be 100644
--- a/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java
+++ b/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java
@@ -25,8 +25,9 @@
import java.net.URI;
import java.util.List;
-
+import java.util.Map;
import java.util.ArrayList;
+import java.util.HashMap;
public class RandomGossipManager extends GossipManager {
@@ -42,6 +43,7 @@
private List<GossipMember> gossipMembers;
private GossipListener listener;
private MetricRegistry registry;
+ private Map<String,String> properties;
private ManagerBuilder() {}
@@ -55,6 +57,11 @@
this.cluster = cluster;
return this;
}
+
+ public ManagerBuilder properties(Map<String,String> properties) {
+ this.properties = properties;
+ return this;
+ }
public ManagerBuilder withId(String id) {
this.id = id;
@@ -75,6 +82,7 @@
this.listener = listener;
return this;
}
+
public ManagerBuilder registry(MetricRegistry registry) {
this.registry = registry;
return this;
@@ -91,18 +99,21 @@
checkArgument(settings != null, "You must specify gossip settings");
checkArgument(uri != null, "You must specify a uri");
checkArgument(registry != null, "You must specify a MetricRegistry");
+ if (properties == null){
+ properties = new HashMap<String,String>();
+ }
if (listener == null){
listener((a,b) -> {});
}
if (gossipMembers == null) {
gossipMembers = new ArrayList<>();
}
- return new RandomGossipManager(cluster, uri, id, settings, gossipMembers, listener, registry);
+ return new RandomGossipManager(cluster, uri, id, properties, settings, gossipMembers, listener, registry);
}
}
- private RandomGossipManager(String cluster, URI uri, String id, GossipSettings settings,
+ private RandomGossipManager(String cluster, URI uri, String id, Map<String,String> properties, GossipSettings settings,
List<GossipMember> gossipMembers, GossipListener listener, MetricRegistry registry) {
- super(cluster, uri, id, settings, gossipMembers, listener, registry);
+ super(cluster, uri, id, properties, settings, gossipMembers, listener, registry);
}
}
diff --git a/src/main/java/org/apache/gossip/model/GossipMember.java b/src/main/java/org/apache/gossip/model/GossipMember.java
index 03ec13d..a318776 100644
--- a/src/main/java/org/apache/gossip/model/GossipMember.java
+++ b/src/main/java/org/apache/gossip/model/GossipMember.java
@@ -17,12 +17,15 @@
*/
package org.apache.gossip.model;
+import java.util.Map;
+
public class GossipMember {
private String cluster;
private String uri;
private String id;
private Long heartbeat;
+ private Map<String,String> properties;
public GossipMember(){
@@ -66,5 +69,19 @@
public void setHeartbeat(Long heartbeat) {
this.heartbeat = heartbeat;
}
+
+ public Map<String, String> getProperties() {
+ return properties;
+ }
+
+ public void setProperties(Map<String, String> properties) {
+ this.properties = properties;
+ }
+
+ @Override
+ public String toString() {
+ return "GossipMember [cluster=" + cluster + ", uri=" + uri + ", id=" + id + ", heartbeat="
+ + heartbeat + ", properties=" + properties + "]";
+ }
}
diff --git a/src/main/java/org/apache/gossip/udp/UdpActiveGossipMessage.java b/src/main/java/org/apache/gossip/udp/UdpActiveGossipMessage.java
index 424d1ca..b6e8101 100644
--- a/src/main/java/org/apache/gossip/udp/UdpActiveGossipMessage.java
+++ b/src/main/java/org/apache/gossip/udp/UdpActiveGossipMessage.java
@@ -42,7 +42,8 @@
@Override
public String toString() {
- return "UdpActiveGossipMessage [uriFrom=" + uriFrom + ", uuid=" + uuid + "]";
+ return "UdpActiveGossipMessage [uriFrom=" + uriFrom + ", uuid=" + uuid + ", getMembers()="
+ + getMembers() + "]";
}
-
+
}
diff --git a/src/test/java/org/apache/gossip/DataTest.java b/src/test/java/org/apache/gossip/DataTest.java
index 766d72b..83879f9 100644
--- a/src/test/java/org/apache/gossip/DataTest.java
+++ b/src/test/java/org/apache/gossip/DataTest.java
@@ -22,6 +22,7 @@
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Callable;
@@ -50,7 +51,7 @@
for (int i = 1; i < clusterMembers+1; ++i) {
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
GossipService gossipService = new GossipService(cluster, uri, i + "",
- startupMembers, settings,
+ new HashMap<String,String>(), startupMembers, settings,
(a,b) -> {}, new MetricRegistry());
clients.add(gossipService);
gossipService.start();
@@ -65,29 +66,27 @@
}}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(2);
clients.get(0).gossipPerNodeData(msg());
clients.get(0).gossipSharedData(sharedMsg());
- Thread.sleep(10000);
- TUnit.assertThat(
- new Callable<Object>() {
- public Object call() throws Exception {
- GossipDataMessage x = clients.get(1).findPerNodeData(1 + "", "a");
- if (x == null)
- return "";
- else
- return x.getPayload();
- }
- }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("b");
+
+ TUnit.assertThat(new Callable<Object>() {
+ public Object call() throws Exception {
+ GossipDataMessage x = clients.get(1).findPerNodeData(1 + "", "a");
+ if (x == null)
+ return "";
+ else
+ return x.getPayload();
+ }
+ }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("b");
- TUnit.assertThat(
- new Callable<Object>() {
- public Object call() throws Exception {
- SharedGossipDataMessage x = clients.get(1).findSharedData("a");
- if (x == null)
- return "";
- else
- return x.getPayload();
- }
- }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("c");
+ TUnit.assertThat(new Callable<Object>() {
+ public Object call() throws Exception {
+ SharedGossipDataMessage x = clients.get(1).findSharedData("a");
+ if (x == null)
+ return "";
+ else
+ return x.getPayload();
+ }
+ }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("c");
for (int i = 0; i < clusterMembers; ++i) {
diff --git a/src/test/java/org/apache/gossip/GossipMemberTest.java b/src/test/java/org/apache/gossip/GossipMemberTest.java
index e15259e..272c7fb 100644
--- a/src/test/java/org/apache/gossip/GossipMemberTest.java
+++ b/src/test/java/org/apache/gossip/GossipMemberTest.java
@@ -19,6 +19,7 @@
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.HashMap;
import org.junit.Assert;
import org.junit.jupiter.api.Test;
@@ -31,9 +32,9 @@
@Test
public void testHashCodeFromGossip40() throws URISyntaxException {
Assert.assertNotEquals(
- new LocalGossipMember("mycluster", new URI("udp://4.4.4.4:1000"), "myid", 1, 10, 5, "exponential")
+ new LocalGossipMember("mycluster", new URI("udp://4.4.4.4:1000"), "myid", 1, new HashMap<String,String>(), 10, 5, "exponential")
.hashCode(),
- new LocalGossipMember("mycluster", new URI("udp://4.4.4.5:1005"), "yourid", 11, 11, 6, "exponential")
+ new LocalGossipMember("mycluster", new URI("udp://4.4.4.5:1005"), "yourid", 11, new HashMap<String,String>(), 11, 6, "exponential")
.hashCode());
}
}
diff --git a/src/test/java/org/apache/gossip/IdAndPropertyTest.java b/src/test/java/org/apache/gossip/IdAndPropertyTest.java
new file mode 100644
index 0000000..2a98f01
--- /dev/null
+++ b/src/test/java/org/apache/gossip/IdAndPropertyTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.gossip.manager.DatacenterRackAwareActiveGossiper;
+import org.junit.jupiter.api.Test;
+import org.junit.platform.runner.JUnitPlatform;
+import org.junit.runner.RunWith;
+
+import com.codahale.metrics.MetricRegistry;
+
+import io.teknek.tunit.TUnit;
+
+@RunWith(JUnitPlatform.class)
+public class IdAndPropertyTest {
+
+ @Test
+ public void testDatacenterRackGossiper() throws URISyntaxException, UnknownHostException, InterruptedException{
+ GossipSettings settings = new GossipSettings();
+ settings.setActiveGossipClass(DatacenterRackAwareActiveGossiper.class.getName());
+ List<GossipMember> startupMembers = new ArrayList<>();
+ Map<String, String> x = new HashMap<>();
+ x.put("a", "b");
+ x.put("datacenter", "dc1");
+ x.put("rack", "rack1");
+ GossipService gossipService1 = new GossipService("a", new URI("udp://" + "127.0.0.1" + ":" + (29000 + 0)), "0", x, startupMembers, settings,
+ (a, b) -> {}, new MetricRegistry());
+ gossipService1.start();
+
+ Map<String, String> y = new HashMap<>();
+ y.put("a", "c");
+ y.put("datacenter", "dc2");
+ y.put("rack", "rack2");
+ GossipService gossipService2 = new GossipService("a", new URI("udp://" + "127.0.0.1" + ":" + (29000 + 1)), "1", y,
+ Arrays.asList(new RemoteGossipMember("a",
+ new URI("udp://" + "127.0.0.1" + ":" + (29000 + 0)), "0")),
+ settings, (a, b) -> { }, new MetricRegistry());
+ gossipService2.start();
+ TUnit.assertThat(() -> {
+ String value = "";
+ try {
+ value = gossipService1.getGossipManager().getLiveMembers().get(0).getProperties().get("a");
+ } catch (RuntimeException e){ }
+ return value;
+ }).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo("c");
+
+ TUnit.assertThat(() -> {
+ String value = "";
+ try {
+ value = gossipService2.getGossipManager().getLiveMembers().get(0).getProperties().get("a");
+ } catch (RuntimeException e){ }
+ return value;
+ }).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo("b");
+
+ gossipService1.shutdown();
+ gossipService2.shutdown();
+
+ }
+}
diff --git a/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java b/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java
index f5e34ba..9d02556 100644
--- a/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java
+++ b/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java
@@ -25,6 +25,7 @@
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Random;
import java.util.UUID;
@@ -58,7 +59,7 @@
final int clusterMembers = 5;
for (int i = 1; i < clusterMembers + 1; ++i) {
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (30300 + i));
- GossipService gossipService = new GossipService(cluster, uri, i + "", startupMembers,
+ GossipService gossipService = new GossipService(cluster, uri, i + "", new HashMap<String,String>(), startupMembers,
settings, (a,b) -> {}, new MetricRegistry());
clients.add(gossipService);
gossipService.start();
@@ -100,11 +101,11 @@
}
return total;
}
- }).afterWaitingAtMost(30, TimeUnit.SECONDS).isEqualTo(4);
+ }).afterWaitingAtMost(50, TimeUnit.SECONDS).isEqualTo(4);
URI uri = new URI("udp://" + "127.0.0.1" + ":" + shutdownPort);
// start client again
- GossipService gossipService = new GossipService(cluster, uri, shutdownId + "", startupMembers,
+ GossipService gossipService = new GossipService(cluster, uri, shutdownId + "", new HashMap<String,String>(), startupMembers,
settings, (a,b) -> {}, new MetricRegistry());
clients.add(gossipService);
gossipService.start();
diff --git a/src/test/java/org/apache/gossip/StartupSettingsTest.java b/src/test/java/org/apache/gossip/StartupSettingsTest.java
index f0acabf..aa4d255 100644
--- a/src/test/java/org/apache/gossip/StartupSettingsTest.java
+++ b/src/test/java/org/apache/gossip/StartupSettingsTest.java
@@ -28,6 +28,7 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.UUID;
import org.junit.platform.runner.JUnitPlatform;
import org.junit.runner.RunWith;
@@ -48,7 +49,7 @@
writeSettingsFile(settingsFile);
URI uri = new URI("udp://" + "127.0.0.1" + ":" + 50000);
final GossipService firstService = new GossipService(
- CLUSTER, uri, "1",
+ CLUSTER, uri, "1", new HashMap<String, String>(),
new ArrayList<GossipMember>(), new GossipSettings(), null, new MetricRegistry());
firstService.start();
final GossipService serviceUnderTest = new GossipService(
@@ -70,6 +71,7 @@
" \"cleanup_interval\":10000,\n" +
" \"convict_threshold\":2.6,\n" +
" \"distribution\":\"exponential\",\n" +
+ " \"properties\":{},\n" +
" \"members\":[\n" +
" {\"cluster\": \"" + CLUSTER + "\",\"uri\":\"udp://127.0.0.1:5000\"}\n" +
" ]\n" +
@@ -77,7 +79,7 @@
log.info( "Using settings file with contents of:\n---\n" + settings + "\n---" );
FileOutputStream output = new FileOutputStream(target);
- output.write( settings.getBytes() );
+ output.write(settings.getBytes());
output.close();
}
}
diff --git a/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java b/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java
index af7a117..bc4004d 100644
--- a/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java
+++ b/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java
@@ -24,6 +24,7 @@
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Callable;
@@ -58,7 +59,7 @@
final int clusterMembers = 5;
for (int i = 1; i < clusterMembers+1; ++i) {
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (base + i));
- GossipService gossipService = new GossipService(cluster, uri, i + "",
+ GossipService gossipService = new GossipService(cluster, uri, i + "", new HashMap<String,String>(),
startupMembers, settings, (a,b) -> {}, new MetricRegistry());
clients.add(gossipService);
gossipService.start();
diff --git a/src/test/java/org/apache/gossip/accrual/FailureDetectorTest.java b/src/test/java/org/apache/gossip/accrual/FailureDetectorTest.java
index 99cf9c8..69d46b8 100644
--- a/src/test/java/org/apache/gossip/accrual/FailureDetectorTest.java
+++ b/src/test/java/org/apache/gossip/accrual/FailureDetectorTest.java
@@ -34,7 +34,7 @@
int samples = 1;
int windowSize = 1000;
LocalGossipMember member = new LocalGossipMember("", URI.create("udp://127.0.0.1:1000"),
- "", 0L, windowSize, samples, "normal");
+ "", 0L, null, windowSize, samples, "normal");
member.recordHeartbeat(5);
member.recordHeartbeat(10);
Assert.assertEquals(new Double(0.3010299956639812), member.detect(10));
@@ -45,7 +45,7 @@
int samples = 1;
int windowSize = 1000;
LocalGossipMember member = new LocalGossipMember("", URI.create("udp://127.0.0.1:1000"),
- "", 0L, windowSize, samples, "exponential");
+ "", 0L, null, windowSize, samples, "exponential");
member.recordHeartbeat(5);
member.recordHeartbeat(10);
Assert.assertEquals(new Double(0.4342944819032518), member.detect(10));
@@ -65,7 +65,7 @@
int samples = 1;
int windowSize = 1000;
LocalGossipMember member = new LocalGossipMember("", URI.create("udp://127.0.0.1:1000"),
- "", 0L, windowSize, samples, "exponential");
+ "", 0L, null, windowSize, samples, "exponential");
member.recordHeartbeat(5);
member.recordHeartbeat(5);
member.recordHeartbeat(5);
diff --git a/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java b/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java
index 1ef3a5b..cf38492 100644
--- a/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java
+++ b/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java
@@ -21,19 +21,15 @@
import org.apache.gossip.GossipMember;
import org.apache.gossip.GossipSettings;
import org.apache.gossip.LocalGossipMember;
-import org.apache.gossip.event.GossipListener;
-import org.apache.gossip.event.GossipState;
import org.apache.gossip.manager.random.RandomGossipManager;
import org.junit.jupiter.api.Test;
import org.junit.platform.runner.JUnitPlatform;
import org.junit.runner.RunWith;
-import javax.management.Notification;
-import javax.management.NotificationListener;
-
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -43,18 +39,6 @@
@RunWith(JUnitPlatform.class)
public class RandomGossipManagerBuilderTest {
- public static class TestGossipListener implements GossipListener {
- @Override
- public void gossipEvent(GossipMember member, GossipState state) {
- }
- }
-
- public static class TestNotificationListener implements NotificationListener {
- @Override
- public void handleNotification(Notification notification, Object o) {
- }
- }
-
@Test
public void idShouldNotBeNull() {
expectThrows(IllegalArgumentException.class,() -> {
@@ -91,7 +75,7 @@
public void useMemberListIfProvided() throws URISyntaxException {
LocalGossipMember member = new LocalGossipMember(
"aCluster", new URI("udp://localhost:2000"), "aGossipMember",
- System.nanoTime(), 1000, 1, "exponential");
+ System.nanoTime(), new HashMap<String, String>(), 1000, 1, "exponential");
List<GossipMember> memberList = new ArrayList<>();
memberList.add(member);
RandomGossipManager gossipManager = RandomGossipManager.newBuilder()