[AMBARI-23780] Implement an upgrade helper in Ambari Metrics collector that copies data from the old schema to the new  (dsen) (#1196)

* [AMBARI-23780] Implement an upgrade helper in Ambari Metrics collector that copies data from the old schema to the new.  (dsen) - initial patch

* [AMBARI-23780] Implement an upgrade helper in Ambari Metrics collector that copies data from the old schema to the new.  (dsen)

* [AMBARI-23780] Implement an upgrade helper in Ambari Metrics collector that copies data from the old schema to the new (dsen)

* [AMBARI-23780] Implement an upgrade helper in Ambari Metrics collector that copies data from the old schema to the new (dsen) -  changes according to review

* [AMBARI-23780] Implement an upgrade helper in Ambari Metrics collector that copies data from the old schema to the new (dsen) -  UT fix

* [AMBARI-23780] Implement an upgrade helper in Ambari Metrics collector that copies data from the old schema to the new.  (dsen) - reverted UpgradeCatalog changes
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/PhoenixHBaseAccessor.java b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/PhoenixHBaseAccessor.java
index c684b0a..f2a3bca 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/PhoenixHBaseAccessor.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/PhoenixHBaseAccessor.java
@@ -1332,7 +1332,7 @@
         TimelineMetric metric = metricAggregate.getKey();
         MetricHostAggregate hostAggregate = metricAggregate.getValue();
-        byte[] uuid = metadataManagerInstance.getUuid(metric, false);
+        byte[] uuid = metadataManagerInstance.getUuid(metric, true);
         if (uuid == null) {
           LOG.error("Error computing UUID for metric. Cannot write aggregate metric : " + metric.toString());
@@ -1516,7 +1516,7 @@
             "aggregate = " + aggregate);
-        byte[] uuid = metadataManagerInstance.getUuid(clusterMetric, false);
+        byte[] uuid = metadataManagerInstance.getUuid(clusterMetric, true);
         if (uuid == null) {
           LOG.error("Error computing UUID for metric. Cannot write metric : " + clusterMetric.toString());
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/AbstractPhoenixMetricsCopier.java b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/AbstractPhoenixMetricsCopier.java
new file mode 100644
index 0000000..6782930
--- /dev/null
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/AbstractPhoenixMetricsCopier.java
@@ -0,0 +1,164 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.upgrade.core;
+import org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Set;
+public abstract class AbstractPhoenixMetricsCopier implements Runnable {
+  private static final Log LOG = LogFactory.getLog(AbstractPhoenixMetricsCopier.class);
+  private static final Long DEFAULT_NATIVE_TIME_RANGE_DELAY = 120000L;
+  private final Long startTime;
+  protected final FileWriter processedMetricsFile;
+  protected String inputTable;
+  protected String outputTable;
+  protected Set<String> metricNames;
+  protected PhoenixHBaseAccessor hBaseAccessor;
+  public AbstractPhoenixMetricsCopier(String inputTableName, String outputTableName, PhoenixHBaseAccessor hBaseAccessor,
+                                      Set<String> metricNames, Long startTime, FileWriter outputStream) {
+    this.inputTable = inputTableName;
+    this.outputTable = outputTableName;
+    this.hBaseAccessor = hBaseAccessor;
+    this.metricNames = metricNames;
+    this.startTime = startTime;
+    this.processedMetricsFile = outputStream;
+  }
+  public void run(){
+    LOG.info(String.format("Copying %s metrics from %s to %s", metricNames, inputTable, outputTable));
+    long startTimer = System.currentTimeMillis();
+    String query = String.format("SELECT %s %s FROM %s WHERE %s AND SERVER_TIME > %s ORDER BY METRIC_NAME, SERVER_TIME",
+      getQueryHint(startTime), getColumnsClause(), inputTable, getMetricNamesLikeClause(), startTime);
+    runPhoenixQueryAndAddToResults(query);
+    try {
+      saveMetrics();
+    } catch (SQLException e) {
+      LOG.error(e);
+    }
+    long estimatedTime = System.currentTimeMillis() - startTimer;
+    LOG.debug(String.format("Copying took %s seconds from table %s to table %s for metric names %s", estimatedTime/ 1000.0, inputTable, outputTable, metricNames));
+    saveMetricsProgress();
+  }
+  private String getMetricNamesLikeClause() {
+    StringBuilder sb = new StringBuilder();
+    sb.append('(');
+    int i = 0;
+    for (String metricName : metricNames) {
+      sb.append("METRIC_NAME");
+      sb.append(" LIKE ");
+      sb.append("'");
+      sb.append(metricName);
+      sb.append("'");
+      if (i < metricNames.size() - 1) {
+          sb.append(" OR ");
+        }
+      i++;
+    }
+    sb.append(')');
+    return sb.toString();
+  }
+  protected abstract String getColumnsClause();
+  private void runPhoenixQueryAndAddToResults(String query) {
+    LOG.debug(String.format("Running query: %s", query));
+    Connection conn = null;
+    PreparedStatement stmt = null;
+    try {
+      conn = hBaseAccessor.getConnection();
+      stmt = conn.prepareStatement(query);
+      ResultSet rs = stmt.executeQuery();
+      while (rs.next()) {
+        addToResults(rs);
+      }
+    } catch (SQLException e) {
+      LOG.error(String.format("Exception during running phoenix query %s", query), e);
+    } finally {
+      if (stmt != null) {
+        try {
+          stmt.close();
+        } catch (SQLException e) {
+          // Ignore
+        }
+      }
+      if (conn != null) {
+        try {
+          conn.close();
+        } catch (SQLException e) {
+          // Ignore
+        }
+      }
+    }
+  }
+  /**
+   * Saves processed metric names info provided file in format TABLE_NAME:METRIC_NAME
+   */
+  private void saveMetricsProgress() {
+    if (processedMetricsFile == null) {
+      LOG.info("Skipping metrics progress save as the file is null");
+      return;
+    }
+    for (String metricName : metricNames) {
+      try {
+        processedMetricsFile.append(inputTable + ":" + metricName + System.lineSeparator());
+      } catch (IOException e) {
+        LOG.error(e);
+      }
+    }
+  }
+  protected String getQueryHint(Long startTime) {
+    StringBuilder sb = new StringBuilder();
+    sb.append("/*+ ");
+    sb.append("NATIVE_TIME_RANGE(");
+    sb.append(startTime - DEFAULT_NATIVE_TIME_RANGE_DELAY);
+    sb.append(") ");
+    sb.append("*/");
+    return sb.toString();
+  }
+  /**
+   * Saves aggregated metrics to the Hbase
+   * @throws SQLException
+   */
+  protected abstract void saveMetrics() throws SQLException;
+  /**
+   * Parses result set into aggregates map
+   * @param rs
+   * @throws SQLException
+   */
+  protected abstract void addToResults(ResultSet rs) throws SQLException;
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/MetricsDataMigrationLauncher.java b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/MetricsDataMigrationLauncher.java
new file mode 100644
index 0000000..7f4d93a
--- /dev/null
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/MetricsDataMigrationLauncher.java
@@ -0,0 +1,309 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.upgrade.core;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+import org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor;
+import org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration;
+import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataManager;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.MalformedURLException;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_DAILY_TABLE_NAME;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_DAILY_V1_TABLE_NAME;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_V1_TABLE_NAME;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_V1_TABLE_NAME;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_DAILY_V1_TABLE_NAME;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_V1_TABLE_NAME;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_MINUTE_V1_TABLE_NAME;
+public class MetricsDataMigrationLauncher {
+  private static final Log LOG = LogFactory.getLog(MetricsDataMigrationLauncher.class);
+  private static final Long DEFAULT_TIMEOUT_MINUTES = 60*24L;
+  private static String patternPrefix = "._p_";
+  private static final int DEFAULT_BATCH_SIZE = 5;
+  public static final Map<String, String> CLUSTER_AGGREGATE_TABLES_MAPPING = new HashMap<>();
+  public static final Map<String, String> HOST_AGGREGATE_TABLES_MAPPING = new HashMap<>();
+  public static final String DEFAULT_PROCESSED_METRICS_FILE_LOCATION = "/var/log/ambari-metrics-collector/ambari-metrics-migration-state.txt";
+  public static final int DEFAULT_NUMBER_OF_THREADS = 3;
+  public static final long ONE_MONTH_MILLIS = 2592000000L;
+  public static final long DEFAULT_START_TIME = System.currentTimeMillis() - ONE_MONTH_MILLIS; //Last month
+  static {
+  }
+  private final Set<Set<String>> metricNamesBatches;
+  private final String processedMetricsFilePath;
+  private Set<String> metricNames;
+  private Long startTime;
+  private Integer batchSize;
+  private Integer numberOfThreads;
+  private TimelineMetricConfiguration timelineMetricConfiguration;
+  private PhoenixHBaseAccessor hBaseAccessor;
+  private Map<String, Set<String>> processedMetrics;
+  public MetricsDataMigrationLauncher(String whitelistedFilePath, String processedMetricsFilePath, Long startTime, Integer numberOfThreads, Integer batchSize) throws Exception {
+    this.startTime = startTime == null? DEFAULT_START_TIME : startTime;
+    this.numberOfThreads = numberOfThreads == null? DEFAULT_NUMBER_OF_THREADS : numberOfThreads;
+    this.batchSize = batchSize == null? DEFAULT_BATCH_SIZE : batchSize;
+    this.processedMetricsFilePath = processedMetricsFilePath == null? DEFAULT_PROCESSED_METRICS_FILE_LOCATION : processedMetricsFilePath;
+    initializeHbaseAccessor();
+    LOG.info("Looking for whitelisted metric names...");
+    if (whitelistedFilePath != null) {
+      this.metricNames = readMetricWhitelistFromFile(whitelistedFilePath);
+    } else if (timelineMetricConfiguration.isWhitelistingEnabled()) {
+      String whitelistFile = timelineMetricConfiguration.getMetricsConf().get(TimelineMetricConfiguration.TIMELINE_METRICS_WHITELIST_FILE, TimelineMetricConfiguration.TIMELINE_METRICS_WHITELIST_FILE_LOCATION_DEFAULT);
+      metricNames = readMetricWhitelistFromFile(whitelistFile);
+    } else {
+      LOG.error("No whitelisted metrics specified. Exiting...");
+      throw new Exception("List of whitelisted metrics must be provided");
+    }
+    readProcessedMetricsMap();
+    LOG.info("Setting up batches...");
+    this.metricNamesBatches = new HashSet<>();
+    Iterables.partition(metricNames, this.batchSize)
+      .forEach(batch -> metricNamesBatches.add(new HashSet<>(batch)));
+    LOG.info(String.format("Split metric names into %s batches with size of %s", metricNamesBatches.size(), this.batchSize));
+  }
+  private void readProcessedMetricsMap() {
+    Map<String, Set<String>> result = new HashMap<>();
+    if (!Files.exists(Paths.get(processedMetricsFilePath))) {
+      LOG.info(String.format("The processed metrics file %s is missing, assuming there were no metrics processed.", processedMetricsFilePath));
+      this.processedMetrics = new HashMap<>();
+    }
+    LOG.info(String.format("Reading the list of already copied metrics from %s", processedMetricsFilePath));
+    try {
+      try (Stream<String> stream = Files.lines(Paths.get(processedMetricsFilePath))) {
+        stream.forEach( line -> {
+          String [] lineSplit = line.split(":");
+          if (!result.containsKey(lineSplit[0])) {
+            result.put(lineSplit[0], new HashSet<>(Collections.singletonList(lineSplit[1])));
+          } else {
+            result.get(lineSplit[0]).add(lineSplit[1]);
+          }
+        });
+      }
+    } catch (IOException e) {
+      LOG.error(e);
+    }
+    this.processedMetrics = result;
+  }
+  public void runMigration(Long timeoutInMinutes) throws IOException {
+    FileWriter processedMetricsFileWriter = new FileWriter(processedMetricsFilePath, true);
+    LOG.info("Setting up copiers...");
+    Set<AbstractPhoenixMetricsCopier> copiers = new HashSet<>();
+    for (Set<String> batch : metricNamesBatches) {
+      for (Map.Entry<String, String> entry : CLUSTER_AGGREGATE_TABLES_MAPPING.entrySet()) {
+        Set<String> filteredMetrics = filterProcessedMetrics(batch, this.processedMetrics, entry.getKey());
+        if (!filteredMetrics.isEmpty()) {
+          copiers.add(new PhoenixClusterMetricsCopier(entry.getKey(), entry.getValue(), hBaseAccessor,
+            filteredMetrics, startTime, processedMetricsFileWriter));
+        }
+      }
+      for (Map.Entry<String, String> entry : HOST_AGGREGATE_TABLES_MAPPING.entrySet()) {
+        Set<String> filteredMetrics = filterProcessedMetrics(batch, processedMetrics, entry.getKey());
+        if (!filteredMetrics.isEmpty()) {
+          copiers.add(new PhoenixHostMetricsCopier(entry.getKey(), entry.getValue(), hBaseAccessor,
+            filteredMetrics, startTime, processedMetricsFileWriter));
+        }
+      }
+    }
+    if (copiers.isEmpty()) {
+      LOG.info("No copy threads to run, looks like all metrics have been copied.");
+      processedMetricsFileWriter.close();
+      return;
+    }
+    LOG.info("Running the copy threads...");
+    long startTimer = System.currentTimeMillis();
+    ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads == null ? DEFAULT_NUMBER_OF_THREADS : numberOfThreads);
+    for (AbstractPhoenixMetricsCopier copier : copiers) {
+      executorService.submit(copier);
+    }
+    executorService.shutdown();
+    try {
+      executorService.awaitTermination(timeoutInMinutes, TimeUnit.MINUTES);
+    } catch (InterruptedException e) {
+      LOG.error(e);
+    }
+    long estimatedTime = System.currentTimeMillis() - startTimer;
+    LOG.info(String.format("Copying took %s seconds", estimatedTime/1000.0));
+    processedMetricsFileWriter.close();
+  }
+  private void initializeHbaseAccessor() throws MalformedURLException, URISyntaxException {
+    this.hBaseAccessor = new PhoenixHBaseAccessor(null);
+    this.timelineMetricConfiguration = TimelineMetricConfiguration.getInstance();
+    timelineMetricConfiguration.initialize();
+    TimelineMetricMetadataManager timelineMetricMetadataManager = new TimelineMetricMetadataManager(hBaseAccessor);
+    timelineMetricConfiguration.initialize();
+    hBaseAccessor.setMetadataInstance(timelineMetricMetadataManager);
+  }
+  private static Set<String> filterProcessedMetrics(Set<String> metricNames, Map<String, Set<String>> processedMetrics, String tableName) {
+    if (!processedMetrics.containsKey(tableName)) {
+      return metricNames;
+    }
+    return Sets.difference(metricNames, processedMetrics.get(tableName));
+  }
+  /**
+   * reads metric names from given file
+   * replaces all * with % and removes all ._p_
+   */
+  private static Set<String> readMetricWhitelistFromFile(String whitelistFile) {
+    LOG.info(String.format("Reading metric names from %s", whitelistFile));
+    Set<String> whitelistedMetrics = new HashSet<>();
+    BufferedReader br = null;
+    String strLine;
+    try(FileInputStream fstream = new FileInputStream(whitelistFile)) {
+      br = new BufferedReader(new InputStreamReader(fstream));
+      while ((strLine = br.readLine()) != null)   {
+        strLine = strLine.trim();
+        if (StringUtils.isEmpty(strLine)) {
+          continue;
+        }
+        if (strLine.startsWith(patternPrefix)) {
+          strLine = strLine.replace(patternPrefix, "");
+        }
+        if (strLine.contains("*")) {
+          strLine = strLine.replaceAll("\\*", "%");
+        }
+        whitelistedMetrics.add(strLine);
+      }
+    } catch (IOException ioEx) {
+      LOG.error(ioEx);
+    }
+    return whitelistedMetrics;
+  }
+  /**
+   *
+   * @param args
+   * REQUIRED args[0] - processedMetricsFilePath - full path to the file where processed metric are/will be stored
+   *
+   * OPTIONAL args[1] - whitelistedFilePath      - full path to the file with whitelisted metrics filenames
+   *                                               if not provided the default whitelist file location will be used if configured
+   *                                               if not configured - will result in error
+   *          args[2] - startTime                - default value is set to the last 30 days
+   *          args[3] - numberOfThreads          - default value is 3
+   *          args[4] - batchSize                - default value is 5
+   *          args[5] - timeoutInMinutes         - default value is set to the equivalent of 24 hours
+   */
+  public static void main(String[] args) {
+    String processedMetricsFilePath = null;
+    String whitelistedFilePath = null;
+    Long startTime = null;
+    Integer numberOfThreads = null;
+    Integer batchSize = null;
+    Long timeoutInMinutes = DEFAULT_TIMEOUT_MINUTES;
+    if (args.length>0) {
+      processedMetricsFilePath = args[0];
+    }
+    if (args.length>1) {
+      whitelistedFilePath = args[1];
+    }
+    if (args.length>2) {
+      startTime = Long.valueOf(args[2]);
+    }
+    if (args.length>3) {
+      numberOfThreads = Integer.valueOf(args[3]);
+    }
+    if (args.length>4) {
+      batchSize = Integer.valueOf(args[4]);
+    }
+    if (args.length>5) {
+      timeoutInMinutes = Long.valueOf(args[5]);
+    }
+    MetricsDataMigrationLauncher dataMigrationLauncher = null;
+    try {
+      LOG.info("Initializing system...");
+      dataMigrationLauncher = new MetricsDataMigrationLauncher(whitelistedFilePath, processedMetricsFilePath, startTime, numberOfThreads, batchSize);
+    } catch (Exception e) {
+      LOG.error("Exception during system setup, exiting...", e);
+      System.exit(1);
+    }
+    try {
+      dataMigrationLauncher.runMigration(timeoutInMinutes);
+    } catch (IOException e) {
+      LOG.error("Exception during data migration, exiting...", e);
+      System.exit(1);
+    }
+    System.exit(0);
+  }
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/PhoenixClusterMetricsCopier.java b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/PhoenixClusterMetricsCopier.java
new file mode 100644
index 0000000..ee65f00
--- /dev/null
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/PhoenixClusterMetricsCopier.java
@@ -0,0 +1,74 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.upgrade.core;
+import org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor;
+import org.apache.ambari.metrics.core.timeline.aggregators.TimelineClusterMetric;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
+import java.io.FileWriter;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+public class PhoenixClusterMetricsCopier extends AbstractPhoenixMetricsCopier {
+  private static final Log LOG = LogFactory.getLog(PhoenixClusterMetricsCopier.class);
+  private Map<TimelineClusterMetric, MetricHostAggregate> aggregateMap = new HashMap<>();
+  PhoenixClusterMetricsCopier(String inputTableName, String outputTableName, PhoenixHBaseAccessor hBaseAccessor, Set<String> metricNames, Long startTime, FileWriter processedMetricsFileWriter) {
+    super(inputTableName, outputTableName, hBaseAccessor, metricNames, startTime, processedMetricsFileWriter);
+  }
+  @Override
+  protected String getColumnsClause() {
+    return "METRIC_NAME, " +
+      "APP_ID, " +
+      "INSTANCE_ID, " +
+      "SERVER_TIME, " +
+      "METRIC_SUM, " +
+      "METRIC_COUNT, " +
+      "METRIC_MAX, " +
+      "METRIC_MIN";
+  }
+  @Override
+  protected void saveMetrics() throws SQLException {
+    LOG.debug(String.format("Saving %s results read from %s into %s", aggregateMap.size(), inputTable, outputTable));
+    hBaseAccessor.saveClusterAggregateRecordsSecond(aggregateMap, outputTable);
+  }
+  @Override
+  protected void addToResults(ResultSet rs) throws SQLException {
+    TimelineClusterMetric timelineMetric = new TimelineClusterMetric(
+            rs.getString("METRIC_NAME"), rs.getString("APP_ID"),
+            rs.getString("INSTANCE_ID"), rs.getLong("SERVER_TIME"));
+    MetricHostAggregate metricHostAggregate = new MetricHostAggregate();
+    metricHostAggregate.setSum(rs.getDouble("METRIC_SUM"));
+    metricHostAggregate.setNumberOfSamples(rs.getLong("METRIC_COUNT"));
+    metricHostAggregate.setMax(rs.getDouble("METRIC_MAX"));
+    metricHostAggregate.setMin(rs.getDouble("METRIC_MIN"));
+    aggregateMap.put(timelineMetric, metricHostAggregate);
+  }
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/PhoenixHostMetricsCopier.java b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/PhoenixHostMetricsCopier.java
new file mode 100644
index 0000000..a4f0c23
--- /dev/null
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/PhoenixHostMetricsCopier.java
@@ -0,0 +1,77 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.upgrade.core;
+import org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import java.io.FileWriter;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+public class PhoenixHostMetricsCopier extends AbstractPhoenixMetricsCopier {
+  private static final Log LOG = LogFactory.getLog(PhoenixHostMetricsCopier.class);
+  private Map<TimelineMetric, MetricHostAggregate> aggregateMap = new HashMap<>();
+  PhoenixHostMetricsCopier(String inputTableName, String outputTableName, PhoenixHBaseAccessor hBaseAccessor, Set<String> metricNames, Long startTime, FileWriter processedMetricsFileWriter) {
+    super(inputTableName, outputTableName, hBaseAccessor, metricNames, startTime, processedMetricsFileWriter);
+  }
+  @Override
+  protected String getColumnsClause() {
+    return "METRIC_NAME, " +
+      "HOSTNAME, " +
+      "APP_ID, " +
+      "INSTANCE_ID, " +
+      "SERVER_TIME, " +
+      "METRIC_SUM, " +
+      "METRIC_COUNT, " +
+      "METRIC_MAX, " +
+      "METRIC_MIN";
+  }
+  @Override
+  protected void saveMetrics() throws SQLException {
+    LOG.debug(String.format("Saving %s results read from %s into %s", aggregateMap.size(), inputTable, outputTable));
+    hBaseAccessor.saveHostAggregateRecords(aggregateMap, outputTable);
+  }
+  @Override
+  protected void addToResults(ResultSet rs) throws SQLException {
+    TimelineMetric timelineMetric = new TimelineMetric();
+    timelineMetric.setMetricName(rs.getString("METRIC_NAME"));
+    timelineMetric.setHostName(rs.getString("HOSTNAME"));
+    timelineMetric.setAppId(rs.getString("APP_ID"));
+    timelineMetric.setInstanceId(rs.getString("INSTANCE_ID"));
+    timelineMetric.setStartTime(rs.getLong("SERVER_TIME"));
+    MetricHostAggregate metricHostAggregate = new MetricHostAggregate();
+    metricHostAggregate.setSum(rs.getDouble("METRIC_SUM"));
+    metricHostAggregate.setNumberOfSamples(rs.getLong("METRIC_COUNT"));
+    metricHostAggregate.setMax(rs.getDouble("METRIC_MAX"));
+    metricHostAggregate.setMin(rs.getDouble("METRIC_MIN"));
+    aggregateMap.put(timelineMetric, metricHostAggregate);
+  }