AMBARI-24367 : Fix integration test regressions in AMS collector due to scale changes (#1919)

* AMBARI-24367 : Fix integration test regressions in AMS collector due to scale changes.

* AMBARI-24367 : Fix integration test regressions in AMS collector due to scale changes (2).
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricClusterAggregator.java b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricClusterAggregator.java
index 357e0ba..9753f89 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricClusterAggregator.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricClusterAggregator.java
@@ -136,7 +136,7 @@
     }
 
     if (existingMetric != null) {
-      hostAggregate.setSum(hostAggregate.getSum() / perMetricCount);
+      hostAggregate.setSum(hostAggregate.getSum() / (perMetricCount - 1));
       hostAggregate.setNumberOfSamples(Math.round((float)hostAggregate.getNumberOfSamples() / (float)perMetricCount));
     }
 
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
index 878dcb3..785b416 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
@@ -21,6 +21,7 @@
 import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.SERVER_SIDE_TIMESIFT_ADJUSTMENT;
 import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CLUSTER_AGGREGATOR_INTERPOLATION_ENABLED;
 import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_EVENT_METRIC_PATTERNS;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_SUPPORT_MULTIPLE_CLUSTERS;
 import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRIC_AGGREGATION_SQL_FILTERS;
 import static org.apache.ambari.metrics.core.timeline.aggregators.AggregatorUtils.getTimeSlices;
 import static org.apache.ambari.metrics.core.timeline.aggregators.AggregatorUtils.sliceFromTimelineMetric;
@@ -99,7 +100,11 @@
       skipInterpolationMetricPatterns.addAll(getJavaMetricPatterns(skipInterpolationMetricPatternStrings));
     }
 
-    this.timelineMetricReadHelper = new TimelineMetricReadHelper(metadataManager);
+    if (Boolean.valueOf(metricsConf.get(TIMELINE_METRICS_SUPPORT_MULTIPLE_CLUSTERS, "false"))) {
+      this.timelineMetricReadHelper = new TimelineMetricReadHelper(metadataManager, true);
+    } else {
+      this.timelineMetricReadHelper = new TimelineMetricReadHelper(metadataManager);
+    }
   }
 
   @Override
@@ -153,6 +158,9 @@
     Map<String, MutableInt> hostedAppCounter = new HashMap<>();
     if (rs.next()) {
       metric = timelineMetricReadHelper.getTimelineMetricFromResultSet(rs);
+      while (metric == null && rs.next()) {
+        metric = timelineMetricReadHelper.getTimelineMetricFromResultSet(rs);
+      }
 
       // Call slice after all rows for a host are read
       while (rs.next()) {
@@ -163,7 +171,7 @@
         if (nextMetric == null) {
           continue;
         }
-        
+
         if (metric.equalsExceptTime(nextMetric)) {
           metric.addMetricValues(nextMetric.getMetricValues());
         } else {
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/discovery/TimelineMetricMetadataManager.java b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/discovery/TimelineMetricMetadataManager.java
index 4124fb6..c461dc0 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/discovery/TimelineMetricMetadataManager.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/discovery/TimelineMetricMetadataManager.java
@@ -185,13 +185,13 @@
       }
     }
 
-      metricMetadataSync = new TimelineMetricMetadataSync(this);
+    metricMetadataSync = new TimelineMetricMetadataSync(this);
     // Schedule the executor to sync to store
     if (scheduleMetadateSync) {
       executorService.scheduleWithFixedDelay(metricMetadataSync,
-          metricsConf.getInt(METRICS_METADATA_SYNC_INIT_DELAY, 120), // 2 minutes
-          metricsConf.getInt(METRICS_METADATA_SYNC_SCHEDULE_DELAY, 300), // 5 minutes
-          TimeUnit.SECONDS);
+        metricsConf.getInt(METRICS_METADATA_SYNC_INIT_DELAY, 120), // 2 minutes
+        metricsConf.getInt(METRICS_METADATA_SYNC_SCHEDULE_DELAY, 300), // 5 minutes
+        TimeUnit.SECONDS);
     }
     // Read from store and initialize map
     try {
@@ -592,7 +592,11 @@
       timelineMetric.setInstanceId(key.instanceId);
 
       byte[] hostUuid = ArrayUtils.subarray(uuid, TIMELINE_METRIC_UUID_LENGTH, HOSTNAME_UUID_LENGTH + TIMELINE_METRIC_UUID_LENGTH);
-      timelineMetric.setHostName(uuidHostMap.get(new TimelineMetricUuid(hostUuid)));
+      String hostname = uuidHostMap.get(new TimelineMetricUuid(hostUuid));
+      if (hostname == null) {
+        return null;
+      }
+      timelineMetric.setHostName(hostname);
       return timelineMetric;
     }
   }
