GOSSIP-17 Add metrics (Chandresh Pancholi via egc)
diff --git a/pom.xml b/pom.xml
index 71db8ed..a104451 100644
--- a/pom.xml
+++ b/pom.xml
@@ -39,6 +39,7 @@
<!-- dependecy versions -->
<jackson.version>2.8.5</jackson.version>
+ <metrics.version>3.1.2</metrics.version>
<commons-math.version>1.2</commons-math.version>
<junit.jupiter.version>5.0.0-M2</junit.jupiter.version>
<junit.platform.version>1.0.0-M2</junit.platform.version>
@@ -91,6 +92,10 @@
<version>${jackson.version}</version>
</dependency>
<dependency>
+ <groupId>io.dropwizard.metrics</groupId>
+ <artifactId>metrics-core</artifactId>
+ <version>${metrics.version}</version></dependency>
+ <dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>${junit.jupiter.version}</version>
diff --git a/src/main/java/org/apache/gossip/GossipRunner.java b/src/main/java/org/apache/gossip/GossipRunner.java
index 1ae609c..c8a1f13 100644
--- a/src/main/java/org/apache/gossip/GossipRunner.java
+++ b/src/main/java/org/apache/gossip/GossipRunner.java
@@ -22,8 +22,6 @@
import java.io.IOException;
import java.net.URISyntaxException;
-
-
public class GossipRunner {
public static void main(String[] args) throws URISyntaxException {
diff --git a/src/main/java/org/apache/gossip/GossipService.java b/src/main/java/org/apache/gossip/GossipService.java
index 80c01ca..fca9f28 100644
--- a/src/main/java/org/apache/gossip/GossipService.java
+++ b/src/main/java/org/apache/gossip/GossipService.java
@@ -17,10 +17,12 @@
*/
package org.apache.gossip;
+import com.codahale.metrics.MetricRegistry;
import java.net.URI;
import java.net.UnknownHostException;
import java.util.List;
+import com.codahale.metrics.JmxReporter;
import org.apache.gossip.event.GossipListener;
import org.apache.gossip.manager.GossipManager;
import org.apache.gossip.manager.random.RandomGossipManager;
@@ -35,7 +37,8 @@
public class GossipService {
public static final Logger LOGGER = Logger.getLogger(GossipService.class);
-
+ private final JmxReporter jmxReporter;
+
private final GossipManager gossipManager;
/**
@@ -48,7 +51,7 @@
UnknownHostException {
this(startupSettings.getCluster(), startupSettings.getUri()
, startupSettings.getId(), startupSettings.getGossipMembers(),
- startupSettings.getGossipSettings(), null);
+ startupSettings.getGossipSettings(), null, new MetricRegistry());
}
/**
@@ -58,8 +61,10 @@
* @throws UnknownHostException
*/
public GossipService(String cluster, URI uri, String id,
- List<GossipMember> gossipMembers, GossipSettings settings, GossipListener listener)
+ List<GossipMember> gossipMembers, GossipSettings settings, GossipListener listener, MetricRegistry registry)
throws InterruptedException, UnknownHostException {
+ jmxReporter = JmxReporter.forRegistry(registry).build();
+ jmxReporter.start();
gossipManager = RandomGossipManager.newBuilder()
.withId(id)
.cluster(cluster)
@@ -67,11 +72,11 @@
.settings(settings)
.gossipMembers(gossipMembers)
.listener(listener)
+ .registry(registry)
.build();
}
public void start() {
- LOGGER.debug("Starting: " + getGossipManager().getMyself().getUri());
gossipManager.init();
}
diff --git a/src/main/java/org/apache/gossip/examples/GossipExample.java b/src/main/java/org/apache/gossip/examples/GossipExample.java
index cea59f4..01cd3e3 100644
--- a/src/main/java/org/apache/gossip/examples/GossipExample.java
+++ b/src/main/java/org/apache/gossip/examples/GossipExample.java
@@ -17,6 +17,7 @@
*/
package org.apache.gossip.examples;
+import com.codahale.metrics.MetricRegistry;
import java.net.InetAddress;
import java.net.URI;
import java.net.URISyntaxException;
@@ -81,7 +82,7 @@
// dead list handling.
for (GossipMember member : startupMembers) {
GossipService gossipService = new GossipService(cluster, member.getUri(), "",
- startupMembers, settings, null);
+ startupMembers, settings, null, new MetricRegistry());
clients.add(gossipService);
gossipService.start();
sleep(settings.getCleanupInterval() + 1000);
diff --git a/src/main/java/org/apache/gossip/examples/StandAloneNode.java b/src/main/java/org/apache/gossip/examples/StandAloneNode.java
index b6784cc..c12f946 100644
--- a/src/main/java/org/apache/gossip/examples/StandAloneNode.java
+++ b/src/main/java/org/apache/gossip/examples/StandAloneNode.java
@@ -17,6 +17,7 @@
*/
package org.apache.gossip.examples;
+import com.codahale.metrics.MetricRegistry;
import java.net.URI;
import java.net.UnknownHostException;
import java.util.Arrays;
@@ -32,7 +33,7 @@
s.setConvictThreshold(1.0);
s.setGossipInterval(10);
GossipService gossipService = new GossipService("mycluster", URI.create(args[0]), args[1],
- Arrays.asList( new RemoteGossipMember("mycluster", URI.create(args[2]), args[3])), s, (a,b) -> {} );
+ Arrays.asList( new RemoteGossipMember("mycluster", URI.create(args[2]), args[3])), s, (a,b) -> {}, new MetricRegistry());
gossipService.start();
while (true){
System.out.println( "Live: " + gossipService.getGossipManager().getLiveMembers());
diff --git a/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java b/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java
index e6248dc..c09cfe9 100644
--- a/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java
+++ b/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java
@@ -32,6 +32,8 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.MetricRegistry;
import org.apache.gossip.LocalGossipMember;
import org.apache.gossip.model.ActiveGossipOk;
import org.apache.gossip.model.GossipDataMessage;
@@ -45,11 +47,10 @@
import com.fasterxml.jackson.databind.ObjectMapper;
+import static com.codahale.metrics.MetricRegistry.name;
/**
- * [The active thread: periodically send gossip request.] The class handles gossiping the membership
- * list. This information is important to maintaining a common state among all the nodes, and is
- * important for detecting failures.
+ * The ActiveGossipThread is sends information. Pick a random partner and send the membership list to that partner
*/
public class ActiveGossipThread {
@@ -63,15 +64,23 @@
private ThreadPoolExecutor threadService;
private ObjectMapper MAPPER = new ObjectMapper();
- public ActiveGossipThread(GossipManager gossipManager, GossipCore gossipCore) {
+ private final Histogram sharedDataHistogram;
+ private final Histogram sendPerNodeDataHistogram;
+ private final Histogram sendMembershipHistorgram;
+
+ public ActiveGossipThread(GossipManager gossipManager, GossipCore gossipCore, MetricRegistry registry) {
this.gossipManager = gossipManager;
random = new Random();
this.gossipCore = gossipCore;
- this.scheduledExecutorService = Executors.newScheduledThreadPool(2);
+ scheduledExecutorService = Executors.newScheduledThreadPool(2);
workQueue = new ArrayBlockingQueue<Runnable>(1024);
threadService = new ThreadPoolExecutor(1, 30, 1, TimeUnit.SECONDS, workQueue, new ThreadPoolExecutor.DiscardOldestPolicy());
+ sharedDataHistogram = registry.histogram(name(ActiveGossipThread.class, "sharedDataHistogram-time"));
+ sendPerNodeDataHistogram = registry.histogram(name(ActiveGossipThread.class, "sendPerNodeDataHistogram-time"));
+ sendMembershipHistorgram = registry.histogram(name(ActiveGossipThread.class, "sendMembershipHistorgram-time"));
}
-
+
+
public void init() {
scheduledExecutorService.scheduleAtFixedRate(
() -> {
@@ -99,9 +108,12 @@
}
public void sendSharedData(LocalGossipMember me, List<LocalGossipMember> memberList){
+ long startTime = System.currentTimeMillis();
+
LocalGossipMember member = selectPartner(memberList);
if (member == null) {
LOGGER.debug("Send sendMembershipList() is called without action");
+ sharedDataHistogram.update(System.currentTimeMillis() - startTime);
return;
}
try (DatagramSocket socket = new DatagramSocket()) {
@@ -128,12 +140,16 @@
} catch (IOException e1) {
LOGGER.warn(e1);
}
+ sharedDataHistogram.update(System.currentTimeMillis() - startTime);
}
public void sendPerNodeData(LocalGossipMember me, List<LocalGossipMember> memberList){
+ long startTime = System.currentTimeMillis();
+
LocalGossipMember member = selectPartner(memberList);
if (member == null) {
LOGGER.debug("Send sendMembershipList() is called without action");
+ sendPerNodeDataHistogram.update(System.currentTimeMillis() - startTime);
return;
}
try (DatagramSocket socket = new DatagramSocket()) {
@@ -162,6 +178,7 @@
} catch (IOException e1) {
LOGGER.warn(e1);
}
+ sendPerNodeDataHistogram.update(System.currentTimeMillis() - startTime);
}
protected void sendToALiveMember(){
@@ -176,10 +193,13 @@
/**
* Performs the sending of the membership list, after we have incremented our own heartbeat.
*/
- protected void sendMembershipList(LocalGossipMember me, LocalGossipMember member) {
+ protected void sendMembershipList(LocalGossipMember me, LocalGossipMember member) {
+ long startTime = System.currentTimeMillis();
+
me.setHeartbeat(System.nanoTime());
if (member == null) {
LOGGER.debug("Send sendMembershipList() is called without action");
+ sendMembershipHistorgram.update(System.currentTimeMillis() - startTime);
return;
} else {
LOGGER.debug("Send sendMembershipList() is called to " + member.toString());
@@ -209,6 +229,7 @@
} catch (IOException e1) {
LOGGER.warn(e1);
}
+ sendMembershipHistorgram.update(System.currentTimeMillis() - startTime);
}
/**
diff --git a/src/main/java/org/apache/gossip/manager/GossipManager.java b/src/main/java/org/apache/gossip/manager/GossipManager.java
index 2b081d0..a5d57f5 100644
--- a/src/main/java/org/apache/gossip/manager/GossipManager.java
+++ b/src/main/java/org/apache/gossip/manager/GossipManager.java
@@ -17,6 +17,7 @@
*/
package org.apache.gossip.manager;
+import com.codahale.metrics.MetricRegistry;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
@@ -74,9 +75,11 @@
private final ScheduledExecutorService scheduledServiced;
+ private MetricRegistry registry;
+
public GossipManager(String cluster,
URI uri, String id, GossipSettings settings,
- List<GossipMember> gossipMembers, GossipListener listener) {
+ List<GossipMember> gossipMembers, GossipListener listener, MetricRegistry registry) {
this.settings = settings;
gossipCore = new GossipCore(this);
@@ -98,6 +101,7 @@
gossipServiceRunning = new AtomicBoolean(true);
this.listener = listener;
this.scheduledServiced = Executors.newScheduledThreadPool(1);
+ this.registry = registry;
}
public ConcurrentSkipListMap<LocalGossipMember, GossipState> getMembers() {
@@ -148,7 +152,7 @@
public void init() {
passiveGossipThread = new OnlyProcessReceivedPassiveGossipThread(this, gossipCore);
gossipThreadExecutor.execute(passiveGossipThread);
- activeGossipThread = new ActiveGossipThread(this, this.gossipCore);
+ activeGossipThread = new ActiveGossipThread(this, this.gossipCore, registry);
activeGossipThread.init();
dataReaper.init();
scheduledServiced.scheduleAtFixedRate(() -> {
diff --git a/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java b/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java
index 1444181..fd936f1 100644
--- a/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java
+++ b/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java
@@ -17,6 +17,7 @@
*/
package org.apache.gossip.manager.random;
+import com.codahale.metrics.MetricRegistry;
import org.apache.gossip.GossipMember;
import org.apache.gossip.GossipSettings;
import org.apache.gossip.event.GossipListener;
@@ -40,6 +41,7 @@
private GossipSettings settings;
private List<GossipMember> gossipMembers;
private GossipListener listener;
+ private MetricRegistry registry;
private ManagerBuilder() {}
@@ -73,6 +75,10 @@
this.listener = listener;
return this;
}
+ public ManagerBuilder registry(MetricRegistry registry) {
+ this.registry = registry;
+ return this;
+ }
public ManagerBuilder uri(URI uri){
this.uri = uri;
@@ -84,15 +90,19 @@
checkArgument(cluster != null, "You must specify a cluster name");
checkArgument(settings != null, "You must specify gossip settings");
checkArgument(uri != null, "You must specify a uri");
+ checkArgument(registry != null, "You must specify a MetricRegistry");
+ if (listener == null){
+ listener((a,b) -> {});
+ }
if (gossipMembers == null) {
gossipMembers = new ArrayList<>();
}
- return new RandomGossipManager(cluster, uri, id, settings, gossipMembers, listener);
+ return new RandomGossipManager(cluster, uri, id, settings, gossipMembers, listener, registry);
}
}
private RandomGossipManager(String cluster, URI uri, String id, GossipSettings settings,
- List<GossipMember> gossipMembers, GossipListener listener) {
- super(cluster, uri, id, settings, gossipMembers, listener);
+ List<GossipMember> gossipMembers, GossipListener listener, MetricRegistry registry) {
+ super(cluster, uri, id, settings, gossipMembers, listener, registry);
}
}
diff --git a/src/test/java/org/apache/gossip/DataTest.java b/src/test/java/org/apache/gossip/DataTest.java
index d3babf6..766d72b 100644
--- a/src/test/java/org/apache/gossip/DataTest.java
+++ b/src/test/java/org/apache/gossip/DataTest.java
@@ -17,6 +17,7 @@
*/
package org.apache.gossip;
+import com.codahale.metrics.MetricRegistry;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
@@ -26,8 +27,6 @@
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
-import org.apache.gossip.event.GossipListener;
-import org.apache.gossip.event.GossipState;
import org.apache.gossip.model.GossipDataMessage;
import org.apache.gossip.model.SharedGossipDataMessage;
import org.junit.Test;
@@ -52,11 +51,7 @@
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
GossipService gossipService = new GossipService(cluster, uri, i + "",
startupMembers, settings,
- new GossipListener(){
- public void gossipEvent(GossipMember member, GossipState state) {
-
- }
- });
+ (a,b) -> {}, new MetricRegistry());
clients.add(gossipService);
gossipService.start();
}
diff --git a/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java b/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java
index 1cdb9ac..7b66fbc 100644
--- a/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java
+++ b/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java
@@ -17,6 +17,7 @@
*/
package org.apache.gossip;
+import com.codahale.metrics.MetricRegistry;
import io.teknek.tunit.TUnit;
import java.net.URI;
@@ -58,7 +59,7 @@
for (int i = 1; i < clusterMembers + 1; ++i) {
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (30300 + i));
GossipService gossipService = new GossipService(cluster, uri, i + "", startupMembers,
- settings, (a,b) -> {});
+ settings, (a,b) -> {}, new MetricRegistry());
clients.add(gossipService);
gossipService.start();
}
@@ -104,7 +105,7 @@
URI uri = new URI("udp://" + "127.0.0.1" + ":" + shutdownPort);
// start client again
GossipService gossipService = new GossipService(cluster, uri, shutdownId + "", startupMembers,
- settings, (a,b) -> {});
+ settings, (a,b) -> {}, new MetricRegistry());
clients.add(gossipService);
gossipService.start();
diff --git a/src/test/java/org/apache/gossip/StartupSettingsTest.java b/src/test/java/org/apache/gossip/StartupSettingsTest.java
index a798594..61bd4f4 100644
--- a/src/test/java/org/apache/gossip/StartupSettingsTest.java
+++ b/src/test/java/org/apache/gossip/StartupSettingsTest.java
@@ -17,6 +17,7 @@
*/
package org.apache.gossip;
+import com.codahale.metrics.MetricRegistry;
import org.apache.log4j.Logger;
import org.junit.jupiter.api.Test;
@@ -40,7 +41,6 @@
private static final Logger log = Logger.getLogger( StartupSettingsTest.class );
private static final String CLUSTER = UUID.randomUUID().toString();
-
@Test
public void testUsingSettingsFile() throws IOException, InterruptedException, URISyntaxException {
File settingsFile = File.createTempFile("gossipTest",".json");
@@ -49,7 +49,7 @@
URI uri = new URI("udp://" + "127.0.0.1" + ":" + 50000);
final GossipService firstService = new GossipService(
CLUSTER, uri, "1",
- new ArrayList<GossipMember>(), new GossipSettings(), null);
+ new ArrayList<GossipMember>(), new GossipSettings(), null, new MetricRegistry());
firstService.start();
final GossipService serviceUnderTest = new GossipService(
StartupSettings.fromJSONFile(settingsFile));
diff --git a/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java b/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java
index aa9e2e8..af7a117 100644
--- a/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java
+++ b/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java
@@ -17,6 +17,7 @@
*/
package org.apache.gossip;
+import com.codahale.metrics.MetricRegistry;
import io.teknek.tunit.TUnit;
import java.net.URI;
@@ -58,7 +59,7 @@
for (int i = 1; i < clusterMembers+1; ++i) {
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (base + i));
GossipService gossipService = new GossipService(cluster, uri, i + "",
- startupMembers, settings, (a,b) -> {});
+ startupMembers, settings, (a,b) -> {}, new MetricRegistry());
clients.add(gossipService);
gossipService.start();
}
diff --git a/src/test/java/org/apache/gossip/manager/DataReaperTest.java b/src/test/java/org/apache/gossip/manager/DataReaperTest.java
index 1b5c35e..5388bb3 100644
--- a/src/test/java/org/apache/gossip/manager/DataReaperTest.java
+++ b/src/test/java/org/apache/gossip/manager/DataReaperTest.java
@@ -17,6 +17,7 @@
*/
package org.apache.gossip.manager;
+import com.codahale.metrics.MetricRegistry;
import java.net.URI;
import org.apache.gossip.GossipSettings;
@@ -30,6 +31,8 @@
public class DataReaperTest {
+ private final MetricRegistry registry = new MetricRegistry();
+
@Test
public void testReaperOneShot() {
String myId = "4";
@@ -37,7 +40,7 @@
String value = "a";
GossipSettings settings = new GossipSettings();
GossipManager gm = RandomGossipManager.newBuilder().cluster("abc").settings(settings)
- .withId(myId).uri(URI.create("udp://localhost:6000")).build();
+ .withId(myId).uri(URI.create("udp://localhost:6000")).registry(registry).build();
gm.init();
gm.gossipPerNodeData(perNodeDatum(key, value));
gm.gossipSharedData(sharedDatum(key, value));
@@ -68,7 +71,6 @@
return m;
}
-
@Test
public void testHigherTimestampWins() {
String myId = "4";
@@ -76,7 +78,7 @@
String value = "a";
GossipSettings settings = new GossipSettings();
GossipManager gm = RandomGossipManager.newBuilder().cluster("abc").settings(settings)
- .withId(myId).uri(URI.create("udp://localhost:7000")).build();
+ .withId(myId).uri(URI.create("udp://localhost:7000")).registry(registry).build();
gm.init();
GossipDataMessage before = perNodeDatum(key, value);
GossipDataMessage after = perNodeDatum(key, "b");
diff --git a/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java b/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java
index d9635af..0c5aa88 100644
--- a/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java
+++ b/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java
@@ -17,6 +17,7 @@
*/
package org.apache.gossip.manager;
+import com.codahale.metrics.MetricRegistry;
import org.apache.gossip.GossipMember;
import org.apache.gossip.GossipSettings;
import org.apache.gossip.LocalGossipMember;
@@ -82,7 +83,7 @@
.cluster("aCluster")
.uri(new URI("udp://localhost:2000"))
.settings(new GossipSettings())
- .gossipMembers(null).build();
+ .gossipMembers(null).registry(new MetricRegistry()).build();
assertNotNull(gossipManager.getLiveMembers());
}
@@ -98,7 +99,7 @@
.cluster("aCluster")
.settings(new GossipSettings())
.uri(new URI("udp://localhost:8000"))
- .gossipMembers(memberList).build();
+ .gossipMembers(memberList).registry(new MetricRegistry()).build();
assertEquals(1, gossipManager.getDeadMembers().size());
assertEquals(member.getId(), gossipManager.getDeadMembers().get(0).getId());
}