/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.ambari.metrics.core.timeline.aggregators;


import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.SERVER_SIDE_TIMESIFT_ADJUSTMENT;
import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CLUSTER_AGGREGATOR_INTERPOLATION_ENABLED;
import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_EVENT_METRIC_PATTERNS;
import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_SUPPORT_MULTIPLE_CLUSTERS;
import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRIC_AGGREGATION_SQL_FILTERS;
import static org.apache.ambari.metrics.core.timeline.aggregators.AggregatorUtils.getTimeSlices;
import static org.apache.ambari.metrics.core.timeline.aggregators.AggregatorUtils.sliceFromTimelineMetric;
import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.GET_METRIC_SQL;
import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
import static org.apache.hadoop.metrics2.sink.timeline.TimelineMetricUtils.getJavaMetricPatterns;

import java.io.IOException;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;

import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.mutable.MutableInt;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
import org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor;
import org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
import org.apache.ambari.metrics.core.timeline.availability.MetricCollectorHAController;
import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataKey;
import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataManager;
import org.apache.ambari.metrics.core.timeline.query.Condition;
import org.apache.ambari.metrics.core.timeline.query.DefaultCondition;

/**
 * Aggregates a metric across all hosts in the cluster. Reads metrics from
 * the precision table and saves into the aggregate.
 */
