blob: 878dcb318db232d5a99b1e4ac4851b8dbfd768a7 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ambari.metrics.core.timeline.aggregators;
import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.SERVER_SIDE_TIMESIFT_ADJUSTMENT;
import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CLUSTER_AGGREGATOR_INTERPOLATION_ENABLED;
import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_EVENT_METRIC_PATTERNS;
import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRIC_AGGREGATION_SQL_FILTERS;
import static org.apache.ambari.metrics.core.timeline.aggregators.AggregatorUtils.getTimeSlices;
import static org.apache.ambari.metrics.core.timeline.aggregators.AggregatorUtils.sliceFromTimelineMetric;
import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.GET_METRIC_SQL;
import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
import static org.apache.hadoop.metrics2.sink.timeline.TimelineMetricUtils.getJavaMetricPatterns;
import java.io.IOException;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.mutable.MutableInt;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
import org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor;
import org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
import org.apache.ambari.metrics.core.timeline.availability.MetricCollectorHAController;
import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataKey;
import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataManager;
import org.apache.ambari.metrics.core.timeline.query.Condition;
import org.apache.ambari.metrics.core.timeline.query.DefaultCondition;
/**
* Aggregates a metric across all hosts in the cluster. Reads metrics from
* the precision table and saves into the aggregate.
*/
public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggregator {
public Long timeSliceIntervalMillis;
private TimelineMetricReadHelper timelineMetricReadHelper;
// Aggregator to perform app-level aggregates for host metrics
private final TimelineMetricAppAggregator appAggregator;
// 1 minute client side buffering adjustment
protected final Long serverTimeShiftAdjustment;
protected final boolean interpolationEnabled;
private TimelineMetricMetadataManager metadataManagerInstance;
private String skipAggrPatternStrings;
private List<String> skipInterpolationMetricPatterns = new ArrayList<>();
private final static String liveHostsMetricName = "live_hosts";
public TimelineMetricClusterAggregatorSecond(AGGREGATOR_NAME aggregatorName,
TimelineMetricMetadataManager metadataManager,
PhoenixHBaseAccessor hBaseAccessor,
Configuration metricsConf,
String checkpointLocation,
Long sleepIntervalMillis,
Integer checkpointCutOffMultiplier,
String aggregatorDisabledParam,
String tableName,
String outputTableName,
Long nativeTimeRangeDelay,
Long timeSliceInterval,
MetricCollectorHAController haController) {
super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation,
sleepIntervalMillis, checkpointCutOffMultiplier, aggregatorDisabledParam,
tableName, outputTableName, nativeTimeRangeDelay, haController);
this.metadataManagerInstance = metadataManager;
appAggregator = new TimelineMetricAppAggregator(metadataManager, metricsConf);
this.timeSliceIntervalMillis = timeSliceInterval;
this.serverTimeShiftAdjustment = Long.parseLong(metricsConf.get(SERVER_SIDE_TIMESIFT_ADJUSTMENT, "90000"));
this.interpolationEnabled = Boolean.parseBoolean(metricsConf.get(TIMELINE_METRICS_CLUSTER_AGGREGATOR_INTERPOLATION_ENABLED, "true"));
this.skipAggrPatternStrings = metricsConf.get(TIMELINE_METRIC_AGGREGATION_SQL_FILTERS);
String skipInterpolationMetricPatternStrings = metricsConf.get(TIMELINE_METRICS_EVENT_METRIC_PATTERNS, "");
if (StringUtils.isNotEmpty(skipInterpolationMetricPatternStrings)) {
LOG.info("Skipping Interpolation for patterns : " + skipInterpolationMetricPatternStrings);
skipInterpolationMetricPatterns.addAll(getJavaMetricPatterns(skipInterpolationMetricPatternStrings));
}
this.timelineMetricReadHelper = new TimelineMetricReadHelper(metadataManager);
}
@Override
protected void aggregate(ResultSet rs, long startTime, long endTime) throws SQLException, IOException {
// Account for time shift due to client side buffering by shifting the
// timestamps with the difference between server time and series start time
// Also, we do not want to look at the shift time period from the end as well since we can interpolate those points
// that come earlier than the expected, during the next run.
List<Long[]> timeSlices = getTimeSlices(startTime - serverTimeShiftAdjustment, endTime - serverTimeShiftAdjustment, timeSliceIntervalMillis);
// Initialize app aggregates for host metrics
appAggregator.init();
Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics =
aggregateMetricsFromResultSet(rs, timeSlices);
LOG.info("Saving " + aggregateClusterMetrics.size() + " metric aggregates.");
hBaseAccessor.saveClusterAggregateRecords(aggregateClusterMetrics);
appAggregator.cleanup();
}
@Override
protected Condition prepareMetricQueryCondition(long startTime, long endTime) {
List<String> metricNames = new ArrayList<>();
boolean metricNamesNotCondition = false;
if (!StringUtils.isEmpty(skipAggrPatternStrings)) {
LOG.info("Skipping aggregation for metric patterns : " + skipAggrPatternStrings);
metricNames.addAll(Arrays.asList(skipAggrPatternStrings.split(",")));
metricNamesNotCondition = true;
}
Condition condition = new DefaultCondition(metricNames, null, null, null, startTime - serverTimeShiftAdjustment,
endTime, null, null, true);
condition.setMetricNamesNotCondition(metricNamesNotCondition);
condition.setNoLimit();
condition.setFetchSize(resultsetFetchSize);
condition.setStatement(String.format(GET_METRIC_SQL,
METRICS_RECORD_TABLE_NAME));
// Retaining order of the row-key avoids client side merge sort.
condition.addOrderByColumn("UUID");
condition.addOrderByColumn("SERVER_TIME");
return condition;
}
Map<TimelineClusterMetric, MetricClusterAggregate> aggregateMetricsFromResultSet(ResultSet rs, List<Long[]> timeSlices)
throws SQLException, IOException {
Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics =
new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
TimelineMetric metric = null;
Map<String, MutableInt> hostedAppCounter = new HashMap<>();
if (rs.next()) {
metric = timelineMetricReadHelper.getTimelineMetricFromResultSet(rs);
// Call slice after all rows for a host are read
while (rs.next()) {
TimelineMetric nextMetric = timelineMetricReadHelper.getTimelineMetricFromResultSet(rs);
// If rows belong to same host combine them before slicing. This
// avoids issues across rows that belong to same hosts but get
// counted as coming from different ones.
if (nextMetric == null) {
continue;
}
if (metric.equalsExceptTime(nextMetric)) {
metric.addMetricValues(nextMetric.getMetricValues());
} else {
// Process the current metric
int numHosts = processAggregateClusterMetrics(aggregateClusterMetrics, metric, timeSlices);
if (!hostedAppCounter.containsKey(metric.getAppId())) {
hostedAppCounter.put(metric.getAppId(), new MutableInt(numHosts));
} else {
int currentHostCount = hostedAppCounter.get(metric.getAppId()).intValue();
if (currentHostCount < numHosts) {
hostedAppCounter.put(metric.getAppId(), new MutableInt(numHosts));
}
}
metric = nextMetric;
}
}
}
// Process last metric
if (metric != null) {
int numHosts = processAggregateClusterMetrics(aggregateClusterMetrics, metric, timeSlices);
if (!hostedAppCounter.containsKey(metric.getAppId())) {
hostedAppCounter.put(metric.getAppId(), new MutableInt(numHosts));
} else {
int currentHostCount = hostedAppCounter.get(metric.getAppId()).intValue();
if (currentHostCount < numHosts) {
hostedAppCounter.put(metric.getAppId(), new MutableInt(numHosts));
}
}
}
// Add app level aggregates to save
aggregateClusterMetrics.putAll(appAggregator.getAggregateClusterMetrics());
// Add liveHosts per AppId metrics.
long timestamp = timeSlices.get(timeSlices.size() - 1)[1];
processLiveAppCountMetrics(aggregateClusterMetrics, hostedAppCounter, timestamp);
return aggregateClusterMetrics;
}
/**
* Slice metric values into interval specified by :
* timeline.metrics.cluster.aggregator.minute.timeslice.interval
* Normalize value by averaging them within the interval
*/
protected int processAggregateClusterMetrics(Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics,
TimelineMetric metric, List<Long[]> timeSlices) {
// Create time slices
TimelineMetricMetadataKey appKey = new TimelineMetricMetadataKey(metric.getMetricName(), metric.getAppId(), metric.getInstanceId());
TimelineMetricMetadata metricMetadata = metadataManagerInstance.getMetadataCacheValue(appKey);
if (metricMetadata != null && !metricMetadata.isSupportsAggregates()) {
LOG.debug("Skipping cluster aggregation for " + metric.getMetricName());
return 0;
}
boolean skipInterpolationForMetric = shouldInterpolationBeSkipped(metric.getMetricName());
Map<TimelineClusterMetric, Double> clusterMetrics = sliceFromTimelineMetric(metric, timeSlices, !skipInterpolationForMetric && interpolationEnabled);
return aggregateClusterMetricsFromSlices(clusterMetrics, aggregateClusterMetrics, metric.getHostName());
}
protected int aggregateClusterMetricsFromSlices(Map<TimelineClusterMetric, Double> clusterMetrics,
Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics,
String hostname) {
int numHosts = 0;
if (clusterMetrics != null && !clusterMetrics.isEmpty()) {
for (Map.Entry<TimelineClusterMetric, Double> clusterMetricEntry : clusterMetrics.entrySet()) {
TimelineClusterMetric clusterMetric = clusterMetricEntry.getKey();
Double avgValue = clusterMetricEntry.getValue();
MetricClusterAggregate aggregate = aggregateClusterMetrics.get(clusterMetric);
if (aggregate == null) {
aggregate = new MetricClusterAggregate(avgValue, 1, null, avgValue, avgValue);
aggregateClusterMetrics.put(clusterMetric, aggregate);
} else {
aggregate.updateSum(avgValue);
aggregate.updateNumberOfHosts(1);
aggregate.updateMax(avgValue);
aggregate.updateMin(avgValue);
}
numHosts = aggregate.getNumberOfHosts();
// Update app level aggregates
appAggregator.processTimelineClusterMetric(clusterMetric, hostname, avgValue);
}
}
return numHosts;
}
/* Add cluster metric for number of hosts that are hosting an appId */
protected void processLiveAppCountMetrics(Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics,
Map<String, MutableInt> appHostsCount, long timestamp) {
for (Map.Entry<String, MutableInt> appHostsEntry : appHostsCount.entrySet()) {
TimelineClusterMetric timelineClusterMetric = new TimelineClusterMetric(
liveHostsMetricName, appHostsEntry.getKey(), null, timestamp);
Integer numOfHosts = appHostsEntry.getValue().intValue();
MetricClusterAggregate metricClusterAggregate = new MetricClusterAggregate(
(double) numOfHosts, 1, null, (double) numOfHosts, (double) numOfHosts);
metadataManagerInstance.getUuid(timelineClusterMetric, true);
aggregateClusterMetrics.put(timelineClusterMetric, metricClusterAggregate);
}
}
private boolean shouldInterpolationBeSkipped(String metricName) {
for (String pattern : skipInterpolationMetricPatterns) {
if (metricName.matches(pattern)) {
return true;
}
}
return false;
}
}