AMBARI-20820 Stack side changes to use instanceId for cluster based segregation of data (AMS) (dsen)
diff --git a/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java b/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
index 46f32f9..2c6fae2 100644
--- a/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
+++ b/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
@@ -79,6 +79,8 @@
   public static final String SSL_KEYSTORE_TYPE_PROPERTY = "truststore.type";
   public static final String SSL_KEYSTORE_PASSWORD_PROPERTY = "truststore.password";
   public static final String COLLECTOR_LIVE_NODES_PATH = "/ws/v1/timeline/metrics/livenodes";
+  public static final String INSTANCE_ID_PROPERTY = "instanceId";
+  public static final String SET_INSTANCE_ID_PROPERTY = "set.instanceId";
 
   protected static final AtomicInteger failedCollectorConnectionsCounter = new AtomicInteger(0);
   public static int NUMBER_OF_SKIPPED_COLLECTOR_EXCEPTIONS = 100;
diff --git a/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java b/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
index 3fdf3f4..904c916 100644
--- a/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
+++ b/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
@@ -61,6 +61,8 @@
   private final static String COUNTER_METRICS_PROPERTY = "counters";
   private final Set<String> counterMetrics = new HashSet<String>();
   private int timeoutSeconds = 10;
+  private boolean setInstanceId;
+  private String instanceId;
 
   @Override
   public void start() {
@@ -106,6 +108,8 @@
     zookeeperQuorum = configuration.getProperty("zookeeper.quorum");
     protocol = configuration.getProperty(COLLECTOR_PROTOCOL, "http");
     port = configuration.getProperty(COLLECTOR_PORT, "6188");
+    setInstanceId = Boolean.valueOf(configuration.getProperty(SET_INSTANCE_ID_PROPERTY, "false"));
+    instanceId = configuration.getProperty(INSTANCE_ID_PROPERTY, "");
     // Initialize the collector write strategy
     super.init();
 
@@ -227,7 +231,11 @@
       TimelineMetric timelineMetric = new TimelineMetric();
       timelineMetric.setMetricName(attributeName);
       timelineMetric.setHostName(hostname);
-      timelineMetric.setInstanceId(component);
+      if (setInstanceId) {
+        timelineMetric.setInstanceId(instanceId + component);
+      } else {
+        timelineMetric.setInstanceId(component);
+      }
       timelineMetric.setAppId("FLUME_HANDLER");
       timelineMetric.setStartTime(currentTimeMillis);
       timelineMetric.getMetricValues().put(currentTimeMillis, Double.parseDouble(attributeValue));
diff --git a/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java b/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
index 8e0de03..11e16c2 100644
--- a/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
+++ b/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
@@ -52,6 +52,7 @@
   private TimelineMetricsCache metricsCache;
   private String hostName = "UNKNOWN.example.com";
   private String instanceId = null;
+  private boolean setInstanceId;
   private String serviceName = "";
   private Collection<String> collectorHosts;
   private String collectorUri;
@@ -95,8 +96,8 @@
     }
 
     serviceName = getServiceName(conf);
-    String inst = conf.getString("instanceId", "");
-    instanceId = StringUtils.isEmpty(inst) ? null : inst;
+    instanceId = conf.getString(INSTANCE_ID_PROPERTY);
+    setInstanceId = conf.getBoolean(SET_INSTANCE_ID_PROPERTY, false);
 
     LOG.info("Identified hostname = " + hostName + ", serviceName = " + serviceName);
     // Initialize the collector write strategy
@@ -321,7 +322,9 @@
         timelineMetric.setMetricName(name);
         timelineMetric.setHostName(hostName);
         timelineMetric.setAppId(serviceName);
-        timelineMetric.setInstanceId(instanceId);
+        if (setInstanceId) {
+          timelineMetric.setInstanceId(instanceId);
+        }
         timelineMetric.setStartTime(startTime);
         timelineMetric.setType(metric.type() != null ? metric.type().name() : null);
         timelineMetric.getMetricValues().put(startTime, value.doubleValue());