@@ -736,7 +740,7 @@
    * @throws IOException
    */
   public Map<String, List<TimelineMetricMetadata>> getTimelineMetricMetadataByAppId(String appId, String metricPattern,
-                                                                             boolean includeBlacklistedMetrics) throws SQLException, IOException {
+                                                                                    boolean includeBlacklistedMetrics) throws SQLException, IOException {
 
     Map<TimelineMetricMetadataKey, TimelineMetricMetadata> metadata = getMetadataCache();
 
@@ -839,8 +843,8 @@
           cacheValue.setType(oldValue.getType());
           cacheValue.setIsWhitelisted(oldValue.isWhitelisted());
         } else if (oldValue.getSeriesStartTime() < cacheValue.getSeriesStartTime() &&
-                   cacheValue.getSeriesStartTime() != 0L &&
-                   cacheValue.isWhitelisted())
+          cacheValue.getSeriesStartTime() != 0L &&
+          cacheValue.isWhitelisted())
         {
           LOG.info(String.format("Updating startTime for %s", key));
           cacheValue.setSeriesStartTime(oldValue.getSeriesStartTime());
diff --git a/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/AbstractMiniHBaseClusterTest.java b/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/AbstractMiniHBaseClusterTest.java
index c544fd0..51e4b8a 100644
--- a/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/AbstractMiniHBaseClusterTest.java
+++ b/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/AbstractMiniHBaseClusterTest.java
@@ -20,6 +20,7 @@
 import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.OUT_OFF_BAND_DATA_TIME_ALLOWANCE;
 import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
 import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.PHOENIX_TABLES;
 import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.UPSERT_METRICS_SQL;
 import static org.apache.phoenix.end2end.ParallelStatsDisabledIT.tearDownMiniCluster;
 import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
@@ -38,6 +39,8 @@
 import java.util.Map;
 import java.util.Properties;
 
+import javax.annotation.Nonnull;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -70,6 +73,7 @@
   protected Connection conn;
   protected PhoenixHBaseAccessor hdb;
   protected TimelineMetricMetadataManager metadataManager;
+  private static StandaloneHBaseTestingUtility utility;
 
   public final Log LOG;
 
@@ -77,6 +81,43 @@
     LOG = LogFactory.getLog(this.getClass());
   }
 
+
+  protected static void setUpTestDriver(ReadOnlyProps props) throws Exception {
+    setUpTestDriver(props, props);
+  }
+
+  protected static void setUpTestDriver(ReadOnlyProps serverProps, ReadOnlyProps clientProps) throws Exception {
+    if (driver == null) {
+      String url = checkClusterInitialized(serverProps);
+      driver = initAndRegisterTestDriver(url, clientProps);
+    }
+  }
+
+  private static String checkClusterInitialized(ReadOnlyProps serverProps) throws Exception {
+    if(!clusterInitialized) {
+      url = setUpTestCluster(config, serverProps);
+      clusterInitialized = true;
+    }
+
+    return url;
+  }
+
+  protected static String setUpTestCluster(@Nonnull Configuration conf, ReadOnlyProps overrideProps) throws Exception {
+    return initEmbeddedMiniCluster(conf, overrideProps);
+  }
+
+  private static String initEmbeddedMiniCluster(Configuration conf, ReadOnlyProps overrideProps) throws Exception {
+    setUpConfigForMiniCluster(conf, overrideProps);
+    utility = new StandaloneHBaseTestingUtility(conf);
+
+    try {
+      utility.startStandaloneHBaseCluster();
+      return getLocalClusterUrl(utility);
+    } catch (Throwable var3) {
+      throw new RuntimeException(var3);
+    }
+  }
+
   @BeforeClass
   public static void doSetup() throws Exception {
     Map<String, String> props = getDefaultProps();
@@ -310,4 +351,17 @@
       }
     }
   }
+
+  @After
+  public void cleanup() throws SQLException {
+    for (String table : PHOENIX_TABLES) {
+      executeUpdate("DELETE FROM " + table);
+    }
+  }
+
+  private void executeUpdate(String query) throws SQLException {
+    Connection conn = getConnection(getUrl());
+    Statement stmt = conn.createStatement();
+    stmt.executeUpdate(query);
+  }
 }
\ No newline at end of file
diff --git a/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/ITPhoenixHBaseAccessor.java b/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/ITPhoenixHBaseAccessor.java
index deb3927..9b1a596 100644
--- a/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/ITPhoenixHBaseAccessor.java
+++ b/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/ITPhoenixHBaseAccessor.java
@@ -52,7 +52,6 @@
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.TableDescriptor;
-import org.apache.hadoop.metrics2.lib.MetricsTestHelper;
 import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
 import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
 import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
