blob: 7f4d93af8c14a17a9f06c77e85331add77eb6f24 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ambari.metrics.core.timeline.upgrade.core;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor;
import org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration;
import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataManager;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.MalformedURLException;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_DAILY_TABLE_NAME;
import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_DAILY_V1_TABLE_NAME;
import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME;
import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_V1_TABLE_NAME;
import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME;
import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_V1_TABLE_NAME;
import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME;
import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_DAILY_V1_TABLE_NAME;
import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_V1_TABLE_NAME;
import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME;
import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_MINUTE_V1_TABLE_NAME;
public class MetricsDataMigrationLauncher {
private static final Log LOG = LogFactory.getLog(MetricsDataMigrationLauncher.class);
private static final Long DEFAULT_TIMEOUT_MINUTES = 60*24L;
private static String patternPrefix = "._p_";
private static final int DEFAULT_BATCH_SIZE = 5;
public static final Map<String, String> CLUSTER_AGGREGATE_TABLES_MAPPING = new HashMap<>();
public static final Map<String, String> HOST_AGGREGATE_TABLES_MAPPING = new HashMap<>();
public static final String DEFAULT_PROCESSED_METRICS_FILE_LOCATION = "/var/log/ambari-metrics-collector/ambari-metrics-migration-state.txt";
public static final int DEFAULT_NUMBER_OF_THREADS = 3;
public static final long ONE_MONTH_MILLIS = 2592000000L;
public static final long DEFAULT_START_TIME = System.currentTimeMillis() - ONE_MONTH_MILLIS; //Last month
static {
CLUSTER_AGGREGATE_TABLES_MAPPING.put(METRICS_CLUSTER_AGGREGATE_MINUTE_V1_TABLE_NAME, METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME);
CLUSTER_AGGREGATE_TABLES_MAPPING.put(METRICS_CLUSTER_AGGREGATE_HOURLY_V1_TABLE_NAME, METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME);
CLUSTER_AGGREGATE_TABLES_MAPPING.put(METRICS_CLUSTER_AGGREGATE_DAILY_V1_TABLE_NAME, METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME);
HOST_AGGREGATE_TABLES_MAPPING.put(METRICS_AGGREGATE_MINUTE_V1_TABLE_NAME, METRICS_AGGREGATE_MINUTE_TABLE_NAME);
HOST_AGGREGATE_TABLES_MAPPING.put(METRICS_AGGREGATE_HOURLY_V1_TABLE_NAME, METRICS_AGGREGATE_HOURLY_TABLE_NAME);
HOST_AGGREGATE_TABLES_MAPPING.put(METRICS_AGGREGATE_DAILY_V1_TABLE_NAME, METRICS_AGGREGATE_DAILY_TABLE_NAME);
}
private final Set<Set<String>> metricNamesBatches;
private final String processedMetricsFilePath;
private Set<String> metricNames;
private Long startTime;
private Integer batchSize;
private Integer numberOfThreads;
private TimelineMetricConfiguration timelineMetricConfiguration;
private PhoenixHBaseAccessor hBaseAccessor;
private Map<String, Set<String>> processedMetrics;
public MetricsDataMigrationLauncher(String whitelistedFilePath, String processedMetricsFilePath, Long startTime, Integer numberOfThreads, Integer batchSize) throws Exception {
this.startTime = startTime == null? DEFAULT_START_TIME : startTime;
this.numberOfThreads = numberOfThreads == null? DEFAULT_NUMBER_OF_THREADS : numberOfThreads;
this.batchSize = batchSize == null? DEFAULT_BATCH_SIZE : batchSize;
this.processedMetricsFilePath = processedMetricsFilePath == null? DEFAULT_PROCESSED_METRICS_FILE_LOCATION : processedMetricsFilePath;
initializeHbaseAccessor();
LOG.info("Looking for whitelisted metric names...");
if (whitelistedFilePath != null) {
this.metricNames = readMetricWhitelistFromFile(whitelistedFilePath);
} else if (timelineMetricConfiguration.isWhitelistingEnabled()) {
String whitelistFile = timelineMetricConfiguration.getMetricsConf().get(TimelineMetricConfiguration.TIMELINE_METRICS_WHITELIST_FILE, TimelineMetricConfiguration.TIMELINE_METRICS_WHITELIST_FILE_LOCATION_DEFAULT);
metricNames = readMetricWhitelistFromFile(whitelistFile);
} else {
LOG.error("No whitelisted metrics specified. Exiting...");
throw new Exception("List of whitelisted metrics must be provided");
}
readProcessedMetricsMap();
LOG.info("Setting up batches...");
this.metricNamesBatches = new HashSet<>();
Iterables.partition(metricNames, this.batchSize)
.forEach(batch -> metricNamesBatches.add(new HashSet<>(batch)));
LOG.info(String.format("Split metric names into %s batches with size of %s", metricNamesBatches.size(), this.batchSize));
}
private void readProcessedMetricsMap() {
Map<String, Set<String>> result = new HashMap<>();
if (!Files.exists(Paths.get(processedMetricsFilePath))) {
LOG.info(String.format("The processed metrics file %s is missing, assuming there were no metrics processed.", processedMetricsFilePath));
this.processedMetrics = new HashMap<>();
}
LOG.info(String.format("Reading the list of already copied metrics from %s", processedMetricsFilePath));
try {
try (Stream<String> stream = Files.lines(Paths.get(processedMetricsFilePath))) {
stream.forEach( line -> {
String [] lineSplit = line.split(":");
if (!result.containsKey(lineSplit[0])) {
result.put(lineSplit[0], new HashSet<>(Collections.singletonList(lineSplit[1])));
} else {
result.get(lineSplit[0]).add(lineSplit[1]);
}
});
}
} catch (IOException e) {
LOG.error(e);
}
this.processedMetrics = result;
}
public void runMigration(Long timeoutInMinutes) throws IOException {
FileWriter processedMetricsFileWriter = new FileWriter(processedMetricsFilePath, true);
LOG.info("Setting up copiers...");
Set<AbstractPhoenixMetricsCopier> copiers = new HashSet<>();
for (Set<String> batch : metricNamesBatches) {
for (Map.Entry<String, String> entry : CLUSTER_AGGREGATE_TABLES_MAPPING.entrySet()) {
Set<String> filteredMetrics = filterProcessedMetrics(batch, this.processedMetrics, entry.getKey());
if (!filteredMetrics.isEmpty()) {
copiers.add(new PhoenixClusterMetricsCopier(entry.getKey(), entry.getValue(), hBaseAccessor,
filteredMetrics, startTime, processedMetricsFileWriter));
}
}
for (Map.Entry<String, String> entry : HOST_AGGREGATE_TABLES_MAPPING.entrySet()) {
Set<String> filteredMetrics = filterProcessedMetrics(batch, processedMetrics, entry.getKey());
if (!filteredMetrics.isEmpty()) {
copiers.add(new PhoenixHostMetricsCopier(entry.getKey(), entry.getValue(), hBaseAccessor,
filteredMetrics, startTime, processedMetricsFileWriter));
}
}
}
if (copiers.isEmpty()) {
LOG.info("No copy threads to run, looks like all metrics have been copied.");
processedMetricsFileWriter.close();
return;
}
LOG.info("Running the copy threads...");
long startTimer = System.currentTimeMillis();
ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads == null ? DEFAULT_NUMBER_OF_THREADS : numberOfThreads);
for (AbstractPhoenixMetricsCopier copier : copiers) {
executorService.submit(copier);
}
executorService.shutdown();
try {
executorService.awaitTermination(timeoutInMinutes, TimeUnit.MINUTES);
} catch (InterruptedException e) {
LOG.error(e);
}
long estimatedTime = System.currentTimeMillis() - startTimer;
LOG.info(String.format("Copying took %s seconds", estimatedTime/1000.0));
processedMetricsFileWriter.close();
}
private void initializeHbaseAccessor() throws MalformedURLException, URISyntaxException {
this.hBaseAccessor = new PhoenixHBaseAccessor(null);
this.timelineMetricConfiguration = TimelineMetricConfiguration.getInstance();
timelineMetricConfiguration.initialize();
TimelineMetricMetadataManager timelineMetricMetadataManager = new TimelineMetricMetadataManager(hBaseAccessor);
timelineMetricConfiguration.initialize();
hBaseAccessor.setMetadataInstance(timelineMetricMetadataManager);
}
private static Set<String> filterProcessedMetrics(Set<String> metricNames, Map<String, Set<String>> processedMetrics, String tableName) {
if (!processedMetrics.containsKey(tableName)) {
return metricNames;
}
return Sets.difference(metricNames, processedMetrics.get(tableName));
}
/**
* reads metric names from given file
* replaces all * with % and removes all ._p_
*/
private static Set<String> readMetricWhitelistFromFile(String whitelistFile) {
LOG.info(String.format("Reading metric names from %s", whitelistFile));
Set<String> whitelistedMetrics = new HashSet<>();
BufferedReader br = null;
String strLine;
try(FileInputStream fstream = new FileInputStream(whitelistFile)) {
br = new BufferedReader(new InputStreamReader(fstream));
while ((strLine = br.readLine()) != null) {
strLine = strLine.trim();
if (StringUtils.isEmpty(strLine)) {
continue;
}
if (strLine.startsWith(patternPrefix)) {
strLine = strLine.replace(patternPrefix, "");
}
if (strLine.contains("*")) {
strLine = strLine.replaceAll("\\*", "%");
}
whitelistedMetrics.add(strLine);
}
} catch (IOException ioEx) {
LOG.error(ioEx);
}
return whitelistedMetrics;
}
/**
*
* @param args
* REQUIRED args[0] - processedMetricsFilePath - full path to the file where processed metric are/will be stored
*
* OPTIONAL args[1] - whitelistedFilePath - full path to the file with whitelisted metrics filenames
* if not provided the default whitelist file location will be used if configured
* if not configured - will result in error
* args[2] - startTime - default value is set to the last 30 days
* args[3] - numberOfThreads - default value is 3
* args[4] - batchSize - default value is 5
* args[5] - timeoutInMinutes - default value is set to the equivalent of 24 hours
*/
public static void main(String[] args) {
String processedMetricsFilePath = null;
String whitelistedFilePath = null;
Long startTime = null;
Integer numberOfThreads = null;
Integer batchSize = null;
Long timeoutInMinutes = DEFAULT_TIMEOUT_MINUTES;
if (args.length>0) {
processedMetricsFilePath = args[0];
}
if (args.length>1) {
whitelistedFilePath = args[1];
}
if (args.length>2) {
startTime = Long.valueOf(args[2]);
}
if (args.length>3) {
numberOfThreads = Integer.valueOf(args[3]);
}
if (args.length>4) {
batchSize = Integer.valueOf(args[4]);
}
if (args.length>5) {
timeoutInMinutes = Long.valueOf(args[5]);
}
MetricsDataMigrationLauncher dataMigrationLauncher = null;
try {
LOG.info("Initializing system...");
dataMigrationLauncher = new MetricsDataMigrationLauncher(whitelistedFilePath, processedMetricsFilePath, startTime, numberOfThreads, batchSize);
} catch (Exception e) {
LOG.error("Exception during system setup, exiting...", e);
System.exit(1);
}
try {
dataMigrationLauncher.runMigration(timeoutInMinutes);
} catch (IOException e) {
LOG.error("Exception during data migration, exiting...", e);
System.exit(1);
}
System.exit(0);
}
}