blob: a5a3499f862ed49259419838312dc14e370eef5e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.SERVER_SIDE_TIMESIFT_ADJUSTMENT;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CLUSTER_AGGREGATOR_INTERPOLATION_ENABLED;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRIC_AGGREGATION_SQL_FILTERS;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_METRIC_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
import java.io.IOException;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.mutable.MutableInt;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.sink.timeline.PostProcessingUtil;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
/**
* Aggregates a metric across all hosts in the cluster. Reads metrics from
* the precision table and saves into the aggregate.
*/
public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggregator {
public Long timeSliceIntervalMillis;
private TimelineMetricReadHelper timelineMetricReadHelper = new TimelineMetricReadHelper(true);
// Aggregator to perform app-level aggregates for host metrics
private final TimelineMetricAppAggregator appAggregator;
// 1 minute client side buffering adjustment
private final Long serverTimeShiftAdjustment;
private final boolean interpolationEnabled;
private TimelineMetricMetadataManager metadataManagerInstance;
private String skipAggrPatternStrings;
public TimelineMetricClusterAggregatorSecond(AGGREGATOR_NAME aggregatorName,
TimelineMetricMetadataManager metadataManager,
PhoenixHBaseAccessor hBaseAccessor,
Configuration metricsConf,
String checkpointLocation,
Long sleepIntervalMillis,
Integer checkpointCutOffMultiplier,
String aggregatorDisabledParam,
String tableName,
String outputTableName,
Long nativeTimeRangeDelay,
Long timeSliceInterval,
MetricCollectorHAController haController) {
super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation,
sleepIntervalMillis, checkpointCutOffMultiplier, aggregatorDisabledParam,
tableName, outputTableName, nativeTimeRangeDelay, haController);
this.metadataManagerInstance = metadataManager;
appAggregator = new TimelineMetricAppAggregator(metadataManager, metricsConf);
this.timeSliceIntervalMillis = timeSliceInterval;
this.serverTimeShiftAdjustment = Long.parseLong(metricsConf.get(SERVER_SIDE_TIMESIFT_ADJUSTMENT, "90000"));
this.interpolationEnabled = Boolean.parseBoolean(metricsConf.get(TIMELINE_METRICS_CLUSTER_AGGREGATOR_INTERPOLATION_ENABLED, "true"));
this.skipAggrPatternStrings = metricsConf.get(TIMELINE_METRIC_AGGREGATION_SQL_FILTERS);
}
@Override
protected void aggregate(ResultSet rs, long startTime, long endTime) throws SQLException, IOException {
// Account for time shift due to client side buffering by shifting the
// timestamps with the difference between server time and series start time
// Also, we do not want to look at the shift time period from the end as well since we can interpolate those points
// that come earlier than the expected, during the next run.
List<Long[]> timeSlices = getTimeSlices(startTime - serverTimeShiftAdjustment, endTime - serverTimeShiftAdjustment);
// Initialize app aggregates for host metrics
appAggregator.init();
Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics =
aggregateMetricsFromResultSet(rs, timeSlices);
LOG.info("Saving " + aggregateClusterMetrics.size() + " metric aggregates.");
hBaseAccessor.saveClusterAggregateRecords(aggregateClusterMetrics);
appAggregator.cleanup();
}
@Override
protected Condition prepareMetricQueryCondition(long startTime, long endTime) {
List<String> metricNames = new ArrayList<>();
boolean metricNamesNotCondition = false;
if (!StringUtils.isEmpty(skipAggrPatternStrings)) {
LOG.info("Skipping aggregation for metric patterns : " + skipAggrPatternStrings);
metricNames.addAll(Arrays.asList(skipAggrPatternStrings.split(",")));
metricNamesNotCondition = true;
}
Condition condition = new DefaultCondition(metricNames, null, null, null, startTime - serverTimeShiftAdjustment,
endTime, null, null, true);
condition.setMetricNamesNotCondition(metricNamesNotCondition);
condition.setNoLimit();
condition.setFetchSize(resultsetFetchSize);
condition.setStatement(String.format(GET_METRIC_SQL,
getQueryHint(startTime), METRICS_RECORD_TABLE_NAME));
// Retaining order of the row-key avoids client side merge sort.
condition.addOrderByColumn("METRIC_NAME");
condition.addOrderByColumn("HOSTNAME");
condition.addOrderByColumn("APP_ID");
condition.addOrderByColumn("INSTANCE_ID");
condition.addOrderByColumn("SERVER_TIME");
return condition;
}
/**
* Return time slices to normalize the timeseries data.
*/
List<Long[]> getTimeSlices(long startTime, long endTime) {
List<Long[]> timeSlices = new ArrayList<Long[]>();
long sliceStartTime = startTime;
while (sliceStartTime < endTime) {
timeSlices.add(new Long[] { sliceStartTime, sliceStartTime + timeSliceIntervalMillis });
sliceStartTime += timeSliceIntervalMillis;
}
return timeSlices;
}
Map<TimelineClusterMetric, MetricClusterAggregate> aggregateMetricsFromResultSet(ResultSet rs, List<Long[]> timeSlices)
throws SQLException, IOException {
Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics =
new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
TimelineMetric metric = null;
Map<String, MutableInt> hostedAppCounter = new HashMap<>();
if (rs.next()) {
metric = timelineMetricReadHelper.getTimelineMetricFromResultSet(rs);
// Call slice after all rows for a host are read
while (rs.next()) {
TimelineMetric nextMetric = timelineMetricReadHelper.getTimelineMetricFromResultSet(rs);
// If rows belong to same host combine them before slicing. This
// avoids issues across rows that belong to same hosts but get
// counted as coming from different ones.
if (metric.equalsExceptTime(nextMetric)) {
metric.addMetricValues(nextMetric.getMetricValues());
} else {
// Process the current metric
int numHosts = processAggregateClusterMetrics(aggregateClusterMetrics, metric, timeSlices);
if (!hostedAppCounter.containsKey(metric.getAppId())) {
hostedAppCounter.put(metric.getAppId(), new MutableInt(numHosts));
} else {
int currentHostCount = hostedAppCounter.get(metric.getAppId()).intValue();
if (currentHostCount < numHosts) {
hostedAppCounter.put(metric.getAppId(), new MutableInt(numHosts));
}
}
metric = nextMetric;
}
}
}
// Process last metric
if (metric != null) {
int numHosts = processAggregateClusterMetrics(aggregateClusterMetrics, metric, timeSlices);
if (!hostedAppCounter.containsKey(metric.getAppId())) {
hostedAppCounter.put(metric.getAppId(), new MutableInt(numHosts));
} else {
int currentHostCount = hostedAppCounter.get(metric.getAppId()).intValue();
if (currentHostCount < numHosts) {
hostedAppCounter.put(metric.getAppId(), new MutableInt(numHosts));
}
}
}
// Add app level aggregates to save
aggregateClusterMetrics.putAll(appAggregator.getAggregateClusterMetrics());
// Add liveHosts per AppId metrics.
long timestamp = timeSlices.get(timeSlices.size() - 1)[1];
processLiveAppCountMetrics(aggregateClusterMetrics, hostedAppCounter, timestamp);
return aggregateClusterMetrics;
}
/**
* Slice metric values into interval specified by :
* timeline.metrics.cluster.aggregator.minute.timeslice.interval
* Normalize value by averaging them within the interval
*/
protected int processAggregateClusterMetrics(Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics,
TimelineMetric metric, List<Long[]> timeSlices) {
// Create time slices
TimelineMetricMetadataKey appKey = new TimelineMetricMetadataKey(metric.getMetricName(), metric.getAppId());
TimelineMetricMetadata metricMetadata = metadataManagerInstance.getMetadataCacheValue(appKey);
if (metricMetadata != null && !metricMetadata.isSupportsAggregates()) {
LOG.debug("Skipping cluster aggregation for " + metric.getMetricName());
return 0;
}
Map<TimelineClusterMetric, Double> clusterMetrics = sliceFromTimelineMetric(metric, timeSlices);
int numHosts = 0;
if (clusterMetrics != null && !clusterMetrics.isEmpty()) {
for (Map.Entry<TimelineClusterMetric, Double> clusterMetricEntry : clusterMetrics.entrySet()) {
TimelineClusterMetric clusterMetric = clusterMetricEntry.getKey();
Double avgValue = clusterMetricEntry.getValue();
MetricClusterAggregate aggregate = aggregateClusterMetrics.get(clusterMetric);
if (aggregate == null) {
aggregate = new MetricClusterAggregate(avgValue, 1, null, avgValue, avgValue);
aggregateClusterMetrics.put(clusterMetric, aggregate);
} else {
aggregate.updateSum(avgValue);
aggregate.updateNumberOfHosts(1);
aggregate.updateMax(avgValue);
aggregate.updateMin(avgValue);
}
numHosts = aggregate.getNumberOfHosts();
// Update app level aggregates
appAggregator.processTimelineClusterMetric(clusterMetric, metric.getHostName(), avgValue);
}
}
return numHosts;
}
protected Map<TimelineClusterMetric, Double> sliceFromTimelineMetric(
TimelineMetric timelineMetric, List<Long[]> timeSlices) {
if (timelineMetric.getMetricValues().isEmpty()) {
return null;
}
Map<TimelineClusterMetric, Double> timelineClusterMetricMap =
new HashMap<TimelineClusterMetric, Double>();
Long timeShift = timelineMetric.getTimestamp() - timelineMetric.getStartTime();
if (timeShift < 0) {
LOG.debug("Invalid time shift found, possible discrepancy in clocks. " +
"timeShift = " + timeShift);
timeShift = 0l;
}
Long prevTimestamp = -1l;
TimelineClusterMetric prevMetric = null;
int count = 0;
double sum = 0.0;
Map<Long,Double> timeSliceValueMap = new HashMap<>();
for (Map.Entry<Long, Double> metric : timelineMetric.getMetricValues().entrySet()) {
// TODO: investigate null values - pre filter
if (metric.getValue() == null) {
continue;
}
Long timestamp = getSliceTimeForMetric(timeSlices, Long.parseLong(metric.getKey().toString()));
if (timestamp != -1) {
// Metric is within desired time range
TimelineClusterMetric clusterMetric = new TimelineClusterMetric(
timelineMetric.getMetricName(),
timelineMetric.getAppId(),
timelineMetric.getInstanceId(),
timestamp,
timelineMetric.getType());
if (prevTimestamp < 0 || timestamp.equals(prevTimestamp)) {
Double newValue = metric.getValue();
if (newValue > 0.0) {
sum += newValue;
count++;
}
} else {
double metricValue = (count > 0) ? (sum / count) : 0.0;
timelineClusterMetricMap.put(prevMetric, metricValue);
timeSliceValueMap.put(prevMetric.getTimestamp(), metricValue);
sum = metric.getValue();
count = sum > 0.0 ? 1 : 0;
}
prevTimestamp = timestamp;
prevMetric = clusterMetric;
}
}
if (prevTimestamp > 0) {
double metricValue = (count > 0) ? (sum / count) : 0.0;
timelineClusterMetricMap.put(prevMetric, metricValue);
timeSliceValueMap.put(prevTimestamp, metricValue);
}
if (interpolationEnabled) {
interpolateMissingPeriods(timelineClusterMetricMap, timelineMetric, timeSlices, timeSliceValueMap);
}
return timelineClusterMetricMap;
}
private void interpolateMissingPeriods(Map<TimelineClusterMetric, Double> timelineClusterMetricMap,
TimelineMetric timelineMetric,
List<Long[]> timeSlices,
Map<Long, Double> timeSliceValueMap) {
if (StringUtils.isNotEmpty(timelineMetric.getType()) && "COUNTER".equalsIgnoreCase(timelineMetric.getType())) {
//For Counter Based metrics, ok to do interpolation and extrapolation
List<Long> requiredTimestamps = new ArrayList<>();
for (Long[] timeSlice : timeSlices) {
if (!timeSliceValueMap.containsKey(timeSlice[1])) {
requiredTimestamps.add(timeSlice[1]);
}
}
Map<Long, Double> interpolatedValuesMap = PostProcessingUtil.interpolate(timelineMetric.getMetricValues(), requiredTimestamps);
if (interpolatedValuesMap != null) {
for (Map.Entry<Long, Double> entry : interpolatedValuesMap.entrySet()) {
Double interpolatedValue = entry.getValue();
if (interpolatedValue != null) {
TimelineClusterMetric clusterMetric = new TimelineClusterMetric(
timelineMetric.getMetricName(),
timelineMetric.getAppId(),
timelineMetric.getInstanceId(),
entry.getKey(),
timelineMetric.getType());
timelineClusterMetricMap.put(clusterMetric, interpolatedValue);
} else {
LOG.debug("Cannot compute interpolated value, hence skipping.");
}
}
}
} else {
//For other metrics, ok to do only interpolation
Double defaultNextSeenValue = null;
if (MapUtils.isEmpty(timeSliceValueMap) && MapUtils.isNotEmpty(timelineMetric.getMetricValues())) {
//If no value was found within the start_time based slices, but the metric has value in the server_time range,
// use that.
LOG.debug("No value found within range for metric : " + timelineMetric.getMetricName());
Map.Entry<Long,Double> firstEntry = timelineMetric.getMetricValues().firstEntry();
defaultNextSeenValue = firstEntry.getValue();
LOG.debug("Found a data point outside timeslice range: " + new Date(firstEntry.getKey()) + ": " + defaultNextSeenValue);
}
for (int sliceNum = 0; sliceNum < timeSlices.size(); sliceNum++) {
Long[] timeSlice = timeSlices.get(sliceNum);
if (!timeSliceValueMap.containsKey(timeSlice[1])) {
LOG.debug("Found an empty slice : " + new Date(timeSlice[0]) + ", " + new Date(timeSlice[1]));
Double lastSeenValue = null;
int index = sliceNum - 1;
Long[] prevTimeSlice = null;
while (lastSeenValue == null && index >= 0) {
prevTimeSlice = timeSlices.get(index--);
lastSeenValue = timeSliceValueMap.get(prevTimeSlice[1]);
}
Double nextSeenValue = null;
index = sliceNum + 1;
Long[] nextTimeSlice = null;
while (nextSeenValue == null && index < timeSlices.size()) {
nextTimeSlice = timeSlices.get(index++);
nextSeenValue = timeSliceValueMap.get(nextTimeSlice[1]);
}
if (nextSeenValue == null) {
nextSeenValue = defaultNextSeenValue;
}
Double interpolatedValue = PostProcessingUtil.interpolate(timeSlice[1],
(prevTimeSlice != null ? prevTimeSlice[1] : null), lastSeenValue,
(nextTimeSlice != null ? nextTimeSlice[1] : null), nextSeenValue);
if (interpolatedValue != null) {
TimelineClusterMetric clusterMetric = new TimelineClusterMetric(
timelineMetric.getMetricName(),
timelineMetric.getAppId(),
timelineMetric.getInstanceId(),
timeSlice[1],
timelineMetric.getType());
LOG.debug("Interpolated value : " + interpolatedValue);
timelineClusterMetricMap.put(clusterMetric, interpolatedValue);
} else {
LOG.debug("Cannot compute interpolated value, hence skipping.");
}
}
}
}
}
/**
* Return end of the time slice into which the metric fits.
*/
private Long getSliceTimeForMetric(List<Long[]> timeSlices, Long timestamp) {
for (Long[] timeSlice : timeSlices) {
if (timestamp > timeSlice[0] && timestamp <= timeSlice[1]) {
return timeSlice[1];
}
}
return -1l;
}
/* Add cluster metric for number of hosts that are hosting an appId */
private void processLiveAppCountMetrics(Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics,
Map<String, MutableInt> appHostsCount, long timestamp) {
for (Map.Entry<String, MutableInt> appHostsEntry : appHostsCount.entrySet()) {
TimelineClusterMetric timelineClusterMetric = new TimelineClusterMetric(
"live_hosts", appHostsEntry.getKey(), null, timestamp, null);
Integer numOfHosts = appHostsEntry.getValue().intValue();
MetricClusterAggregate metricClusterAggregate = new MetricClusterAggregate(
(double) numOfHosts, 1, null, (double) numOfHosts, (double) numOfHosts);
aggregateClusterMetrics.put(timelineClusterMetric, metricClusterAggregate);
}
}
}