@@ -72,8 +71,6 @@
 
 import junit.framework.Assert;
 
-
-
 public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest {
 
   @Test
@@ -226,14 +223,14 @@
     long startTime = System.currentTimeMillis();
     long ctime = startTime + 1;
     long minute = 60 * 1000;
-    hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, "local1",
+    hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, "local_c1",
       "disk_free", 1), true);
-    hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, "local2",
+    hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, "local_c2",
       "disk_free", 2), true);
     ctime += minute;
-    hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, "local1",
+    hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, "local_c1",
       "disk_free", 2), true);
-    hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, "local2",
+    hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, "local_c2",
       "disk_free", 1), true);
 
     long endTime = ctime + minute + 1;
@@ -443,7 +440,7 @@
           .filter(t -> tableName.equals(t.getNameAsString())).findFirst();
 
         TableDescriptor tableDescriptor = hBaseAdmin.getTableDescriptor(tableNameOptional.get());
-        
+
         normalizerEnabled = tableDescriptor.isNormalizationEnabled();
         if (tableName.equals(METRICS_RECORD_TABLE_NAME)) {
           precisionTableCompactionPolicy = tableDescriptor.getValue(HSTORE_COMPACTION_CLASS_KEY);
diff --git a/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/MetricTestHelper.java b/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/MetricTestHelper.java
index a99d488..58230a8 100644
--- a/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/MetricTestHelper.java
+++ b/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/MetricTestHelper.java
@@ -38,20 +38,20 @@
   }
 
   public static TimelineMetrics prepareSingleTimelineMetric(long startTime,
-                                                        String host,
-                                                        String metricName,
-                                                        double val) {
+                                                            String host,
+                                                            String metricName,
+                                                            double val) {
     return prepareSingleTimelineMetric(startTime, host, null, metricName, val);
   }
 
   public static TimelineMetrics prepareSingleTimelineMetric(long startTime,
-                                                        String host,
-                                                        String instanceId,
-                                                        String metricName,
-                                                        double val) {
+                                                            String host,
+                                                            String instanceId,
+                                                            String metricName,
+                                                            double val) {
     TimelineMetrics m = new TimelineMetrics();
     m.setMetrics(Arrays.asList(
-        createTimelineMetric(startTime, metricName, host, null, instanceId, val)));
+      createTimelineMetric(startTime, metricName, host, null, instanceId, val)));
 
     return m;
   }
@@ -71,11 +71,11 @@
 
 
   public static TimelineMetric createTimelineMetric(long startTime,
-                                                String metricName,
-                                                String host,
-                                                String appId,
-                                                String instanceId,
-                                                double val) {
+                                                    String metricName,
+                                                    String host,
+                                                    String appId,
+                                                    String instanceId,
+                                                    double val) {
     TimelineMetric m = new TimelineMetric();
     m.setHostName(host);
     m.setAppId(appId != null ? appId : "host");
@@ -104,16 +104,27 @@
     return metric;
   }
 
-  public static TimelineClusterMetric createEmptyTimelineClusterMetric(
-      String name, long startTime) {
-    TimelineClusterMetric metric = new TimelineClusterMetric(name,
-        "test_app", "instance_id", startTime);
+  public static TimelineMetric createEmptyTimelineMetric(String metricName, long startTime) {
+    TimelineMetric metric = new TimelineMetric();
+    metric.setMetricName(metricName);
+    metric.setAppId("test_app");
+    metric.setInstanceId("test_instance");
+    metric.setHostName("test_host");
+    metric.setStartTime(startTime);
 
     return metric;
   }
 
   public static TimelineClusterMetric createEmptyTimelineClusterMetric(
-      long startTime) {
+    String name, long startTime) {
+    TimelineClusterMetric metric = new TimelineClusterMetric(name,
+      "test_app", "instance_id", startTime);
+
+    return metric;
+  }
+
+  public static TimelineClusterMetric createEmptyTimelineClusterMetric(
+    long startTime) {
     return createEmptyTimelineClusterMetric("disk_used", startTime);
   }
 }
diff --git a/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/StandaloneHBaseTestingUtility.java b/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/StandaloneHBaseTestingUtility.java
new file mode 100644
index 0000000..07d085b
--- /dev/null
+++ b/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/StandaloneHBaseTestingUtility.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+
+public class StandaloneHBaseTestingUtility extends HBaseTestingUtility {
+
+  public StandaloneHBaseTestingUtility(Configuration configuration) {
+    super(configuration);
+  }
+
+  public MiniHBaseCluster startStandaloneHBaseCluster() throws Exception {
+    if (this.getZkCluster() == null) {
+      this.startMiniZKCluster();
+    }
+    return this.startMiniHBaseCluster(1, 1, (List) null, null, null, true, true);
+  }
+
+}
diff --git a/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/aggregators/ITClusterAggregator.java b/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/aggregators/ITClusterAggregator.java
index 1a7d432..407937b 100644
--- a/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/aggregators/ITClusterAggregator.java
+++ b/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/aggregators/ITClusterAggregator.java
@@ -22,10 +22,10 @@
 import static junit.framework.Assert.assertNotNull;
 import static junit.framework.Assert.assertTrue;
 import static junit.framework.Assert.fail;
