Make metrics cache client in healthMgr fetch location from statemgr (#2232)

* Make metrics cache client in healthMgr fetch location from statemgr

The metrics cache port is selected at runtime. So a user cannot provide
it as a configuration. The cache client needs to fetch it.

* Fix checkstyle errors
diff --git a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/HealthManager.java b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/HealthManager.java
index b8a9ee7..a3a7e56 100644
--- a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/HealthManager.java
+++ b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/HealthManager.java
@@ -100,6 +100,7 @@
 public class HealthManager {
   public static final String CONF_TOPOLOGY_NAME = "TOPOLOGY_NAME";
   public static final String CONF_METRICS_SOURCE_URL = "METRICS_SOURCE_URL";
+  private static final String CONF_METRICS_SOURCE_TYPE = "METRICS_SOURCE_TYPE";
 
   private static final Logger LOG = Logger.getLogger(HealthManager.class.getName());
   private final Config config;
@@ -203,10 +204,7 @@
     String metricsUrl = config.getStringValue(PolicyConfigKey.METRIC_SOURCE_URL.key());
     metricsUrl = getOptionValue(cmd, CliArgs.METRIC_SOURCE_URL, metricsUrl);
 
-    Class<? extends MetricsProvider> metricsProviderClass =
-        Class.forName(metricSourceClassName).asSubclass(MetricsProvider.class);
-    AbstractModule module =
-        buildMetricsProviderModule(config, metricsUrl, metricsProviderClass);
+    AbstractModule module = buildMetricsProviderModule(metricsUrl, metricSourceClassName);
     HealthManager healthManager = new HealthManager(config, module);
 
     LOG.info("Initializing health manager");
@@ -261,7 +259,9 @@
   }
 
   public void initialize() throws ReflectiveOperationException, FileNotFoundException {
-    this.stateMgrAdaptor = createStateMgrAdaptor();
+    injector = Guice.createInjector(baseModule);
+
+    stateMgrAdaptor = createStateMgrAdaptor();
 
     this.runtime = Config.newBuilder()
         .put(Key.SCHEDULER_STATE_MANAGER_ADAPTOR, stateMgrAdaptor)
@@ -272,7 +272,6 @@
 
     this.policyConfigReader = createPolicyConfigReader();
 
-    injector = Guice.createInjector(baseModule);
     AbstractModule commonModule = buildCommonConfigModule();
     injector = injector.createChildInjector(commonModule);
 
@@ -308,15 +307,31 @@
   }
 
   @VisibleForTesting
-  static AbstractModule buildMetricsProviderModule(
-      final Config config, final String metricsSourceUrl,
-      final Class<? extends MetricsProvider> metricsProviderClass) {
+  static AbstractModule buildMetricsProviderModule(final String sourceUrl, final String type) {
     return new AbstractModule() {
       @Override
       protected void configure() {
         bind(String.class)
             .annotatedWith(Names.named(CONF_METRICS_SOURCE_URL))
-            .toInstance(metricsSourceUrl);
+            .toInstance(sourceUrl);
+        bind(String.class)
+            .annotatedWith(Names.named(CONF_METRICS_SOURCE_TYPE))
+            .toInstance(type);
+      }
+    };
+  }
+
+  private AbstractModule buildCommonConfigModule() throws ReflectiveOperationException {
+    String metricSourceClassName
+        = injector.getInstance(
+        com.google.inject.Key.get(String.class, Names.named(CONF_METRICS_SOURCE_TYPE)));
+
+    Class<? extends MetricsProvider> metricsProviderClass =
+        Class.forName(metricSourceClassName).asSubclass(MetricsProvider.class);
+
+    return new AbstractModule() {
+      @Override
+      protected void configure() {
         bind(String.class)
             .annotatedWith(Names.named(CONF_TOPOLOGY_NAME))
             .toInstance(Context.topologyName(config));
@@ -326,20 +341,12 @@
         bind(String.class)
             .annotatedWith(Names.named(TrackerMetricsProvider.CONF_ENVIRON))
             .toInstance(Context.environ(config));
-        bind(MetricsProvider.class).to(metricsProviderClass).in(Singleton.class);
-      }
-    };
-  }
-
-  private AbstractModule buildCommonConfigModule() {
-    return new AbstractModule() {
-      @Override
-      protected void configure() {
         bind(Config.class).toInstance(config);
         bind(EventManager.class).in(Singleton.class);
         bind(ISchedulerClient.class).toInstance(schedulerClient);
         bind(SchedulerStateManagerAdaptor.class).toInstance(stateMgrAdaptor);
         bind(PackingPlanProvider.class).in(Singleton.class);
+        bind(MetricsProvider.class).to(metricsProviderClass).in(Singleton.class);
       }
     };
   }
diff --git a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/MetricsCacheMetricsProvider.java b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/MetricsCacheMetricsProvider.java
index 17348d3..931918b 100644
--- a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/MetricsCacheMetricsProvider.java
+++ b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/MetricsCacheMetricsProvider.java
@@ -27,6 +27,7 @@
 import javax.inject.Named;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.InvalidProtocolBufferException;
 import com.microsoft.dhalion.api.MetricsProvider;
 import com.microsoft.dhalion.metrics.ComponentMetrics;
 import com.microsoft.dhalion.metrics.InstanceMetrics;
@@ -37,23 +38,28 @@
 import com.twitter.heron.proto.tmaster.TopologyMaster.MetricResponse.IndividualMetric;
 import com.twitter.heron.proto.tmaster.TopologyMaster.MetricResponse.IndividualMetric.IntervalValue;
 import com.twitter.heron.proto.tmaster.TopologyMaster.MetricResponse.TaskMetric;
+import com.twitter.heron.proto.tmaster.TopologyMaster.MetricsCacheLocation;
+import com.twitter.heron.spi.statemgr.SchedulerStateManagerAdaptor;
 import com.twitter.heron.spi.utils.NetworkUtils;
 
-import static com.twitter.heron.healthmgr.HealthManager.CONF_METRICS_SOURCE_URL;
+import static com.twitter.heron.healthmgr.HealthManager.CONF_TOPOLOGY_NAME;
 
 public class MetricsCacheMetricsProvider implements MetricsProvider {
-  private static final String PATH_STATS = "/stats";
+  private static final String PATH_STATS = "stats";
   private static final Logger LOG = Logger.getLogger(MetricsCacheMetricsProvider.class.getName());
-  private HttpURLConnection con;
+  private SchedulerStateManagerAdaptor stateManagerAdaptor;
+  private String topologyName;
 
   private Clock clock = new Clock();
+  private String metricsCacheLocation;
 
   @Inject
-  public MetricsCacheMetricsProvider(@Named(CONF_METRICS_SOURCE_URL) String metricsCacheURL) {
-    LOG.info("Metrics will be provided by MetricsCache at :" + metricsCacheURL);
+  public MetricsCacheMetricsProvider(SchedulerStateManagerAdaptor stateManagerAdaptor,
+                                     @Named(CONF_TOPOLOGY_NAME) String topologyName) {
+    this.stateManagerAdaptor = stateManagerAdaptor;
+    this.topologyName = topologyName;
 
-    String url = metricsCacheURL + PATH_STATS;
-    con = NetworkUtils.getHttpConnection(url);
+    LOG.info("Metrics will be provided by MetricsCache at :" + getCacheLocation());
   }
 
   @Override
@@ -65,6 +71,7 @@
     for (String component : components) {
       TopologyMaster.MetricResponse response =
           getMetricsFromMetricsCache(metric, component, startTime, duration);
+
       Map<String, InstanceMetrics> metrics = parse(response, component, metric);
       ComponentMetrics componentMetric = new ComponentMetrics(component, metrics);
       result.put(component, componentMetric);
@@ -97,18 +104,18 @@
     }
 
     // convert heron.protobuf.taskMetrics to dhalion.InstanceMetrics
-    for (TaskMetric tm :response.getMetricList()) {
+    for (TaskMetric tm : response.getMetricList()) {
       String instanceId = tm.getInstanceId();
       InstanceMetrics instanceMetrics = new InstanceMetrics(instanceId);
 
-      for (IndividualMetric im: tm.getMetricList()) {
+      for (IndividualMetric im : tm.getMetricList()) {
         String metricName = im.getName();
         Map<Instant, Double> values = new HashMap<>();
 
-        for (IntervalValue iv: im.getIntervalValuesList()) {
+        for (IntervalValue iv : im.getIntervalValuesList()) {
           MetricInterval mi = iv.getInterval();
           String value = iv.getValue();
-          values.put(Instant.ofEpochSecond(mi.getStart()),  Double.parseDouble(value));
+          values.put(Instant.ofEpochSecond(mi.getStart()), Double.parseDouble(value));
         }
 
         if (!values.isEmpty()) {
@@ -129,24 +136,37 @@
         .setComponentName(component)
         .setExplicitInterval(
             MetricInterval.newBuilder()
-            .setStart(start.getEpochSecond())
-            .setEnd(start.plus(duration).getEpochSecond())
-            .build())
+                .setStart(start.getEpochSecond())
+                .setEnd(start.plus(duration).getEpochSecond())
+                .build())
         .addMetric(metric)
         .build();
     LOG.log(Level.FINE, "MetricsCache Query request: {0}", request);
 
-    NetworkUtils.sendHttpPostRequest(con, "X", request.toByteArray());
-    byte[] responseData = NetworkUtils.readHttpResponse(con);
-
+    HttpURLConnection connection = NetworkUtils.getHttpConnection(getCacheLocation());
     try {
-      TopologyMaster.MetricResponse response =
-          TopologyMaster.MetricResponse.parseFrom(responseData);
-      LOG.log(Level.FINE, "MetricsCache Query response: {0}", response);
-      return response;
-    } catch (com.google.protobuf.InvalidProtocolBufferException e) {
-      LOG.severe("protobuf cannot parse the reply from MetricsCache " + e);
-      return null;
+      boolean result = NetworkUtils.sendHttpPostRequest(connection, "X", request.toByteArray());
+      if (!result) {
+        LOG.warning("Failed to get response from metrics cache. Resetting connection...");
+        resetCacheLocation();
+        return null;
+      }
+
+      byte[] responseData = NetworkUtils.readHttpResponse(connection);
+
+      try {
+        TopologyMaster.MetricResponse response =
+            TopologyMaster.MetricResponse.parseFrom(responseData);
+        LOG.log(Level.FINE, "MetricsCache Query response: {0}", response);
+        return response;
+      } catch (InvalidProtocolBufferException e) {
+        LOG.log(Level.SEVERE, "protobuf cannot parse the reply from MetricsCache ", e);
+        return null;
+      }
+    } finally {
+      if (connection != null) {
+        connection.disconnect();
+      }
     }
   }
 
@@ -155,6 +175,23 @@
     this.clock = clock;
   }
 
+  /* returns last known location of metrics cache
+   */
+  private synchronized String getCacheLocation() {
+    if (metricsCacheLocation != null) {
+      return metricsCacheLocation;
+    }
+
+    MetricsCacheLocation cacheLocation = stateManagerAdaptor.getMetricsCacheLocation(topologyName);
+    metricsCacheLocation = String.format("http://%s:%s/%s", cacheLocation.getHost(),
+        cacheLocation.getStatsPort(), PATH_STATS);
+    return metricsCacheLocation;
+  }
+
+  private synchronized void resetCacheLocation() {
+    metricsCacheLocation = null;
+  }
+
   static class Clock {
     long currentTime() {
       return System.currentTimeMillis();
diff --git a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/HealthManagerTest.java b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/HealthManagerTest.java
index ab98148..99956a6 100644
--- a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/HealthManagerTest.java
+++ b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/HealthManagerTest.java
@@ -64,7 +64,7 @@
     when(adaptor.getSchedulerLocation(anyString())).thenReturn(schedulerLocation);
 
     AbstractModule baseModule = HealthManager
-        .buildMetricsProviderModule(config, "127.0.0.1", TrackerMetricsProvider.class);
+        .buildMetricsProviderModule("127.0.0.1", TrackerMetricsProvider.class.getName());
 
     HealthManager healthManager = new HealthManager(config, baseModule);
 
diff --git a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/sensors/MetricsCacheMetricsProviderTest.java b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/sensors/MetricsCacheMetricsProviderTest.java
index f689c78..ea087c3 100644
--- a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/sensors/MetricsCacheMetricsProviderTest.java
+++ b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/sensors/MetricsCacheMetricsProviderTest.java
@@ -29,13 +29,17 @@
 import com.twitter.heron.proto.tmaster.TopologyMaster.MetricResponse.IndividualMetric;
 import com.twitter.heron.proto.tmaster.TopologyMaster.MetricResponse.IndividualMetric.IntervalValue;
 import com.twitter.heron.proto.tmaster.TopologyMaster.MetricResponse.TaskMetric;
+import com.twitter.heron.proto.tmaster.TopologyMaster.MetricsCacheLocation;
+import com.twitter.heron.spi.statemgr.SchedulerStateManagerAdaptor;
 
 import org.junit.Test;
+import org.mockito.Mockito;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
 
 public class MetricsCacheMetricsProviderTest {
   @Test
@@ -210,8 +214,19 @@
   }
 
   private MetricsCacheMetricsProvider createMetricsProviderSpy() {
+    MetricsCacheLocation location = MetricsCacheLocation.newBuilder()
+        .setTopologyName("testTopo")
+        .setTopologyId("topoId")
+        .setHost("localhost")
+        .setControllerPort(0)
+        .setMasterPort(0)
+        .build();
+
+    SchedulerStateManagerAdaptor stateMgr = Mockito.mock(SchedulerStateManagerAdaptor.class);
+    when(stateMgr.getMetricsCacheLocation("testTopo")).thenReturn(location);
+
     MetricsCacheMetricsProvider metricsProvider
-        = new MetricsCacheMetricsProvider("127.0.0.1");
+        = new MetricsCacheMetricsProvider(stateMgr, "testTopo");
 
     MetricsCacheMetricsProvider spyMetricsProvider = spy(metricsProvider);
     spyMetricsProvider.setClock(new TestClock(70000));
diff --git a/heron/spi/src/java/com/twitter/heron/spi/statemgr/SchedulerStateManagerAdaptor.java b/heron/spi/src/java/com/twitter/heron/spi/statemgr/SchedulerStateManagerAdaptor.java
index b91d59e..3d6a6bb 100644
--- a/heron/spi/src/java/com/twitter/heron/spi/statemgr/SchedulerStateManagerAdaptor.java
+++ b/heron/spi/src/java/com/twitter/heron/spi/statemgr/SchedulerStateManagerAdaptor.java
@@ -253,6 +253,16 @@
   }
 
   /**
+   * Get the metricscache location for the given topology
+   *
+   * @return MetricsCacheLocation
+   */
+  public TopologyMaster.MetricsCacheLocation getMetricsCacheLocation(String topologyName) {
+    return awaitResult(delegate.getMetricsCacheLocation(null, topologyName));
+  }
+
+
+  /**
    * Get the topology definition for the given topology
    *
    * @return Topology