AMBARI-20777 : AMS changes to use instanceId for cluster based segregation of data. (avijayan)
diff --git a/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java b/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
index a112ef2..8e0de03 100644
--- a/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
+++ b/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
@@ -51,6 +51,7 @@
private Map<String, Set<String>> useTagsMap = new HashMap<String, Set<String>>();
private TimelineMetricsCache metricsCache;
private String hostName = "UNKNOWN.example.com";
+ private String instanceId = null;
private String serviceName = "";
private Collection<String> collectorHosts;
private String collectorUri;
@@ -94,6 +95,8 @@
}
serviceName = getServiceName(conf);
+ String inst = conf.getString("instanceId", "");
+ instanceId = StringUtils.isEmpty(inst) ? null : inst;
LOG.info("Identified hostname = " + hostName + ", serviceName = " + serviceName);
// Initialize the collector write strategy
@@ -318,6 +321,7 @@
timelineMetric.setMetricName(name);
timelineMetric.setHostName(hostName);
timelineMetric.setAppId(serviceName);
+ timelineMetric.setInstanceId(instanceId);
timelineMetric.setStartTime(startTime);
timelineMetric.setType(metric.type() != null ? metric.type().name() : null);
timelineMetric.getMetricValues().put(startTime, value.doubleValue());
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
index 17c58f0..fa095a0 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
@@ -388,6 +388,11 @@
}
@Override
+ public Map<String, Set<String>> getInstanceHostsMetadata() throws SQLException, IOException {
+ return metricMetadataManager.getHostedInstanceCache();
+ }
+
+ @Override
public List<String> getLiveInstances() {
List<String> instances = null;
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
index 8b0d84b..65bbc4c 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
@@ -115,6 +115,7 @@
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CONTAINER_METRICS_TABLE_NAME;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_CONTAINER_METRICS_TABLE_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_HOSTED_APPS_METADATA_TABLE_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_INSTANCE_HOST_TABLE_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_AGGREGATE_TABLE_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_GROUPED_TABLE_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL;
@@ -124,6 +125,7 @@
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.DEFAULT_ENCODING;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.DEFAULT_TABLE_COMPRESSION;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_HOSTED_APPS_METADATA_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_INSTANCE_HOST_METADATA_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_METRIC_METADATA_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_DAILY_TABLE_NAME;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME;
@@ -138,6 +140,7 @@
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_CLUSTER_AGGREGATE_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_CLUSTER_AGGREGATE_TIME_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_HOSTED_APPS_METADATA_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_INSTANCE_HOST_METADATA_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_METADATA_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_METRICS_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_CONTAINER_METRICS_SQL;
@@ -430,6 +433,11 @@
encoding, compression);
stmt.executeUpdate(hostedAppSql);
+ //Host Instances table
+ String hostedInstancesSql = String.format(CREATE_INSTANCE_HOST_TABLE_SQL,
+ encoding, compression);
+ stmt.executeUpdate(hostedInstancesSql);
+
// Container Metrics
stmt.executeUpdate( String.format(CREATE_CONTAINER_METRICS_TABLE_SQL,
encoding, tableTTL.get(CONTAINER_METRICS_TABLE_NAME), compression));
@@ -778,6 +786,8 @@
metadataManager.putIfModifiedHostedAppsMetadata(
tm.getHostName(), tm.getAppId());
+
+ metadataManager.putIfModifiedHostedInstanceMetadata(tm.getInstanceId(), tm.getHostName());
}
if (!acceptMetric) {
iterator.remove();
@@ -1552,6 +1562,55 @@
}
}
+ public void saveInstanceHostsMetadata(Map<String, Set<String>> instanceHostsMap) throws SQLException {
+ Connection conn = getConnection();
+ PreparedStatement stmt = null;
+ try {
+ stmt = conn.prepareStatement(UPSERT_INSTANCE_HOST_METADATA_SQL);
+ int rowCount = 0;
+
+ for (Map.Entry<String, Set<String>> hostInstancesEntry : instanceHostsMap.entrySet()) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Host Instances Entry: " + hostInstancesEntry);
+ }
+
+ String instanceId = hostInstancesEntry.getKey();
+
+ for(String hostname : hostInstancesEntry.getValue()) {
+ stmt.clearParameters();
+ stmt.setString(1, instanceId);
+ stmt.setString(2, hostname);
+ try {
+ stmt.executeUpdate();
+ rowCount++;
+ } catch (SQLException sql) {
+ LOG.error("Error saving host instances metadata.", sql);
+ }
+ }
+
+ }
+
+ conn.commit();
+ LOG.info("Saved " + rowCount + " host instances metadata records.");
+
+ } finally {
+ if (stmt != null) {
+ try {
+ stmt.close();
+ } catch (SQLException e) {
+ // Ignore
+ }
+ }
+ if (conn != null) {
+ try {
+ conn.close();
+ } catch (SQLException sql) {
+ // Ignore
+ }
+ }
+ }
+ }
+
/**
* Save metdata on updates.
* @param metricMetadata @Collection<@TimelineMetricMetadata>
@@ -1658,6 +1717,53 @@
return hostedAppMap;
}
+ public Map<String, Set<String>> getInstanceHostsMetdata() throws SQLException {
+ Map<String, Set<String>> instanceHostsMap = new HashMap<>();
+ Connection conn = getConnection();
+ PreparedStatement stmt = null;
+ ResultSet rs = null;
+
+ try {
+ stmt = conn.prepareStatement(GET_INSTANCE_HOST_METADATA_SQL);
+ rs = stmt.executeQuery();
+
+ while (rs.next()) {
+ String instanceId = rs.getString("INSTANCE_ID");
+ String hostname = rs.getString("HOSTNAME");
+
+ if (!instanceHostsMap.containsKey(instanceId)) {
+ instanceHostsMap.put(instanceId, new HashSet<String>());
+ }
+ instanceHostsMap.get(instanceId).add(hostname);
+ }
+
+ } finally {
+ if (rs != null) {
+ try {
+ rs.close();
+ } catch (SQLException e) {
+ // Ignore
+ }
+ }
+ if (stmt != null) {
+ try {
+ stmt.close();
+ } catch (SQLException e) {
+ // Ignore
+ }
+ }
+ if (conn != null) {
+ try {
+ conn.close();
+ } catch (SQLException sql) {
+ // Ignore
+ }
+ }
+ }
+
+ return instanceHostsMap;
+ }
+
// No filter criteria support for now.
public Map<TimelineMetricMetadataKey, TimelineMetricMetadata> getTimelineMetricMetadata() throws SQLException {
Map<TimelineMetricMetadataKey, TimelineMetricMetadata> metadataMap = new HashMap<>();
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java
index d049e33..121a8ae 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java
@@ -89,6 +89,14 @@
Map<String, Set<String>> getHostAppsMetadata() throws SQLException, IOException;
/**
+ * Returns all instances and the set of hosts each instance is present on
+ * @return { instanceId : [ hosts ] }
+ * @throws SQLException
+ * @throws IOException
+ */
+ Map<String, Set<String>> getInstanceHostsMetadata() throws SQLException, IOException;
+
+ /**
* Return a list of known live collector nodes
* @return [ hostname ]
*/
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
index 5310906..a5a3499 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
@@ -129,6 +129,7 @@
condition.addOrderByColumn("METRIC_NAME");
condition.addOrderByColumn("HOSTNAME");
condition.addOrderByColumn("APP_ID");
+ condition.addOrderByColumn("INSTANCE_ID");
condition.addOrderByColumn("SERVER_TIME");
return condition;
}
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataManager.java b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataManager.java
index 7eb2457..f904ebe 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataManager.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataManager.java
@@ -54,8 +54,10 @@
private final Map<TimelineMetricMetadataKey, TimelineMetricMetadata> METADATA_CACHE = new ConcurrentHashMap<>();
// Map to lookup apps on a host
private final Map<String, Set<String>> HOSTED_APPS_MAP = new ConcurrentHashMap<>();
+ private final Map<String, Set<String>> INSTANCE_HOST_MAP = new ConcurrentHashMap<>();
// Sync only when needed
AtomicBoolean SYNC_HOSTED_APPS_METADATA = new AtomicBoolean(false);
+ AtomicBoolean SYNC_HOSTED_INSTANCES_METADATA = new AtomicBoolean(false);
// Single thread to sync back new writes to the store
private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
@@ -122,14 +124,25 @@
return HOSTED_APPS_MAP;
}
+ public Map<String, Set<String>> getHostedInstanceCache() {
+ return INSTANCE_HOST_MAP;
+ }
+
public boolean syncHostedAppsMetadata() {
return SYNC_HOSTED_APPS_METADATA.get();
}
+ public boolean syncHostedInstanceMetadata() {
+ return SYNC_HOSTED_INSTANCES_METADATA.get();
+ }
+
public void markSuccessOnSyncHostedAppsMetadata() {
SYNC_HOSTED_APPS_METADATA.set(false);
}
+ public void markSuccessOnSyncHostedInstanceMetadata() {
+ SYNC_HOSTED_INSTANCES_METADATA.set(false);
+ }
/**
* Test metric name for valid patterns and return true/false
*/
@@ -189,6 +202,23 @@
}
}
+ public void putIfModifiedHostedInstanceMetadata(String instanceId, String hostname) {
+ if (StringUtils.isEmpty(instanceId)) {
+ return;
+ }
+
+ Set<String> hosts = INSTANCE_HOST_MAP.get(instanceId);
+ if (hosts == null) {
+ hosts = new HashSet<>();
+ INSTANCE_HOST_MAP.put(instanceId, hosts);
+ }
+
+ if (!hosts.contains(hostname)) {
+ hosts.add(hostname);
+ SYNC_HOSTED_INSTANCES_METADATA.set(true);
+ }
+ }
+
public void persistMetadata(Collection<TimelineMetricMetadata> metadata) throws SQLException {
hBaseAccessor.saveMetricMetadata(metadata);
}
@@ -197,6 +227,10 @@
hBaseAccessor.saveHostAppsMetadata(hostedApps);
}
+ public void persistHostedInstanceMetadata(Map<String, Set<String>> hostedInstancesMetadata) throws SQLException {
+ hBaseAccessor.saveInstanceHostsMetadata(hostedInstancesMetadata);
+ }
+
public TimelineMetricMetadata getTimelineMetricMetadata(TimelineMetric timelineMetric, boolean isWhitelisted) {
return new TimelineMetricMetadata(
timelineMetric.getMetricName(),
@@ -233,6 +267,10 @@
return hBaseAccessor.getHostedAppsMetadata();
}
+ Map<String, Set<String>> getHostedInstancesFromStore() throws SQLException {
+ return hBaseAccessor.getInstanceHostsMetdata();
+ }
+
private boolean supportAggregates(TimelineMetric metric) {
return MapUtils.isEmpty(metric.getMetadata()) ||
!(String.valueOf(true).equals(metric.getMetadata().get("skipAggregation")));
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataSync.java b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataSync.java
index 25b525a..6d519f6 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataSync.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataSync.java
@@ -45,11 +45,15 @@
persistMetricMetadata();
LOG.debug("Persisting hosted apps metadata...");
persistHostAppsMetadata();
+ LOG.debug("Persisting hosted instance metadata...");
+ persistHostInstancesMetadata();
if (cacheManager.isDistributedModeEnabled()) {
LOG.debug("Refreshing metric metadata...");
refreshMetricMetadata();
LOG.debug("Refreshing hosted apps metadata...");
refreshHostAppsMetadata();
+ LOG.debug("Refreshing hosted instances metadata...");
+ refreshHostedInstancesMetadata();
}
}
@@ -147,6 +151,41 @@
}
/**
+ * Sync apps instances data if needed
+ */
+ private void persistHostInstancesMetadata() {
+ if (cacheManager.syncHostedInstanceMetadata()) {
+ Map<String, Set<String>> persistedData = null;
+ try {
+ persistedData = cacheManager.getHostedInstancesFromStore();
+ } catch (SQLException e) {
+ LOG.warn("Failed on fetching hosted instances data from store.", e);
+ return; // Something wrong with store
+ }
+
+ Map<String, Set<String>> cachedData = cacheManager.getHostedInstanceCache();
+ Map<String, Set<String>> dataToSync = new HashMap<>();
+ if (cachedData != null && !cachedData.isEmpty()) {
+ for (Map.Entry<String, Set<String>> cacheEntry : cachedData.entrySet()) {
+ // No persistence / stale data in store
+ if (persistedData == null || persistedData.isEmpty() ||
+ !persistedData.containsKey(cacheEntry.getKey()) ||
+ !persistedData.get(cacheEntry.getKey()).containsAll(cacheEntry.getValue())) {
+ dataToSync.put(cacheEntry.getKey(), cacheEntry.getValue());
+ }
+ }
+ try {
+ cacheManager.persistHostedInstanceMetadata(dataToSync);
+ cacheManager.markSuccessOnSyncHostedInstanceMetadata();
+
+ } catch (SQLException e) {
+ LOG.warn("Error persisting hosted apps metadata.", e);
+ }
+ }
+
+ }
+ }
+ /**
* Read all hosted apps metadata and update cached values - HA
*/
private void refreshHostAppsMetadata() {
@@ -166,4 +205,22 @@
}
}
}
+
+ private void refreshHostedInstancesMetadata() {
+ Map<String, Set<String>> hostedInstancesFromStore = null;
+ try {
+ hostedInstancesFromStore = cacheManager.getHostedInstancesFromStore();
+ } catch (SQLException e) {
+ LOG.warn("Error refreshing metadata from store.", e);
+ }
+ if (hostedInstancesFromStore != null) {
+ Map<String, Set<String>> cachedData = cacheManager.getHostedInstanceCache();
+
+ for (Map.Entry<String, Set<String>> storeEntry : hostedInstancesFromStore.entrySet()) {
+ if (!cachedData.containsKey(storeEntry.getKey())) {
+ cachedData.put(storeEntry.getKey(), storeEntry.getValue());
+ }
+ }
+ }
+ }
}
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java
index 0c8e5a7..d39230d 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java
@@ -148,6 +148,12 @@
"CONSTRAINT pk PRIMARY KEY (HOSTNAME))" +
"DATA_BLOCK_ENCODING='%s', COMPRESSION='%s'";
+ public static final String CREATE_INSTANCE_HOST_TABLE_SQL =
+ "CREATE TABLE IF NOT EXISTS INSTANCE_HOST_METADATA " +
+ "(INSTANCE_ID VARCHAR, HOSTNAME VARCHAR, " +
+ "CONSTRAINT pk PRIMARY KEY (INSTANCE_ID, HOSTNAME))" +
+ "DATA_BLOCK_ENCODING='%s', COMPRESSION='%s'";
+
public static final String ALTER_METRICS_METADATA_TABLE =
"ALTER TABLE METRICS_METADATA ADD IF NOT EXISTS IS_WHITELISTED BOOLEAN";
@@ -230,6 +236,9 @@
public static final String UPSERT_HOSTED_APPS_METADATA_SQL =
"UPSERT INTO HOSTED_APPS_METADATA (HOSTNAME, APP_IDS) VALUES (?, ?)";
+ public static final String UPSERT_INSTANCE_HOST_METADATA_SQL =
+ "UPSERT INTO INSTANCE_HOST_METADATA (INSTANCE_ID, HOSTNAME) VALUES (?, ?)";
+
/**
* Retrieve a set of rows from metrics records table.
*/
@@ -309,6 +318,9 @@
public static final String GET_HOSTED_APPS_METADATA_SQL = "SELECT " +
"HOSTNAME, APP_IDS FROM HOSTED_APPS_METADATA";
+ public static final String GET_INSTANCE_HOST_METADATA_SQL = "SELECT " +
+ "INSTANCE_ID, HOSTNAME FROM INSTANCE_HOST_METADATA";
+
/**
* Aggregate host metrics using a GROUP BY clause to take advantage of
* N - way parallel scan where N = number of regions.
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java
index 304a8e0..6278c59 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java
@@ -412,6 +412,22 @@
}
}
+ @GET
+ @Path("/metrics/instances")
+ @Produces({ MediaType.APPLICATION_JSON })
+ public Map<String, Set<String>> getClusterHostsMetadata(
+ @Context HttpServletRequest req,
+ @Context HttpServletResponse res
+ ) {
+ init(res);
+
+ try {
+ return timelineMetricStore.getInstanceHostsMetadata();
+ } catch (Exception e) {
+ throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR);
+ }
+ }
+
/**
* This is a discovery endpoint that advertises known live collector
* instances. Note: It will always answer with current instance as live.
diff --git a/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java b/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java
index b2e8cac..b40481d 100644
--- a/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java
+++ b/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java
@@ -97,6 +97,11 @@
}
@Override
+ public Map<String, Set<String>> getInstanceHostsMetadata() throws SQLException, IOException {
+ return Collections.emptyMap();
+ }
+
+ @Override
public List<String> getLiveInstances() {
return Collections.emptyList();
}
diff --git a/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TestMetadataManager.java b/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TestMetadataManager.java
index b243e0b..c62fd34 100644
--- a/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TestMetadataManager.java
+++ b/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TestMetadataManager.java
@@ -69,6 +69,7 @@
metric2.setStartTime(now - 1000);
metric2.setAppId("dummy_app2");
metric2.setType("Integer");
+ metric2.setInstanceId("instance2");
metric2.setMetricValues(new TreeMap<Long, Double>() {{
put(now - 100, 1.0);
put(now - 200, 2.0);
@@ -144,5 +145,12 @@
Assert.assertEquals("dummy_app1", savedHostData.get("dummy_host1").iterator().next());
Assert.assertEquals("dummy_app2", savedHostData.get("dummy_host2").iterator().next());
Assert.assertEquals("dummy_app3", cachedHostData.get("dummy_host3").iterator().next());
+
+
+ Map<String, Set<String>> cachedHostInstanceData = metadataManager.getHostedInstanceCache();
+ Map<String, Set<String>> savedHostInstanceData = metadataManager.getHostedInstancesFromStore();
+ Assert.assertEquals(cachedHostInstanceData.size(), savedHostInstanceData.size());
+ Assert.assertEquals("dummy_host2", cachedHostInstanceData.get("instance2").iterator().next());
+
}
}
diff --git a/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TestMetadataSync.java b/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TestMetadataSync.java
index 5eab903..181abca 100644
--- a/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TestMetadataSync.java
+++ b/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TestMetadataSync.java
@@ -56,9 +56,15 @@
put("h2", new HashSet<>(Arrays.asList("a1", "a2")));
}};
+ Map<String, Set<String>> hostedInstances = new HashMap<String, Set<String>>() {{
+ put("i1", new HashSet<>(Arrays.asList("h1")));
+ put("i2", new HashSet<>(Arrays.asList("h1", "h2")));
+ }};
+
expect(configuration.get("timeline.metrics.service.operation.mode", "")).andReturn("distributed");
expect(hBaseAccessor.getTimelineMetricMetadata()).andReturn(metadata);
expect(hBaseAccessor.getHostedAppsMetadata()).andReturn(hostedApps);
+ expect(hBaseAccessor.getInstanceHostsMetdata()).andReturn(hostedInstances);
replay(configuration, hBaseAccessor);
@@ -80,6 +86,12 @@
Assert.assertEquals(2, hostedApps.size());
Assert.assertEquals(1, hostedApps.get("h1").size());
Assert.assertEquals(2, hostedApps.get("h2").size());
+
+ hostedInstances = metadataManager.getHostedInstanceCache();
+ Assert.assertEquals(2, hostedInstances.size());
+ Assert.assertEquals(1, hostedInstances.get("i1").size());
+ Assert.assertEquals(2, hostedInstances.get("i2").size());
+
}
@Test