Add a cap to metrics communicator in TMasterSink and MetricsCacheSink (#3355)

* Add a cap to metrics communicator in TMasterSink and MetricsCacheSink

* refactor while loop
diff --git a/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/sink/metricscache/MetricsCacheClient.java b/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/sink/metricscache/MetricsCacheClient.java
index b95ef2e..5e0a3e9 100644
--- a/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/sink/metricscache/MetricsCacheClient.java
+++ b/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/sink/metricscache/MetricsCacheClient.java
@@ -93,12 +93,21 @@
     Runnable task = new Runnable() {
       @Override
       public void run() {
-        while (!publishMetricsCommunicator.isEmpty()) {
-          TopologyMaster.PublishMetrics publishMetrics = publishMetricsCommunicator.poll();
+        TopologyMaster.PublishMetrics publishMetrics;
+        while (true) {
+          synchronized (publishMetricsCommunicator) {
+            publishMetrics = publishMetricsCommunicator.poll();
+          }
+          if (publishMetrics == null) {
+            break;  // No metrics left
+          }
+
           LOG.info(String.format("%d Metrics, %d Exceptions to send to MetricsCache",
               publishMetrics.getMetricsCount(), publishMetrics.getExceptionsCount()));
           LOG.fine("Publish Metrics sending to MetricsCache: " + publishMetrics.toString());
+
           sendMessage(publishMetrics);
+
         }
       }
     };
diff --git a/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/sink/metricscache/MetricsCacheSink.java b/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/sink/metricscache/MetricsCacheSink.java
index 4d2357f..0f049d1 100644
--- a/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/sink/metricscache/MetricsCacheSink.java
+++ b/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/sink/metricscache/MetricsCacheSink.java
@@ -83,6 +83,8 @@
 public class MetricsCacheSink implements IMetricsSink {
   private static final Logger LOG = Logger.getLogger(MetricsCacheSink.class.getName());
 
+  private static final int MAX_COMMUNICATOR_SIZE = 128;
+
   // These configs would be read from metrics-sink-configs.yaml
   private static final String KEY_TMASTER_LOCATION_CHECK_INTERVAL_SEC =
       "metricscache-location-check-interval-sec";
@@ -232,10 +234,27 @@
 
     metricsCommunicator.offer(publishMetrics.build());
 
+
+
     // Update metrics
     sinkContext.exportCountMetric(RECORD_PROCESS_COUNT, 1);
     sinkContext.exportCountMetric(METRICS_COUNT, publishMetrics.getMetricsCount());
     sinkContext.exportCountMetric(EXCEPTIONS_COUNT, publishMetrics.getExceptionsCount());
+
+    checkCommunicator(metricsCommunicator, MAX_COMMUNICATOR_SIZE);
+  }
+
+  // Check if the communicator is full/overflow. Poll and drop extra elements that
+  // are over the queue limit from the head.
+  public static void checkCommunicator(Communicator<TopologyMaster.PublishMetrics> communicator,
+                                        int maxSize) {
+    synchronized (communicator) {
+      int size = communicator.size();
+
+      for (int i = 0; i < size - maxSize; ++i) {
+        communicator.poll();
+      }
+    }
   }
 
   @Override
diff --git a/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/sink/tmaster/TMasterClient.java b/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/sink/tmaster/TMasterClient.java
index 7c7354f..48e0a32 100644
--- a/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/sink/tmaster/TMasterClient.java
+++ b/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/sink/tmaster/TMasterClient.java
@@ -88,11 +88,19 @@
     Runnable task = new Runnable() {
       @Override
       public void run() {
-        while (!publishMetricsCommunicator.isEmpty()) {
-          TopologyMaster.PublishMetrics publishMetrics = publishMetricsCommunicator.poll();
+        TopologyMaster.PublishMetrics publishMetrics;
+        while (true) {
+          synchronized (publishMetricsCommunicator) {
+            publishMetrics = publishMetricsCommunicator.poll();
+          }
+          if (publishMetrics == null) {
+            break;  // No metrics left
+          }
+
           LOG.info(String.format("%d Metrics, %d Exceptions to send to TMaster",
               publishMetrics.getMetricsCount(), publishMetrics.getExceptionsCount()));
           LOG.fine("Publish Metrics sending to TMaster: " + publishMetrics.toString());
+
           sendMessage(publishMetrics);
         }
       }
diff --git a/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/sink/tmaster/TMasterSink.java b/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/sink/tmaster/TMasterSink.java
index f044292..129d546 100644
--- a/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/sink/tmaster/TMasterSink.java
+++ b/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/sink/tmaster/TMasterSink.java
@@ -80,6 +80,8 @@
 public class TMasterSink implements IMetricsSink {
   private static final Logger LOG = Logger.getLogger(TMasterSink.class.getName());
 
+  private static final int MAX_COMMUNICATOR_SIZE = 128;
+
   // These configs would be read from metrics-sink-configs.yaml
   private static final String KEY_TMASTER_LOCATION_CHECK_INTERVAL_SEC =
       "tmaster-location-check-interval-sec";
@@ -234,6 +236,21 @@
     sinkContext.exportCountMetric(RECORD_PROCESS_COUNT, 1);
     sinkContext.exportCountMetric(METRICS_COUNT, publishMetrics.getMetricsCount());
     sinkContext.exportCountMetric(EXCEPTIONS_COUNT, publishMetrics.getExceptionsCount());
+
+    checkCommunicator(metricsCommunicator, MAX_COMMUNICATOR_SIZE);
+  }
+
+  // Check if the communicator is full/overflow. Poll and drop extra elements that
+  // are over the queue limit from the head.
+  public static void checkCommunicator(Communicator<TopologyMaster.PublishMetrics> communicator,
+                                        int maxSize) {
+    synchronized (communicator) {
+      int size = communicator.size();
+
+      for (int i = 0; i < size - maxSize; ++i) {
+        communicator.poll();
+      }
+    }
   }
 
   @Override
diff --git a/heron/metricsmgr/tests/java/org/apache/heron/metricsmgr/sink/metricscache/MetricsCacheSinkTest.java b/heron/metricsmgr/tests/java/org/apache/heron/metricsmgr/sink/metricscache/MetricsCacheSinkTest.java
index d425eb3..b02669e 100644
--- a/heron/metricsmgr/tests/java/org/apache/heron/metricsmgr/sink/metricscache/MetricsCacheSinkTest.java
+++ b/heron/metricsmgr/tests/java/org/apache/heron/metricsmgr/sink/metricscache/MetricsCacheSinkTest.java
@@ -31,6 +31,7 @@
 import org.junit.Test;
 
 import org.apache.heron.api.metric.MultiCountMetric;
+import org.apache.heron.common.basics.Communicator;
 import org.apache.heron.common.basics.SingletonRegistry;
 import org.apache.heron.common.basics.SysUtils;
 import org.apache.heron.common.config.SystemConfig;
@@ -188,4 +189,30 @@
 
     metricsCacheSink.close();
   }
+
+  @Test
+  public void testCheckCommunicator() {
+    Communicator<TopologyMaster.PublishMetrics> communicator = new Communicator<>();
+    int initSize = 16;
+    int capSize = 10;
+
+    TopologyMaster.PublishMetrics.Builder publishMetrics =
+        TopologyMaster.PublishMetrics.newBuilder();
+    for (int i = 0; i < initSize; ++i) {
+      communicator.offer(publishMetrics.build());
+    }
+    assertEquals(communicator.size(), initSize);
+
+    MetricsCacheSink.checkCommunicator(communicator, initSize + 1);
+    assertEquals(communicator.size(), initSize);
+
+    MetricsCacheSink.checkCommunicator(communicator, initSize);
+    assertEquals(communicator.size(), initSize);
+
+    MetricsCacheSink.checkCommunicator(communicator, initSize - 1);
+    assertEquals(communicator.size(), initSize - 1);
+
+    MetricsCacheSink.checkCommunicator(communicator, capSize);
+    assertEquals(communicator.size(), capSize);
+  }
 }
diff --git a/heron/metricsmgr/tests/java/org/apache/heron/metricsmgr/sink/tmaster/TMasterSinkTest.java b/heron/metricsmgr/tests/java/org/apache/heron/metricsmgr/sink/tmaster/TMasterSinkTest.java
index 5100270..3d57adf 100644
--- a/heron/metricsmgr/tests/java/org/apache/heron/metricsmgr/sink/tmaster/TMasterSinkTest.java
+++ b/heron/metricsmgr/tests/java/org/apache/heron/metricsmgr/sink/tmaster/TMasterSinkTest.java
@@ -31,6 +31,7 @@
 import org.junit.Test;
 
 import org.apache.heron.api.metric.MultiCountMetric;
+import org.apache.heron.common.basics.Communicator;
 import org.apache.heron.common.basics.SingletonRegistry;
 import org.apache.heron.common.basics.SysUtils;
 import org.apache.heron.common.config.SystemConfig;
@@ -186,5 +187,31 @@
 
     tMasterSink.close();
   }
+
+  @Test
+  public void testCheckCommunicator() {
+    Communicator<TopologyMaster.PublishMetrics> communicator = new Communicator<>();
+    int initSize = 16;
+    int capSize = 10;
+
+    TopologyMaster.PublishMetrics.Builder publishMetrics =
+        TopologyMaster.PublishMetrics.newBuilder();
+    for (int i = 0; i < initSize; ++i) {
+      communicator.offer(publishMetrics.build());
+    }
+    assertEquals(communicator.size(), initSize);
+
+    TMasterSink.checkCommunicator(communicator, initSize + 1);
+    assertEquals(communicator.size(), initSize);
+
+    TMasterSink.checkCommunicator(communicator, initSize);
+    assertEquals(communicator.size(), initSize);
+
+    TMasterSink.checkCommunicator(communicator, initSize - 1);
+    assertEquals(communicator.size(), initSize - 1);
+
+    TMasterSink.checkCommunicator(communicator, capSize);
+    assertEquals(communicator.size(), capSize);
+  }
 }