diff --git a/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java b/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java
index 5777639..4a009dc 100644
--- a/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java
+++ b/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java
@@ -50,12 +50,15 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
+import java.util.TreeMap;
 
 import static org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink.COLLECTOR_PORT;
 import static org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink.COLLECTOR_HOSTS_PROPERTY;
 import static org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink.COLLECTOR_PROTOCOL;
+import static org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink.INSTANCE_ID_PROPERTY;
 import static org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink.MAX_METRIC_ROW_CACHE_SIZE;
 import static org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink.METRICS_SEND_INTERVAL;
+import static org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink.SET_INSTANCE_ID_PROPERTY;
 import static org.easymock.EasyMock.anyInt;
 import static org.easymock.EasyMock.anyObject;
 import static org.easymock.EasyMock.anyString;
@@ -81,13 +84,17 @@
   }
 
   @Test
-  @PrepareForTest({URL.class, OutputStream.class, AbstractTimelineMetricsSink.class, HttpURLConnection.class})
+  @PrepareForTest({URL.class, OutputStream.class, AbstractTimelineMetricsSink.class, HttpURLConnection.class, TimelineMetric.class, HadoopTimelineMetricsSink.class})
   public void testPutMetrics() throws Exception {
     HadoopTimelineMetricsSink sink = new HadoopTimelineMetricsSink();
 
     HttpURLConnection connection = PowerMock.createNiceMock(HttpURLConnection.class);
     URL url = PowerMock.createNiceMock(URL.class);
     InputStream is = IOUtils.toInputStream(gson.toJson(Collections.singletonList("localhost")));
+    TimelineMetric timelineMetric = PowerMock.createNiceMock(TimelineMetric.class);
+    expectNew(TimelineMetric.class).andReturn(timelineMetric).times(2);
+    expect(timelineMetric.getMetricValues()).andReturn(new TreeMap<Long, Double>()).anyTimes();
+    expect(timelineMetric.getMetricName()).andReturn("metricName").anyTimes();
     expectNew(URL.class, anyString()).andReturn(url).anyTimes();
     expect(url.openConnection()).andReturn(connection).anyTimes();
     expect(connection.getInputStream()).andReturn(is).anyTimes();
@@ -106,6 +113,8 @@
 
     expect(conf.getInt(eq(MAX_METRIC_ROW_CACHE_SIZE), anyInt())).andReturn(10).anyTimes();
     expect(conf.getInt(eq(METRICS_SEND_INTERVAL), anyInt())).andReturn(1000).anyTimes();
+    expect(conf.getBoolean(eq(SET_INSTANCE_ID_PROPERTY), eq(false))).andReturn(true).anyTimes();
+    expect(conf.getString(eq(INSTANCE_ID_PROPERTY))).andReturn("instanceId").anyTimes();
 
     conf.setListDelimiter(eq(','));
     expectLastCall().anyTimes();
@@ -145,6 +154,9 @@
 
     expect(record.metrics()).andReturn(Arrays.asList(metric)).anyTimes();
 
+    timelineMetric.setInstanceId(eq("instanceId"));
+    EasyMock.expectLastCall();
+
     replay(conf, record, metric);
     replayAll();
 
diff --git a/ambari-metrics-host-monitoring/src/main/python/core/application_metric_map.py b/ambari-metrics-host-monitoring/src/main/python/core/application_metric_map.py
index 0052808..34a6787 100644
--- a/ambari-metrics-host-monitoring/src/main/python/core/application_metric_map.py
+++ b/ambari-metrics-host-monitoring/src/main/python/core/application_metric_map.py
@@ -66,7 +66,7 @@
     del self.app_metric_map[ app_id ]
   pass
 
-  def flatten(self, application_id = None, clear_once_flattened = False):
+  def flatten(self, application_id = None, clear_once_flattened = False, set_instanceid = False, instanceid = None):
     """
     Return flatten dict to caller in json format.
     Json format:
@@ -89,11 +89,14 @@
       for appId, metrics in local_metric_map.iteritems():
         for metricId, metricData in dict(metrics).iteritems():
           # Create a timeline metric object
+          result_instanceid = ""
+          if set_instanceid:
+            result_instanceid = instanceid
           timeline_metric = {
             "hostname" : self.hostname,
             "metricname" : metricId,
             "appid" : "HOST",
-            "instanceid" : "",
+            "instanceid" : result_instanceid,
             "starttime" : self.get_start_time(appId, metricId),
             "metrics" : metricData
           }
diff --git a/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py b/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py
index 5686c50..2670e76 100644
--- a/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py
+++ b/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py
@@ -239,9 +239,15 @@
   def is_server_https_enabled(self):
     return "true" == str(self.get("collector", "https_enabled")).lower()
 
+  def is_set_instanceid(self):
+    return "true" == str(self.get("default", "set.instanceId", 'false')).lower()
+
   def get_server_host(self):
     return self.get("collector", "host")
 
+  def get_instanceid(self):
+    return self.get("default", "instanceid")
+
   def get_server_port(self):
     try:
       return int(self.get("collector", "port"))
diff --git a/ambari-metrics-host-monitoring/src/main/python/core/emitter.py b/ambari-metrics-host-monitoring/src/main/python/core/emitter.py
index ba3f18e..e2a7f0d 100644
--- a/ambari-metrics-host-monitoring/src/main/python/core/emitter.py
+++ b/ambari-metrics-host-monitoring/src/main/python/core/emitter.py
@@ -46,6 +46,8 @@
     self.collector_port = config.get_server_port()
     self.all_metrics_collector_hosts = config.get_metrics_collector_hosts()
     self.is_server_https_enabled = config.is_server_https_enabled()
+    self.set_instanceid = config.is_set_instanceid()
+    self.instanceid = config.get_instanceid()
 
     if self.is_server_https_enabled:
       self.ca_certs = config.get_ca_certs()
@@ -74,7 +76,7 @@
     # This call will acquire lock on the map and clear contents before returning
     # After configured number of retries the data will not be sent to the
     # collector
-    json_data = self.application_metric_map.flatten(None, True)
+    json_data = self.application_metric_map.flatten(None, True, set_instanceid=self.set_instanceid, instanceid=self.instanceid)
     if json_data is None:
       logger.info("Nothing to emit, resume waiting.")
       return
diff --git a/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java b/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
index 9d492cb..211e9cd 100644
--- a/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
+++ b/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
@@ -70,6 +70,8 @@
   private static final String TIMELINE_METRICS_SSL_KEYSTORE_PATH_PROPERTY = TIMELINE_METRICS_KAFKA_PREFIX + SSL_KEYSTORE_PATH_PROPERTY;
   private static final String TIMELINE_METRICS_SSL_KEYSTORE_TYPE_PROPERTY = TIMELINE_METRICS_KAFKA_PREFIX + SSL_KEYSTORE_TYPE_PROPERTY;
   private static final String TIMELINE_METRICS_SSL_KEYSTORE_PASSWORD_PROPERTY = TIMELINE_METRICS_KAFKA_PREFIX + SSL_KEYSTORE_PASSWORD_PROPERTY;
+  private static final String TIMELINE_METRICS_KAFKA_INSTANCE_ID_PROPERTY = TIMELINE_METRICS_KAFKA_PREFIX + INSTANCE_ID_PROPERTY;
+  private static final String TIMELINE_METRICS_KAFKA_SET_INSTANCE_ID_PROPERTY = TIMELINE_METRICS_KAFKA_PREFIX + SET_INSTANCE_ID_PROPERTY;
   private static final String TIMELINE_DEFAULT_HOST = "localhost";
   private static final String TIMELINE_DEFAULT_PORT = "6188";
   private static final String TIMELINE_DEFAULT_PROTOCOL = "http";
@@ -87,6 +89,8 @@
   private TimelineMetricsCache metricsCache;
   private int timeoutSeconds = 10;
   private String zookeeperQuorum = null;
+  private boolean setInstanceId;
+  private String instanceId;
 
   private String[] excludedMetricsPrefixes;
   private String[] includedMetricsPrefixes;
@@ -162,6 +166,9 @@
         collectorHosts = parseHostsStringIntoCollection(props.getString(TIMELINE_HOSTS_PROPERTY, TIMELINE_DEFAULT_HOST));
         metricCollectorProtocol = props.getString(TIMELINE_PROTOCOL_PROPERTY, TIMELINE_DEFAULT_PROTOCOL);
 
+        instanceId = props.getString(TIMELINE_METRICS_KAFKA_INSTANCE_ID_PROPERTY);
+        setInstanceId = props.getBoolean(TIMELINE_METRICS_KAFKA_SET_INSTANCE_ID_PROPERTY);
+
         setMetricsCache(new TimelineMetricsCache(maxRowCacheSize, metricsSendInterval));
 
         if (metricCollectorProtocol.contains("https")) {
@@ -315,6 +322,9 @@
       TimelineMetric timelineMetric = new TimelineMetric();
       timelineMetric.setMetricName(attributeName);
       timelineMetric.setHostName(hostname);
+      if (setInstanceId) {
+        timelineMetric.setInstanceId(instanceId);
+      }
       timelineMetric.setAppId(component);
       timelineMetric.setStartTime(currentTimeMillis);
       timelineMetric.setType(ClassUtils.getShortCanonicalName(attributeValue, "Number"));
diff --git a/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterTest.java b/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterTest.java
index e1ac48c..b05190c 100644
--- a/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterTest.java
+++ b/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterTest.java
@@ -84,6 +84,8 @@
     properties.setProperty("kafka.timeline.metrics.reporter.enabled", "true");
     properties.setProperty("external.kafka.metrics.exclude.prefix", "a.b.c");
     properties.setProperty("external.kafka.metrics.include.prefix", "a.b.c.d");
+    properties.setProperty("kafka.timeline.metrics.instanceId", "cluster");
+    properties.setProperty("kafka.timeline.metrics.set.instanceId", "false");
     props = new VerifiableProperties(properties);
   }
 
@@ -120,6 +122,8 @@
     properties.setProperty("kafka.timeline.metrics.truststore.path", "");
     properties.setProperty("kafka.timeline.metrics.truststore.type", "");
     properties.setProperty("kafka.timeline.metrics.truststore.password", "");
+    properties.setProperty("kafka.timeline.metrics.instanceId", "cluster");
+    properties.setProperty("kafka.timeline.metrics.set.instanceId", "false");
     kafkaTimelineMetricsReporter.init(new VerifiableProperties(properties));
     kafkaTimelineMetricsReporter.stopReporter();
     verifyAll();
diff --git a/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java b/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
index 9a55f10..08f0598 100644
--- a/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
+++ b/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
@@ -50,6 +50,8 @@
   private Collection<String> collectorHosts;
   private String zkQuorum;
   private String protocol;
+  private boolean setInstanceId;
+  private String instanceId;
   private NimbusClient nimbusClient;
   private String applicationId;
   private int timeoutSeconds;
@@ -126,6 +128,8 @@
           Integer.parseInt(cf.get(METRICS_POST_TIMEOUT_SECONDS).toString()) :
           DEFAULT_POST_TIMEOUT_SECONDS;
       applicationId = cf.get(APP_ID).toString();
+      setInstanceId = Boolean.getBoolean(cf.get(SET_INSTANCE_ID_PROPERTY).toString());
+      instanceId = cf.get(INSTANCE_ID_PROPERTY).toString();
 
       collectorUri = constructTimelineMetricUri(protocol, findPreferredCollectHost(), port);
       if (protocol.contains("https")) {
@@ -196,6 +200,9 @@
     TimelineMetric timelineMetric = new TimelineMetric();
     timelineMetric.setMetricName(attributeName);
     timelineMetric.setHostName(hostname);
+    if (setInstanceId) {
+      timelineMetric.setInstanceId(instanceId);
+    }
     timelineMetric.setAppId(component);
     timelineMetric.setStartTime(currentTimeMillis);
     timelineMetric.getMetricValues().put(currentTimeMillis, Double.parseDouble(attributeValue));
diff --git a/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java b/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
index 60c1427..20f60e1 100644
--- a/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
+++ b/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
@@ -59,6 +59,8 @@
   private String port;
   private String topologyName;
   private String applicationId;
+  private boolean setInstanceId;
+  private String instanceId;
 
   @Override
   protected String getCollectorUri(String host) {
@@ -122,6 +124,8 @@
     protocol = configuration.getProperty(COLLECTOR_PROTOCOL, "http");
     port = configuration.getProperty(COLLECTOR_PORT, "6188");
 
+    instanceId = configuration.getProperty(INSTANCE_ID_PROPERTY);
+    setInstanceId = Boolean.valueOf(configuration.getProperty(SET_INSTANCE_ID_PROPERTY, "false"));
     // Initialize the collector write strategy
     super.init();
 
@@ -243,6 +247,9 @@
     TimelineMetric timelineMetric = new TimelineMetric();
     timelineMetric.setMetricName(attributeName);
     timelineMetric.setHostName(hostName);
+    if (setInstanceId) {
+      timelineMetric.setInstanceId(instanceId);
+    }
     timelineMetric.setAppId(applicationId);
     timelineMetric.setStartTime(currentTimeMillis);
     timelineMetric.setType(ClassUtils.getShortCanonicalName(
diff --git a/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java b/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
index 535fae0..14f160b 100644
--- a/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
+++ b/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
@@ -46,6 +46,8 @@
   private Collection<String> collectorHosts;
   private String zkQuorum;
   private String protocol;
+  private boolean setInstanceId;
+  private String instanceId;
   private String applicationId;
   private int timeoutSeconds;
 
@@ -115,7 +117,8 @@
           Integer.parseInt(configuration.getProperty(METRICS_POST_TIMEOUT_SECONDS)) :
           DEFAULT_POST_TIMEOUT_SECONDS;
       applicationId = configuration.getProperty(CLUSTER_REPORTER_APP_ID, DEFAULT_CLUSTER_REPORTER_APP_ID);
-
+      setInstanceId = Boolean.valueOf(configuration.getProperty(SET_INSTANCE_ID_PROPERTY));
+      instanceId = configuration.getProperty(INSTANCE_ID_PROPERTY);
       if (protocol.contains("https")) {
         String trustStorePath = configuration.getProperty(SSL_KEYSTORE_PATH_PROPERTY).trim();
         String trustStoreType = configuration.getProperty(SSL_KEYSTORE_TYPE_PROPERTY).trim();
@@ -226,6 +229,9 @@
     TimelineMetric timelineMetric = new TimelineMetric();
     timelineMetric.setMetricName(attributeName);
     timelineMetric.setHostName(hostname);
+    if (setInstanceId) {
+      timelineMetric.setInstanceId(instanceId);
+    }
     timelineMetric.setAppId(component);
     timelineMetric.setStartTime(currentTimeMillis);
     timelineMetric.setType(ClassUtils.getShortCanonicalName(attributeValue, "Number"));
diff --git a/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java b/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
index f58f549..425201c 100644
--- a/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
+++ b/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
@@ -68,6 +68,8 @@
   private String port;
   private String topologyName;
   private String applicationId;
+  private String instanceId;
+  private boolean setInstanceId;
 
   @Override
   protected String getCollectorUri(String host) {
@@ -133,7 +135,8 @@
 
     protocol = configuration.getProperty(COLLECTOR_PROTOCOL, "http");
     port = configuration.getProperty(COLLECTOR_PORT, "6188");
-
+    instanceId = configuration.getProperty(INSTANCE_ID_PROPERTY);
+    setInstanceId = Boolean.valueOf(configuration.getProperty(SET_INSTANCE_ID_PROPERTY, "false"));
     // Initialize the collector write strategy
     super.init();
 
@@ -332,6 +335,9 @@
     TimelineMetric timelineMetric = new TimelineMetric();
     timelineMetric.setMetricName(attributeName);
     timelineMetric.setHostName(hostName);
+    if (setInstanceId) {
+      timelineMetric.setInstanceId(instanceId);
+    }
     timelineMetric.setAppId(applicationId);
     timelineMetric.setStartTime(currentTimeMillis);
     timelineMetric.setType(ClassUtils.getShortCanonicalName(