Add metrics for communicator size in metrics manager (#3351)
diff --git a/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/MetricsManager.java b/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/MetricsManager.java
index 8d4ab5e..076a504 100644
--- a/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/MetricsManager.java
+++ b/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/MetricsManager.java
@@ -199,7 +199,7 @@
: TypeUtils.getInteger(restartAttempts));
// Update the list of Communicator in Metrics Manager Server
- metricsManagerServer.addSinkCommunicator(sinkExecutor.getCommunicator());
+ metricsManagerServer.addSinkCommunicator(sinkId, sinkExecutor.getCommunicator());
}
}
@@ -537,12 +537,11 @@
// If the thread name is a key of SinkExecutors, then it is a thread running IMetricsSink
if (sinkExecutors.containsKey(thread.getName())) {
sinkId = thread.getName();
- // Remove the old sink executor
- SinkExecutor oldSinkExecutor = sinkExecutors.remove(sinkId);
// Remove the unneeded Communicator bind with Metrics Manager Server
- metricsManagerServer.removeSinkCommunicator(oldSinkExecutor.getCommunicator());
+ metricsManagerServer.removeSinkCommunicator(sinkId);
- // Close the sink
+ // Remove the old sink executor and close the sink
+ SinkExecutor oldSinkExecutor = sinkExecutors.remove(sinkId);
SysUtils.closeIgnoringExceptions(oldSinkExecutor);
thisSinkRetryAttempts = sinksRetryAttempts.remove(sinkId);
@@ -565,7 +564,7 @@
sinksRetryAttempts.put(sinkId, thisSinkRetryAttempts);
// Update the list of Communicator in Metrics Manager Server
- metricsManagerServer.addSinkCommunicator(newSinkExecutor.getCommunicator());
+ metricsManagerServer.addSinkCommunicator(sinkId, newSinkExecutor.getCommunicator());
// Restart it
executors.execute(newSinkExecutor);
diff --git a/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/MetricsManagerServer.java b/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/MetricsManagerServer.java
index a9ff54f..8b9e6ab 100644
--- a/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/MetricsManagerServer.java
+++ b/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/MetricsManagerServer.java
@@ -22,10 +22,11 @@
import java.net.SocketAddress;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.CopyOnWriteArrayList;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -61,8 +62,10 @@
private static final String SERVER_EXCEPTIONS_RECEIVED = "exceptions-received";
private static final String SERVER_NEW_TMASTER_LOCATION = "new-tmaster-location";
private static final String SERVER_TMASTER_LOCATION_RECEIVED = "tmaster-location-received";
+ private static final String SERVER_COMMUNICATOR_OFFER = "communicator-offer";
+ private static final String SERVER_COMMUNICATOR_SIZE = "communicator-size";
- private final List<Communicator<MetricsRecord>> metricsSinkCommunicators;
+ private final Map<String, Communicator<MetricsRecord>> metricsSinkCommunicators;
// A map from MetricPublisher's immutable SocketAddress to the MetricPublisher
// We would fetch SocketAddress by using SocketChannel.socket().getRemoteSocketAddress,
@@ -96,7 +99,7 @@
// Since we might mutate the list while iterating it
// Consider that the iteration vastly outnumbers mutation,
// it would barely hurt any performance
- this.metricsSinkCommunicators = new CopyOnWriteArrayList<Communicator<MetricsRecord>>();
+ this.metricsSinkCommunicators = Collections.synchronizedMap(new HashMap<>());
this.publisherMap = new HashMap<SocketAddress, Metrics.MetricPublisher>();
@@ -119,14 +122,24 @@
registerOnMessage(Metrics.MetricsCacheLocationRefreshMessage.newBuilder());
}
- public void addSinkCommunicator(Communicator<MetricsRecord> communicator) {
- LOG.info("Communicator is added: " + communicator);
- this.metricsSinkCommunicators.add(communicator);
+ public void addSinkCommunicator(String id, Communicator<MetricsRecord> communicator) {
+ LOG.info("Communicator " + id + " is added: " + communicator);
+ synchronized (metricsSinkCommunicators) {
+ this.metricsSinkCommunicators.put(id, communicator);
+ }
}
- public boolean removeSinkCommunicator(Communicator<MetricsRecord> communicator) {
- LOG.info("Communicator is removed: " + communicator);
- return this.metricsSinkCommunicators.remove(communicator);
+ public boolean removeSinkCommunicator(String id) {
+ LOG.info("Remove communicator: " + id);
+ boolean found = false;
+ synchronized (metricsSinkCommunicators) {
+ if (metricsSinkCommunicators.containsKey(id)) {
+ this.metricsSinkCommunicators.remove(id);
+ found = true;
+ LOG.info("Communicator " + id + " is removed");
+ }
+ }
+ return found;
}
@Override
@@ -279,8 +292,15 @@
MetricsRecord record = new MetricsRecord(source, metricsInfos, exceptionInfos);
// Push MetricsRecord to Communicator, which would wake up SlaveLooper bind with IMetricsSink
- for (Communicator<MetricsRecord> c : metricsSinkCommunicators) {
- c.offer(record);
+ synchronized (metricsSinkCommunicators) {
+ Iterator<String> itr = metricsSinkCommunicators.keySet().iterator();
+ while (itr.hasNext()) {
+ String key = itr.next();
+ Communicator<MetricsRecord> c = metricsSinkCommunicators.get(key);
+ c.offer(record);
+ serverMetricsCounters.scope(SERVER_COMMUNICATOR_OFFER).incr();
+ serverMetricsCounters.scope(SERVER_COMMUNICATOR_SIZE + "-" + key).incrBy(c.size());
+ }
}
}
diff --git a/heron/metricsmgr/tests/java/org/apache/heron/metricsmgr/MetricsManagerServerTest.java b/heron/metricsmgr/tests/java/org/apache/heron/metricsmgr/MetricsManagerServerTest.java
index 994f1ea..49b5d90 100644
--- a/heron/metricsmgr/tests/java/org/apache/heron/metricsmgr/MetricsManagerServerTest.java
+++ b/heron/metricsmgr/tests/java/org/apache/heron/metricsmgr/MetricsManagerServerTest.java
@@ -83,9 +83,10 @@
*/
@Test
public void testAddSinkCommunicator() {
+ String name = "test_communicator";
Communicator<MetricsRecord> sinkCommunicator = new Communicator<>();
- metricsManagerServer.addSinkCommunicator(sinkCommunicator);
- Assert.assertTrue(metricsManagerServer.removeSinkCommunicator(sinkCommunicator));
+ metricsManagerServer.addSinkCommunicator(name, sinkCommunicator);
+ Assert.assertTrue(metricsManagerServer.removeSinkCommunicator(name));
}
/**
@@ -93,9 +94,10 @@
*/
@Test
public void testRemoveSinkCommunicator() {
+ String name = "test_communicator";
Communicator<MetricsRecord> sinkCommunicator = new Communicator<>();
- metricsManagerServer.addSinkCommunicator(sinkCommunicator);
- Assert.assertTrue(metricsManagerServer.removeSinkCommunicator(sinkCommunicator));
+ metricsManagerServer.addSinkCommunicator(name, sinkCommunicator);
+ Assert.assertTrue(metricsManagerServer.removeSinkCommunicator(name));
}
/**
@@ -103,10 +105,11 @@
*/
@Test
public void testMetricsManagerServer() throws InterruptedException {
+ String name = "test_communicator";
CountDownLatch offersLatch = new CountDownLatch(MESSAGE_SIZE);
Communicator<MetricsRecord> sinkCommunicator =
CommunicatorTestHelper.spyCommunicator(new Communicator<MetricsRecord>(), offersLatch);
- metricsManagerServer.addSinkCommunicator(sinkCommunicator);
+ metricsManagerServer.addSinkCommunicator(name, sinkCommunicator);
serverTester.start();