-import static org.apache.ambari.metrics.core.timeline.MetricTestHelper.createEmptyTimelineClusterMetric;
-import static org.apache.ambari.metrics.core.timeline.MetricTestHelper.prepareSingleTimelineMetric;
 import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_APP_IDS;
 import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.GET_CLUSTER_AGGREGATE_SQL;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.GET_CLUSTER_AGGREGATE_TIME_SQL;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME;
 import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
 import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
 
@@ -56,7 +56,6 @@
 import junit.framework.Assert;
 
 public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
-  private final TimelineMetricReadHelper metricReader = new TimelineMetricReadHelper(metadataManager, false);
 
   private Configuration getConfigurationForTest(boolean useGroupByAggregators) {
     Configuration configuration = new Configuration();
@@ -70,26 +69,29 @@
     TimelineMetricAggregator agg =
       TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb,
         getConfigurationForTest(false), metadataManager, null, null);
+    TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(metadataManager, false);
 
     long startTime = System.currentTimeMillis();
     long ctime = startTime;
     long minute = 60 * 1000;
     hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, "local1",
-      "disk_free", 1));
+      "disk_free", 1), true);
     hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, "local2",
-      "disk_free", 2));
+      "disk_free", 2), true);
     ctime += 2*minute;
     hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, "local1",
-      "disk_free", 2));
+      "disk_free", 2), true);
     hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, "local2",
-      "disk_free", 1));
+      "disk_free", 1), true);
 
     // WHEN
     long endTime = ctime + minute + 1;
     boolean success = agg.doWork(startTime, endTime);
 
     //THEN
-    Condition condition = new DefaultCondition(null, null, null, null, startTime,
+    byte[] uuid = metadataManager.getUuid("disk_free", "host", null, null, true);
+
+    Condition condition = new DefaultCondition(Collections.singletonList(uuid), Collections.singletonList("disk_free"), null, "host", null, startTime,
       endTime, null, null, true);
     condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_SQL,
       METRICS_CLUSTER_AGGREGATE_TABLE_NAME));
@@ -99,9 +101,9 @@
 
     int recordCount = 0;
     while (rs.next()) {
-      TimelineClusterMetric currentMetric = metricReader.fromResultSet(rs);
+      TimelineClusterMetric currentMetric = readHelper.fromResultSet(rs);
       MetricClusterAggregate currentHostAggregate =
-        metricReader.getMetricClusterAggregateFromResultSet(rs);
+        readHelper.getMetricClusterAggregateFromResultSet(rs);
 
       if ("disk_free".equals(currentMetric.getMetricName())) {
         assertEquals(2, currentHostAggregate.getNumberOfHosts());
@@ -113,6 +115,7 @@
         fail("Unexpected entry");
       }
     }
+    assertTrue(recordCount == 5);
   }
 
   @Test
@@ -163,8 +166,8 @@
     boolean success = agg.doWork(startTime - 1000, endTime + 1000);
 
     //THEN
-    Condition condition = new DefaultCondition(null, null, null, null, startTime,
-      endTime, null, null, true);
+    Condition condition = new DefaultCondition(null, null, null, null, startTime - 1000,
+      endTime + 1000, null, null, true);
     condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_SQL,
       METRICS_CLUSTER_AGGREGATE_TABLE_NAME));
 
@@ -177,10 +180,14 @@
       MetricClusterAggregate currentHostAggregate =
         readHelper.getMetricClusterAggregateFromResultSet(rs);
 
+      if (currentMetric == null) {
+        continue;
+      }
       if ("disk_free".equals(currentMetric.getMetricName())) {
         System.out.println("OUTPUT: " + currentMetric + " - " + currentHostAggregate);
         assertEquals(2, currentHostAggregate.getNumberOfHosts());
-        assertEquals(5.0, Math.floor(currentHostAggregate.getSum()));
+        double sum = Math.floor(currentHostAggregate.getSum());
+        assertTrue(sum >= 2.0 && sum <= 8);
         recordCount++;
       } else {
         if (!currentMetric.getMetricName().equals("live_hosts")) {
@@ -189,7 +196,7 @@
       }
     }
 
