AMBARI-23804 : Refine AMS HBase region splitting calculation based on UUID work.
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/HBaseTimelineMetricsService.java b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/HBaseTimelineMetricsService.java
index 56a28dc..d09d4bb 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/HBaseTimelineMetricsService.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/HBaseTimelineMetricsService.java
@@ -17,7 +17,6 @@
  */
 package org.apache.ambari.metrics.core.timeline;
 
-import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.DEFAULT_TOPN_HOSTS_LIMIT;
 import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.USE_GROUPBY_AGGREGATOR_QUERIES;
 import static org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.ACTUAL_AGGREGATOR_NAMES;
 
@@ -35,7 +34,6 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
@@ -111,15 +109,18 @@
   private synchronized void initializeSubsystem() {
     if (!isInitialized) {
       hBaseAccessor = new PhoenixHBaseAccessor(null);
-      // Initialize schema
-      hBaseAccessor.initMetricSchema();
-      // Initialize metadata from store
+
+      // Initialize metadata
       try {
         metricMetadataManager = new TimelineMetricMetadataManager(hBaseAccessor);
       } catch (MalformedURLException | URISyntaxException e) {
         throw new ExceptionInInitializerError("Unable to initialize metadata manager");
       }
       metricMetadataManager.initializeMetadata();
+
+      // Initialize metric schema
+      hBaseAccessor.initMetricSchema();
+
       // Initialize policies before TTL update
       hBaseAccessor.initPoliciesAndTTL();
       // Start HA service
@@ -395,6 +396,10 @@
     return metricsFunctions;
   }
 
+  public void putMetricsSkipCache(TimelineMetrics metrics) throws SQLException, IOException {
+    hBaseAccessor.insertMetricRecordsWithMetadata(metricMetadataManager, metrics, true);
+  }
+
   @Override
   public TimelinePutResponse putMetrics(TimelineMetrics metrics) throws SQLException, IOException {
     // Error indicated by the Sql exception
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/PhoenixHBaseAccessor.java b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/PhoenixHBaseAccessor.java
index 040df1b..dec7850 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/PhoenixHBaseAccessor.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/PhoenixHBaseAccessor.java
@@ -20,6 +20,7 @@
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.ambari.metrics.core.timeline.FunctionUtils.findMetricFunctions;
 import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.AGGREGATORS_SKIP_BLOCK_CACHE;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.BLOCKING_STORE_FILES_KEY;
 import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL;
 import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL;
 import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_DAILY_TABLE_TTL;
@@ -27,41 +28,32 @@
 import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_MINUTE_TABLE_TTL;
 import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_SECOND_TABLE_TTL;
 import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CONTAINER_METRICS_TTL;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.DATE_TIERED_COMPACTION_POLICY;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.FIFO_COMPACTION_POLICY_CLASS;
 import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.GLOBAL_MAX_RETRIES;
 import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.GLOBAL_RESULT_LIMIT;
 import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.GLOBAL_RETRY_INTERVAL;
-import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HBASE_BLOCKING_STORE_FILES;
 import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HBASE_COMPRESSION_SCHEME;
 import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HBASE_ENCODING_SCHEME;
 import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_SLEEP_INTERVAL;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HSTORE_COMPACTION_CLASS_KEY;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HSTORE_ENGINE_CLASS;
 import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TRANSIENT_METRIC_PATTERNS;
 import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HOST_DAILY_TABLE_TTL;
 import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HOST_HOUR_TABLE_TTL;
 import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HOST_MINUTE_TABLE_TTL;
 import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.METRICS_TRANSIENT_TABLE_TTL;
 import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.PRECISION_TABLE_TTL;
-import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATE_TABLES_DURABILITY;
-import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATE_TABLE_HBASE_BLOCKING_STORE_FILES;
 import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_COMMIT_INTERVAL;
 import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_ENABLED;
 import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_SIZE;
-import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_HBASE_AGGREGATE_TABLE_COMPACTION_POLICY_CLASS;
-import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_HBASE_AGGREGATE_TABLE_COMPACTION_POLICY_KEY;
-import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_HBASE_PRECISION_TABLE_COMPACTION_POLICY_CLASS;
-import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_HBASE_PRECISION_TABLE_COMPACTION_POLICY_KEY;
-import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_PRECISION_TABLE_DURABILITY;
-import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_PRECISION_TABLE_HBASE_BLOCKING_STORE_FILES;
 import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRIC_AGGREGATOR_SINK_CLASS;
-import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.ALTER_METRICS_METADATA_TABLE;
 import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.CONTAINER_METRICS_TABLE_NAME;
 import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.CREATE_CONTAINER_METRICS_TABLE_SQL;
 import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.CREATE_TRANSIENT_METRICS_TABLE_SQL;
-import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.CREATE_HOSTED_APPS_METADATA_TABLE_SQL;
-import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.CREATE_INSTANCE_HOST_TABLE_SQL;
 import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.CREATE_METRICS_AGGREGATE_TABLE_SQL;
 import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_GROUPED_TABLE_SQL;
 import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL;
-import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.CREATE_METRICS_METADATA_TABLE_SQL;
 import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.CREATE_METRICS_TABLE_SQL;
 import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.DEFAULT_ENCODING;
 import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.DEFAULT_TABLE_COMPRESSION;
@@ -132,6 +124,7 @@
 import org.apache.ambari.metrics.core.timeline.source.InternalMetricsSource;
 import org.apache.ambari.metrics.core.timeline.source.InternalSourceProvider;
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -201,21 +194,8 @@
   private TimelineMetricsAggregatorSink aggregatorSink;
   private final int cacheCommitInterval;
   private final boolean skipBlockCacheForAggregatorsEnabled;
-  private final String timelineMetricsTablesDurability;
-  private final String timelineMetricsPrecisionTableDurability;
   private TimelineMetricMetadataManager metadataManagerInstance;
 
-  static final String HSTORE_COMPACTION_CLASS_KEY =
-    "hbase.hstore.defaultengine.compactionpolicy.class";
-  static final String HSTORE_ENGINE_CLASS =
-    "hbase.hstore.engine.class";
-  static final String FIFO_COMPACTION_POLICY_CLASS =
-    "org.apache.hadoop.hbase.regionserver.compactions.FIFOCompactionPolicy";
-  static final String DATE_TIERED_COMPACTION_POLICY =
-    "org.apache.hadoop.hbase.regionserver.DateTieredStoreEngine";
-  static final String BLOCKING_STORE_FILES_KEY =
-    "hbase.hstore.blockingStoreFiles";
-
   private Map<String, Integer> tableTTL = new HashMap<>();
 
   private final TimelineMetricConfiguration configuration;
@@ -259,11 +239,9 @@
     this.cacheCommitInterval = Integer.valueOf(metricsConf.get(TIMELINE_METRICS_CACHE_COMMIT_INTERVAL, "3"));
     this.insertCache = new ArrayBlockingQueue<TimelineMetrics>(cacheSize);
     this.skipBlockCacheForAggregatorsEnabled = metricsConf.getBoolean(AGGREGATORS_SKIP_BLOCK_CACHE, false);
-    this.timelineMetricsTablesDurability = metricsConf.get(TIMELINE_METRICS_AGGREGATE_TABLES_DURABILITY, "");
-    this.timelineMetricsPrecisionTableDurability = metricsConf.get(TIMELINE_METRICS_PRECISION_TABLE_DURABILITY, "");
 
     tableTTL.put(METRICS_RECORD_TABLE_NAME, metricsConf.getInt(PRECISION_TABLE_TTL, 1 * 86400));  // 1 day
-    tableTTL.put(CONTAINER_METRICS_TABLE_NAME, metricsConf.getInt(CONTAINER_METRICS_TTL, 30 * 86400));  // 30 days
+    tableTTL.put(CONTAINER_METRICS_TABLE_NAME, metricsConf.getInt(CONTAINER_METRICS_TTL, 14 * 86400));  // 30 days
     tableTTL.put(METRICS_AGGREGATE_MINUTE_TABLE_NAME, metricsConf.getInt(HOST_MINUTE_TABLE_TTL, 7 * 86400)); //7 days
     tableTTL.put(METRICS_AGGREGATE_HOURLY_TABLE_NAME, metricsConf.getInt(HOST_HOUR_TABLE_TTL, 30 * 86400)); //30 days
     tableTTL.put(METRICS_AGGREGATE_DAILY_TABLE_NAME, metricsConf.getInt(HOST_DAILY_TABLE_TTL, 365 * 86400)); //1 year
@@ -470,7 +448,7 @@
     return mapper.readValue(json, metricValuesTypeRef);
   }
 
-  private Connection getConnectionRetryingOnException() throws SQLException, InterruptedException {
+  public Connection getConnectionRetryingOnException() throws SQLException, InterruptedException {
     RetryCounter retryCounter = retryCounterFactory.create();
     while (true) {
       try{
@@ -511,6 +489,9 @@
   protected void initMetricSchema() {
     Connection conn = null;
     Statement stmt = null;
+    PreparedStatement pStmt = null;
+    TimelineMetricSplitPointComputer splitPointComputer = new TimelineMetricSplitPointComputer(
+      metricsConf, hbaseConf, metadataManagerInstance);
 
     String encoding = metricsConf.get(HBASE_ENCODING_SCHEME, DEFAULT_ENCODING);
     String compression = metricsConf.get(HBASE_COMPRESSION_SCHEME, DEFAULT_TABLE_COMPRESSION);
@@ -521,21 +502,6 @@
       conn = getConnectionRetryingOnException();
       stmt = conn.createStatement();
 
-      // Metadata
-      String metadataSql = String.format(CREATE_METRICS_METADATA_TABLE_SQL,
-        encoding, compression);
-      stmt.executeUpdate(metadataSql);
-      stmt.executeUpdate(ALTER_METRICS_METADATA_TABLE);
-
-      String hostedAppSql = String.format(CREATE_HOSTED_APPS_METADATA_TABLE_SQL,
-        encoding, compression);
-      stmt.executeUpdate(hostedAppSql);
-
-      //Host Instances table
-      String hostedInstancesSql = String.format(CREATE_INSTANCE_HOST_TABLE_SQL,
-        encoding, compression);
-      stmt.executeUpdate(hostedInstancesSql);
-
       // Container Metrics
       stmt.executeUpdate( String.format(CREATE_CONTAINER_METRICS_TABLE_SQL,
         encoding, tableTTL.get(CONTAINER_METRICS_TABLE_NAME), compression));
@@ -543,13 +509,15 @@
       // Host level
       String precisionSql = String.format(CREATE_METRICS_TABLE_SQL,
         encoding, tableTTL.get(METRICS_RECORD_TABLE_NAME), compression);
-      stmt.executeUpdate(precisionSql);
+      pStmt = prepareCreateMetricsTableStatement(conn, precisionSql, splitPointComputer.getPrecisionSplitPoints());
+      pStmt.executeUpdate();
 
       String hostMinuteAggregrateSql = String.format(CREATE_METRICS_AGGREGATE_TABLE_SQL,
         METRICS_AGGREGATE_MINUTE_TABLE_NAME, encoding,
         tableTTL.get(METRICS_AGGREGATE_MINUTE_TABLE_NAME),
         compression);
-      stmt.executeUpdate(hostMinuteAggregrateSql);
+      pStmt = prepareCreateMetricsTableStatement(conn, hostMinuteAggregrateSql, splitPointComputer.getHostAggregateSplitPoints());
+      pStmt.executeUpdate();
 
       stmt.executeUpdate(String.format(CREATE_METRICS_AGGREGATE_TABLE_SQL,
         METRICS_AGGREGATE_HOURLY_TABLE_NAME, encoding,
@@ -565,8 +533,9 @@
         METRICS_CLUSTER_AGGREGATE_TABLE_NAME, encoding,
         tableTTL.get(METRICS_CLUSTER_AGGREGATE_TABLE_NAME),
         compression);
+      pStmt = prepareCreateMetricsTableStatement(conn, aggregateSql, splitPointComputer.getClusterAggregateSplitPoints());
+      pStmt.executeUpdate();
 
-      stmt.executeUpdate(aggregateSql);
       stmt.executeUpdate(String.format(CREATE_METRICS_CLUSTER_AGGREGATE_GROUPED_TABLE_SQL,
         METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME, encoding,
         tableTTL.get(METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME),
@@ -603,6 +572,13 @@
           // Ignore
         }
       }
+      if (pStmt != null) {
+        try {
+          pStmt.close();
+        } catch (Exception e) {
+          // Ignore
+        }
+      }
       if (conn != null) {
         try {
           conn.close();
@@ -613,7 +589,7 @@
     }
   }
 
-  protected void initPoliciesAndTTL() {
+  void initPoliciesAndTTL() {
     Admin hBaseAdmin = null;
     try {
       hBaseAdmin = dataSource.getHBaseAdmin();
@@ -622,9 +598,13 @@
     }
 
     TableName[] tableNames = null;
+    TableName[] containerMetricsTableName = null;
+
     if (hBaseAdmin != null) {
       try {
         tableNames = hBaseAdmin.listTableNames(PHOENIX_TABLES_REGEX_PATTERN, false);
+        containerMetricsTableName = hBaseAdmin.listTableNames(CONTAINER_METRICS_TABLE_NAME, false);
+        tableNames = (TableName[]) ArrayUtils.addAll(tableNames, containerMetricsTableName);
       } catch (IOException e) {
         LOG.warn("Unable to get table names from HBaseAdmin for setting policies.", e);
         return;
@@ -708,72 +688,44 @@
   }
 
   private boolean setDurabilityForTable(String tableName, TableDescriptorBuilder tableDescriptor) {
-
-    boolean modifyTable = false;
-
-    if (METRIC_TRANSIENT_TABLE_NAME.equalsIgnoreCase(tableName)) {
-      tableDescriptor.setDurability(Durability.SKIP_WAL);
-      modifyTable = true;
-    } else if (METRICS_RECORD_TABLE_NAME.equals(tableName)) {
-      if (!timelineMetricsPrecisionTableDurability.isEmpty()) {
-        LOG.info("Setting WAL option " + timelineMetricsPrecisionTableDurability + " for table : " + tableName);
-        boolean validDurability = true;
-        if ("SKIP_WAL".equals(timelineMetricsPrecisionTableDurability)) {
-          tableDescriptor.setDurability(Durability.SKIP_WAL);
-        } else if ("SYNC_WAL".equals(timelineMetricsPrecisionTableDurability)) {
-          tableDescriptor.setDurability(Durability.SYNC_WAL);
-        } else if ("ASYNC_WAL".equals(timelineMetricsPrecisionTableDurability)) {
-          tableDescriptor.setDurability(Durability.ASYNC_WAL);
-        } else if ("FSYNC_WAL".equals(timelineMetricsPrecisionTableDurability)) {
-          tableDescriptor.setDurability(Durability.FSYNC_WAL);
-        } else {
-          LOG.info("Unknown value for " + TIMELINE_METRICS_PRECISION_TABLE_DURABILITY + " : " + timelineMetricsPrecisionTableDurability);
-          validDurability = false;
-        }
-        if (validDurability) {
-          modifyTable = true;
-        }
+    String tableDurability = metricsConf.get("timeline.metrics." + tableName + ".durability", "");
+    if (StringUtils.isNotEmpty(tableDurability)) {
+      LOG.info("Setting WAL option " + tableDurability + " for table : " + tableName);
+      boolean validDurability = true;
+      if ("SKIP_WAL".equals(tableDurability)) {
+        tableDescriptor.setDurability(Durability.SKIP_WAL);
+      } else if ("SYNC_WAL".equals(tableDurability)) {
+        tableDescriptor.setDurability(Durability.SYNC_WAL);
+      } else if ("ASYNC_WAL".equals(tableDurability)) {
+        tableDescriptor.setDurability(Durability.ASYNC_WAL);
+      } else if ("FSYNC_WAL".equals(tableDurability)) {
+        tableDescriptor.setDurability(Durability.FSYNC_WAL);
+      } else {
+        LOG.info("Unknown value for durability : " + tableDurability);
+        validDurability = false;
       }
-    } else {
-      if (!timelineMetricsTablesDurability.isEmpty()) {
-        LOG.info("Setting WAL option " + timelineMetricsTablesDurability + " for table : " + tableName);
-        boolean validDurability = true;
-        if ("SKIP_WAL".equals(timelineMetricsTablesDurability)) {
-          tableDescriptor.setDurability(Durability.SKIP_WAL);
-        } else if ("SYNC_WAL".equals(timelineMetricsTablesDurability)) {
-          tableDescriptor.setDurability(Durability.SYNC_WAL);
-        } else if ("ASYNC_WAL".equals(timelineMetricsTablesDurability)) {
-          tableDescriptor.setDurability(Durability.ASYNC_WAL);
-        } else if ("FSYNC_WAL".equals(timelineMetricsTablesDurability)) {
-          tableDescriptor.setDurability(Durability.FSYNC_WAL);
-        } else {
-          LOG.info("Unknown value for " + TIMELINE_METRICS_AGGREGATE_TABLES_DURABILITY + " : " + timelineMetricsTablesDurability);
-          validDurability = false;
-        }
-        if (validDurability) {
-          modifyTable = true;
-        }
-      }
+      return validDurability;
     }
-    return modifyTable;
+    return false;
   }
 
+
   private boolean setCompactionPolicyForTable(String tableName, TableDescriptorBuilder tableDescriptorBuilder) {
 
     boolean modifyTable = false;
 
-    String compactionPolicyKey = metricsConf.get(TIMELINE_METRICS_HBASE_AGGREGATE_TABLE_COMPACTION_POLICY_KEY,
-      HSTORE_ENGINE_CLASS);
-    String compactionPolicyClass = metricsConf.get(TIMELINE_METRICS_HBASE_AGGREGATE_TABLE_COMPACTION_POLICY_CLASS,
-      DATE_TIERED_COMPACTION_POLICY);
-    int blockingStoreFiles = hbaseConf.getInt(TIMELINE_METRICS_AGGREGATE_TABLE_HBASE_BLOCKING_STORE_FILES, 60);
+    String keyConfig = "timeline.metrics." + tableName + ".compaction.policy.key";
+    String policyConfig = "timeline.metrics." + tableName + ".compaction.policy";
+    String storeFilesConfig = "timeline.metrics." + tableName + ".blocking.store.files";
 
-    if (tableName.equals(METRICS_RECORD_TABLE_NAME) || tableName.equalsIgnoreCase(METRIC_TRANSIENT_TABLE_NAME)) {
-      compactionPolicyKey = metricsConf.get(TIMELINE_METRICS_HBASE_PRECISION_TABLE_COMPACTION_POLICY_KEY,
-        HSTORE_COMPACTION_CLASS_KEY);
-      compactionPolicyClass = metricsConf.get(TIMELINE_METRICS_HBASE_PRECISION_TABLE_COMPACTION_POLICY_CLASS,
-        FIFO_COMPACTION_POLICY_CLASS);
-      blockingStoreFiles = hbaseConf.getInt(TIMELINE_METRICS_PRECISION_TABLE_HBASE_BLOCKING_STORE_FILES, 1000);
+    String compactionPolicyKey = metricsConf.get(keyConfig, HSTORE_ENGINE_CLASS);
+    String compactionPolicyClass = metricsConf.get(policyConfig, DATE_TIERED_COMPACTION_POLICY);
+    int blockingStoreFiles = hbaseConf.getInt(storeFilesConfig, 60);
+
+    if (tableName.equals(METRICS_RECORD_TABLE_NAME)) {
+      compactionPolicyKey = metricsConf.get(keyConfig, HSTORE_COMPACTION_CLASS_KEY);
+      compactionPolicyClass = metricsConf.get(policyConfig, FIFO_COMPACTION_POLICY_CLASS);
+      blockingStoreFiles = hbaseConf.getInt(storeFilesConfig, 1000);
     }
 
     if (StringUtils.isEmpty(compactionPolicyKey) || StringUtils.isEmpty(compactionPolicyClass)) {
@@ -781,46 +733,54 @@
       modifyTable = setHbaseBlockingStoreFiles(tableDescriptorBuilder, tableName, 300);
     } else {
       tableDescriptorBuilder.setValue(compactionPolicyKey, compactionPolicyClass);
-      tableDescriptorBuilder.removeValue(HSTORE_ENGINE_CLASS.getBytes());
-      tableDescriptorBuilder.removeValue(HSTORE_COMPACTION_CLASS_KEY.getBytes());
       setHbaseBlockingStoreFiles(tableDescriptorBuilder, tableName, blockingStoreFiles);
       modifyTable = true;
     }
 
+    if (!compactionPolicyKey.equals(HSTORE_ENGINE_CLASS)) {
+      tableDescriptorBuilder.removeValue(HSTORE_ENGINE_CLASS.getBytes());
+    }
+    if (!compactionPolicyKey.equals(HSTORE_COMPACTION_CLASS_KEY)) {
+      tableDescriptorBuilder.removeValue(HSTORE_COMPACTION_CLASS_KEY.getBytes());
+    }
+
     return modifyTable;
   }
 
   private boolean setHbaseBlockingStoreFiles(TableDescriptorBuilder tableDescriptor, String tableName, int value) {
-    int blockingStoreFiles = hbaseConf.getInt(HBASE_BLOCKING_STORE_FILES, value);
-    if (blockingStoreFiles != value) {
-      blockingStoreFiles = value;
-      tableDescriptor.setValue(BLOCKING_STORE_FILES_KEY, String.valueOf(value));
-      LOG.info("Setting config property " + BLOCKING_STORE_FILES_KEY +
-        " = " + blockingStoreFiles + " for " + tableName);
-      return true;
-    }
-    return false;
+    tableDescriptor.setValue(BLOCKING_STORE_FILES_KEY, String.valueOf(value));
+    LOG.info("Setting config property " + BLOCKING_STORE_FILES_KEY +
+      " = " + value + " for " + tableName);
+    return true;
   }
 
-  protected String getSplitPointsStr(String splitPoints) {
-    if (StringUtils.isEmpty(splitPoints.trim())) {
+
+  private PreparedStatement prepareCreateMetricsTableStatement(Connection connection,
+                                                               String sql,
+                                                               List<byte[]> splitPoints) throws SQLException {
+
+    String createTableWithSplitPointsSql = sql + getSplitPointsStr(splitPoints.size());
+    LOG.info(createTableWithSplitPointsSql);
+    PreparedStatement statement = connection.prepareStatement(createTableWithSplitPointsSql);
+    for (int i = 1; i <= splitPoints.size(); i++) {
+      statement.setBytes(i, splitPoints.get(i - 1));
+    }
+    return statement;
+  }
+
+  private String getSplitPointsStr(int numSplits) {
+    if (numSplits <= 0) {
       return "";
     }
-    String[] points = splitPoints.split(",");
-    if (points.length > 0) {
-      StringBuilder sb = new StringBuilder(" SPLIT ON ");
-      sb.append("(");
-      for (String point : points) {
-        sb.append("'");
-        sb.append(point.trim());
-        sb.append("'");
-        sb.append(",");
-      }
-      sb.deleteCharAt(sb.length() - 1);
-      sb.append(")");
-      return sb.toString();
+    StringBuilder sb = new StringBuilder(" SPLIT ON ");
+    sb.append("(");
+    for (int i = 0; i < numSplits; i++) {
+      sb.append("?");
+      sb.append(",");
     }
-    return "";
+    sb.deleteCharAt(sb.length() - 1);
+    sb.append(")");
+    return sb.toString();
   }
 
   /**
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/TimelineMetricConfiguration.java b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/TimelineMetricConfiguration.java
index 6ec2c6b..393d4a3 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/TimelineMetricConfiguration.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/TimelineMetricConfiguration.java
@@ -237,12 +237,6 @@
   public static final String WATCHER_MAX_FAILURES =
     "timeline.metrics.service.watcher.max.failures";
 
-  public static final String PRECISION_TABLE_SPLIT_POINTS =
-    "timeline.metrics.host.aggregate.splitpoints";
-
-  public static final String AGGREGATE_TABLE_SPLIT_POINTS =
-    "timeline.metrics.cluster.aggregate.splitpoints";
-
   public static final String AGGREGATORS_SKIP_BLOCK_CACHE =
     "timeline.metrics.aggregators.skip.blockcache.enabled";
 
@@ -261,12 +255,6 @@
   public static final String TIMELINE_METRICS_SINK_COLLECTION_PERIOD =
     "timeline.metrics.sink.collection.period";
 
-  public static final String TIMELINE_METRICS_PRECISION_TABLE_DURABILITY =
-    "timeline.metrics.precision.table.durability";
-
-  public static final String TIMELINE_METRICS_AGGREGATE_TABLES_DURABILITY =
-      "timeline.metrics.aggregate.tables.durability";
-
   public static final String TIMELINE_METRICS_WHITELIST_ENABLED =
     "timeline.metrics.whitelisting.enabled";
 
@@ -285,33 +273,9 @@
   public static final String TIMELINE_METRICS_APPS_WHITELIST =
     "timeline.metrics.apps.whitelist";
 
-  public static final String HBASE_BLOCKING_STORE_FILES =
-    "hbase.hstore.blockingStoreFiles";
-
-  public static final String DEFAULT_TOPN_HOSTS_LIMIT =
-    "timeline.metrics.default.topn.hosts.limit";
-
   public static final String TIMELINE_METRIC_AGGREGATION_SQL_FILTERS =
     "timeline.metrics.cluster.aggregation.sql.filters";
 
-  public static final String TIMELINE_METRICS_HBASE_AGGREGATE_TABLE_COMPACTION_POLICY_KEY =
-    "timeline.metrics.hbase.aggregate.table.compaction.policy.key";
-
-  public static final String TIMELINE_METRICS_HBASE_AGGREGATE_TABLE_COMPACTION_POLICY_CLASS =
-    "timeline.metrics.hbase.aggregate.table.compaction.policy.class";
-
-  public static final String TIMELINE_METRICS_AGGREGATE_TABLE_HBASE_BLOCKING_STORE_FILES =
-    "timeline.metrics.aggregate.table.hbase.hstore.blockingStoreFiles";
-
-  public static final String TIMELINE_METRICS_HBASE_PRECISION_TABLE_COMPACTION_POLICY_KEY =
-    "timeline.metrics.hbase.precision.table.compaction.policy.key";
-
-  public static final String TIMELINE_METRICS_HBASE_PRECISION_TABLE_COMPACTION_POLICY_CLASS =
-    "timeline.metrics.hbase.precision.table.compaction.policy.class";
-
-  public static final String TIMELINE_METRICS_PRECISION_TABLE_HBASE_BLOCKING_STORE_FILES =
-    "timeline.metrics.precision.table.hbase.hstore.blockingStoreFiles";
-
   public static final String TIMELINE_METRICS_SUPPORT_MULTIPLE_CLUSTERS =
     "timeline.metrics.support.multiple.clusters";
 
@@ -346,6 +310,9 @@
 
   public static final String TRANSIENT_METRIC_PATTERNS = "timeline.metrics.transient.metric.patterns";
 
+  public static final String TIMELINE_METRIC_INITIAL_CONFIGURED_MASTER_COMPONENTS = "timeline.metrics.initial.configured.master.components";
+  public static final String TIMELINE_METRIC_INITIAL_CONFIGURED_SLAVE_COMPONENTS = "timeline.metrics.initial.configured.slave.components";
+
   public static final String KAFKA_SERVERS = "timeline.metrics.external.sink.kafka.bootstrap.servers";
   public static final String KAFKA_ACKS = "timeline.metrics.external.sink.kafka.acks";
   public static final String KAFKA_RETRIES = "timeline.metrics.external.sink.kafka.bootstrap.retries";
@@ -353,7 +320,13 @@
   public static final String KAFKA_LINGER_MS = "timeline.metrics.external.sink.kafka.linger.ms";
   public static final String KAFKA_BUFFER_MEM = "timeline.metrics.external.sink.kafka.buffer.memory";
   public static final String KAFKA_SINK_TIMEOUT_SECONDS = "timeline.metrics.external.sink.kafka.timeout.seconds";
-  
+
+  public static final String HSTORE_COMPACTION_CLASS_KEY = "hbase.hstore.defaultengine.compactionpolicy.class";
+  public static final String HSTORE_ENGINE_CLASS = "hbase.hstore.engine.class";
+  public static final String FIFO_COMPACTION_POLICY_CLASS = "org.apache.hadoop.hbase.regionserver.compactions.FIFOCompactionPolicy";
+  public static final String DATE_TIERED_COMPACTION_POLICY = "org.apache.hadoop.hbase.regionserver.DateTieredStoreEngine";
+  public static final String BLOCKING_STORE_FILES_KEY = "hbase.hstore.blockingStoreFiles";
+
   private Configuration hbaseConf;
   private Configuration metricsConf;
   private Configuration metricsSslConf;
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/TimelineMetricSplitPointComputer.java b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/TimelineMetricSplitPointComputer.java
new file mode 100644
index 0000000..89bb843
--- /dev/null
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/TimelineMetricSplitPointComputer.java
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.core.timeline;
+
+import org.apache.ambari.metrics.core.timeline.aggregators.TimelineClusterMetric;
+import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataManager;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRIC_INITIAL_CONFIGURED_MASTER_COMPONENTS;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRIC_INITIAL_CONFIGURED_SLAVE_COMPONENTS;
+
+public class TimelineMetricSplitPointComputer {
+
+  private static final Log LOG = LogFactory.getLog(TimelineMetricSplitPointComputer.class);
+  private Set<String> masterComponents = new HashSet<>();
+  private Set<String> slaveComponents = new HashSet<>();
+
+  private static final int MINIMUM_PRECISION_TABLE_REGIONS = 4;
+  private static final int MINIMUM_AGGREGATE_TABLE_REGIONS = 2;
+  private static final int OTHER_TABLE_STATIC_REGIONS = 8;
+  private static final int SLAVE_EQUIDISTANT_POINTS = 50;
+  private static final int MASTER_EQUIDISTANT_POINTS = 5;
+
+  private List<byte[]> precisionSplitPoints = new ArrayList<>();
+  private List<byte[]> aggregateSplitPoints = new ArrayList<>();
+
+  public TimelineMetricSplitPointComputer(Configuration metricsConf,
+                                          Configuration hbaseConf,
+                                          TimelineMetricMetadataManager timelineMetricMetadataManager) {
+
+    String componentsString = metricsConf.get(TIMELINE_METRIC_INITIAL_CONFIGURED_MASTER_COMPONENTS, "");
+    if (StringUtils.isNotEmpty(componentsString)) {
+      masterComponents.addAll(Arrays.asList(componentsString.split(",")));
+    }
+
+   componentsString = metricsConf.get(TIMELINE_METRIC_INITIAL_CONFIGURED_SLAVE_COMPONENTS, "");
+    if (StringUtils.isNotEmpty(componentsString)) {
+      slaveComponents.addAll(Arrays.asList(componentsString.split(",")));
+    }
+
+    double hbaseTotalHeapsize = metricsConf.getDouble("hbase_total_heapsize", 1024*1024*1024);
+    double hbaseMemstoreUpperLimit = hbaseConf.getDouble("hbase.regionserver.global.memstore.upperLimit", 0.5);
+    double hbaseMemstoreFlushSize = hbaseConf.getDouble("hbase.hregion.memstore.flush.size", 134217728);
+
+    computeSplitPoints(hbaseTotalHeapsize, hbaseMemstoreUpperLimit, hbaseMemstoreFlushSize, timelineMetricMetadataManager);
+  }
+
+
+  private void computeSplitPoints(double hbaseTotalHeapsize,
+                                  double hbaseMemstoreUpperLimit,
+                                  double hbaseMemstoreFlushSize,
+                                  TimelineMetricMetadataManager timelineMetricMetadataManager) {
+
+    double memstoreMaxMemory = hbaseMemstoreUpperLimit * hbaseTotalHeapsize;
+    int maxInMemoryRegions = (int) ((memstoreMaxMemory / hbaseMemstoreFlushSize) - OTHER_TABLE_STATIC_REGIONS);
+
+    int targetPrecisionTableRegionCount = MINIMUM_PRECISION_TABLE_REGIONS;
+    int targetAggregateTableRegionCount = MINIMUM_AGGREGATE_TABLE_REGIONS;
+
+    if (maxInMemoryRegions > 2) {
+      targetPrecisionTableRegionCount =  Math.max(4, (int)(0.70 * maxInMemoryRegions));
+      targetAggregateTableRegionCount =  Math.max(2, (int)(0.15 * maxInMemoryRegions));
+    }
+
+    List<MetricApp> metricList = new ArrayList<>();
+
+    for (String component : masterComponents) {
+      metricList.addAll(getSortedMetricListForSplitPoint(component, false));
+    }
+
+    for (String component : slaveComponents) {
+      metricList.addAll(getSortedMetricListForSplitPoint(component, true));
+    }
+
+    int totalMetricLength = metricList.size();
+
+    if (targetPrecisionTableRegionCount > 1) {
+      int idx = (int) Math.ceil(totalMetricLength / targetPrecisionTableRegionCount);
+      int index = idx;
+      for (int i = 0; i < targetPrecisionTableRegionCount; i++) {
+        if (index < totalMetricLength - 1) {
+          MetricApp metricAppService = metricList.get(index);
+          byte[] uuid = timelineMetricMetadataManager.getUuid(
+            new TimelineClusterMetric(metricAppService.metricName, metricAppService.appId, null, -1),
+            true);
+          precisionSplitPoints.add(uuid);
+          index += idx;
+        }
+      }
+    }
+
+    if (targetAggregateTableRegionCount > 1) {
+      int idx = (int) Math.ceil(totalMetricLength / targetAggregateTableRegionCount);
+      int index = idx;
+      for (int i = 0; i < targetAggregateTableRegionCount; i++) {
+        if (index < totalMetricLength - 1) {
+          MetricApp metricAppService = metricList.get(index);
+          byte[] uuid = timelineMetricMetadataManager.getUuid(
+            new TimelineClusterMetric(metricAppService.metricName, metricAppService.appId, null, -1),
+            true);
+          aggregateSplitPoints.add(uuid);
+          index += idx;
+        }
+      }
+    }
+  }
+
+  private List<MetricApp> getSortedMetricListForSplitPoint(String component, boolean isSlave) {
+
+    String appId = getAppId(component);
+    List<MetricApp> metricList = new ArrayList<>();
+
+    ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+    if (classLoader == null) {
+      classLoader = getClass().getClassLoader();
+    }
+
+    String strLine;
+    BufferedReader bufferedReader;
+
+    try (InputStream inputStream = classLoader.getResourceAsStream("metrics_def/" + appId.toUpperCase() + ".dat")) {
+
+      if (inputStream != null) {
+        bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
+        LOG.info("Found split point candidate metrics for : " + appId);
+
+        while ((strLine = bufferedReader.readLine()) != null) {
+          metricList.add(new MetricApp(strLine.trim(), appId));
+        }
+      } else {
+        LOG.info("Split point candidate metrics not found for : " + appId);
+      }
+    } catch (Exception e) {
+      LOG.info("Error reading split point candidate metrics for component : " + component);
+      LOG.error(e);
+    }
+
+    if (isSlave) {
+      return getEquidistantMetrics(metricList, SLAVE_EQUIDISTANT_POINTS);
+    } else {
+      return getEquidistantMetrics(metricList, MASTER_EQUIDISTANT_POINTS);
+    }
+  }
+
+  private List<MetricApp> getEquidistantMetrics(List<MetricApp> metrics, int distance) {
+    List<MetricApp> selectedMetricApps = new ArrayList<>();
+
+    int idx = metrics.size() / distance;
+    if (idx == 0) {
+      return metrics;
+    }
+
+    int index = idx;
+    for (int i = 0; i < distance; i++) {
+      selectedMetricApps.add(metrics.get(index - 1));
+      index += idx;
+    }
+    return selectedMetricApps;
+  }
+
+
+  public List<byte[]> getPrecisionSplitPoints() {
+    return precisionSplitPoints;
+  }
+
+  public List<byte[]> getClusterAggregateSplitPoints() {
+    return aggregateSplitPoints;
+  }
+
+  public List<byte[]> getHostAggregateSplitPoints() {
+    return aggregateSplitPoints;
+  }
+
+  private String getAppId(String component) {
+
+    if (component.equalsIgnoreCase("METRICS_COLLECTOR")) {
+      return "ams-hbase";
+    }
+
+    if (component.equalsIgnoreCase("METRICS_MONITOR")) {
+      return "HOST";
+    }
+    return component;
+  }
+}
+
+class MetricApp implements Comparable{
+  String metricName;
+  String appId;
+
+  MetricApp(String metricName, String appId) {
+    this.metricName = metricName;
+    if (appId.startsWith("hbase")) {
+      this.appId = "hbase";
+    } else {
+      this.appId = appId;
+    }
+  }
+
+  @Override
+  public int compareTo(Object o) {
+    MetricApp that = (MetricApp)o;
+
+    int metricCompare = metricName.compareTo(that.metricName);
+    if (metricCompare != 0) {
+      return metricCompare;
+    }
+
+    return appId.compareTo(that.appId);
+  }
+}
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/TimelineMetricStoreWatcher.java b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/TimelineMetricStoreWatcher.java
index 0ab7929..ba7ce44 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/TimelineMetricStoreWatcher.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/TimelineMetricStoreWatcher.java
@@ -45,13 +45,13 @@
   private static int failures = 0;
   private final TimelineMetricConfiguration configuration;
 
-  private TimelineMetricStore timelineMetricStore;
+  private HBaseTimelineMetricsService timelineMetricStore;
 
   //used to call timelineMetricStore blocking methods with timeout
   private ExecutorService executor = Executors.newSingleThreadExecutor();
 
 
-  public TimelineMetricStoreWatcher(TimelineMetricStore timelineMetricStore,
+  public TimelineMetricStoreWatcher(HBaseTimelineMetricsService timelineMetricStore,
                                     TimelineMetricConfiguration configuration) {
     this.timelineMetricStore = timelineMetricStore;
     this.configuration = configuration;
@@ -100,7 +100,7 @@
 
     Callable<TimelineMetric> task = new Callable<TimelineMetric>() {
       public TimelineMetric call() throws Exception {
-        timelineMetricStore.putMetrics(metrics);
+        timelineMetricStore.putMetricsSkipCache(metrics);
         TimelineMetrics timelineMetrics = timelineMetricStore.getTimelineMetrics(
           Collections.singletonList(FAKE_METRIC_NAME), Collections.singletonList(FAKE_HOSTNAME),
           FAKE_APP_ID, null, startTime - delay * 2 * 1000,
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/discovery/TimelineMetricMetadataManager.java b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/discovery/TimelineMetricMetadataManager.java
index 1ca5bc0..737c2ff 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/discovery/TimelineMetricMetadataManager.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/discovery/TimelineMetricMetadataManager.java
@@ -20,7 +20,9 @@
 import java.io.IOException;
 import java.net.MalformedURLException;
 import java.net.URISyntaxException;
+import java.sql.Connection;
 import java.sql.SQLException;
+import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -38,9 +40,12 @@
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import org.apache.ambari.metrics.core.timeline.MetricsSystemInitializationException;
 import org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration;
 import org.apache.ambari.metrics.core.timeline.uuid.MetricUuidGenStrategy;
 import org.apache.ambari.metrics.core.timeline.uuid.MD5UuidGenStrategy;
+import org.apache.ambari.metrics.core.timeline.uuid.Murmur3HashUuidGenStrategy;
+import org.apache.ambari.metrics.core.timeline.uuid.TimelineMetricUuid;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang.ArrayUtils;
@@ -53,13 +58,19 @@
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
 import org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor;
 import org.apache.ambari.metrics.core.timeline.aggregators.TimelineClusterMetric;
-import org.apache.ambari.metrics.core.timeline.uuid.HashBasedUuidGenStrategy;
 
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HBASE_COMPRESSION_SCHEME;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HBASE_ENCODING_SCHEME;
 import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TRANSIENT_METRIC_PATTERNS;
 import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.METRICS_METADATA_SYNC_INIT_DELAY;
 import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.METRICS_METADATA_SYNC_SCHEDULE_DELAY;
 import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_UUID_GEN_STRATEGY;
 import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRIC_METADATA_FILTERS;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.CREATE_HOSTED_APPS_METADATA_TABLE_SQL;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.CREATE_INSTANCE_HOST_TABLE_SQL;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.CREATE_METRICS_METADATA_TABLE_SQL;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.DEFAULT_ENCODING;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.DEFAULT_TABLE_COMPRESSION;
 import static org.apache.hadoop.metrics2.sink.timeline.TimelineMetricUtils.getJavaMetricPatterns;
 import static org.apache.hadoop.metrics2.sink.timeline.TimelineMetricUtils.getJavaRegexFromSqlRegex;
 
@@ -67,18 +78,18 @@
   private static final Log LOG = LogFactory.getLog(TimelineMetricMetadataManager.class);
   // Cache all metadata on retrieval
   private final Map<TimelineMetricMetadataKey, TimelineMetricMetadata> METADATA_CACHE = new ConcurrentHashMap<>();
-  private final Map<String, TimelineMetricMetadataKey> uuidKeyMap = new ConcurrentHashMap<>();
+  private final Map<TimelineMetricUuid, TimelineMetricMetadataKey> uuidKeyMap = new ConcurrentHashMap<>();
   // Map to lookup apps on a host
   private final Map<String, TimelineMetricHostMetadata> HOSTED_APPS_MAP = new ConcurrentHashMap<>();
-  private final Map<String, String> uuidHostMap = new ConcurrentHashMap<>();
+  private final Map<TimelineMetricUuid, String> uuidHostMap = new ConcurrentHashMap<>();
   private final Map<String, Set<String>> INSTANCE_HOST_MAP = new ConcurrentHashMap<>();
   // Sync only when needed
   AtomicBoolean SYNC_HOSTED_APPS_METADATA = new AtomicBoolean(false);
   AtomicBoolean SYNC_HOSTED_INSTANCES_METADATA = new AtomicBoolean(false);
 
-  private MetricUuidGenStrategy uuidGenStrategy = new HashBasedUuidGenStrategy();
+  private MetricUuidGenStrategy uuidGenStrategy = new Murmur3HashUuidGenStrategy();
   public static final int TIMELINE_METRIC_UUID_LENGTH = 16;
-  public static final int HOSTNAME_UUID_LENGTH = 16;
+  public static int HOSTNAME_UUID_LENGTH = 4;
 
   //Transient metric patterns. No UUID management and aggregation for such metrics.
   private List<String> transientMetricPatterns = new ArrayList<>();
@@ -120,7 +131,54 @@
    * Initialize Metadata from the store
    */
   public void initializeMetadata() {
-    metricMetadataSync = new TimelineMetricMetadataSync(this);
+
+    //Create metadata schema
+    Connection conn = null;
+    Statement stmt = null;
+
+    String encoding = metricsConf.get(HBASE_ENCODING_SCHEME, DEFAULT_ENCODING);
+    String compression = metricsConf.get(HBASE_COMPRESSION_SCHEME, DEFAULT_TABLE_COMPRESSION);
+
+    try {
+      LOG.info("Initializing metrics metadata schema...");
+      conn = hBaseAccessor.getConnectionRetryingOnException();
+      stmt = conn.createStatement();
+
+      // Metadata
+      String metadataSql = String.format(CREATE_METRICS_METADATA_TABLE_SQL,
+        encoding, compression);
+      stmt.executeUpdate(metadataSql);
+
+      String hostedAppSql = String.format(CREATE_HOSTED_APPS_METADATA_TABLE_SQL,
+        encoding, compression);
+      stmt.executeUpdate(hostedAppSql);
+
+      //Host Instances table
+      String hostedInstancesSql = String.format(CREATE_INSTANCE_HOST_TABLE_SQL,
+        encoding, compression);
+      stmt.executeUpdate(hostedInstancesSql);
+    } catch (SQLException | InterruptedException sql) {
+      LOG.error("Error creating Metrics Schema in HBase using Phoenix.", sql);
+      throw new MetricsSystemInitializationException(
+        "Error creating Metrics Metadata Schema in HBase using Phoenix.", sql);
+    } finally {
+      if (stmt != null) {
+        try {
+          stmt.close();
+        } catch (SQLException e) {
+          // Ignore
+        }
+      }
+      if (conn != null) {
+        try {
+          conn.close();
+        } catch (SQLException e) {
+          // Ignore
+        }
+      }
+    }
+
+      metricMetadataSync = new TimelineMetricMetadataSync(this);
     // Schedule the executor to sync to store
     executorService.scheduleWithFixedDelay(metricMetadataSync,
       metricsConf.getInt(METRICS_METADATA_SYNC_INIT_DELAY, 120), // 2 minutes
@@ -335,14 +393,26 @@
     for (TimelineMetricMetadataKey key : METADATA_CACHE.keySet()) {
       TimelineMetricMetadata timelineMetricMetadata = METADATA_CACHE.get(key);
       if (timelineMetricMetadata != null && timelineMetricMetadata.getUuid() != null) {
-        uuidKeyMap.put(new String(timelineMetricMetadata.getUuid()), key);
+        uuidKeyMap.put(new TimelineMetricUuid(timelineMetricMetadata.getUuid()), key);
+      }
+    }
+
+    if (!HOSTED_APPS_MAP.isEmpty()) {
+      Map.Entry<String, TimelineMetricHostMetadata> entry = HOSTED_APPS_MAP.entrySet().iterator().next();
+      TimelineMetricHostMetadata timelineMetricHostMetadata = entry.getValue();
+      if (timelineMetricHostMetadata.getUuid() != null  && timelineMetricHostMetadata.getUuid().length == 16) {
+        HOSTNAME_UUID_LENGTH = 16;
+        uuidGenStrategy = new MD5UuidGenStrategy();
+      } else {
+        HOSTNAME_UUID_LENGTH = 4;
+        uuidGenStrategy = new Murmur3HashUuidGenStrategy();
       }
     }
 
     for (String host : HOSTED_APPS_MAP.keySet()) {
       TimelineMetricHostMetadata timelineMetricHostMetadata = HOSTED_APPS_MAP.get(host);
       if (timelineMetricHostMetadata != null && timelineMetricHostMetadata.getUuid() != null) {
-        uuidHostMap.put(new String(timelineMetricHostMetadata.getUuid()), host);
+        uuidHostMap.put(new TimelineMetricUuid(timelineMetricHostMetadata.getUuid()), host);
       }
     }
   }
@@ -354,11 +424,11 @@
    */
   private MetricUuidGenStrategy getUuidStrategy(Configuration configuration) {
     String strategy = configuration.get(TIMELINE_METRICS_UUID_GEN_STRATEGY, "");
-    if ("hash".equalsIgnoreCase(strategy)) {
-      return new HashBasedUuidGenStrategy();
+    if ("md5".equalsIgnoreCase(strategy)){
+      return new MD5UuidGenStrategy();
     } else {
       //Default
-      return new MD5UuidGenStrategy();
+      return new Murmur3HashUuidGenStrategy();
     }
   }
 
@@ -379,14 +449,13 @@
     }
 
     if (!createIfNotPresent) {
-      LOG.warn("UUID not found for " + hostname + ", createIfNotPresent is false");
+      LOG.debug("UUID not found for " + hostname + ", createIfNotPresent is false");
       return null;
     }
 
     byte[] uuid = uuidGenStrategy.computeUuid(hostname, HOSTNAME_UUID_LENGTH);
-    String uuidStr = new String(uuid);
-    if (uuidHostMap.containsKey(uuidStr)) {
-      LOG.error("Duplicate key computed for " + hostname +", Collides with  " + uuidHostMap.get(uuidStr));
+    if (uuidHostMap.containsKey(new TimelineMetricUuid(uuid))) {
+      LOG.error("Duplicate key computed for " + hostname +", Collides with  " + uuidHostMap.get(uuid));
       return null;
     }
 
@@ -395,7 +464,7 @@
       HOSTED_APPS_MAP.put(hostname, timelineMetricHostMetadata);
     }
     timelineMetricHostMetadata.setUuid(uuid);
-    uuidHostMap.put(uuidStr, hostname);
+    uuidHostMap.put(new TimelineMetricUuid(uuid), hostname);
 
     return uuid;
   }
@@ -420,17 +489,16 @@
     }
 
     if (!createIfNotPresent) {
-      LOG.warn("UUID not found for " + key + ", createIfNotPresent is false");
+      LOG.debug("UUID not found for " + key + ", createIfNotPresent is false");
       return null;
     }
 
-    byte[] uuid = uuidGenStrategy.computeUuid(timelineClusterMetric, TIMELINE_METRIC_UUID_LENGTH);
+    byte[] uuidBytes = uuidGenStrategy.computeUuid(timelineClusterMetric, TIMELINE_METRIC_UUID_LENGTH);
 
-    String uuidStr = new String(uuid);
-    if (uuidKeyMap.containsKey(uuidStr) && !uuidKeyMap.get(uuidStr).equals(key)) {
-      TimelineMetricMetadataKey collidingKey = (TimelineMetricMetadataKey)uuidKeyMap.get(uuidStr);
-      LOG.error("Duplicate key " + Arrays.toString(uuid) + "(" + uuid +  ") computed for " + timelineClusterMetric.toString()
-        + ", Collides with  " + collidingKey.toString());
+    TimelineMetricUuid uuid = new TimelineMetricUuid(uuidBytes);
+    if (uuidKeyMap.containsKey(uuid) && !uuidKeyMap.get(uuid).equals(key)) {
+      TimelineMetricMetadataKey collidingKey = uuidKeyMap.get(uuid);
+      LOG.error("Duplicate key " + uuid + " computed for " + timelineClusterMetric + ", Collides with  " + collidingKey);
       return null;
     }
 
@@ -442,10 +510,10 @@
       METADATA_CACHE.put(key, timelineMetricMetadata);
     }
 
-    timelineMetricMetadata.setUuid(uuid);
+    timelineMetricMetadata.setUuid(uuid.uuid);
     timelineMetricMetadata.setIsPersisted(false);
-    uuidKeyMap.put(uuidStr, key);
-    return uuid;
+    uuidKeyMap.put(uuid, key);
+    return uuid.uuid;
   }
 
   /**
@@ -484,14 +552,14 @@
     return metricUuid;
   }
 
-  public String getMetricNameFromUuid(byte[]  uuid) {
+  public String getMetricNameFromUuid(byte[] uuid) {
 
     byte[] metricUuid = uuid;
     if (uuid.length == TIMELINE_METRIC_UUID_LENGTH + HOSTNAME_UUID_LENGTH) {
       metricUuid = ArrayUtils.subarray(uuid, 0, TIMELINE_METRIC_UUID_LENGTH);
     }
 
-    TimelineMetricMetadataKey key = uuidKeyMap.get(new String(metricUuid));
+    TimelineMetricMetadataKey key = uuidKeyMap.get(new TimelineMetricUuid(metricUuid));
     return key != null ? key.getMetricName() : null;
   }
 
@@ -506,11 +574,11 @@
     }
 
     if (uuid.length == TIMELINE_METRIC_UUID_LENGTH) {
-      TimelineMetricMetadataKey key = uuidKeyMap.get(new String(uuid));
+      TimelineMetricMetadataKey key = uuidKeyMap.get(new TimelineMetricUuid(uuid));
       return key != null ? new TimelineMetric(key.metricName, null, key.appId, key.instanceId) : null;
     } else {
       byte[] metricUuid = ArrayUtils.subarray(uuid, 0, TIMELINE_METRIC_UUID_LENGTH);
-      TimelineMetricMetadataKey key = uuidKeyMap.get(new String(metricUuid));
+      TimelineMetricMetadataKey key = uuidKeyMap.get(new TimelineMetricUuid(metricUuid));
       if (key == null) {
         LOG.error("TimelineMetricMetadataKey is null for : " + Arrays.toString(uuid));
         return null;
@@ -521,7 +589,7 @@
       timelineMetric.setInstanceId(key.instanceId);
 
       byte[] hostUuid = ArrayUtils.subarray(uuid, TIMELINE_METRIC_UUID_LENGTH, HOSTNAME_UUID_LENGTH + TIMELINE_METRIC_UUID_LENGTH);
-      timelineMetric.setHostName(uuidHostMap.get(new String(hostUuid)));
+      timelineMetric.setHostName(uuidHostMap.get(new TimelineMetricUuid(hostUuid)));
       return timelineMetric;
     }
   }
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/PhoenixTransactSQL.java b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/PhoenixTransactSQL.java
index f76933a..e0cc642 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/PhoenixTransactSQL.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/PhoenixTransactSQL.java
@@ -43,8 +43,11 @@
   /**
    * Create table to store individual metric records.
    */
+
+  public static final String METRICS_RECORD_TABLE_NAME = "METRIC_RECORD_UUID";
+
   public static final String CREATE_METRICS_TABLE_SQL = "CREATE TABLE IF NOT " +
-    "EXISTS METRIC_RECORD_UUID (UUID BINARY(32) NOT NULL, " +
+    "EXISTS " + METRICS_RECORD_TABLE_NAME + " (UUID BINARY(20) NOT NULL, " +
     "SERVER_TIME BIGINT NOT NULL, " +
     "METRIC_SUM DOUBLE, " +
     "METRIC_COUNT UNSIGNED_INT, " +
@@ -83,7 +86,7 @@
 
   public static final String CREATE_METRICS_AGGREGATE_TABLE_SQL =
     "CREATE TABLE IF NOT EXISTS %s " +
-      "(UUID BINARY(32) NOT NULL, " +
+      "(UUID BINARY(20) NOT NULL, " +
       "SERVER_TIME BIGINT NOT NULL, " +
       "METRIC_SUM DOUBLE," +
       "METRIC_COUNT UNSIGNED_INT, " +
@@ -155,7 +158,7 @@
 
   public static final String CREATE_HOSTED_APPS_METADATA_TABLE_SQL =
     "CREATE TABLE IF NOT EXISTS HOSTED_APPS_METADATA_UUID " +
-      "(HOSTNAME VARCHAR, UUID BINARY(16), APP_IDS VARCHAR, " +
+      "(HOSTNAME VARCHAR, UUID BINARY(4), APP_IDS VARCHAR, " +
       "CONSTRAINT pk PRIMARY KEY (HOSTNAME))" +
       "DATA_BLOCK_ENCODING='%s', COMPRESSION='%s'";
 
@@ -165,9 +168,6 @@
       "CONSTRAINT pk PRIMARY KEY (INSTANCE_ID, HOSTNAME))" +
       "DATA_BLOCK_ENCODING='%s', COMPRESSION='%s'";
 
-  public static final String ALTER_METRICS_METADATA_TABLE =
-    "ALTER TABLE METRICS_METADATA_UUID ADD IF NOT EXISTS IS_WHITELISTED BOOLEAN";
-
   ////////////////////////////////
 
   /**
@@ -442,8 +442,6 @@
   public static final String METRICS_CLUSTER_AGGREGATE_DAILY_V1_TABLE_NAME =
     "METRIC_AGGREGATE_DAILY";
 
-  public static final String METRICS_RECORD_TABLE_NAME = "METRIC_RECORD_UUID";
-
   public static final String METRICS_AGGREGATE_MINUTE_TABLE_NAME =
     "METRIC_RECORD_MINUTE_UUID";
   public static final String METRICS_AGGREGATE_HOURLY_TABLE_NAME =
@@ -459,7 +457,7 @@
   public static final String METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME =
     "METRIC_AGGREGATE_DAILY_UUID";
 
-  public static final Pattern PHOENIX_TABLES_REGEX_PATTERN = Pattern.compile("METRIC_.*_UUID");
+  public static final Pattern PHOENIX_TABLES_REGEX_PATTERN = Pattern.compile("METRIC_.*");
 
   public static final String[] PHOENIX_TABLES = {
     METRICS_RECORD_TABLE_NAME,
@@ -469,7 +467,9 @@
     METRICS_CLUSTER_AGGREGATE_TABLE_NAME,
     METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME,
     METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME,
-    METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME
+    METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME,
+    METRIC_TRANSIENT_TABLE_NAME,
+    CONTAINER_METRICS_TABLE_NAME
   };
 
   public static final String DEFAULT_TABLE_COMPRESSION = "SNAPPY";
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/uuid/Murmur3HashUuidGenStrategy.java b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/uuid/Murmur3HashUuidGenStrategy.java
new file mode 100644
index 0000000..9418aa4
--- /dev/null
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/uuid/Murmur3HashUuidGenStrategy.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES   OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.core.timeline.uuid;
+
+import com.google.common.hash.Hashing;
+import org.apache.ambari.metrics.core.timeline.aggregators.TimelineClusterMetric;
+import org.apache.commons.lang.StringUtils;
+
+public class Murmur3HashUuidGenStrategy implements MetricUuidGenStrategy{
+
+  @Override
+  public byte[] computeUuid(TimelineClusterMetric timelineClusterMetric, int maxLength) {
+
+    String metricString = timelineClusterMetric.getMetricName() + timelineClusterMetric.getAppId();
+    if (StringUtils.isNotEmpty(timelineClusterMetric.getInstanceId())) {
+      metricString += timelineClusterMetric.getInstanceId();
+    }
+    byte[] metricBytes = metricString.getBytes();
+    return Hashing.murmur3_128().hashBytes(metricBytes).asBytes();
+  }
+
+  @Override
+  public byte[] computeUuid(String value, int maxLength) {
+    byte[] valueBytes = value.getBytes();
+    return Hashing.murmur3_32().hashBytes(valueBytes).asBytes();
+  }
+}
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/uuid/TimelineMetricUuid.java b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/uuid/TimelineMetricUuid.java
new file mode 100644
index 0000000..7907ff6
--- /dev/null
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/uuid/TimelineMetricUuid.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.core.timeline.uuid;
+
+import java.util.Arrays;
+
+public class TimelineMetricUuid {
+  public byte[] uuid;
+
+  public TimelineMetricUuid(byte[] uuid) {
+    this.uuid = uuid;
+  }
+
+  @Override
+  public int hashCode() {
+    return Arrays.hashCode(uuid);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+
+    if (this == o) {
+      return false;
+    }
+
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    TimelineMetricUuid that = (TimelineMetricUuid) o;
+
+    return Arrays.equals(this.uuid, that.uuid);
+  }
+
+  @Override
+  public String toString() {
+    return Arrays.toString(uuid);
+  }
+}
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/webapp/TimelineWebServices.java b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/webapp/TimelineWebServices.java
index 9c88b1a..3bcbaf6 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/webapp/TimelineWebServices.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/webapp/TimelineWebServices.java
@@ -144,8 +144,8 @@
       // TODO: Check ACLs for MetricEntity using the TimelineACLManager.
       // TODO: Save owner of the MetricEntity.
 
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Storing metrics: " +
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Storing metrics: " +
           TimelineUtils.dumpTimelineRecordtoJSON(metrics, true));
       }
 
@@ -175,8 +175,8 @@
     }
 
     try {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Storing aggregated metrics: " +
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Storing aggregated metrics: " +
                 TimelineUtils.dumpTimelineRecordtoJSON(metrics, true));
       }
 
@@ -200,8 +200,8 @@
     }
 
     try {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Storing container metrics: " + TimelineUtils
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Storing container metrics: " + TimelineUtils
             .dumpTimelineRecordtoJSON(metrics, true));
       }
 
@@ -250,8 +250,8 @@
   ) {
     init(res);
     try {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Request for metrics => metricNames: " + metricNames + ", " +
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Request for metrics => metricNames: " + metricNames + ", " +
           "appId: " + appId + ", instanceId: " + instanceId + ", " +
           "hostname: " + hostname + ", startTime: " + startTime + ", " +
           "endTime: " + endTime + ", " +
diff --git a/ambari-metrics-timelineservice/src/main/resources/metrics_def/AMSSMOKETESTFAKE.dat b/ambari-metrics-timelineservice/src/main/resources/metrics_def/AMSSMOKETESTFAKE.dat
new file mode 100644
index 0000000..f5c181a
--- /dev/null
+++ b/ambari-metrics-timelineservice/src/main/resources/metrics_def/AMSSMOKETESTFAKE.dat
@@ -0,0 +1 @@
+AMBARI_METRICS.SmokeTest.FakeMetric
\ No newline at end of file
diff --git a/ambari-metrics-timelineservice/src/main/resources/metrics_def/MASTER_HBASE.dat b/ambari-metrics-timelineservice/src/main/resources/metrics_def/HBASE_MASTER.dat
similarity index 100%
rename from ambari-metrics-timelineservice/src/main/resources/metrics_def/MASTER_HBASE.dat
rename to ambari-metrics-timelineservice/src/main/resources/metrics_def/HBASE_MASTER.dat
diff --git a/ambari-metrics-timelineservice/src/main/resources/metrics_def/SLAVE_HBASE.dat b/ambari-metrics-timelineservice/src/main/resources/metrics_def/HBASE_REGIONSERVER.dat
similarity index 100%
rename from ambari-metrics-timelineservice/src/main/resources/metrics_def/SLAVE_HBASE.dat
rename to ambari-metrics-timelineservice/src/main/resources/metrics_def/HBASE_REGIONSERVER.dat
diff --git a/ambari-metrics-timelineservice/src/main/resources/metrics_def/TIMELINE_METRIC_STORE_WATCHER.dat b/ambari-metrics-timelineservice/src/main/resources/metrics_def/TIMELINE_METRIC_STORE_WATCHER.dat
new file mode 100644
index 0000000..af73d02
--- /dev/null
+++ b/ambari-metrics-timelineservice/src/main/resources/metrics_def/TIMELINE_METRIC_STORE_WATCHER.dat
@@ -0,0 +1 @@
+TimelineMetricStoreWatcher.FakeMetric
\ No newline at end of file
diff --git a/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/AbstractMiniHBaseClusterTest.java b/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/AbstractMiniHBaseClusterTest.java
index 258054c..26078cb 100644
--- a/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/AbstractMiniHBaseClusterTest.java
+++ b/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/AbstractMiniHBaseClusterTest.java
@@ -18,6 +18,7 @@
 package org.apache.ambari.metrics.core.timeline;
 
 import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.OUT_OFF_BAND_DATA_TIME_ALLOWANCE;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
 import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
 import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.UPSERT_METRICS_SQL;
 import static org.apache.phoenix.end2end.ParallelStatsDisabledIT.tearDownMiniCluster;
@@ -100,8 +101,12 @@
     // inits connection, starts mini cluster
     conn = getConnection(getUrl());
 
+    Configuration metricsConf = new Configuration();
+    metricsConf.set(TimelineMetricConfiguration.HBASE_COMPRESSION_SCHEME, "NONE");
+
+    metadataManager = new TimelineMetricMetadataManager(metricsConf, hdb);
+    metadataManager.initializeMetadata();
     hdb.initMetricSchema();
-    metadataManager = new TimelineMetricMetadataManager(new Configuration(), hdb);
     hdb.setMetadataInstance(metadataManager);
   }
 
@@ -206,6 +211,8 @@
     metricsConf.set("timeline.metrics.transient.metric.patterns", "topology%");
     // Unit tests insert values into the future
     metricsConf.setLong(OUT_OFF_BAND_DATA_TIME_ALLOWANCE, 600000);
+    metricsConf.set("timeline.metrics." + METRICS_RECORD_TABLE_NAME + ".durability", "SKIP_WAL");
+    metricsConf.set("timeline.metrics." + METRICS_CLUSTER_AGGREGATE_TABLE_NAME + ".durability", "ASYNC_WAL");
 
     return
       new PhoenixHBaseAccessor(new TimelineMetricConfiguration(new Configuration(), metricsConf),
diff --git a/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/ITPhoenixHBaseAccessor.java b/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/ITPhoenixHBaseAccessor.java
index 65b5a1b..20fbc58 100644
--- a/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/ITPhoenixHBaseAccessor.java
+++ b/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/ITPhoenixHBaseAccessor.java
@@ -19,11 +19,13 @@
 
 import static junit.framework.Assert.assertEquals;
 import static junit.framework.Assert.assertTrue;
-import static org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor.DATE_TIERED_COMPACTION_POLICY;
-import static org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor.FIFO_COMPACTION_POLICY_CLASS;
-import static org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor.HSTORE_COMPACTION_CLASS_KEY;
-import static org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor.HSTORE_ENGINE_CLASS;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.DATE_TIERED_COMPACTION_POLICY;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.FIFO_COMPACTION_POLICY_CLASS;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HSTORE_COMPACTION_CLASS_KEY;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HSTORE_ENGINE_CLASS;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.CONTAINER_METRICS_TABLE_NAME;
 import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
 import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
 import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.PHOENIX_TABLES;
 import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.PHOENIX_TABLES_REGEX_PATTERN;
@@ -42,6 +44,7 @@
 import java.util.Map;
 import java.util.Optional;
 
+import org.apache.commons.lang.ArrayUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
@@ -367,28 +370,30 @@
     precisionValues.put(METRICS_RECORD_TABLE_NAME, precisionTtl);
     f.set(hdb, precisionValues);
 
-    Field f2 = PhoenixHBaseAccessor.class.getDeclaredField("timelineMetricsTablesDurability");
-    f2.setAccessible(true);
-    f2.set(hdb, "ASYNC_WAL");
-
     hdb.initPoliciesAndTTL();
 
     // Verify expected policies are set
     boolean normalizerEnabled = false;
     String precisionTableCompactionPolicy = null;
     String aggregateTableCompactionPolicy = null;
-    boolean tableDurabilitySet  = false;
-    for (int i = 0; i < 10; i++) {
+    boolean precisionTableDurabilitySet  = false;
+    boolean aggregateTableDurabilitySet  = false;
+
+    boolean isComplete = false;
+
+    for (int i = 0; i < 10 && !isComplete; i++) {
       LOG.warn("Policy check retry : " + i);
       for (String tableName : PHOENIX_TABLES) {
         TableName[] tableNames = hBaseAdmin.listTableNames(PHOENIX_TABLES_REGEX_PATTERN, false);
+        TableName[] containerMetricsTableName = hBaseAdmin.listTableNames(CONTAINER_METRICS_TABLE_NAME, false);
+        tableNames = (TableName[]) ArrayUtils.addAll(tableNames, containerMetricsTableName);
+
         Optional<TableName> tableNameOptional = Arrays.stream(tableNames)
           .filter(t -> tableName.equals(t.getNameAsString())).findFirst();
 
         TableDescriptor tableDescriptor = hBaseAdmin.getTableDescriptor(tableNameOptional.get());
         
         normalizerEnabled = tableDescriptor.isNormalizationEnabled();
-        tableDurabilitySet = (Durability.ASYNC_WAL.equals(tableDescriptor.getDurability()));
         if (tableName.equals(METRICS_RECORD_TABLE_NAME)) {
           precisionTableCompactionPolicy = tableDescriptor.getValue(HSTORE_COMPACTION_CLASS_KEY);
         } else {
@@ -398,17 +403,25 @@
         // Best effort for 20 seconds
         if (normalizerEnabled || (precisionTableCompactionPolicy == null && aggregateTableCompactionPolicy == null)) {
           Thread.sleep(2000l);
+        } else {
+          isComplete = true;
         }
         if (tableName.equals(METRICS_RECORD_TABLE_NAME)) {
+          precisionTableDurabilitySet = (Durability.SKIP_WAL.equals(tableDescriptor.getDurability()));
           for (ColumnFamilyDescriptor family : tableDescriptor.getColumnFamilies()) {
             precisionTtl = family.getTimeToLive();
           }
         }
+
+        if (tableName.equals(METRICS_CLUSTER_AGGREGATE_TABLE_NAME)) {
+          aggregateTableDurabilitySet = (Durability.ASYNC_WAL.equals(tableDescriptor.getDurability()));
+        }
       }
     }
 
     Assert.assertFalse("Normalizer disabled.", normalizerEnabled);
-    Assert.assertTrue("Durability Set.", tableDurabilitySet);
+    Assert.assertTrue("METRIC_RECORD_UUID Durability Set.", precisionTableDurabilitySet);
+    Assert.assertTrue("METRIC_AGGREGATE_UUID Durability Set.", aggregateTableDurabilitySet);
     Assert.assertEquals("FIFO compaction policy is set for METRIC_RECORD_UUID.", FIFO_COMPACTION_POLICY_CLASS, precisionTableCompactionPolicy);
     Assert.assertEquals("FIFO compaction policy is set for aggregate tables", DATE_TIERED_COMPACTION_POLICY, aggregateTableCompactionPolicy);
     Assert.assertEquals("Precision TTL value as expected.", 86400, precisionTtl);
@@ -441,7 +454,7 @@
     metric.setExitCode(0);
     List<ContainerMetric> list = Arrays.asList(metric);
     hdb.insertContainerMetrics(list);
-    PreparedStatement stmt = conn.prepareStatement("SELECT * FROM CONTAINER_METRICS_UUID");
+    PreparedStatement stmt = conn.prepareStatement("SELECT * FROM CONTAINER_METRICS");
     ResultSet set = stmt.executeQuery();
     // check each filed is set properly when read back.
     boolean foundRecord = false;
diff --git a/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/PhoenixHBaseAccessorTest.java b/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/PhoenixHBaseAccessorTest.java
index 9d1b2a4..63ec59e 100644
--- a/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/PhoenixHBaseAccessorTest.java
+++ b/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/PhoenixHBaseAccessorTest.java
@@ -109,7 +109,7 @@
 
     mockStatic(PhoenixTransactSQL.class);
     PreparedStatement preparedStatementMock = EasyMock.createNiceMock(PreparedStatement.class);
-    Condition condition = new DefaultCondition(Collections.singletonList(new byte[32]), metricNames, hostnames, "appid", "instanceid", 123L, 234L, Precision.SECONDS, 10, true);
+    Condition condition = new DefaultCondition(Collections.singletonList(new byte[20]), metricNames, hostnames, "appid", "instanceid", 123L, 234L, Precision.SECONDS, 10, true);
     expect(PhoenixTransactSQL.prepareGetMetricsSqlStmt(null, condition)).andReturn(preparedStatementMock).once();
     ResultSet rsMock = EasyMock.createNiceMock(ResultSet.class);
     expect(preparedStatementMock.executeQuery()).andReturn(rsMock);
@@ -138,7 +138,7 @@
 
     mockStatic(PhoenixTransactSQL.class);
     PreparedStatement preparedStatementMock = EasyMock.createNiceMock(PreparedStatement.class);
-    Condition condition = new DefaultCondition(Collections.singletonList(new byte[32]), metricNames, hostnames, "appid", "instanceid", 123L, 234L, Precision.SECONDS, 10, true);
+    Condition condition = new DefaultCondition(Collections.singletonList(new byte[20]), metricNames, hostnames, "appid", "instanceid", 123L, 234L, Precision.SECONDS, 10, true);
     expect(PhoenixTransactSQL.prepareGetMetricsSqlStmt(null, condition)).andReturn(preparedStatementMock).once();
     ResultSet rsMock = EasyMock.createNiceMock(ResultSet.class);
     RuntimeException runtimeException = EasyMock.createNiceMock(RuntimeException.class);
diff --git a/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TimelineMetricSplitPointComputerTest.java b/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TimelineMetricSplitPointComputerTest.java
new file mode 100644
index 0000000..4d663cc
--- /dev/null
+++ b/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TimelineMetricSplitPointComputerTest.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.core.timeline;
+
+import org.apache.ambari.metrics.core.timeline.aggregators.TimelineClusterMetric;
+import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataManager;
+import org.apache.hadoop.conf.Configuration;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRIC_INITIAL_CONFIGURED_MASTER_COMPONENTS;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRIC_INITIAL_CONFIGURED_SLAVE_COMPONENTS;
+import static org.easymock.EasyMock.anyBoolean;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+
+
+public class TimelineMetricSplitPointComputerTest {
+
+  @Test
+  public void testSplitPointComputationForBasicCluster() {
+
+    /**
+     *  HBase Total heap = 1G.
+     *  HDFS,HBASE,YARN services deployed.
+     */
+    Configuration metricsConfMock = EasyMock.createMock(Configuration.class);
+
+    expect(metricsConfMock.get(TIMELINE_METRIC_INITIAL_CONFIGURED_MASTER_COMPONENTS, "")).
+      andReturn("METRICS_COLLECTOR,AMBARI_SERVER,NAMENODE,RESOURCEMANAGER").once();
+    expect(metricsConfMock.get(TIMELINE_METRIC_INITIAL_CONFIGURED_SLAVE_COMPONENTS, "")).
+      andReturn("METRICS_MONITOR,DATANODE,NODEMANAGER,HBASE_REGIONSERVER").once();
+    expect(metricsConfMock.getDouble("hbase_total_heapsize", 1024*1024*1024)).andReturn(1024 * 1024 * 1024.0).once();
+
+    Configuration hbaseConfMock = EasyMock.createMock(Configuration.class);
+    expect(hbaseConfMock.getDouble("hbase.regionserver.global.memstore.upperLimit", 0.5)).andReturn(0.5).once();
+    expect(hbaseConfMock.getDouble("hbase.hregion.memstore.flush.size", 134217728)).andReturn(134217728.0).once();
+
+    TimelineMetricMetadataManager metricMetadataManagerMock = EasyMock.createNiceMock(TimelineMetricMetadataManager.class);
+    expect(metricMetadataManagerMock.getUuid(anyObject(TimelineClusterMetric.class), anyBoolean())).andReturn(new byte[16]);
+
+    replay(metricsConfMock, hbaseConfMock, metricMetadataManagerMock);
+
+    TimelineMetricSplitPointComputer timelineMetricSplitPointComputer = new TimelineMetricSplitPointComputer(metricsConfMock,
+      hbaseConfMock,
+      metricMetadataManagerMock);
+
+    Assert.assertEquals(timelineMetricSplitPointComputer.getPrecisionSplitPoints().size(), 3);
+    Assert.assertEquals(timelineMetricSplitPointComputer.getClusterAggregateSplitPoints().size(), 1);
+    Assert.assertEquals(timelineMetricSplitPointComputer.getHostAggregateSplitPoints().size(), 1);
+  }
+
+  @Test
+  public void testSplitPointComputationForMediumCluster() {
+
+    /**
+     *  HBase Total heap = 8G.
+     *  All services deployed.
+     */
+    Configuration metricsConfMock = EasyMock.createMock(Configuration.class);
+
+    expect(metricsConfMock.get(TIMELINE_METRIC_INITIAL_CONFIGURED_MASTER_COMPONENTS, "")).
+      andReturn("METRICS_COLLECTOR,AMBARI_SERVER,NAMENODE,RESOURCEMANAGER," +
+        "NIMBUS,HIVESERVER2,HIVEMETASTORE,HBASE_MASTER,KAFKA_BROKER").once();
+    expect(metricsConfMock.get(TIMELINE_METRIC_INITIAL_CONFIGURED_SLAVE_COMPONENTS, "")).
+      andReturn("METRICS_MONITOR,DATANODE,NODEMANAGER,HBASE_REGIONSERVER").once();
+    expect(metricsConfMock.getDouble("hbase_total_heapsize", 1024*1024*1024)).andReturn(8589934592.0).once();
+
+    Configuration hbaseConfMock = EasyMock.createMock(Configuration.class);
+    expect(hbaseConfMock.getDouble("hbase.regionserver.global.memstore.upperLimit", 0.5)).andReturn(0.5).once();
+    expect(hbaseConfMock.getDouble("hbase.hregion.memstore.flush.size", 134217728)).andReturn(134217728.0).once();
+
+    TimelineMetricMetadataManager metricMetadataManagerMock = EasyMock.createNiceMock(TimelineMetricMetadataManager.class);
+    expect(metricMetadataManagerMock.getUuid(anyObject(TimelineClusterMetric.class), anyBoolean())).andReturn(new byte[16]);
+
+    replay(metricsConfMock, hbaseConfMock, metricMetadataManagerMock);
+
+    TimelineMetricSplitPointComputer timelineMetricSplitPointComputer = new TimelineMetricSplitPointComputer(metricsConfMock,
+      hbaseConfMock,
+      metricMetadataManagerMock);
+
+    Assert.assertEquals(timelineMetricSplitPointComputer.getPrecisionSplitPoints().size(), 16);
+    Assert.assertEquals(timelineMetricSplitPointComputer.getClusterAggregateSplitPoints().size(), 3);
+    Assert.assertEquals(timelineMetricSplitPointComputer.getHostAggregateSplitPoints().size(), 3);
+  }
+
+  @Test
+  public void testSplitPointComputationForLargeCluster() {
+
+    /**
+     *  HBase Total heap = 24G.
+     *  All services deployed.
+     */
+    Configuration metricsConfMock = EasyMock.createMock(Configuration.class);
+
+    expect(metricsConfMock.get(TIMELINE_METRIC_INITIAL_CONFIGURED_MASTER_COMPONENTS, "")).
+      andReturn("METRICS_COLLECTOR,AMBARI_SERVER,NAMENODE,RESOURCEMANAGER," +
+        "NIMBUS,HIVESERVER2,HIVEMETASTORE,HBASE_MASTER,KAFKA_BROKER").once();
+    expect(metricsConfMock.get(TIMELINE_METRIC_INITIAL_CONFIGURED_SLAVE_COMPONENTS, "")).
+      andReturn("METRICS_MONITOR,DATANODE,NODEMANAGER,HBASE_REGIONSERVER").once();
+    expect(metricsConfMock.getDouble("hbase_total_heapsize", 1024*1024*1024)).andReturn(24 * 1024 * 1024 * 1024.0).once();
+
+    Configuration hbaseConfMock = EasyMock.createMock(Configuration.class);
+    expect(hbaseConfMock.getDouble("hbase.regionserver.global.memstore.upperLimit", 0.5)).andReturn(0.5).once();
+    expect(hbaseConfMock.getDouble("hbase.hregion.memstore.flush.size", 134217728)).andReturn(2 * 134217728.0).once();
+
+    TimelineMetricMetadataManager metricMetadataManagerMock = EasyMock.createNiceMock(TimelineMetricMetadataManager.class);
+    expect(metricMetadataManagerMock.getUuid(anyObject(TimelineClusterMetric.class), anyBoolean())).andReturn(new byte[16]);
+
+    replay(metricsConfMock, hbaseConfMock, metricMetadataManagerMock);
+
+    TimelineMetricSplitPointComputer timelineMetricSplitPointComputer = new TimelineMetricSplitPointComputer(metricsConfMock,
+      hbaseConfMock,
+      metricMetadataManagerMock);
+
+    Assert.assertEquals(timelineMetricSplitPointComputer.getPrecisionSplitPoints().size(), 28);
+    Assert.assertEquals(timelineMetricSplitPointComputer.getClusterAggregateSplitPoints().size(), 6);
+    Assert.assertEquals(timelineMetricSplitPointComputer.getHostAggregateSplitPoints().size(), 6);
+  }
+}
diff --git a/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TimelineMetricStoreWatcherTest.java b/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TimelineMetricStoreWatcherTest.java
index de0236c..eb64198 100644
--- a/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TimelineMetricStoreWatcherTest.java
+++ b/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TimelineMetricStoreWatcherTest.java
@@ -48,10 +48,10 @@
 
   @Test
   public void testRunPositive() throws Exception {
-    TimelineMetricStore metricStore = createNiceMock(TimelineMetricStore.class);
+    HBaseTimelineMetricsService metricStore = createNiceMock(HBaseTimelineMetricsService.class);
 
-    expect(metricStore.putMetrics(anyObject(TimelineMetrics.class)))
-      .andReturn(new TimelinePutResponse());
+    metricStore.putMetricsSkipCache(anyObject(TimelineMetrics.class));
+    expectLastCall().once();
 
     // metric found
     expect(metricStore.getTimelineMetrics(EasyMock.<List<String>>anyObject(),
@@ -75,7 +75,7 @@
 
   @Test
   public void testRunNegative() throws Exception {
-    TimelineMetricStore metricStore = createNiceMock(TimelineMetricStore.class);
+    HBaseTimelineMetricsService metricStore = createNiceMock(HBaseTimelineMetricsService.class);
 
     expect(metricStore.putMetrics(anyObject(TimelineMetrics.class)))
       .andReturn(new TimelinePutResponse());
diff --git a/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/discovery/TestMetadataManager.java b/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/discovery/TestMetadataManager.java
index 2f2b0b5..28bb75e 100644
--- a/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/discovery/TestMetadataManager.java
+++ b/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/discovery/TestMetadataManager.java
@@ -160,7 +160,7 @@
 
     byte[] uuid = metadataManager.getUuid(timelineMetric, true);
     Assert.assertNotNull(uuid);
-    Assert.assertEquals(uuid.length, 32);
+    Assert.assertEquals(uuid.length, 20);
 
     byte[] uuidWithoutHost = metadataManager.getUuid(new TimelineClusterMetric(timelineMetric.getMetricName(), timelineMetric.getAppId(), timelineMetric.getInstanceId(), -1), true);
     Assert.assertNotNull(uuidWithoutHost);
diff --git a/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/uuid/MetricUuidGenStrategyTest.java b/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/uuid/MetricUuidGenStrategyTest.java
new file mode 100644
index 0000000..a25310b
--- /dev/null
+++ b/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/uuid/MetricUuidGenStrategyTest.java
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.core.timeline.uuid;
+
+import org.apache.ambari.metrics.core.timeline.aggregators.TimelineClusterMetric;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.URL;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class MetricUuidGenStrategyTest {
+
+
+  private static List<String> apps = Arrays.asList("namenode",
+    "datanode", "hbase_master", "hbase_regionserver", "kafka_broker", "nimbus", "ams-hbase",
+    "accumulo", "nodemanager", "resourcemanager", "ambari_server", "HOST", "timeline_metric_store_watcher",
+    "jobhistoryserver", "hiveserver2", "hivemetastore", "applicationhistoryserver", "amssmoketestfake", "llapdaemon");
+
+  private static Map<String, Set<String>> metricSet  = new HashMap<>();
+
+  @BeforeClass
+  public static void init() {
+    metricSet  = new HashMap<>(populateMetricWhitelistFromFile());
+  }
+
+  @Test
+  @Ignore
+  public void testHashBasedUuid() throws SQLException {
+    testMetricCollisionsForUuidGenStrategy(new HashBasedUuidGenStrategy(), 16);
+  }
+
+  @Test
+  @Ignore
+  public void testHashBasedUuidForAppIds() throws SQLException {
+    MetricUuidGenStrategy strategy = new HashBasedUuidGenStrategy();
+    Map<String, TimelineClusterMetric> uuids = new HashMap<>();
+    for (String app : metricSet.keySet()) {
+      TimelineClusterMetric timelineClusterMetric = new TimelineClusterMetric("TestMetric", app, null, -1l);
+      byte[] uuid = strategy.computeUuid(timelineClusterMetric, 16);
+      String uuidStr = new String(uuid);
+      Assert.assertFalse(uuids.containsKey(uuidStr) && !uuids.containsValue(timelineClusterMetric));
+      uuids.put(uuidStr, timelineClusterMetric);
+    }
+  }
+
+  @Test
+  @Ignore
+  public void testHashBasedUuidForHostnames() throws SQLException {
+    testHostCollisionsForUuidGenStrategy(new HashBasedUuidGenStrategy(), 16);
+  }
+
+
+  @Test
+  public void testMD5BasedUuid() throws SQLException {
+    testMetricCollisionsForUuidGenStrategy(new MD5UuidGenStrategy(), 16);
+
+  }
+
+  @Test
+  public void testMD5BasedUuidForHostnames() throws SQLException {
+    testHostCollisionsForUuidGenStrategy(new MD5UuidGenStrategy(), 16);
+  }
+
+
+  @Test
+  public void testMD5ConsistentHashing() throws SQLException, InterruptedException {
+    testConsistencyForUuidGenStrategy(new MD5UuidGenStrategy(), 16);
+  }
+
+
+  @Test
+  public void testMurmur3HashUuid() throws SQLException {
+    testMetricCollisionsForUuidGenStrategy(new Murmur3HashUuidGenStrategy(), 16);
+  }
+
+  @Test
+  public void testMurmur3HashingBasedUuidForHostnames() throws SQLException {
+    testHostCollisionsForUuidGenStrategy(new Murmur3HashUuidGenStrategy(), 4);
+  }
+
+  @Test
+  public void testMurmur3ConsistentHashing() throws SQLException, InterruptedException {
+    testConsistencyForUuidGenStrategy(new Murmur3HashUuidGenStrategy(), 4);
+  }
+
+  private void testMetricCollisionsForUuidGenStrategy(MetricUuidGenStrategy strategy, int uuidLength) {
+    Map<TimelineMetricUuid, TimelineClusterMetric> uuids = new HashMap<>();
+    for (String app : metricSet.keySet()) {
+      Set<String> metrics = metricSet.get(app);
+      for (String m : metrics) {
+        TimelineClusterMetric metric = new TimelineClusterMetric(m, app, null, -1l);
+        byte[] uuid = strategy.computeUuid(metric, uuidLength);
+        Assert.assertNotNull(uuid);
+        Assert.assertTrue(uuid.length == uuidLength);
+        TimelineMetricUuid uuidStr = new TimelineMetricUuid(uuid);
+        Assert.assertFalse(uuids.containsKey(uuidStr) && !uuids.containsValue(metric));
+        uuids.put(uuidStr, metric);
+      }
+    }
+  }
+
+
+  private void testHostCollisionsForUuidGenStrategy(MetricUuidGenStrategy strategy, int uuidLength) {
+    Map<TimelineMetricUuid, String> uuids = new HashMap<>();
+
+    List<String> hosts = new ArrayList<>();
+    String hostPrefix = "TestHost.";
+    String hostSuffix = ".ambari.apache.org";
+
+    for (int i=0; i<=2000; i++) {
+      hosts.add(hostPrefix + i + hostSuffix);
+    }
+
+    for (String host : hosts) {
+      byte[] uuid = strategy.computeUuid(host, uuidLength);
+      Assert.assertNotNull(uuid);
+      Assert.assertTrue(uuid.length == uuidLength);
+      TimelineMetricUuid uuidStr = new TimelineMetricUuid(uuid);
+      Assert.assertFalse(uuids.containsKey(uuidStr));
+      uuids.put(uuidStr, host);
+    }
+  }
+
+  private void testConsistencyForUuidGenStrategy(MetricUuidGenStrategy strategy, int length) throws InterruptedException {
+    String key = "TestString";
+
+    byte[] uuid = strategy.computeUuid(key, length);
+    Assert.assertNotNull(uuid);
+    Assert.assertTrue(uuid.length == length);
+
+    for (int i = 0; i<100; i++) {
+      byte[] uuid2 = strategy.computeUuid(key, length);
+      Assert.assertNotNull(uuid2);
+      Assert.assertTrue(uuid2.length == length);
+      Assert.assertArrayEquals(uuid, uuid2);
+      Thread.sleep(10);
+    }
+  }
+
+  private static Map<String, Set<String>> populateMetricWhitelistFromFile() {
+
+    Map<String, Set<String>> metricSet = new HashMap<String, Set<String>>();
+    FileInputStream fstream = null;
+    Set<String> hbaseMetrics = new HashSet<>();
+    BufferedReader br = null;
+    String strLine;
+    for (String appId : apps) {
+      URL fileUrl = ClassLoader.getSystemResource("metrics_def/" + appId.toUpperCase() + ".dat");
+
+      Set<String> metricsForApp = new HashSet<>();
+      try {
+        fstream = new FileInputStream(fileUrl.getPath());
+        br = new BufferedReader(new InputStreamReader(fstream));
+        while ((strLine = br.readLine()) != null)   {
+          strLine = strLine.trim();
+          metricsForApp.add(strLine);
+        }
+      } catch (Exception ioEx) {
+        System.out.println("Metrics for AppId " + appId + " not found.");
+      } finally {
+        if (br != null) {
+          try {
+            br.close();
+          } catch (IOException e) {
+          }
+        }
+
+        if (fstream != null) {
+          try {
+            fstream.close();
+          } catch (IOException e) {
+          }
+        }
+      }
+      metricsForApp.add("live_hosts");
+      if (appId.startsWith("hbase")) {
+        hbaseMetrics.addAll(metricsForApp);
+      } else {
+        metricSet.put(appId, metricsForApp);
+      }
+      System.out.println("Found " + metricsForApp.size() + " metrics for appId = " + appId);
+    }
+    metricSet.put("hbase", hbaseMetrics);
+    return metricSet;
+  }
+}
diff --git a/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/uuid/TimelineMetricUuidManagerTest.java b/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/uuid/TimelineMetricUuidManagerTest.java
deleted file mode 100644
index e4018bb..0000000
--- a/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/uuid/TimelineMetricUuidManagerTest.java
+++ /dev/null
@@ -1,220 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ambari.metrics.core.timeline.uuid;
-
-import org.apache.ambari.metrics.core.timeline.aggregators.TimelineClusterMetric;
-import org.junit.Assert;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import java.io.BufferedReader;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.net.URL;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-public class TimelineMetricUuidManagerTest {
-
-
-  private List<String> apps = Arrays.asList("namenode",
-    "datanode", "master_hbase", "slave_hbase", "kafka_broker", "nimbus", "ams-hbase",
-    "accumulo", "nodemanager", "resourcemanager", "ambari_server", "HOST", "timeline_metric_store_watcher",
-    "jobhistoryserver", "hiveserver2", "hivemetastore", "applicationhistoryserver", "amssmoketestfake");
-
-  private Map<String, Set<String>> metricSet  = new HashMap<>(populateMetricWhitelistFromFile());
-
-  @Test
-  @Ignore("Collisions possible")
-  public void testHashBasedUuidForMetricName() throws SQLException {
-
-    MetricUuidGenStrategy strategy = new HashBasedUuidGenStrategy();
-    Map<String, TimelineClusterMetric> uuids = new HashMap<>();
-    for (String app : metricSet.keySet()) {
-      Set<String> metrics = metricSet.get(app);
-      for (String metric : metrics) {
-        TimelineClusterMetric timelineClusterMetric = new TimelineClusterMetric(metric, app, null, -1l);
-        byte[] uuid = strategy.computeUuid(timelineClusterMetric, 16);
-        Assert.assertNotNull(uuid);
-        Assert.assertTrue(uuid.length == 16);
-        String uuidStr = new String(uuid);
-        Assert.assertFalse(uuids.containsKey(uuidStr) && !uuids.containsValue(timelineClusterMetric));
-        uuids.put(uuidStr, timelineClusterMetric);
-      }
-    }
-  }
-
-  @Test
-  public void testHaseBasedUuidForAppIds() throws SQLException {
-
-    MetricUuidGenStrategy strategy = new HashBasedUuidGenStrategy();
-    Map<String, TimelineClusterMetric> uuids = new HashMap<>();
-    for (String app : metricSet.keySet()) {
-      TimelineClusterMetric timelineClusterMetric = new TimelineClusterMetric("TestMetric", app, null, -1l);
-      byte[] uuid = strategy.computeUuid(timelineClusterMetric, 16);
-      String uuidStr = new String(uuid);
-      Assert.assertFalse(uuids.containsKey(uuidStr) && !uuids.containsValue(timelineClusterMetric));
-      uuids.put(uuidStr, timelineClusterMetric);
-    }
-  }
-
-  @Test
-  public void testHashBasedUuidForHostnames() throws SQLException {
-
-    MetricUuidGenStrategy strategy = new HashBasedUuidGenStrategy();
-    Map<String, String> uuids = new HashMap<>();
-
-    List<String> hosts = new ArrayList<>();
-    String hostPrefix = "TestHost.";
-    String hostSuffix = ".ambari.apache.org";
-
-    for (int i=0; i<=2000; i++) {
-      hosts.add(hostPrefix + i + hostSuffix);
-    }
-
-    for (String host : hosts) {
-      byte[] uuid = strategy.computeUuid(host, 16);
-      Assert.assertNotNull(uuid);
-      Assert.assertTrue(uuid.length == 16);
-      String uuidStr = new String(uuid);
-      Assert.assertFalse(uuids.containsKey(uuidStr));
-      uuids.put(uuidStr, host);
-    }
-  }
-
-
-  @Test
-  public void testRandomUuidForWhitelistedMetrics() throws SQLException {
-
-    MetricUuidGenStrategy strategy = new MD5UuidGenStrategy();
-    Map<String, String> uuids = new HashMap<>();
-    for (String app : metricSet.keySet()) {
-      Set<String> metrics = metricSet.get(app);
-      for (String metric : metrics) {
-        byte[] uuid = strategy.computeUuid(new TimelineClusterMetric(metric, app, null, -1l), 16);
-        Assert.assertNotNull(uuid);
-        Assert.assertTrue(uuid.length == 16);
-        String uuidStr = new String(uuid);
-        Assert.assertFalse(uuids.containsKey(uuidStr) && !uuids.containsValue(metric));
-        uuids.put(uuidStr, metric);
-      }
-    }
-  }
-
-  @Test
-  public void testRandomUuidForHostnames() throws SQLException {
-
-    MetricUuidGenStrategy strategy = new MD5UuidGenStrategy();
-    Map<String, String> uuids = new HashMap<>();
-
-    List<String> hosts = new ArrayList<>();
-    String hostPrefix = "TestHost.";
-    String hostSuffix = ".ambari.apache.org";
-
-    for (int i=0; i<=2000; i++) {
-      hosts.add(hostPrefix + i + hostSuffix);
-    }
-
-    int numC = 0;
-    for (String host : hosts) {
-      byte[] uuid = strategy.computeUuid(host, 16);
-      Assert.assertNotNull(uuid);
-      Assert.assertTrue(uuid.length == 16);
-      String uuidStr = new String(uuid);
-      Assert.assertFalse(uuids.containsKey(uuidStr));
-      uuids.put(uuidStr, host);
-    }
-  }
-
-
-  @Test
-  public void testConsistentHashing() throws SQLException, InterruptedException {
-
-    MetricUuidGenStrategy strategy = new MD5UuidGenStrategy();
-    String key = "TestString";
-
-    byte[] uuid = strategy.computeUuid(key, 16);
-    Assert.assertNotNull(uuid);
-    Assert.assertTrue(uuid.length == 16);
-
-    for (int i = 0; i<100; i++) {
-      byte[] uuid2 = strategy.computeUuid(key, 16);
-      Assert.assertNotNull(uuid2);
-      Assert.assertTrue(uuid2.length == 16);
-      Assert.assertArrayEquals(uuid, uuid2);
-      Thread.sleep(10);
-    }
-  }
-
-
-  public Map<String, Set<String>> populateMetricWhitelistFromFile() {
-
-
-    Map<String, Set<String>> metricSet = new HashMap<String, Set<String>>();
-    FileInputStream fstream = null;
-    Set<String> hbaseMetrics = new HashSet<>();
-    BufferedReader br = null;
-    String strLine;
-    for (String appId : apps) {
-      URL fileUrl = ClassLoader.getSystemResource("metrics_def/" + appId.toUpperCase() + ".dat");
-
-      Set<String> metricsForApp = new HashSet<>();
-      try {
-        fstream = new FileInputStream(fileUrl.getPath());
-        br = new BufferedReader(new InputStreamReader(fstream));
-        while ((strLine = br.readLine()) != null)   {
-          strLine = strLine.trim();
-          metricsForApp.add(strLine);
-        }
-      } catch (Exception ioEx) {
-        System.out.println("Metrics for AppId " + appId + " not found.");
-      } finally {
-        if (br != null) {
-          try {
-            br.close();
-          } catch (IOException e) {
-          }
-        }
-
-        if (fstream != null) {
-          try {
-            fstream.close();
-          } catch (IOException e) {
-          }
-        }
-      }
-      metricsForApp.add("live_hosts");
-      if (appId.equals("master_hbase") || appId.equals("slave_hbase")) {
-        hbaseMetrics.addAll(metricsForApp);
-      } else {
-        metricSet.put(appId, metricsForApp);
-      }
-      System.out.println("Found " + metricsForApp.size() + " metrics for appId = " + appId);
-    }
-    metricSet.put("hbase", hbaseMetrics);
-    return metricSet;
-  }
-}