[AMBARI-24146] Metrics migrated during AMS upgrade are not saved into… (#1580)

* [AMBARI-24146] Metrics migrated during AMS upgrade are not saved into metadata table (dsen)

* [AMBARI-24146] Metrics migrated during AMS upgrade are not saved into metadata table (dsen)

* [AMBARI-24146] Metrics migrated during AMS upgrade are not saved into metadata table (dsen) - shutdown hook
diff --git a/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricMetadata.java b/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricMetadata.java
index 6c9712f..b1266b1 100644
--- a/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricMetadata.java
+++ b/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricMetadata.java
@@ -142,6 +142,10 @@
     return isWhitelisted;
   }
 
+  public void setIsWhitelisted(boolean isWhitelisted) {
+    this.isWhitelisted = isWhitelisted;
+  }
+
   public void setSupportsAggregates(boolean supportsAggregates) {
     this.supportsAggregates = supportsAggregates;
   }
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/PhoenixHBaseAccessor.java b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/PhoenixHBaseAccessor.java
index e3da8b2..b6da012 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/PhoenixHBaseAccessor.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/PhoenixHBaseAccessor.java
@@ -61,6 +61,7 @@
 import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.GET_HOSTED_APPS_METADATA_SQL;
 import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.GET_INSTANCE_HOST_METADATA_SQL;
 import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.GET_METRIC_METADATA_SQL;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.GET_METRIC_METADATA_SQL_V1;
 import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_DAILY_TABLE_NAME;
 import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME;
 import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME;
@@ -1975,6 +1976,63 @@
     return metadataMap;
   }
 
+  // No filter criteria support for now.
+  public Map<TimelineMetricMetadataKey, TimelineMetricMetadata> getTimelineMetricMetadataV1() throws SQLException {
+    Map<TimelineMetricMetadataKey, TimelineMetricMetadata> metadataMap = new HashMap<>();
+    Connection conn = getConnection();
+    PreparedStatement stmt = null;
+    ResultSet rs = null;
+
+    try {
+      stmt = conn.prepareStatement(GET_METRIC_METADATA_SQL_V1);
+      rs = stmt.executeQuery();
+
+      while (rs.next()) {
+        String metricName = rs.getString("METRIC_NAME");
+        String appId = rs.getString("APP_ID");
+        TimelineMetricMetadata metadata = new TimelineMetricMetadata(
+          metricName,
+          appId,
+          null,
+          rs.getString("UNITS"),
+          rs.getString("TYPE"),
+          rs.getLong("START_TIME"),
+          rs.getBoolean("SUPPORTS_AGGREGATION"),
+          rs.getBoolean("IS_WHITELISTED")
+        );
+
+        TimelineMetricMetadataKey key = new TimelineMetricMetadataKey(metricName, appId, null);
+        metadata.setIsPersisted(false);
+        metadataMap.put(key, metadata);
+      }
+
+    } finally {
+      if (rs != null) {
+        try {
+          rs.close();
+        } catch (SQLException e) {
+          // Ignore
+        }
+      }
+      if (stmt != null) {
+        try {
+          stmt.close();
+        } catch (SQLException e) {
+          // Ignore
+        }
+      }
+      if (conn != null) {
+        try {
+          conn.close();
+        } catch (SQLException sql) {
+          // Ignore
+        }
+      }
+    }
+
+    return metadataMap;
+  }
+
   public void setMetadataInstance(TimelineMetricMetadataManager metadataManager) {
     this.metadataManagerInstance = metadataManager;
     TIMELINE_METRIC_READ_HELPER = new TimelineMetricReadHelper(this.metadataManagerInstance);
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/discovery/TimelineMetricMetadataManager.java b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/discovery/TimelineMetricMetadataManager.java
index 86226ec..4124fb6 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/discovery/TimelineMetricMetadataManager.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/discovery/TimelineMetricMetadataManager.java
@@ -131,6 +131,13 @@
    * Initialize Metadata from the store
    */
   public void initializeMetadata() {
+    initializeMetadata(true);
+  }
+
+  /**
+   * Initialize Metadata from the store
+   */
+  public void initializeMetadata(boolean scheduleMetadateSync) {
 
     //Create metadata schema
     Connection conn = null;
@@ -180,10 +187,12 @@
 
       metricMetadataSync = new TimelineMetricMetadataSync(this);
     // Schedule the executor to sync to store
-    executorService.scheduleWithFixedDelay(metricMetadataSync,
-      metricsConf.getInt(METRICS_METADATA_SYNC_INIT_DELAY, 120), // 2 minutes
-      metricsConf.getInt(METRICS_METADATA_SYNC_SCHEDULE_DELAY, 300), // 5 minutes
-      TimeUnit.SECONDS);
+    if (scheduleMetadateSync) {
+      executorService.scheduleWithFixedDelay(metricMetadataSync,
+          metricsConf.getInt(METRICS_METADATA_SYNC_INIT_DELAY, 120), // 2 minutes
+          metricsConf.getInt(METRICS_METADATA_SYNC_SCHEDULE_DELAY, 300), // 5 minutes
+          TimeUnit.SECONDS);
+    }
     // Read from store and initialize map
     try {
       Map<TimelineMetricMetadataKey, TimelineMetricMetadata> metadata = getMetadataFromStore();
@@ -357,6 +366,14 @@
   }
 
   /**
+   * Fetch metrics metadata from store from V1 table (no UUID)
+   * @throws SQLException
+   */
+  Map<TimelineMetricMetadataKey, TimelineMetricMetadata> getMetadataFromStoreV1() throws SQLException {
+    return hBaseAccessor.getTimelineMetricMetadataV1();
+  }
+
+  /**
    * Fetch metrics metadata from store
    * @throws SQLException
    */
@@ -510,6 +527,10 @@
       timelineMetric.getInstanceId(), -1l), createIfNotPresent);
     byte[] hostUuid = getUuidForHostname(timelineMetric.getHostName(), createIfNotPresent);
 