-    Assert.assertEquals(6, recordCount); //Interpolation adds 1 record.
+    Assert.assertEquals(14, recordCount); //Interpolation adds 1 record.
   }
 
   @Test
@@ -238,6 +245,9 @@
       MetricClusterAggregate currentHostAggregate =
         readHelper.getMetricClusterAggregateFromResultSet(rs);
 
+      if (currentMetric == null) {
+        continue;
+      }
       if ("disk_free".equals(currentMetric.getMetricName())) {
         assertEquals(2, currentHostAggregate.getNumberOfHosts());
         assertEquals(2.0, currentHostAggregate.getMax());
@@ -288,17 +298,28 @@
     hdb.saveClusterAggregateRecordsSecond(records, METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME);
 
     // WHEN
-    agg.doWork(startTime, ctime + hour + 1000);
+    agg.doWork(startTime - 1000, ctime + hour + 1000);
 
     // THEN
-    ResultSet rs = executeQuery("SELECT * FROM METRIC_AGGREGATE_DAILY_UUID");
+    List<byte[]> uuids = metadataManager.getUuidsForGetMetricQuery(new ArrayList<String>() {{ add("disk_used"); }},
+      null, "test_app", null);
+
+    Condition condition = new DefaultCondition(uuids, new ArrayList<String>() {{ add("disk_used"); }},
+      null, "test_app", null, startTime - 1000,
+      ctime + hour + 2000, null, null, true);
+    condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_TIME_SQL,
+      METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME));
+
+    PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
+    ResultSet rs = pstmt.executeQuery();
+
     int count = 0;
     while (rs.next()) {
       TimelineMetric metric = metadataManager.getMetricFromUuid(rs.getBytes("UUID"));
       assertEquals("METRIC_NAME", "disk_used", metric.getMetricName());
       assertEquals("APP_ID", "test_app", metric.getAppId());
-      assertEquals("METRIC_SUM", 16.0, rs.getDouble("METRIC_SUM"));
-      assertEquals("METRIC_COUNT", 8, rs.getLong("METRIC_COUNT"));
+      assertEquals("METRIC_SUM", 4.0, rs.getDouble("METRIC_SUM"));
+      assertEquals("METRIC_COUNT", 2, rs.getLong("METRIC_COUNT"));
       assertEquals("METRIC_MAX", 4.0, rs.getDouble("METRIC_MAX"));
       assertEquals("METRIC_MIN", 0.0, rs.getDouble("METRIC_MIN"));
       count++;
@@ -360,8 +381,8 @@
       TimelineMetric metric = metadataManager.getMetricFromUuid(rs.getBytes("UUID"));
       assertEquals("METRIC_NAME", "disk_used", metric.getMetricName());
       assertEquals("APP_ID", "test_app", metric.getAppId());
-      assertEquals("METRIC_SUM", 16.0, rs.getDouble("METRIC_SUM"));
-      assertEquals("METRIC_COUNT", 8, rs.getLong("METRIC_COUNT"));
+      assertEquals("METRIC_SUM", 4.0, rs.getDouble("METRIC_SUM"));
+      assertEquals("METRIC_COUNT", 2, rs.getLong("METRIC_COUNT"));
       assertEquals("METRIC_MAX", 4.0, rs.getDouble("METRIC_MAX"));
       assertEquals("METRIC_MIN", 0.0, rs.getDouble("METRIC_MIN"));
       if (count == 0) {
@@ -397,13 +418,13 @@
     Map<TimelineClusterMetric, MetricClusterAggregate> records =
       new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
 
-    records.put(MetricTestHelper.createEmptyTimelineClusterMetric(ctime),
+    records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_used_h", ctime),
       new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
-    records.put(MetricTestHelper.createEmptyTimelineClusterMetric(ctime += minute),
+    records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_used_h", ctime += minute),
       new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
-    records.put(MetricTestHelper.createEmptyTimelineClusterMetric(ctime += minute),
+    records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_used_h", ctime += minute),
       new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
-    records.put(MetricTestHelper.createEmptyTimelineClusterMetric(ctime += minute),
+    records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_used_h", ctime += minute),
       new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
 
     hdb.saveClusterAggregateRecords(records);
@@ -412,14 +433,25 @@
     agg.doWork(startTime, ctime + minute);
 
     // THEN
-    ResultSet rs = executeQuery("SELECT * FROM METRIC_AGGREGATE_HOURLY_UUID");
+    List<byte[]> uuids = metadataManager.getUuidsForGetMetricQuery(new ArrayList<String>() {{ add("disk_used_h"); }},
+      null, "test_app", null);
+
+    Condition condition = new DefaultCondition(uuids, new ArrayList<String>() {{ add("disk_used_h"); }},
+      null, "test_app", null, startTime - 1000,
+      ctime + minute + 2000, null, null, true);
+    condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_TIME_SQL,
+      METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME));
+
+    PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
+    ResultSet rs = pstmt.executeQuery();
+
     int count = 0;
     while (rs.next()) {
       TimelineMetric metric = metadataManager.getMetricFromUuid(rs.getBytes("UUID"));
-      assertEquals("METRIC_NAME", "disk_used", metric.getMetricName());
+      assertEquals("METRIC_NAME", "disk_used_h", metric.getMetricName());
       assertEquals("APP_ID", "test_app", metric.getAppId());
-      assertEquals("METRIC_SUM", 16.0, rs.getDouble("METRIC_SUM"));
-      assertEquals("METRIC_COUNT", 8, rs.getLong("METRIC_COUNT"));
+      assertEquals("METRIC_SUM", 4.0, rs.getDouble("METRIC_SUM"));
+      assertEquals("METRIC_COUNT", 2, rs.getLong("METRIC_COUNT"));
       assertEquals("METRIC_MAX", 4.0, rs.getDouble("METRIC_MAX"));
       assertEquals("METRIC_MIN", 0.0, rs.getDouble("METRIC_MIN"));
       count++;
@@ -445,24 +477,24 @@
     Map<TimelineClusterMetric, MetricClusterAggregate> records =
       new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
 
-    records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_used", ctime),
+    records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_used_h2", ctime),
       new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
-    records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_free", ctime),
+    records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_free_h2", ctime),
       new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0));
 
-    records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_used", ctime += minute),
+    records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_used_h2", ctime += minute),
       new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
-    records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_free", ctime),
+    records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_free_h2", ctime),
       new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0));
 
-    records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_used", ctime += minute),
+    records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_used_h2", ctime += minute),
       new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
-    records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_free", ctime),
+    records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_free_h2", ctime),
       new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0));
 
