/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.metrics2.sink.ganglia;

import java.io.IOException;
import java.net.*;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.commons.configuration.SubsetConfiguration;
import org.apache.hadoop.metrics2.MetricsSink;
import org.apache.hadoop.metrics2.util.Servers;
import org.apache.hadoop.net.DNS;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * This the base class for Ganglia sink classes using metrics2. Lot of the code
 * has been derived from org.apache.hadoop.metrics.ganglia.GangliaContext.
 * As per the documentation, sink implementations doesn't have to worry about
 * thread safety. Hence the code wasn't written for thread safety and should
 * be modified in case the above assumption changes in the future.
 */
public abstract class AbstractGangliaSink implements MetricsSink {

  public final Logger LOG = LoggerFactory.getLogger(this.getClass());

  /*
   * Output of "gmetric --help" showing allowable values
   * -t, --type=STRING
   *     Either string|int8|uint8|int16|uint16|int32|uint32|float|double
   * -u, --units=STRING Unit of measure for the value e.g. Kilobytes, Celcius
   *     (default='')
   * -s, --slope=STRING Either zero|positive|negative|both
   *     (default='both')
   * -x, --tmax=INT The maximum time in seconds between gmetric calls
   *     (default='60')
   */
  public static final String DEFAULT_UNITS = "";
  public static final int DEFAULT_TMAX = 60;
  public static final int DEFAULT_DMAX = 0;
  public static final GangliaSlope DEFAULT_SLOPE = GangliaSlope.both;
  public static final int DEFAULT_PORT = 8649;
  public static final boolean DEFAULT_MULTICAST_ENABLED = false;
  public static final int DEFAULT_MULTICAST_TTL = 1;
  public static final String SERVERS_PROPERTY = "servers";
  public static final String MULTICAST_ENABLED_PROPERTY = "multicast";
  public static final String MULTICAST_TTL_PROPERTY = "multicast.ttl";
  public static final int BUFFER_SIZE = 1500; // as per libgmond.c
  public static final String SUPPORT_SPARSE_METRICS_PROPERTY = "supportsparse";
  public static final boolean SUPPORT_SPARSE_METRICS_DEFAULT = false;
  public static final String EQUAL = "=";

  private String hostName = "UNKNOWN.example.com";
  private DatagramSocket datagramSocket;
  private List<? extends SocketAddress> metricsServers;
  private boolean multicastEnabled;
  private int multicastTtl;
  private byte[] buffer = new byte[BUFFER_SIZE];
  private int offset;
  private boolean supportSparseMetrics = SUPPORT_SPARSE_METRICS_DEFAULT;

  /**
   * Used for visiting Metrics
   */
  protected final GangliaMetricVisitor gangliaMetricVisitor =
    new GangliaMetricVisitor();

  private SubsetConfiguration conf;
  private Map<String, GangliaConf> gangliaConfMap;
  private GangliaConf DEFAULT_GANGLIA_CONF = new GangliaConf();

  /**
   * ganglia slope values which equal the ordinal
   */
  public enum GangliaSlope {
    zero,       // 0
    positive,   // 1
    negative,   // 2
    both        // 3
  };

  /**
   * define enum for various type of conf
   */
  public enum GangliaConfType {
    slope, units, dmax, tmax
  };

  /*
   * (non-Javadoc)
   *
   * @see
   * org.apache.hadoop.metrics2.MetricsPlugin#init(org.apache.commons.configuration
   * .SubsetConfiguration)
   */
  public void init(SubsetConfiguration conf) {
    LOG.debug("Initializing the GangliaSink for Ganglia metrics.");

    this.conf = conf;

    // Take the hostname from the DNS class.
    if (conf.getString("slave.host.name") != null) {
      hostName = conf.getString("slave.host.name");
    } else {
      try {
        hostName = DNS.getDefaultHost(
            conf.getString("dfs.datanode.dns.interface", "default"),
            conf.getString("dfs.datanode.dns.nameserver", "default"));
      } catch (UnknownHostException uhe) {
        LOG.error(uhe.toString());
        hostName = "UNKNOWN.example.com";
      }
    }

    // load the gannglia servers from properties
    metricsServers = Servers.parse(conf.getString(SERVERS_PROPERTY),
        DEFAULT_PORT);
    multicastEnabled = conf.getBoolean(MULTICAST_ENABLED_PROPERTY,
            DEFAULT_MULTICAST_ENABLED);
    multicastTtl = conf.getInt(MULTICAST_TTL_PROPERTY, DEFAULT_MULTICAST_TTL);

    // extract the Ganglia conf per metrics
    gangliaConfMap = new HashMap<String, GangliaConf>();
    loadGangliaConf(GangliaConfType.units);
    loadGangliaConf(GangliaConfType.tmax);
    loadGangliaConf(GangliaConfType.dmax);
    loadGangliaConf(GangliaConfType.slope);

    try {
      if (multicastEnabled) {
        LOG.info("Enabling multicast for Ganglia with TTL " + multicastTtl);
        datagramSocket = new MulticastSocket();
        ((MulticastSocket) datagramSocket).setTimeToLive(multicastTtl);
      } else {
        datagramSocket = new DatagramSocket();
      }
    } catch (IOException e) {
      LOG.error(e.toString());
    }

    // see if sparseMetrics is supported. Default is false
    supportSparseMetrics = conf.getBoolean(SUPPORT_SPARSE_METRICS_PROPERTY,
        SUPPORT_SPARSE_METRICS_DEFAULT);
  }