+    if (hostUuid != null) {
+      putIfModifiedHostedAppsMetadata(timelineMetric.getHostName(), timelineMetric.getAppId());
+    }
+
     if (metricUuid == null || hostUuid == null) {
       return null;
     }
@@ -797,4 +818,35 @@
     return false;
   }
 
+  /**
+   * Run TimelineMetricMetadataSync once
+   */
+  public void forceMetricsMetadataSync() {
+    metricMetadataSync.run();
+  }
+
+  public void updateMetadataCacheUsingV1Tables() throws SQLException {
+    Map<TimelineMetricMetadataKey, TimelineMetricMetadata> metadataV1Map = getMetadataFromStoreV1();
+    for (TimelineMetricMetadataKey key: METADATA_CACHE.keySet()) {
+      TimelineMetricMetadata cacheValue = METADATA_CACHE.get(key);
+      TimelineMetricMetadata oldValue = metadataV1Map.get(key);
+
+      if (oldValue != null) {
+        if (!cacheValue.isPersisted()) {
+          LOG.info(String.format("Updating properties for %s", key));
+          cacheValue.setSeriesStartTime(oldValue.getSeriesStartTime());
+          cacheValue.setSupportsAggregates(oldValue.isSupportsAggregates());
+          cacheValue.setType(oldValue.getType());
+          cacheValue.setIsWhitelisted(oldValue.isWhitelisted());
+        } else if (oldValue.getSeriesStartTime() < cacheValue.getSeriesStartTime() &&
+                   cacheValue.getSeriesStartTime() != 0L &&
+                   cacheValue.isWhitelisted())
+        {
+          LOG.info(String.format("Updating startTime for %s", key));
+          cacheValue.setSeriesStartTime(oldValue.getSeriesStartTime());
+          cacheValue.setIsPersisted(false);
+        }
+      }
+    }
+  }
 }
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/PhoenixTransactSQL.java b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/PhoenixTransactSQL.java
index 539bb17..42bdd60 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/PhoenixTransactSQL.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/PhoenixTransactSQL.java
@@ -366,6 +366,10 @@
     "METRIC_NAME, APP_ID, INSTANCE_ID, UUID, UNITS, TYPE, START_TIME, " +
     "SUPPORTS_AGGREGATION, IS_WHITELISTED FROM METRICS_METADATA_UUID";
 
+  public static final String GET_METRIC_METADATA_SQL_V1 = "SELECT " +
+    "METRIC_NAME, APP_ID, UNITS, TYPE, START_TIME, " +
+    "SUPPORTS_AGGREGATION, IS_WHITELISTED FROM METRICS_METADATA";
+
   public static final String GET_HOSTED_APPS_METADATA_SQL = "SELECT " +
     "HOSTNAME, UUID, APP_IDS FROM HOSTED_APPS_METADATA_UUID";
 
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/MetricsDataMigrationLauncher.java b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/MetricsDataMigrationLauncher.java
index 7f4d93a..5c4ee4e 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/MetricsDataMigrationLauncher.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/MetricsDataMigrationLauncher.java
@@ -35,6 +35,7 @@
 import java.net.URISyntaxException;
 import java.nio.file.Files;
 import java.nio.file.Paths;
+import java.sql.SQLException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -89,6 +90,7 @@
   private Integer numberOfThreads;
   private TimelineMetricConfiguration timelineMetricConfiguration;
   private PhoenixHBaseAccessor hBaseAccessor;
+  private TimelineMetricMetadataManager timelineMetricMetadataManager;
   private Map<String, Set<String>> processedMetrics;
 
   public MetricsDataMigrationLauncher(String whitelistedFilePath, String processedMetricsFilePath, Long startTime, Integer numberOfThreads, Integer batchSize) throws Exception {
@@ -201,8 +203,9 @@
     this.timelineMetricConfiguration = TimelineMetricConfiguration.getInstance();
     timelineMetricConfiguration.initialize();
 
-    TimelineMetricMetadataManager timelineMetricMetadataManager = new TimelineMetricMetadataManager(hBaseAccessor);
-    timelineMetricConfiguration.initialize();
+    timelineMetricMetadataManager = new TimelineMetricMetadataManager(hBaseAccessor);
+    timelineMetricMetadataManager.initializeMetadata(false);
+
     hBaseAccessor.setMetadataInstance(timelineMetricMetadataManager);
   }
 
@@ -246,6 +249,13 @@
     return whitelistedMetrics;
   }
 
+  private void saveMetadata() throws SQLException {
+    LOG.info("Saving metadata to store...");
+    timelineMetricMetadataManager.updateMetadataCacheUsingV1Tables();
+    timelineMetricMetadataManager.forceMetricsMetadataSync();
+    LOG.info("Metadata was saved.");
+  }
+
 
   /**
    *
@@ -297,6 +307,18 @@
     }
 
     try {
+      //Setup shutdown hook for metadata save.
+      MetricsDataMigrationLauncher finalDataMigrationLauncher = dataMigrationLauncher;
+      Runtime.getRuntime().addShutdownHook(new Thread() {
+        public void run() {
+          try {
+            finalDataMigrationLauncher.saveMetadata();
+          } catch (SQLException e) {
+            LOG.error("Exception during metadata saving, exiting...", e);
+          }
+        }
+      });
+
       dataMigrationLauncher.runMigration(timeoutInMinutes);
     } catch (IOException e) {
       LOG.error("Exception during data migration, exiting...", e);