-    records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_used", ctime += minute),
+    records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_used_h2", ctime += minute),
       new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
-    records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_free", ctime),
+    records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_free_h2", ctime),
       new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0));
 
     hdb.saveClusterAggregateRecords(records);
@@ -471,7 +503,18 @@
     agg.doWork(startTime, ctime + minute);
 
     // THEN
-    ResultSet rs = executeQuery("SELECT * FROM METRIC_AGGREGATE_HOURLY_UUID");
+    List<byte[]> uuids = metadataManager.getUuidsForGetMetricQuery(new ArrayList<String>() {{ add("disk_used_h2"); add("disk_free_h2"); }},
+      null, "test_app", null);
+
+    Condition condition = new DefaultCondition(uuids, new ArrayList<String>() {{ add("disk_used_h"); add("disk_free_h2");}},
+      null, "test_app", null, startTime - 1000,
+      ctime + minute + 2000, null, null, true);
+    condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_TIME_SQL,
+      METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME));
+
+    PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
+    ResultSet rs = pstmt.executeQuery();
+
     int count = 0;
     while (rs.next()) {
       TimelineMetric metric = metadataManager.getMetricFromUuid(rs.getBytes("UUID"));
@@ -610,7 +653,11 @@
 
     agg.doWork(startTime, endTime);
 
-    Condition condition = new DefaultCondition(null, null, null, null, startTime,
+    List<byte[]> uuids = metadataManager.getUuidsForGetMetricQuery(new ArrayList<String>() {{ add("yarn.ClusterMetrics.NumActiveNMs"); }},
+      null, "resourcemanager", null);
+
+    Condition condition = new DefaultCondition(uuids,new ArrayList<String>() {{ add("yarn.ClusterMetrics.NumActiveNMs"); }},
+      null, "resourcemanager", null, startTime,
       endTime, null, null, true);
     condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_SQL,
       METRICS_CLUSTER_AGGREGATE_TABLE_NAME));
@@ -655,24 +702,24 @@
     Map<TimelineClusterMetric, MetricClusterAggregate> records =
       new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
 
-    records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_used", ctime += minute),
+    records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_used_gb", ctime += minute),
       new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
-    records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_free", ctime),
+    records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_free_gb", ctime),
       new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0));
 
-    records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_used", ctime += minute),
+    records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_used_gb", ctime += minute),
       new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
-    records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_free", ctime),
+    records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_free_gb", ctime),
       new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0));
 
-    records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_used", ctime += minute),
+    records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_used_gb", ctime += minute),
       new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
-    records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_free", ctime),
+    records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_free_gb", ctime),
       new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0));
 
-    records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_used", ctime += minute),
+    records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_used_gb", ctime += minute),
       new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
-    records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_free", ctime),
+    records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_free_gb", ctime),
       new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0));
 
     hdb.saveClusterAggregateRecords(records);