  /*
   * (non-Javadoc)
   *
   * @see org.apache.hadoop.metrics2.MetricsSink#flush()
   */
  public void flush() {
    // nothing to do as we are not buffering data
  }

  // Load the configurations for a conf type
  private void loadGangliaConf(GangliaConfType gtype) {
    String propertyarr[] = conf.getStringArray(gtype.name());
    if (propertyarr != null && propertyarr.length > 0) {
      for (String metricNValue : propertyarr) {
        String metricNValueArr[] = metricNValue.split(EQUAL);
        if (metricNValueArr.length != 2 || metricNValueArr[0].length() == 0) {
          LOG.error("Invalid propertylist for " + gtype.name());
        }

        String metricName = metricNValueArr[0].trim();
        String metricValue = metricNValueArr[1].trim();
        GangliaConf gconf = gangliaConfMap.get(metricName);
        if (gconf == null) {
          gconf = new GangliaConf();
          gangliaConfMap.put(metricName, gconf);
        }

        switch (gtype) {
        case units:
          gconf.setUnits(metricValue);
          break;
        case dmax:
          gconf.setDmax(Integer.parseInt(metricValue));
          break;
        case tmax:
          gconf.setTmax(Integer.parseInt(metricValue));
          break;
        case slope:
          gconf.setSlope(GangliaSlope.valueOf(metricValue));
          break;
        }
      }
    }
  }

  /**
   * Lookup GangliaConf from cache. If not found, return default values
   *
   * @param metricName
   * @return looked up GangliaConf
   */
  protected GangliaConf getGangliaConfForMetric(String metricName) {
    GangliaConf gconf = gangliaConfMap.get(metricName);

    return gconf != null ? gconf : DEFAULT_GANGLIA_CONF;
  }

  /**
   * @return the hostName
   */
  protected String getHostName() {
    return hostName;
  }

  /**
   * Puts a string into the buffer by first writing the size of the string as an
   * int, followed by the bytes of the string, padded if necessary to a multiple
   * of 4.
   * @param s the string to be written to buffer at offset location
   */
  protected void xdr_string(String s) {
    byte[] bytes = s.getBytes(StandardCharsets.UTF_8);
    int len = bytes.length;
    xdr_int(len);
    System.arraycopy(bytes, 0, buffer, offset, len);
    offset += len;
    pad();
  }

  // Pads the buffer with zero bytes up to the nearest multiple of 4.
  private void pad() {
    int newOffset = ((offset + 3) / 4) * 4;
    while (offset < newOffset) {
      buffer[offset++] = 0;
    }
  }

  /**
   * Puts an integer into the buffer as 4 bytes, big-endian.
   */
  protected void xdr_int(int i) {
    buffer[offset++] = (byte) ((i >> 24) & 0xff);
    buffer[offset++] = (byte) ((i >> 16) & 0xff);
    buffer[offset++] = (byte) ((i >> 8) & 0xff);
    buffer[offset++] = (byte) (i & 0xff);
  }

  /**
   * Sends Ganglia Metrics to the configured hosts
   * @throws IOException
   */
  protected void emitToGangliaHosts() throws IOException {
    try {
      for (SocketAddress socketAddress : metricsServers) {
        if (socketAddress == null || !(socketAddress instanceof InetSocketAddress))
          throw new IllegalArgumentException("Unsupported Address type");
        InetSocketAddress inetAddress = (InetSocketAddress)socketAddress;
        if(inetAddress.isUnresolved()) {
          throw new UnknownHostException("Unresolved host: " + inetAddress);
        }
        DatagramPacket packet =
          new DatagramPacket(buffer, offset, socketAddress);
        datagramSocket.send(packet);
      }
    } finally {
      // reset the buffer for the next metric to be built
      offset = 0;
    }
  }

  /**
   * Reset the buffer for the next metric to be built
   */
  void resetBuffer() {
    offset = 0;
  }

  /**
   * @return whether sparse metrics are supported
   */
  protected boolean isSupportSparseMetrics() {
    return supportSparseMetrics;
  }

  /**
   * Used only by unit test
   * @param datagramSocket the datagramSocket to set.
   */
  void setDatagramSocket(DatagramSocket datagramSocket) {
    this.datagramSocket = datagramSocket;
  }

  /**
   * Used only by unit tests
   * @return the datagramSocket for this sink
   */
  DatagramSocket getDatagramSocket() {
    return datagramSocket;
  }
}
