Make metrics cache client in healthMgr fetch location from statemgr (#2232)
* Make metrics cache client in healthMgr fetch location from statemgr
The metrics cache port is selected at runtime. So a user cannot provide
it as a configuration. The cache client needs to fetch it.
* Fix checkstyle errors
diff --git a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/HealthManager.java b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/HealthManager.java
index b8a9ee7..a3a7e56 100644
--- a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/HealthManager.java
+++ b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/HealthManager.java
@@ -100,6 +100,7 @@
public class HealthManager {
public static final String CONF_TOPOLOGY_NAME = "TOPOLOGY_NAME";
public static final String CONF_METRICS_SOURCE_URL = "METRICS_SOURCE_URL";
+ private static final String CONF_METRICS_SOURCE_TYPE = "METRICS_SOURCE_TYPE";
private static final Logger LOG = Logger.getLogger(HealthManager.class.getName());
private final Config config;
@@ -203,10 +204,7 @@
String metricsUrl = config.getStringValue(PolicyConfigKey.METRIC_SOURCE_URL.key());
metricsUrl = getOptionValue(cmd, CliArgs.METRIC_SOURCE_URL, metricsUrl);
- Class<? extends MetricsProvider> metricsProviderClass =
- Class.forName(metricSourceClassName).asSubclass(MetricsProvider.class);
- AbstractModule module =
- buildMetricsProviderModule(config, metricsUrl, metricsProviderClass);
+ AbstractModule module = buildMetricsProviderModule(metricsUrl, metricSourceClassName);
HealthManager healthManager = new HealthManager(config, module);
LOG.info("Initializing health manager");
@@ -261,7 +259,9 @@
}
public void initialize() throws ReflectiveOperationException, FileNotFoundException {
- this.stateMgrAdaptor = createStateMgrAdaptor();
+ injector = Guice.createInjector(baseModule);
+
+ stateMgrAdaptor = createStateMgrAdaptor();
this.runtime = Config.newBuilder()
.put(Key.SCHEDULER_STATE_MANAGER_ADAPTOR, stateMgrAdaptor)
@@ -272,7 +272,6 @@
this.policyConfigReader = createPolicyConfigReader();
- injector = Guice.createInjector(baseModule);
AbstractModule commonModule = buildCommonConfigModule();
injector = injector.createChildInjector(commonModule);
@@ -308,15 +307,31 @@
}
@VisibleForTesting
- static AbstractModule buildMetricsProviderModule(
- final Config config, final String metricsSourceUrl,
- final Class<? extends MetricsProvider> metricsProviderClass) {
+ static AbstractModule buildMetricsProviderModule(final String sourceUrl, final String type) {
return new AbstractModule() {
@Override
protected void configure() {
bind(String.class)
.annotatedWith(Names.named(CONF_METRICS_SOURCE_URL))
- .toInstance(metricsSourceUrl);
+ .toInstance(sourceUrl);
+ bind(String.class)
+ .annotatedWith(Names.named(CONF_METRICS_SOURCE_TYPE))
+ .toInstance(type);
+ }
+ };
+ }
+
+ private AbstractModule buildCommonConfigModule() throws ReflectiveOperationException {
+ String metricSourceClassName
+ = injector.getInstance(
+ com.google.inject.Key.get(String.class, Names.named(CONF_METRICS_SOURCE_TYPE)));
+
+ Class<? extends MetricsProvider> metricsProviderClass =
+ Class.forName(metricSourceClassName).asSubclass(MetricsProvider.class);
+
+ return new AbstractModule() {
+ @Override
+ protected void configure() {
bind(String.class)
.annotatedWith(Names.named(CONF_TOPOLOGY_NAME))
.toInstance(Context.topologyName(config));
@@ -326,20 +341,12 @@
bind(String.class)
.annotatedWith(Names.named(TrackerMetricsProvider.CONF_ENVIRON))
.toInstance(Context.environ(config));
- bind(MetricsProvider.class).to(metricsProviderClass).in(Singleton.class);
- }
- };
- }
-
- private AbstractModule buildCommonConfigModule() {
- return new AbstractModule() {
- @Override
- protected void configure() {
bind(Config.class).toInstance(config);
bind(EventManager.class).in(Singleton.class);
bind(ISchedulerClient.class).toInstance(schedulerClient);
bind(SchedulerStateManagerAdaptor.class).toInstance(stateMgrAdaptor);
bind(PackingPlanProvider.class).in(Singleton.class);
+ bind(MetricsProvider.class).to(metricsProviderClass).in(Singleton.class);
}
};
}
diff --git a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/MetricsCacheMetricsProvider.java b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/MetricsCacheMetricsProvider.java
index 17348d3..931918b 100644
--- a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/MetricsCacheMetricsProvider.java
+++ b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/MetricsCacheMetricsProvider.java
@@ -27,6 +27,7 @@
import javax.inject.Named;
import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.InvalidProtocolBufferException;
import com.microsoft.dhalion.api.MetricsProvider;
import com.microsoft.dhalion.metrics.ComponentMetrics;
import com.microsoft.dhalion.metrics.InstanceMetrics;
@@ -37,23 +38,28 @@
import com.twitter.heron.proto.tmaster.TopologyMaster.MetricResponse.IndividualMetric;
import com.twitter.heron.proto.tmaster.TopologyMaster.MetricResponse.IndividualMetric.IntervalValue;
import com.twitter.heron.proto.tmaster.TopologyMaster.MetricResponse.TaskMetric;
+import com.twitter.heron.proto.tmaster.TopologyMaster.MetricsCacheLocation;
+import com.twitter.heron.spi.statemgr.SchedulerStateManagerAdaptor;
import com.twitter.heron.spi.utils.NetworkUtils;
-import static com.twitter.heron.healthmgr.HealthManager.CONF_METRICS_SOURCE_URL;
+import static com.twitter.heron.healthmgr.HealthManager.CONF_TOPOLOGY_NAME;
public class MetricsCacheMetricsProvider implements MetricsProvider {
- private static final String PATH_STATS = "/stats";
+ private static final String PATH_STATS = "stats";
private static final Logger LOG = Logger.getLogger(MetricsCacheMetricsProvider.class.getName());
- private HttpURLConnection con;
+ private SchedulerStateManagerAdaptor stateManagerAdaptor;
+ private String topologyName;
private Clock clock = new Clock();
+ private String metricsCacheLocation;
@Inject
- public MetricsCacheMetricsProvider(@Named(CONF_METRICS_SOURCE_URL) String metricsCacheURL) {
- LOG.info("Metrics will be provided by MetricsCache at :" + metricsCacheURL);
+ public MetricsCacheMetricsProvider(SchedulerStateManagerAdaptor stateManagerAdaptor,
+ @Named(CONF_TOPOLOGY_NAME) String topologyName) {
+ this.stateManagerAdaptor = stateManagerAdaptor;
+ this.topologyName = topologyName;
- String url = metricsCacheURL + PATH_STATS;
- con = NetworkUtils.getHttpConnection(url);
+ LOG.info("Metrics will be provided by MetricsCache at :" + getCacheLocation());
}
@Override
@@ -65,6 +71,7 @@
for (String component : components) {
TopologyMaster.MetricResponse response =
getMetricsFromMetricsCache(metric, component, startTime, duration);
+
Map<String, InstanceMetrics> metrics = parse(response, component, metric);
ComponentMetrics componentMetric = new ComponentMetrics(component, metrics);
result.put(component, componentMetric);
@@ -97,18 +104,18 @@
}
// convert heron.protobuf.taskMetrics to dhalion.InstanceMetrics
- for (TaskMetric tm :response.getMetricList()) {
+ for (TaskMetric tm : response.getMetricList()) {
String instanceId = tm.getInstanceId();
InstanceMetrics instanceMetrics = new InstanceMetrics(instanceId);
- for (IndividualMetric im: tm.getMetricList()) {
+ for (IndividualMetric im : tm.getMetricList()) {
String metricName = im.getName();
Map<Instant, Double> values = new HashMap<>();
- for (IntervalValue iv: im.getIntervalValuesList()) {
+ for (IntervalValue iv : im.getIntervalValuesList()) {
MetricInterval mi = iv.getInterval();
String value = iv.getValue();
- values.put(Instant.ofEpochSecond(mi.getStart()), Double.parseDouble(value));
+ values.put(Instant.ofEpochSecond(mi.getStart()), Double.parseDouble(value));
}
if (!values.isEmpty()) {
@@ -129,24 +136,37 @@
.setComponentName(component)
.setExplicitInterval(
MetricInterval.newBuilder()
- .setStart(start.getEpochSecond())
- .setEnd(start.plus(duration).getEpochSecond())
- .build())
+ .setStart(start.getEpochSecond())
+ .setEnd(start.plus(duration).getEpochSecond())
+ .build())
.addMetric(metric)
.build();
LOG.log(Level.FINE, "MetricsCache Query request: {0}", request);
- NetworkUtils.sendHttpPostRequest(con, "X", request.toByteArray());
- byte[] responseData = NetworkUtils.readHttpResponse(con);
-
+ HttpURLConnection connection = NetworkUtils.getHttpConnection(getCacheLocation());
try {
- TopologyMaster.MetricResponse response =
- TopologyMaster.MetricResponse.parseFrom(responseData);
- LOG.log(Level.FINE, "MetricsCache Query response: {0}", response);
- return response;
- } catch (com.google.protobuf.InvalidProtocolBufferException e) {
- LOG.severe("protobuf cannot parse the reply from MetricsCache " + e);
- return null;
+ boolean result = NetworkUtils.sendHttpPostRequest(connection, "X", request.toByteArray());
+ if (!result) {
+ LOG.warning("Failed to get response from metrics cache. Resetting connection...");
+ resetCacheLocation();
+ return null;
+ }
+
+ byte[] responseData = NetworkUtils.readHttpResponse(connection);
+
+ try {
+ TopologyMaster.MetricResponse response =
+ TopologyMaster.MetricResponse.parseFrom(responseData);
+ LOG.log(Level.FINE, "MetricsCache Query response: {0}", response);
+ return response;
+ } catch (InvalidProtocolBufferException e) {
+ LOG.log(Level.SEVERE, "protobuf cannot parse the reply from MetricsCache ", e);
+ return null;
+ }
+ } finally {
+ if (connection != null) {
+ connection.disconnect();
+ }
}
}
@@ -155,6 +175,23 @@
this.clock = clock;
}
+ /* returns last known location of metrics cache
+ */
+ private synchronized String getCacheLocation() {
+ if (metricsCacheLocation != null) {
+ return metricsCacheLocation;
+ }
+
+ MetricsCacheLocation cacheLocation = stateManagerAdaptor.getMetricsCacheLocation(topologyName);
+ metricsCacheLocation = String.format("http://%s:%s/%s", cacheLocation.getHost(),
+ cacheLocation.getStatsPort(), PATH_STATS);
+ return metricsCacheLocation;
+ }
+
+ private synchronized void resetCacheLocation() {
+ metricsCacheLocation = null;
+ }
+
static class Clock {
long currentTime() {
return System.currentTimeMillis();
diff --git a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/HealthManagerTest.java b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/HealthManagerTest.java
index ab98148..99956a6 100644
--- a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/HealthManagerTest.java
+++ b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/HealthManagerTest.java
@@ -64,7 +64,7 @@
when(adaptor.getSchedulerLocation(anyString())).thenReturn(schedulerLocation);
AbstractModule baseModule = HealthManager
- .buildMetricsProviderModule(config, "127.0.0.1", TrackerMetricsProvider.class);
+ .buildMetricsProviderModule("127.0.0.1", TrackerMetricsProvider.class.getName());
HealthManager healthManager = new HealthManager(config, baseModule);
diff --git a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/sensors/MetricsCacheMetricsProviderTest.java b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/sensors/MetricsCacheMetricsProviderTest.java
index f689c78..ea087c3 100644
--- a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/sensors/MetricsCacheMetricsProviderTest.java
+++ b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/sensors/MetricsCacheMetricsProviderTest.java
@@ -29,13 +29,17 @@
import com.twitter.heron.proto.tmaster.TopologyMaster.MetricResponse.IndividualMetric;
import com.twitter.heron.proto.tmaster.TopologyMaster.MetricResponse.IndividualMetric.IntervalValue;
import com.twitter.heron.proto.tmaster.TopologyMaster.MetricResponse.TaskMetric;
+import com.twitter.heron.proto.tmaster.TopologyMaster.MetricsCacheLocation;
+import com.twitter.heron.spi.statemgr.SchedulerStateManagerAdaptor;
import org.junit.Test;
+import org.mockito.Mockito;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
public class MetricsCacheMetricsProviderTest {
@Test
@@ -210,8 +214,19 @@
}
private MetricsCacheMetricsProvider createMetricsProviderSpy() {
+ MetricsCacheLocation location = MetricsCacheLocation.newBuilder()
+ .setTopologyName("testTopo")
+ .setTopologyId("topoId")
+ .setHost("localhost")
+ .setControllerPort(0)
+ .setMasterPort(0)
+ .build();
+
+ SchedulerStateManagerAdaptor stateMgr = Mockito.mock(SchedulerStateManagerAdaptor.class);
+ when(stateMgr.getMetricsCacheLocation("testTopo")).thenReturn(location);
+
MetricsCacheMetricsProvider metricsProvider
- = new MetricsCacheMetricsProvider("127.0.0.1");
+ = new MetricsCacheMetricsProvider(stateMgr, "testTopo");
MetricsCacheMetricsProvider spyMetricsProvider = spy(metricsProvider);
spyMetricsProvider.setClock(new TestClock(70000));
diff --git a/heron/spi/src/java/com/twitter/heron/spi/statemgr/SchedulerStateManagerAdaptor.java b/heron/spi/src/java/com/twitter/heron/spi/statemgr/SchedulerStateManagerAdaptor.java
index b91d59e..3d6a6bb 100644
--- a/heron/spi/src/java/com/twitter/heron/spi/statemgr/SchedulerStateManagerAdaptor.java
+++ b/heron/spi/src/java/com/twitter/heron/spi/statemgr/SchedulerStateManagerAdaptor.java
@@ -253,6 +253,16 @@
}
/**
+ * Get the metricscache location for the given topology
+ *
+ * @return MetricsCacheLocation
+ */
+ public TopologyMaster.MetricsCacheLocation getMetricsCacheLocation(String topologyName) {
+ return awaitResult(delegate.getMetricsCacheLocation(null, topologyName));
+ }
+
+
+ /**
* Get the topology definition for the given topology
*
* @return Topology