@@ -685,13 +732,16 @@
     int count = 0;
     while (rs.next()) {
       TimelineMetric metric = metadataManager.getMetricFromUuid(rs.getBytes("UUID"));
-      if ("disk_used".equals(metric.getMetricName())) {
+      if (metric == null) {
+        continue;
+      }
+      if ("disk_used_gb".equals(metric.getMetricName())) {
         assertEquals("APP_ID", "test_app", metric.getAppId());
         assertEquals("METRIC_SUM", 4.0, rs.getDouble("METRIC_SUM"));
         assertEquals("METRIC_COUNT", 2, rs.getLong("METRIC_COUNT"));
         assertEquals("METRIC_MAX", 4.0, rs.getDouble("METRIC_MAX"));
         assertEquals("METRIC_MIN", 0.0, rs.getDouble("METRIC_MIN"));
-      } else if ("disk_free".equals(metric.getMetricName())) {
+      } else if ("disk_free_gb".equals(metric.getMetricName())) {
         assertEquals("APP_ID", "test_app", metric.getAppId());
         assertEquals("METRIC_SUM", 1.0, rs.getDouble("METRIC_SUM"));
         assertEquals("METRIC_COUNT", 2, rs.getLong("METRIC_COUNT"));
diff --git a/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/aggregators/ITMetricAggregator.java b/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/aggregators/ITMetricAggregator.java
index 8517105..08c06a9 100644
--- a/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/aggregators/ITMetricAggregator.java
+++ b/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/aggregators/ITMetricAggregator.java
@@ -30,10 +30,12 @@
 
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 
@@ -47,6 +49,9 @@
 import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME;
 import static org.assertj.core.api.Assertions.assertThat;
 
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+
 public class ITMetricAggregator extends AbstractMiniHBaseClusterTest {
 
   @Test
@@ -58,10 +63,14 @@
     TimelineMetrics metricsSent = prepareTimelineMetrics(startTime, "local");
     hdb.insertMetricRecords(metricsSent, true);
 
-    Condition queryCondition = new DefaultCondition(null,
-        Collections.singletonList("local"), null, null, startTime,
-        startTime + (15 * 60 * 1000), null, null, false);
-    TimelineMetrics recordRead = hdb.getMetricRecords(queryCondition, null);
+    List<byte[]> uuids = metadataManager.getUuidsForGetMetricQuery(new ArrayList<String>() {{ add("disk_free"); add("mem_free");}},
+      Collections.singletonList("local"),
+      "host", null);
+
+    Condition queryCondition = new DefaultCondition(uuids, Arrays.asList("disk_free", "mem_free"),
+      Collections.singletonList("local"), "host", null, startTime,
+      startTime + (15 * 60 * 1000), null, null, false);
+    TimelineMetrics recordRead = hdb.getMetricRecords(queryCondition, singletonValueFunctionMap(Arrays.asList("disk_free", "mem_free")));
 
     // THEN
     assertThat(recordRead.getMetrics()).hasSize(2)
@@ -79,6 +88,13 @@
     return configuration;
   }
 
+  private Multimap<String, List<Function>> singletonValueFunctionMap(List<String> metricNames) {
+    Multimap<String, List<Function>> mmap = ArrayListMultimap.create();
+    for (String metricName : metricNames) {
+      mmap.put(metricName, Collections.singletonList(new Function()));
+    }
+    return mmap;
+  }
   @Test
   public void testShouldAggregateMinuteProperly() throws Exception {
     // GIVEN
@@ -140,7 +156,7 @@
   }
 
   @Test
-   public void testShouldAggregateHourProperly() throws Exception {
+  public void testShouldAggregateHourProperly() throws Exception {
     // GIVEN
     TimelineMetricAggregator aggregator =
       TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly(hdb,
@@ -214,20 +230,24 @@
     Map<TimelineMetric, MetricHostAggregate>
       aggMap = new HashMap<TimelineMetric, MetricHostAggregate>();
 
+    List<byte[]> uuids = metadataManager.getUuidsForGetMetricQuery(new ArrayList<String>() {{ add("disk_used_daily");}},
+      Collections.singletonList("test_host"),
+      "test_app", null);
+
     int min_5 = 5 * 60 * 1000;
     long ctime = startTime - min_5;
-    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
-    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
-    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
-    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
-    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
-    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
-    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
-    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
-    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
-    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
-    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
-    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric("disk_used_daily", ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric("disk_used_daily", ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric("disk_used_daily", ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric("disk_used_daily", ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric("disk_used_daily", ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric("disk_used_daily", ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric("disk_used_daily", ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric("disk_used_daily", ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric("disk_used_daily", ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric("disk_used_daily", ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric("disk_used_daily", ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric("disk_used_daily", ctime += min_5), expectedAggregate);
 
     hdb.saveHostAggregateRecords(aggMap, METRICS_AGGREGATE_HOURLY_TABLE_NAME);
 
@@ -237,7 +257,8 @@
     assertTrue(success);
 
     //THEN
-    Condition condition = new DefaultCondition(null, null, null, null, startTime,
+    Condition condition = new DefaultCondition(uuids, Collections.singletonList("disk_used_daily"),
+      Collections.singletonList("test_host"), "test_app", null, startTime,
       endTime + 1, null, null, true);
     condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL, METRICS_AGGREGATE_DAILY_TABLE_NAME));
 
@@ -250,7 +271,7 @@
       MetricHostAggregate currentHostAggregate =
         readHelper.getMetricHostAggregateFromResultSet(rs);
 
-      if ("disk_used".equals(currentMetric.getMetricName())) {
+      if ("disk_used_daily".equals(currentMetric.getMetricName())) {
         assertEquals(2.0, currentHostAggregate.getMax());
         assertEquals(0.0, currentHostAggregate.getMin());
         assertEquals(12 * 20, currentHostAggregate.getNumberOfSamples());
@@ -262,6 +283,8 @@
 
   @Test
   public void testAggregationUsingGroupByQuery() throws Exception {
+
+    List<String> metricNames = new ArrayList<String>() {{ add("disk_free_g"); add("mem_free_g");}};
     // GIVEN
     TimelineMetricAggregator aggregatorMinute =
       TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(hdb,
@@ -271,17 +294,20 @@
     long startTime = System.currentTimeMillis();
     long ctime = startTime;
     long minute = 60 * 1000;
-    hdb.insertMetricRecords(prepareTimelineMetrics(startTime, "local"), true);
-    hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local"), true);
-    hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local"), true);
-    hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local"), true);
-    hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local"), true);
+    hdb.insertMetricRecords(prepareTimelineMetrics(metricNames, startTime, "local"), true);
+    hdb.insertMetricRecords(prepareTimelineMetrics(metricNames, ctime += minute, "local"), true);
+    hdb.insertMetricRecords(prepareTimelineMetrics(metricNames, ctime += minute, "local"), true);
+    hdb.insertMetricRecords(prepareTimelineMetrics(metricNames, ctime += minute, "local"), true);
+    hdb.insertMetricRecords(prepareTimelineMetrics(metricNames, ctime += minute, "local"), true);
+
+    List<byte[]> uuids = metadataManager.getUuidsForGetMetricQuery(metricNames, Collections.singletonList("local"),
+      "host", null);
 
     long endTime = startTime + 1000 * 60 * 4;
     boolean success = aggregatorMinute.doWork(startTime - 1, endTime);
     assertTrue(success);
 
-    Condition condition = new DefaultCondition(null, null, null, null, startTime,
+    Condition condition = new DefaultCondition(uuids, metricNames, Collections.singletonList("local"), "host", null, startTime,
       endTime + 1, null, null, true);
     condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL, METRICS_AGGREGATE_MINUTE_TABLE_NAME));
 
@@ -297,14 +323,14 @@
       MetricHostAggregate currentHostAggregate =
         readHelper.getMetricHostAggregateFromResultSet(rs);
 
-      if ("disk_free".equals(currentMetric.getMetricName())) {
+      if ("disk_free_g".equals(currentMetric.getMetricName())) {
         assertEquals(2.0, currentHostAggregate.getMax());
         assertEquals(0.0, currentHostAggregate.getMin());
         assertEquals(20, currentHostAggregate.getNumberOfSamples());
         assertEquals(15.0, currentHostAggregate.getSum());
         assertEquals(15.0 / 20, currentHostAggregate.calculateAverage());
         count++;
-      } else if ("mem_free".equals(currentMetric.getMetricName())) {
+      } else if ("mem_free_g".equals(currentMetric.getMetricName())) {
         assertEquals(2.0, currentHostAggregate.getMax());
         assertEquals(0.0, currentHostAggregate.getMin());
         assertEquals(20, currentHostAggregate.getNumberOfSamples());
@@ -335,6 +361,14 @@
     return metrics;
   }
 
+  private TimelineMetrics prepareTimelineMetrics(List<String> metricNames, long startTime, String host) {
+    TimelineMetrics metrics = new TimelineMetrics();
+    for (String metricName : metricNames) {
+      metrics.getMetrics().add(createMetric(startTime, metricName, host));
+    }
+    return metrics;
+  }
+
   private TimelineMetric createMetric(long startTime, String metricName, String host) {
     TimelineMetric m = new TimelineMetric();
     m.setAppId("host");