/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.ambari.metrics.adservice.prototype.core;

import org.apache.ambari.metrics.adservice.prototype.methods.MetricAnomaly;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
import org.codehaus.jackson.map.AnnotationIntrospector;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.ObjectReader;
import org.codehaus.jackson.map.annotate.JsonSerialize;
import org.codehaus.jackson.xc.JaxbAnnotationIntrospector;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.Serializable;
import java.net.HttpURLConnection;
import java.net.InetAddress;
import java.net.URL;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.TreeMap;

public class MetricsCollectorInterface implements Serializable {

  private static String hostName = null;
  private String instanceId = null;
  public final static String serviceName = "anomaly-engine";
  private String collectorHost;
  private String protocol;
  private String port;
  private static final String WS_V1_TIMELINE_METRICS = "/ws/v1/timeline/metrics";
  private static final Log LOG = LogFactory.getLog(MetricsCollectorInterface.class);
  private static ObjectMapper mapper;
  private final static ObjectReader timelineObjectReader;

  static {
    mapper = new ObjectMapper();
    AnnotationIntrospector introspector = new JaxbAnnotationIntrospector();
    mapper.setAnnotationIntrospector(introspector);
    mapper.getSerializationConfig()
      .withSerializationInclusion(JsonSerialize.Inclusion.NON_NULL);
    timelineObjectReader = mapper.reader(TimelineMetrics.class);
  }

  public MetricsCollectorInterface(String collectorHost, String protocol, String port) {
    this.collectorHost = collectorHost;
    this.protocol = protocol;
    this.port = port;
    this.hostName = getDefaultLocalHostName();
  }

  public static String getDefaultLocalHostName() {

    if (hostName != null) {
      return hostName;
    }

    try {
      return InetAddress.getLocalHost().getCanonicalHostName();
    } catch (UnknownHostException e) {
      LOG.info("Error getting host address");
    }
    return null;
  }

  public void publish(List<MetricAnomaly> metricAnomalies) {
    if (CollectionUtils.isNotEmpty(metricAnomalies)) {
      LOG.info("Sending metric anomalies of size : " + metricAnomalies.size());
      List<TimelineMetric> metricList = getTimelineMetricList(metricAnomalies);
      if (!metricList.isEmpty()) {
        TimelineMetrics timelineMetrics = new TimelineMetrics();
        timelineMetrics.setMetrics(metricList);
        emitMetrics(timelineMetrics);
      }
    } else {
      LOG.debug("No anomalies to send.");
    }
  }

  private List<TimelineMetric> getTimelineMetricList(List<MetricAnomaly> metricAnomalies) {
    List<TimelineMetric> metrics = new ArrayList<>();

    if (metricAnomalies.isEmpty()) {
      return metrics;
    }

    for (MetricAnomaly anomaly : metricAnomalies) {
      TimelineMetric timelineMetric = new TimelineMetric();
      timelineMetric.setMetricName(anomaly.getMetricKey());
      timelineMetric.setAppId(serviceName + "-" + anomaly.getMethodType());
      timelineMetric.setInstanceId(null);
      timelineMetric.setHostName(getDefaultLocalHostName());
      timelineMetric.setStartTime(anomaly.getTimestamp());
      HashMap<String, String> metadata = new HashMap<>();
      metadata.put("method", anomaly.getMethodType());
      metadata.put("anomaly-score", String.valueOf(anomaly.getAnomalyScore()));
      timelineMetric.setMetadata(metadata);
      TreeMap<Long,Double> metricValues = new TreeMap<>();
      metricValues.put(anomaly.getTimestamp(), anomaly.getMetricValue());
      timelineMetric.setMetricValues(metricValues);

      metrics.add(timelineMetric);
    }
    return metrics;
  }

  public boolean emitMetrics(TimelineMetrics metrics) {
    String connectUrl = constructTimelineMetricUri();
    String jsonData = null;
    LOG.debug("EmitMetrics connectUrl = " + connectUrl);
    try {
      jsonData = mapper.writeValueAsString(metrics);
      LOG.info(jsonData);
    } catch (IOException e) {
      LOG.error("Unable to parse metrics", e);
    }
    if (jsonData != null) {
      return emitMetricsJson(connectUrl, jsonData);
    }
    return false;
  }

  private HttpURLConnection getConnection(String spec) throws IOException {
    return (HttpURLConnection) new URL(spec).openConnection();
  }

  private boolean emitMetricsJson(String connectUrl, String jsonData) {
    int timeout = 10000;
    HttpURLConnection connection = null;
    try {
      if (connectUrl == null) {
        throw new IOException("Unknown URL. Unable to connect to metrics collector.");
      }
      connection = getConnection(connectUrl);

      connection.setRequestMethod("POST");
      connection.setRequestProperty("Content-Type", "application/json");
      connection.setRequestProperty("Connection", "Keep-Alive");
      connection.setConnectTimeout(timeout);
      connection.setReadTimeout(timeout);
      connection.setDoOutput(true);

      if (jsonData != null) {
        try (OutputStream os = connection.getOutputStream()) {
          os.write(jsonData.getBytes("UTF-8"));
        }
      }

      int statusCode = connection.getResponseCode();

      if (statusCode != 200) {
        LOG.info("Unable to POST metrics to collector, " + connectUrl + ", " +
          "statusCode = " + statusCode);
      } else {
        LOG.info("Metrics posted to Collector " + connectUrl);
      }
      return true;
    } catch (IOException ioe) {
      LOG.error(ioe.getMessage());
    }
    return false;
  }

  private String constructTimelineMetricUri() {
    StringBuilder sb = new StringBuilder(protocol);
    sb.append("://");
    sb.append(collectorHost);
    sb.append(":");
    sb.append(port);
    sb.append(WS_V1_TIMELINE_METRICS);
    return sb.toString();
  }

  public TimelineMetrics fetchMetrics(String metricName,
                                      String appId,
                                      String hostname,
                                      long startime,
                                      long endtime) {

    String url = constructTimelineMetricUri() + "?metricNames=" + metricName + "&appId=" + appId +
      "&hostname=" + hostname + "&startTime=" + startime + "&endTime=" + endtime;
    LOG.debug("Fetch metrics URL : " + url);

    URL obj = null;
    BufferedReader in = null;
    TimelineMetrics timelineMetrics = new TimelineMetrics();

    try {
      obj = new URL(url);
      HttpURLConnection con = (HttpURLConnection) obj.openConnection();
      con.setRequestMethod("GET");
      int responseCode = con.getResponseCode();
      LOG.debug("Sending 'GET' request to URL : " + url);
      LOG.debug("Response Code : " + responseCode);

      in = new BufferedReader(
        new InputStreamReader(con.getInputStream()));
      timelineMetrics = timelineObjectReader.readValue(in);
    } catch (Exception e) {
      LOG.error(e);
    } finally {
      if (in != null) {
        try {
          in.close();
        } catch (IOException e) {
          LOG.warn(e);
        }
      }
    }

    LOG.info("Fetched " + timelineMetrics.getMetrics().size() + " metrics.");
    return timelineMetrics;
  }
}
