[AMBARI-23780] Implement an upgrade helper in Ambari Metrics collector that copies data from the old schema to the new (dsen) (#1196)
* [AMBARI-23780] Implement an upgrade helper in Ambari Metrics collector that copies data from the old schema to the new. (dsen) - initial patch
* [AMBARI-23780] Implement an upgrade helper in Ambari Metrics collector that copies data from the old schema to the new. (dsen)
* [AMBARI-23780] Implement an upgrade helper in Ambari Metrics collector that copies data from the old schema to the new (dsen)
* [AMBARI-23780] Implement an upgrade helper in Ambari Metrics collector that copies data from the old schema to the new (dsen) - changes according to review
* [AMBARI-23780] Implement an upgrade helper in Ambari Metrics collector that copies data from the old schema to the new (dsen) - UT fix
* [AMBARI-23780] Implement an upgrade helper in Ambari Metrics collector that copies data from the old schema to the new. (dsen) - reverted UpgradeCatalog changes
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/PhoenixHBaseAccessor.java b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/PhoenixHBaseAccessor.java
index c684b0a..f2a3bca 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/PhoenixHBaseAccessor.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/PhoenixHBaseAccessor.java
@@ -1332,7 +1332,7 @@
TimelineMetric metric = metricAggregate.getKey();
MetricHostAggregate hostAggregate = metricAggregate.getValue();
- byte[] uuid = metadataManagerInstance.getUuid(metric, false);
+ byte[] uuid = metadataManagerInstance.getUuid(metric, true);
if (uuid == null) {
LOG.error("Error computing UUID for metric. Cannot write aggregate metric : " + metric.toString());
continue;
@@ -1516,7 +1516,7 @@
"aggregate = " + aggregate);
}
- byte[] uuid = metadataManagerInstance.getUuid(clusterMetric, false);
+ byte[] uuid = metadataManagerInstance.getUuid(clusterMetric, true);
if (uuid == null) {
LOG.error("Error computing UUID for metric. Cannot write metric : " + clusterMetric.toString());
continue;
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/AbstractPhoenixMetricsCopier.java b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/AbstractPhoenixMetricsCopier.java
new file mode 100644
index 0000000..6782930
--- /dev/null
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/AbstractPhoenixMetricsCopier.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.upgrade.core;
+
+import org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.FileWriter;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Set;
+
+public abstract class AbstractPhoenixMetricsCopier implements Runnable {
+ private static final Log LOG = LogFactory.getLog(AbstractPhoenixMetricsCopier.class);
+ private static final Long DEFAULT_NATIVE_TIME_RANGE_DELAY = 120000L;
+ private final Long startTime;
+ protected final FileWriter processedMetricsFile;
+ protected String inputTable;
+ protected String outputTable;
+ protected Set<String> metricNames;
+ protected PhoenixHBaseAccessor hBaseAccessor;
+
+ public AbstractPhoenixMetricsCopier(String inputTableName, String outputTableName, PhoenixHBaseAccessor hBaseAccessor,
+ Set<String> metricNames, Long startTime, FileWriter outputStream) {
+ this.inputTable = inputTableName;
+ this.outputTable = outputTableName;
+ this.hBaseAccessor = hBaseAccessor;
+ this.metricNames = metricNames;
+ this.startTime = startTime;
+ this.processedMetricsFile = outputStream;
+ }
+
+ public void run(){
+ LOG.info(String.format("Copying %s metrics from %s to %s", metricNames, inputTable, outputTable));
+ long startTimer = System.currentTimeMillis();
+ String query = String.format("SELECT %s %s FROM %s WHERE %s AND SERVER_TIME > %s ORDER BY METRIC_NAME, SERVER_TIME",
+ getQueryHint(startTime), getColumnsClause(), inputTable, getMetricNamesLikeClause(), startTime);
+
+ runPhoenixQueryAndAddToResults(query);
+
+ try {
+ saveMetrics();
+ } catch (SQLException e) {
+ LOG.error(e);
+ }
+ long estimatedTime = System.currentTimeMillis() - startTimer;
+ LOG.debug(String.format("Copying took %s seconds from table %s to table %s for metric names %s", estimatedTime/ 1000.0, inputTable, outputTable, metricNames));
+
+ saveMetricsProgress();
+ }
+
+ private String getMetricNamesLikeClause() {
+ StringBuilder sb = new StringBuilder();
+ sb.append('(');
+ int i = 0;
+ for (String metricName : metricNames) {
+ sb.append("METRIC_NAME");
+ sb.append(" LIKE ");
+ sb.append("'");
+ sb.append(metricName);
+ sb.append("'");
+
+ if (i < metricNames.size() - 1) {
+ sb.append(" OR ");
+ }
+ i++;
+ }
+
+ sb.append(')');
+ return sb.toString();
+ }
+
+ protected abstract String getColumnsClause();
+
+ private void runPhoenixQueryAndAddToResults(String query) {
+ LOG.debug(String.format("Running query: %s", query));
+ Connection conn = null;
+ PreparedStatement stmt = null;
+ try {
+ conn = hBaseAccessor.getConnection();
+ stmt = conn.prepareStatement(query);
+ ResultSet rs = stmt.executeQuery();
+ while (rs.next()) {
+ addToResults(rs);
+ }
+ } catch (SQLException e) {
+ LOG.error(String.format("Exception during running phoenix query %s", query), e);
+ } finally {
+ if (stmt != null) {
+ try {
+ stmt.close();
+ } catch (SQLException e) {
+ // Ignore
+ }
+ }
+ if (conn != null) {
+ try {
+ conn.close();
+ } catch (SQLException e) {
+ // Ignore
+ }
+ }
+ }
+ }
+
+ /**
+ * Saves processed metric names info provided file in format TABLE_NAME:METRIC_NAME
+ */
+ private void saveMetricsProgress() {
+ if (processedMetricsFile == null) {
+ LOG.info("Skipping metrics progress save as the file is null");
+ return;
+ }
+ for (String metricName : metricNames) {
+ try {
+ processedMetricsFile.append(inputTable + ":" + metricName + System.lineSeparator());
+ } catch (IOException e) {
+ LOG.error(e);
+ }
+ }
+ }
+
+ protected String getQueryHint(Long startTime) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("/*+ ");
+ sb.append("NATIVE_TIME_RANGE(");
+ sb.append(startTime - DEFAULT_NATIVE_TIME_RANGE_DELAY);
+ sb.append(") ");
+ sb.append("*/");
+ return sb.toString();
+ }
+
+ /**
+ * Saves aggregated metrics to the Hbase
+ * @throws SQLException
+ */
+ protected abstract void saveMetrics() throws SQLException;
+
+ /**
+ * Parses result set into aggregates map
+ * @param rs
+ * @throws SQLException
+ */
+ protected abstract void addToResults(ResultSet rs) throws SQLException;
+}
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/MetricsDataMigrationLauncher.java b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/MetricsDataMigrationLauncher.java
new file mode 100644
index 0000000..7f4d93a
--- /dev/null
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/MetricsDataMigrationLauncher.java
@@ -0,0 +1,309 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.upgrade.core;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+import org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor;
+import org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration;
+import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataManager;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.MalformedURLException;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_DAILY_TABLE_NAME;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_DAILY_V1_TABLE_NAME;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_V1_TABLE_NAME;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_V1_TABLE_NAME;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_DAILY_V1_TABLE_NAME;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_V1_TABLE_NAME;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_MINUTE_V1_TABLE_NAME;
+
+public class MetricsDataMigrationLauncher {
+ private static final Log LOG = LogFactory.getLog(MetricsDataMigrationLauncher.class);
+ private static final Long DEFAULT_TIMEOUT_MINUTES = 60*24L;
+ private static String patternPrefix = "._p_";
+ private static final int DEFAULT_BATCH_SIZE = 5;
+ public static final Map<String, String> CLUSTER_AGGREGATE_TABLES_MAPPING = new HashMap<>();
+ public static final Map<String, String> HOST_AGGREGATE_TABLES_MAPPING = new HashMap<>();
+ public static final String DEFAULT_PROCESSED_METRICS_FILE_LOCATION = "/var/log/ambari-metrics-collector/ambari-metrics-migration-state.txt";
+ public static final int DEFAULT_NUMBER_OF_THREADS = 3;
+ public static final long ONE_MONTH_MILLIS = 2592000000L;
+ public static final long DEFAULT_START_TIME = System.currentTimeMillis() - ONE_MONTH_MILLIS; //Last month
+
+ static {
+ CLUSTER_AGGREGATE_TABLES_MAPPING.put(METRICS_CLUSTER_AGGREGATE_MINUTE_V1_TABLE_NAME, METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME);
+ CLUSTER_AGGREGATE_TABLES_MAPPING.put(METRICS_CLUSTER_AGGREGATE_HOURLY_V1_TABLE_NAME, METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME);
+ CLUSTER_AGGREGATE_TABLES_MAPPING.put(METRICS_CLUSTER_AGGREGATE_DAILY_V1_TABLE_NAME, METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME);
+
+ HOST_AGGREGATE_TABLES_MAPPING.put(METRICS_AGGREGATE_MINUTE_V1_TABLE_NAME, METRICS_AGGREGATE_MINUTE_TABLE_NAME);
+ HOST_AGGREGATE_TABLES_MAPPING.put(METRICS_AGGREGATE_HOURLY_V1_TABLE_NAME, METRICS_AGGREGATE_HOURLY_TABLE_NAME);
+ HOST_AGGREGATE_TABLES_MAPPING.put(METRICS_AGGREGATE_DAILY_V1_TABLE_NAME, METRICS_AGGREGATE_DAILY_TABLE_NAME);
+ }
+
+ private final Set<Set<String>> metricNamesBatches;
+ private final String processedMetricsFilePath;
+ private Set<String> metricNames;
+
+ private Long startTime;
+ private Integer batchSize;
+ private Integer numberOfThreads;
+ private TimelineMetricConfiguration timelineMetricConfiguration;
+ private PhoenixHBaseAccessor hBaseAccessor;
+ private Map<String, Set<String>> processedMetrics;
+
+ public MetricsDataMigrationLauncher(String whitelistedFilePath, String processedMetricsFilePath, Long startTime, Integer numberOfThreads, Integer batchSize) throws Exception {
+ this.startTime = startTime == null? DEFAULT_START_TIME : startTime;
+ this.numberOfThreads = numberOfThreads == null? DEFAULT_NUMBER_OF_THREADS : numberOfThreads;
+ this.batchSize = batchSize == null? DEFAULT_BATCH_SIZE : batchSize;
+ this.processedMetricsFilePath = processedMetricsFilePath == null? DEFAULT_PROCESSED_METRICS_FILE_LOCATION : processedMetricsFilePath;
+
+ initializeHbaseAccessor();
+
+ LOG.info("Looking for whitelisted metric names...");
+
+ if (whitelistedFilePath != null) {
+ this.metricNames = readMetricWhitelistFromFile(whitelistedFilePath);
+ } else if (timelineMetricConfiguration.isWhitelistingEnabled()) {
+ String whitelistFile = timelineMetricConfiguration.getMetricsConf().get(TimelineMetricConfiguration.TIMELINE_METRICS_WHITELIST_FILE, TimelineMetricConfiguration.TIMELINE_METRICS_WHITELIST_FILE_LOCATION_DEFAULT);
+ metricNames = readMetricWhitelistFromFile(whitelistFile);
+ } else {
+ LOG.error("No whitelisted metrics specified. Exiting...");
+ throw new Exception("List of whitelisted metrics must be provided");
+ }
+
+ readProcessedMetricsMap();
+
+ LOG.info("Setting up batches...");
+ this.metricNamesBatches = new HashSet<>();
+
+ Iterables.partition(metricNames, this.batchSize)
+ .forEach(batch -> metricNamesBatches.add(new HashSet<>(batch)));
+ LOG.info(String.format("Split metric names into %s batches with size of %s", metricNamesBatches.size(), this.batchSize));
+ }
+
+
+ private void readProcessedMetricsMap() {
+ Map<String, Set<String>> result = new HashMap<>();
+ if (!Files.exists(Paths.get(processedMetricsFilePath))) {
+ LOG.info(String.format("The processed metrics file %s is missing, assuming there were no metrics processed.", processedMetricsFilePath));
+ this.processedMetrics = new HashMap<>();
+ }
+ LOG.info(String.format("Reading the list of already copied metrics from %s", processedMetricsFilePath));
+ try {
+ try (Stream<String> stream = Files.lines(Paths.get(processedMetricsFilePath))) {
+ stream.forEach( line -> {
+ String [] lineSplit = line.split(":");
+ if (!result.containsKey(lineSplit[0])) {
+ result.put(lineSplit[0], new HashSet<>(Collections.singletonList(lineSplit[1])));
+ } else {
+ result.get(lineSplit[0]).add(lineSplit[1]);
+ }
+ });
+ }
+ } catch (IOException e) {
+ LOG.error(e);
+ }
+ this.processedMetrics = result;
+ }
+
+ public void runMigration(Long timeoutInMinutes) throws IOException {
+
+ FileWriter processedMetricsFileWriter = new FileWriter(processedMetricsFilePath, true);
+ LOG.info("Setting up copiers...");
+ Set<AbstractPhoenixMetricsCopier> copiers = new HashSet<>();
+ for (Set<String> batch : metricNamesBatches) {
+ for (Map.Entry<String, String> entry : CLUSTER_AGGREGATE_TABLES_MAPPING.entrySet()) {
+ Set<String> filteredMetrics = filterProcessedMetrics(batch, this.processedMetrics, entry.getKey());
+ if (!filteredMetrics.isEmpty()) {
+ copiers.add(new PhoenixClusterMetricsCopier(entry.getKey(), entry.getValue(), hBaseAccessor,
+ filteredMetrics, startTime, processedMetricsFileWriter));
+ }
+ }
+
+ for (Map.Entry<String, String> entry : HOST_AGGREGATE_TABLES_MAPPING.entrySet()) {
+ Set<String> filteredMetrics = filterProcessedMetrics(batch, processedMetrics, entry.getKey());
+ if (!filteredMetrics.isEmpty()) {
+ copiers.add(new PhoenixHostMetricsCopier(entry.getKey(), entry.getValue(), hBaseAccessor,
+ filteredMetrics, startTime, processedMetricsFileWriter));
+ }
+ }
+ }
+
+ if (copiers.isEmpty()) {
+ LOG.info("No copy threads to run, looks like all metrics have been copied.");
+ processedMetricsFileWriter.close();
+ return;
+ }
+
+ LOG.info("Running the copy threads...");
+ long startTimer = System.currentTimeMillis();
+ ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads == null ? DEFAULT_NUMBER_OF_THREADS : numberOfThreads);
+ for (AbstractPhoenixMetricsCopier copier : copiers) {
+ executorService.submit(copier);
+ }
+
+ executorService.shutdown();
+
+ try {
+ executorService.awaitTermination(timeoutInMinutes, TimeUnit.MINUTES);
+ } catch (InterruptedException e) {
+ LOG.error(e);
+ }
+
+ long estimatedTime = System.currentTimeMillis() - startTimer;
+ LOG.info(String.format("Copying took %s seconds", estimatedTime/1000.0));
+
+ processedMetricsFileWriter.close();
+ }
+
+ private void initializeHbaseAccessor() throws MalformedURLException, URISyntaxException {
+ this.hBaseAccessor = new PhoenixHBaseAccessor(null);
+ this.timelineMetricConfiguration = TimelineMetricConfiguration.getInstance();
+ timelineMetricConfiguration.initialize();
+
+ TimelineMetricMetadataManager timelineMetricMetadataManager = new TimelineMetricMetadataManager(hBaseAccessor);
+ timelineMetricConfiguration.initialize();
+ hBaseAccessor.setMetadataInstance(timelineMetricMetadataManager);
+ }
+
+ private static Set<String> filterProcessedMetrics(Set<String> metricNames, Map<String, Set<String>> processedMetrics, String tableName) {
+ if (!processedMetrics.containsKey(tableName)) {
+ return metricNames;
+ }
+ return Sets.difference(metricNames, processedMetrics.get(tableName));
+ }
+
+ /**
+ * reads metric names from given file
+ * replaces all * with % and removes all ._p_
+ */
+ private static Set<String> readMetricWhitelistFromFile(String whitelistFile) {
+ LOG.info(String.format("Reading metric names from %s", whitelistFile));
+ Set<String> whitelistedMetrics = new HashSet<>();
+
+ BufferedReader br = null;
+ String strLine;
+
+ try(FileInputStream fstream = new FileInputStream(whitelistFile)) {
+ br = new BufferedReader(new InputStreamReader(fstream));
+
+ while ((strLine = br.readLine()) != null) {
+ strLine = strLine.trim();
+ if (StringUtils.isEmpty(strLine)) {
+ continue;
+ }
+ if (strLine.startsWith(patternPrefix)) {
+ strLine = strLine.replace(patternPrefix, "");
+ }
+ if (strLine.contains("*")) {
+ strLine = strLine.replaceAll("\\*", "%");
+ }
+ whitelistedMetrics.add(strLine);
+ }
+ } catch (IOException ioEx) {
+ LOG.error(ioEx);
+ }
+ return whitelistedMetrics;
+ }
+
+
+ /**
+ *
+ * @param args
+ * REQUIRED args[0] - processedMetricsFilePath - full path to the file where processed metric are/will be stored
+ *
+ * OPTIONAL args[1] - whitelistedFilePath - full path to the file with whitelisted metrics filenames
+ * if not provided the default whitelist file location will be used if configured
+ * if not configured - will result in error
+ * args[2] - startTime - default value is set to the last 30 days
+ * args[3] - numberOfThreads - default value is 3
+ * args[4] - batchSize - default value is 5
+ * args[5] - timeoutInMinutes - default value is set to the equivalent of 24 hours
+ */
+ public static void main(String[] args) {
+ String processedMetricsFilePath = null;
+ String whitelistedFilePath = null;
+ Long startTime = null;
+ Integer numberOfThreads = null;
+ Integer batchSize = null;
+ Long timeoutInMinutes = DEFAULT_TIMEOUT_MINUTES;
+
+ if (args.length>0) {
+ processedMetricsFilePath = args[0];
+ }
+ if (args.length>1) {
+ whitelistedFilePath = args[1];
+ }
+ if (args.length>2) {
+ startTime = Long.valueOf(args[2]);
+ }
+ if (args.length>3) {
+ numberOfThreads = Integer.valueOf(args[3]);
+ }
+ if (args.length>4) {
+ batchSize = Integer.valueOf(args[4]);
+ }
+ if (args.length>5) {
+ timeoutInMinutes = Long.valueOf(args[5]);
+ }
+
+ MetricsDataMigrationLauncher dataMigrationLauncher = null;
+ try {
+ LOG.info("Initializing system...");
+ dataMigrationLauncher = new MetricsDataMigrationLauncher(whitelistedFilePath, processedMetricsFilePath, startTime, numberOfThreads, batchSize);
+ } catch (Exception e) {
+ LOG.error("Exception during system setup, exiting...", e);
+ System.exit(1);
+ }
+
+ try {
+ dataMigrationLauncher.runMigration(timeoutInMinutes);
+ } catch (IOException e) {
+ LOG.error("Exception during data migration, exiting...", e);
+ System.exit(1);
+ }
+
+ System.exit(0);
+
+ }
+}
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/PhoenixClusterMetricsCopier.java b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/PhoenixClusterMetricsCopier.java
new file mode 100644
index 0000000..ee65f00
--- /dev/null
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/PhoenixClusterMetricsCopier.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.upgrade.core;
+
+import org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor;
+import org.apache.ambari.metrics.core.timeline.aggregators.TimelineClusterMetric;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
+
+import java.io.FileWriter;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class PhoenixClusterMetricsCopier extends AbstractPhoenixMetricsCopier {
+ private static final Log LOG = LogFactory.getLog(PhoenixClusterMetricsCopier.class);
+ private Map<TimelineClusterMetric, MetricHostAggregate> aggregateMap = new HashMap<>();
+
+ PhoenixClusterMetricsCopier(String inputTableName, String outputTableName, PhoenixHBaseAccessor hBaseAccessor, Set<String> metricNames, Long startTime, FileWriter processedMetricsFileWriter) {
+ super(inputTableName, outputTableName, hBaseAccessor, metricNames, startTime, processedMetricsFileWriter);
+ }
+
+ @Override
+ protected String getColumnsClause() {
+ return "METRIC_NAME, " +
+ "APP_ID, " +
+ "INSTANCE_ID, " +
+ "SERVER_TIME, " +
+ "METRIC_SUM, " +
+ "METRIC_COUNT, " +
+ "METRIC_MAX, " +
+ "METRIC_MIN";
+ }
+
+ @Override
+ protected void saveMetrics() throws SQLException {
+ LOG.debug(String.format("Saving %s results read from %s into %s", aggregateMap.size(), inputTable, outputTable));
+ hBaseAccessor.saveClusterAggregateRecordsSecond(aggregateMap, outputTable);
+ }
+
+ @Override
+ protected void addToResults(ResultSet rs) throws SQLException {
+ TimelineClusterMetric timelineMetric = new TimelineClusterMetric(
+ rs.getString("METRIC_NAME"), rs.getString("APP_ID"),
+ rs.getString("INSTANCE_ID"), rs.getLong("SERVER_TIME"));
+
+ MetricHostAggregate metricHostAggregate = new MetricHostAggregate();
+ metricHostAggregate.setSum(rs.getDouble("METRIC_SUM"));
+ metricHostAggregate.setNumberOfSamples(rs.getLong("METRIC_COUNT"));
+ metricHostAggregate.setMax(rs.getDouble("METRIC_MAX"));
+ metricHostAggregate.setMin(rs.getDouble("METRIC_MIN"));
+
+ aggregateMap.put(timelineMetric, metricHostAggregate);
+
+ }
+}
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/PhoenixHostMetricsCopier.java b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/PhoenixHostMetricsCopier.java
new file mode 100644
index 0000000..a4f0c23
--- /dev/null
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/PhoenixHostMetricsCopier.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.upgrade.core;
+
+import org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+
+import java.io.FileWriter;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class PhoenixHostMetricsCopier extends AbstractPhoenixMetricsCopier {
+ private static final Log LOG = LogFactory.getLog(PhoenixHostMetricsCopier.class);
+ private Map<TimelineMetric, MetricHostAggregate> aggregateMap = new HashMap<>();
+
+ PhoenixHostMetricsCopier(String inputTableName, String outputTableName, PhoenixHBaseAccessor hBaseAccessor, Set<String> metricNames, Long startTime, FileWriter processedMetricsFileWriter) {
+ super(inputTableName, outputTableName, hBaseAccessor, metricNames, startTime, processedMetricsFileWriter);
+ }
+
+ @Override
+ protected String getColumnsClause() {
+ return "METRIC_NAME, " +
+ "HOSTNAME, " +
+ "APP_ID, " +
+ "INSTANCE_ID, " +
+ "SERVER_TIME, " +
+ "METRIC_SUM, " +
+ "METRIC_COUNT, " +
+ "METRIC_MAX, " +
+ "METRIC_MIN";
+ }
+
+ @Override
+ protected void saveMetrics() throws SQLException {
+ LOG.debug(String.format("Saving %s results read from %s into %s", aggregateMap.size(), inputTable, outputTable));
+ hBaseAccessor.saveHostAggregateRecords(aggregateMap, outputTable);
+ }
+
+ @Override
+ protected void addToResults(ResultSet rs) throws SQLException {
+ TimelineMetric timelineMetric = new TimelineMetric();
+ timelineMetric.setMetricName(rs.getString("METRIC_NAME"));
+ timelineMetric.setHostName(rs.getString("HOSTNAME"));
+ timelineMetric.setAppId(rs.getString("APP_ID"));
+ timelineMetric.setInstanceId(rs.getString("INSTANCE_ID"));
+ timelineMetric.setStartTime(rs.getLong("SERVER_TIME"));
+
+ MetricHostAggregate metricHostAggregate = new MetricHostAggregate();
+ metricHostAggregate.setSum(rs.getDouble("METRIC_SUM"));
+ metricHostAggregate.setNumberOfSamples(rs.getLong("METRIC_COUNT"));
+ metricHostAggregate.setMax(rs.getDouble("METRIC_MAX"));
+ metricHostAggregate.setMin(rs.getDouble("METRIC_MIN"));
+
+ aggregateMap.put(timelineMetric, metricHostAggregate);
+ }
+}