public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggregator {
  public Long timeSliceIntervalMillis;
  private TimelineMetricReadHelper timelineMetricReadHelper;
  // Aggregator to perform app-level aggregates for host metrics
  private final TimelineMetricAppAggregator appAggregator;
  // 1 minute client side buffering adjustment
  protected final Long serverTimeShiftAdjustment;
  protected final boolean interpolationEnabled;
  private TimelineMetricMetadataManager metadataManagerInstance;
  private String skipAggrPatternStrings;
  private List<String> skipInterpolationMetricPatterns = new ArrayList<>();
  private final static String liveHostsMetricName = "live_hosts";

  public TimelineMetricClusterAggregatorSecond(AGGREGATOR_NAME aggregatorName,
                                               TimelineMetricMetadataManager metadataManager,
                                               PhoenixHBaseAccessor hBaseAccessor,
                                               Configuration metricsConf,
                                               String checkpointLocation,
                                               Long sleepIntervalMillis,
                                               Integer checkpointCutOffMultiplier,
                                               String aggregatorDisabledParam,
                                               String tableName,
                                               String outputTableName,
                                               Long nativeTimeRangeDelay,
                                               Long timeSliceInterval,
                                               MetricCollectorHAController haController) {
    super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation,
      sleepIntervalMillis, checkpointCutOffMultiplier, aggregatorDisabledParam,
      tableName, outputTableName, nativeTimeRangeDelay, haController);

    this.metadataManagerInstance = metadataManager;
    appAggregator = new TimelineMetricAppAggregator(metadataManager, metricsConf);
    this.timeSliceIntervalMillis = timeSliceInterval;
    this.serverTimeShiftAdjustment = Long.parseLong(metricsConf.get(SERVER_SIDE_TIMESIFT_ADJUSTMENT, "90000"));
    this.interpolationEnabled = Boolean.parseBoolean(metricsConf.get(TIMELINE_METRICS_CLUSTER_AGGREGATOR_INTERPOLATION_ENABLED, "true"));
    this.skipAggrPatternStrings = metricsConf.get(TIMELINE_METRIC_AGGREGATION_SQL_FILTERS);
    String skipInterpolationMetricPatternStrings = metricsConf.get(TIMELINE_METRICS_EVENT_METRIC_PATTERNS, "");

    if (StringUtils.isNotEmpty(skipInterpolationMetricPatternStrings)) {
      LOG.info("Skipping Interpolation for patterns : " + skipInterpolationMetricPatternStrings);
      skipInterpolationMetricPatterns.addAll(getJavaMetricPatterns(skipInterpolationMetricPatternStrings));
    }

    if (Boolean.valueOf(metricsConf.get(TIMELINE_METRICS_SUPPORT_MULTIPLE_CLUSTERS, "false"))) {
      this.timelineMetricReadHelper = new TimelineMetricReadHelper(metadataManager, true);
    } else {
      this.timelineMetricReadHelper = new TimelineMetricReadHelper(metadataManager);
    }
  }

  @Override
  protected void aggregate(ResultSet rs, long startTime, long endTime) throws SQLException, IOException {
    // Account for time shift due to client side buffering by shifting the
    // timestamps with the difference between server time and series start time
    // Also, we do not want to look at the shift time period from the end as well since we can interpolate those points
    // that come earlier than the expected, during the next run.
    List<Long[]> timeSlices = getTimeSlices(startTime - serverTimeShiftAdjustment, endTime - serverTimeShiftAdjustment, timeSliceIntervalMillis);
    // Initialize app aggregates for host metrics
    appAggregator.init();
    Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics =
      aggregateMetricsFromResultSet(rs, timeSlices);

    LOG.info("Saving " + aggregateClusterMetrics.size() + " metric aggregates.");
    hBaseAccessor.saveClusterAggregateRecords(aggregateClusterMetrics);
    appAggregator.cleanup();
  }

  @Override
  protected Condition prepareMetricQueryCondition(long startTime, long endTime) {

    List<String> metricNames = new ArrayList<>();
    boolean metricNamesNotCondition = false;

    if (!StringUtils.isEmpty(skipAggrPatternStrings)) {
      LOG.info("Skipping aggregation for metric patterns : " + skipAggrPatternStrings);
      metricNames.addAll(Arrays.asList(skipAggrPatternStrings.split(",")));
      metricNamesNotCondition = true;
    }

    Condition condition = new DefaultCondition(metricNames, null, null, null, startTime - serverTimeShiftAdjustment,
      endTime, null, null, true);
    condition.setMetricNamesNotCondition(metricNamesNotCondition);
    condition.setNoLimit();
    condition.setFetchSize(resultsetFetchSize);
    condition.setStatement(String.format(GET_METRIC_SQL,
      METRICS_RECORD_TABLE_NAME));
    // Retaining order of the row-key avoids client side merge sort.
    condition.addOrderByColumn("UUID");
    condition.addOrderByColumn("SERVER_TIME");
    return condition;
  }

  Map<TimelineClusterMetric, MetricClusterAggregate> aggregateMetricsFromResultSet(ResultSet rs, List<Long[]> timeSlices)
    throws SQLException, IOException {
    Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics =
      new HashMap<TimelineClusterMetric, MetricClusterAggregate>();

    TimelineMetric metric = null;
    Map<String, MutableInt> hostedAppCounter = new HashMap<>();
    if (rs.next()) {
      metric = timelineMetricReadHelper.getTimelineMetricFromResultSet(rs);
      while (metric == null && rs.next()) {
        metric = timelineMetricReadHelper.getTimelineMetricFromResultSet(rs);
      }

      // Call slice after all rows for a host are read
      while (rs.next()) {
        TimelineMetric nextMetric = timelineMetricReadHelper.getTimelineMetricFromResultSet(rs);
        // If rows belong to same host combine them before slicing. This
        // avoids issues across rows that belong to same hosts but get
        // counted as coming from different ones.
        if (nextMetric == null) {
          continue;
        }

        if (metric.equalsExceptTime(nextMetric)) {
          metric.addMetricValues(nextMetric.getMetricValues());
        } else {
          // Process the current metric
          int numHosts = processAggregateClusterMetrics(aggregateClusterMetrics, metric, timeSlices);
          if (!hostedAppCounter.containsKey(metric.getAppId())) {
            hostedAppCounter.put(metric.getAppId(), new MutableInt(numHosts));
          } else {
            int currentHostCount = hostedAppCounter.get(metric.getAppId()).intValue();
            if (currentHostCount < numHosts) {
              hostedAppCounter.put(metric.getAppId(), new MutableInt(numHosts));
            }
          }
          metric = nextMetric;
        }
      }
    }
    // Process last metric
    if (metric != null) {
      int numHosts = processAggregateClusterMetrics(aggregateClusterMetrics, metric, timeSlices);
      if (!hostedAppCounter.containsKey(metric.getAppId())) {
        hostedAppCounter.put(metric.getAppId(), new MutableInt(numHosts));
      } else {
        int currentHostCount = hostedAppCounter.get(metric.getAppId()).intValue();
        if (currentHostCount < numHosts) {
          hostedAppCounter.put(metric.getAppId(), new MutableInt(numHosts));
        }
      }
    }

    // Add app level aggregates to save
    aggregateClusterMetrics.putAll(appAggregator.getAggregateClusterMetrics());

    // Add liveHosts per AppId metrics.
    long timestamp = timeSlices.get(timeSlices.size() - 1)[1];
    processLiveAppCountMetrics(aggregateClusterMetrics, hostedAppCounter, timestamp);

    return aggregateClusterMetrics;
  }

  /**
   * Slice metric values into interval specified by :
   * timeline.metrics.cluster.aggregator.minute.timeslice.interval
   * Normalize value by averaging them within the interval
   */
  protected int processAggregateClusterMetrics(Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics,
                                               TimelineMetric metric, List<Long[]> timeSlices) {
    // Create time slices
    TimelineMetricMetadataKey appKey =  new TimelineMetricMetadataKey(metric.getMetricName(), metric.getAppId(), metric.getInstanceId());
    TimelineMetricMetadata metricMetadata = metadataManagerInstance.getMetadataCacheValue(appKey);

    if (metricMetadata != null && !metricMetadata.isSupportsAggregates()) {
      LOG.debug("Skipping cluster aggregation for " + metric.getMetricName());
      return 0;
    }

    boolean skipInterpolationForMetric = shouldInterpolationBeSkipped(metric.getMetricName());

    Map<TimelineClusterMetric, Double> clusterMetrics = sliceFromTimelineMetric(metric, timeSlices, !skipInterpolationForMetric && interpolationEnabled);

    return aggregateClusterMetricsFromSlices(clusterMetrics, aggregateClusterMetrics, metric.getHostName());
  }

  protected int aggregateClusterMetricsFromSlices(Map<TimelineClusterMetric, Double> clusterMetrics,
                                                  Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics,
                                                  String hostname) {

    int numHosts = 0;
    if (clusterMetrics != null && !clusterMetrics.isEmpty()) {
      for (Map.Entry<TimelineClusterMetric, Double> clusterMetricEntry : clusterMetrics.entrySet()) {

        TimelineClusterMetric clusterMetric = clusterMetricEntry.getKey();
        Double avgValue = clusterMetricEntry.getValue();

        MetricClusterAggregate aggregate = aggregateClusterMetrics.get(clusterMetric);

        if (aggregate == null) {
          aggregate = new MetricClusterAggregate(avgValue, 1, null, avgValue, avgValue);
          aggregateClusterMetrics.put(clusterMetric, aggregate);
        } else {
          aggregate.updateSum(avgValue);
          aggregate.updateNumberOfHosts(1);
          aggregate.updateMax(avgValue);
          aggregate.updateMin(avgValue);
        }

        numHosts = aggregate.getNumberOfHosts();
        // Update app level aggregates
        appAggregator.processTimelineClusterMetric(clusterMetric, hostname, avgValue);
      }
    }
    return numHosts;
  }

  /* Add cluster metric for number of hosts that are hosting an appId */
  protected void processLiveAppCountMetrics(Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics,
                                            Map<String, MutableInt> appHostsCount, long timestamp) {

    for (Map.Entry<String, MutableInt> appHostsEntry : appHostsCount.entrySet()) {
      TimelineClusterMetric timelineClusterMetric = new TimelineClusterMetric(
        liveHostsMetricName, appHostsEntry.getKey(), null, timestamp);

      Integer numOfHosts = appHostsEntry.getValue().intValue();

      MetricClusterAggregate metricClusterAggregate = new MetricClusterAggregate(
        (double) numOfHosts, 1, null, (double) numOfHosts, (double) numOfHosts);

      metadataManagerInstance.getUuid(timelineClusterMetric, true);
      aggregateClusterMetrics.put(timelineClusterMetric, metricClusterAggregate);
    }
  }

  private boolean shouldInterpolationBeSkipped(String metricName) {
    for (String pattern : skipInterpolationMetricPatterns) {
      if (metricName.matches(pattern)) {
        return true;
      }
    }
    return false;
  }
}
