/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */


package org.apache.oozie.util;

import com.codahale.metrics.Counter;
import com.codahale.metrics.ExponentiallyDecayingReservoir;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.JmxReporter;
import com.codahale.metrics.MetricFilter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.ganglia.GangliaReporter;
import com.codahale.metrics.graphite.Graphite;
import com.codahale.metrics.graphite.GraphiteReporter;
import com.codahale.metrics.json.MetricsModule;
import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import info.ganglia.gmetric4j.gmetric.GMetric;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.dropwizard.DropwizardExports;
import org.apache.oozie.service.ConfigurationService;

import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * Instrumentation framework that is mostly compatible with {@link Instrumentation} but is backed by Codahale Metrics.  This class
 * was designed to minimize the changes required to switch from {@link Instrumentation} to {@link MetricsInstrumentation} by keeping
 * the same API.  However, certain operations are obviously implemented differently or are no longer needed; and the output format
 * is a little different.  Internally, this class maps Cron to {@link com.codahale.metrics.Timer}, Variable to {@link Gauge},
 * counter to {@link Counter}, and Sampler to {@link Histogram}.
 */
@SuppressWarnings("unchecked")
public class MetricsInstrumentation extends Instrumentation {

    private final MetricRegistry metricRegistry;
    private transient ObjectMapper jsonMapper;
    private ScheduledExecutorService scheduler;
    private final LoadingCache<String, com.codahale.metrics.Counter> counters;
    private final Map<String, Gauge> gauges;
    private final LoadingCache<String, com.codahale.metrics.Timer> timers;
    private final Map<String, Histogram> histograms;
    private Lock timersLock;
    private Lock gaugesLock;
    private Lock countersLock;
    private Lock histogramsLock;

    public static final String EXTERNAL_MONITORING_ENABLE = "oozie.external_monitoring.enable";
    public static final String EXTERNAL_MONITORING_TYPE = "oozie.external_monitoring.type";
    public static final String EXTERNAL_MONITORING_ADDRESS = "oozie.external_monitoring.address";
    public static final String EXTERNAL_MONITORING_PREFIX = "oozie.external_monitoring.metricPrefix";
    public static final String EXTERNAL_MONITORING_INTERVAL = "oozie.external_monitoring.reporterIntervalSecs";
    public static final String JMX_MONITORING_ENABLE = "oozie.jmx_monitoring.enable";
    public static final String GRAPHITE="graphite";
    public static final String GANGLIA="ganglia";
    private String metricsAddress;
    private String metricsHost;
    private String metricsPrefix;
    private String metricsServerName;
    private int metricsPort;
    private GraphiteReporter graphiteReporter = null;
    private GangliaReporter gangliaReporter = null;
    private JmxReporter jmxReporter = null;
    private long metricsReportIntervalSec;
    private boolean isExternalMonitoringEnabled;
    private boolean isJMXMonitoringEnabled;

    private static final TimeUnit RATE_UNIT = TimeUnit.MILLISECONDS;
    private static final TimeUnit DURATION_UNIT = TimeUnit.MILLISECONDS;

    protected XLog LOG = XLog.getLog(getClass());

    /**
     * Creates the MetricsInstrumentation and starts taking some metrics.
     */
    public MetricsInstrumentation() {
        metricRegistry = new MetricRegistry();

        isExternalMonitoringEnabled = ConfigurationService.getBoolean(EXTERNAL_MONITORING_ENABLE);
        if(isExternalMonitoringEnabled) {
            metricsServerName = ConfigurationService.get(EXTERNAL_MONITORING_TYPE);
            if (metricsServerName != null) {
                String modifiedServerName = metricsServerName.trim().toLowerCase();
                if (modifiedServerName.equals(GRAPHITE) || modifiedServerName.equals(GANGLIA)) {
                    metricsAddress = ConfigurationService.get(EXTERNAL_MONITORING_ADDRESS);
                    metricsPrefix = ConfigurationService.get(EXTERNAL_MONITORING_PREFIX);
                    metricsReportIntervalSec = ConfigurationService.getLong(EXTERNAL_MONITORING_INTERVAL);
                    LOG.debug("Publishing external monitoring to [{0}]  at host [{1}] every [{2}] seconds with prefix " +
                            "[{3}]", metricsServerName, metricsAddress, metricsReportIntervalSec, metricsPrefix);

                    try {
                        URL url = new URL(metricsAddress);
                        metricsHost = url.getHost();
                        metricsPort = url.getPort();
                    } catch (MalformedURLException e) {
                        LOG.error("Exception, ", e);
                    }

                    if (modifiedServerName.equals(GRAPHITE)) {
                        Graphite graphite = new Graphite(new InetSocketAddress(metricsHost, metricsPort));
                        graphiteReporter = GraphiteReporter.forRegistry(metricRegistry).prefixedWith(metricsPrefix)
                                .convertDurationsTo(TimeUnit.SECONDS).filter(MetricFilter.ALL).build(graphite);
                        graphiteReporter.start(metricsReportIntervalSec, TimeUnit.SECONDS);
                    }

                    if (modifiedServerName.equals(GANGLIA)) {
                        GMetric ganglia;
                        try {
                            ganglia = new GMetric(metricsHost, metricsPort, GMetric.UDPAddressingMode.MULTICAST, 1);
                        } catch (IOException e) {
                            LOG.error("Exception, ", e);
                            throw new RuntimeException(e);
                        }
                        gangliaReporter = GangliaReporter.forRegistry(metricRegistry).prefixedWith(metricsPrefix)
                                .convertRatesTo(TimeUnit.SECONDS)
                                .convertDurationsTo(TimeUnit.MILLISECONDS)
                                .build(ganglia);
                        gangliaReporter.start(metricsReportIntervalSec, TimeUnit.SECONDS);
                    }
                } else {
                    throw new RuntimeException("Metrics Server Name should be either graphite or ganglia");
                }
            }
            else {
                throw new RuntimeException("Metrics Server Name is not specified");
            }
        }

        timersLock = new ReentrantLock();
        gaugesLock = new ReentrantLock();
        countersLock = new ReentrantLock();
        histogramsLock = new ReentrantLock();

        // Used for writing the json for the metrics (see com.codahale.metrics.servlets.MetricsServlet)
        // The "false" is to prevent it from printing out all of the values used in the histograms and timers
        this.jsonMapper = new ObjectMapper().registerModule(new MetricsModule(RATE_UNIT, DURATION_UNIT, false));

        // Register the JVM memory gauges and prefix the keys
        MemoryUsageGaugeSet memorySet = new MemoryUsageGaugeSet();
        for (String key : memorySet.getMetrics().keySet()) {
            metricRegistry.register(MetricRegistry.name("jvm", "memory", key), memorySet.getMetrics().get(key));
        }

        // By setting this up as a cache, if a counter doesn't exist when we try to retrieve it, it will automatically be created
        counters = CacheBuilder.newBuilder().build(
                new CacheLoader<String, com.codahale.metrics.Counter>() {
                    @Override
                    public com.codahale.metrics.Counter load(String key) throws Exception {
                        com.codahale.metrics.Counter counter = new com.codahale.metrics.Counter();
                        metricRegistry.register(key, counter);
                        return counter;
                    }
                }
        );
        timers = CacheBuilder.newBuilder().build(
                new CacheLoader<String, com.codahale.metrics.Timer>() {
                    @Override
                    public com.codahale.metrics.Timer load(String key) throws Exception {
                        com.codahale.metrics.Timer timer
                                = new com.codahale.metrics.Timer(new ExponentiallyDecayingReservoir());
                        metricRegistry.register(key, timer);
                        return timer;
                    }
                }
        );
        gauges = new ConcurrentHashMap<String, Gauge>();
        histograms = new ConcurrentHashMap<String, Histogram>();
        isJMXMonitoringEnabled = ConfigurationService.getBoolean(JMX_MONITORING_ENABLE);
        if (isJMXMonitoringEnabled) {
            jmxReporter  = JmxReporter.forRegistry(metricRegistry).build();
            jmxReporter.start();
        }
        CollectorRegistry.defaultRegistry.register(new DropwizardExports(metricRegistry));
    }

    /**
     * Reporting final metrics into the server before stopping
     */
    @Override
    public void stop() {
        if (graphiteReporter != null) {
            try {
                // reporting final metrics into graphite before stopping
                graphiteReporter.report();
            } finally {
                graphiteReporter.stop();
            }
        }
        if (gangliaReporter != null) {
            try {
                // reporting final metrics into ganglia before stopping
                gangliaReporter.report();
            } finally {
                gangliaReporter.stop();
            }
        }

        if (jmxReporter != null) {
            jmxReporter.stop();
        }

        CollectorRegistry.defaultRegistry.clear();
    }

    /**
     * Add a cron to an instrumentation timer. The timer is created if it does not exists. <p>
     * Internally, this is backed by a {@link com.codahale.metrics.Timer}.
     *
     * @param group timer group.
     * @param name timer name.
     * @param cron cron to add to the timer.
     */
    @Override
    public void addCron(String group, String name, Cron cron) {
        String key = MetricRegistry.name(group, name, "timer");
        try {
            timersLock.lock();
            com.codahale.metrics.Timer timer = timers.get(key);
            timer.update(cron.getOwn(), TimeUnit.MILLISECONDS);
        } catch(ExecutionException ee) {
            throw new RuntimeException(ee);
        } finally {
            timersLock.unlock();
        }
    }

    /**
     * Add an instrumentation variable. <p>
     * Internally, this is backed by a {@link Gauge}.
     *
     * @param group counter group.
     * @param name counter name.
     * @param variable variable to add.
     */
    @Override
    public void addVariable(String group, String name, final Variable variable) {
        Gauge gauge = new Gauge() {
            @Override
            public Object getValue() {
                return variable.getValue();
            }
        };
        String key = MetricRegistry.name(group, name);

        try {
            gaugesLock.lock();
            gauges.put(key, gauge);
            // Metrics throws an Exception if we don't do this when the key already exists
            if (metricRegistry.getGauges().containsKey(key)) {
                XLog.getLog(MetricsInstrumentation.class).debug("A Variable with name [" + key + "] already exists. "
                        + " The old Variable will be overwritten, but this is not recommended");
                metricRegistry.remove(key);
            }
            metricRegistry.register(key, gauge);
        } finally {
            gaugesLock.unlock();
        }
    }

   /**
     * Increment an instrumentation counter. The counter is created if it does not exists. <p>
     * Internally, this is backed by a {@link Counter}.
     *
     * @param group counter group.
     * @param name counter name.
     * @param count increment to add to the counter.
     */
    @Override
    public void incr(String group, String name, long count) {
        String key = MetricRegistry.name(group, name);
        try {
            countersLock.lock();
            counters.get(key).inc(count);
        } catch(ExecutionException ee) {
            throw new RuntimeException(ee);
        } finally {
            countersLock.unlock();
        }
    }

    /**
     * Add a sampling variable. <p>
     * Internally, this is backed by a biased (decaying) {@link Histogram}.
     *
     * @param group timer group.
     * @param name timer name.
     * @param period (ignored)
     * @param interval sampling frequency, how often the variable is probed.
     * @param variable variable to sample.
     */
    @Override
    public void addSampler(String group, String name, int period, int interval, Variable<Long> variable) {
        if (scheduler == null) {
            throw new IllegalStateException("scheduler not set, cannot sample");
        }
        Histogram histogram = new Histogram(new ExponentiallyDecayingReservoir());
        Sampler sampler = new Sampler(variable, histogram);
        scheduler.scheduleAtFixedRate(sampler, 0, interval, TimeUnit.SECONDS);
        String key = MetricRegistry.name(group, name, "histogram");
        try {
            histogramsLock.lock();
            histograms.put(key, histogram);
            // Metrics throws an Exception if we don't do this when the key already exists
            if (metricRegistry.getHistograms().containsKey(key)) {
                XLog.getLog(MetricsInstrumentation.class).debug("A Sampler with name [" + key + "] already exists. "
                        + " The old Sampler will be overwritten, but this is not recommended");
                metricRegistry.remove(key);
            }
            metricRegistry.register(key, histogram);
        } finally {
            histogramsLock.unlock();
        }
    }

    public static class Sampler implements Runnable {
        private final Variable<Long> variable;
        private final Histogram histogram;
        public Sampler(Variable<Long> variable, Histogram histogram) {
            this.variable = variable;
            this.histogram = histogram;
        }

        @Override
        public void run() {
            histogram.update(variable.getValue());
        }
    }

    /**
     * Set the scheduler instance to handle the samplers.
     *
     * @param scheduler scheduler instance.
     */
    @Override
    public void setScheduler(ScheduledExecutorService scheduler) {
        this.scheduler = scheduler;
    }

    /**
     * Return the string representation of the instrumentation.  It does a JSON pretty-print.
     *
     * @return the string representation of the instrumentation.
     */
    @Override
    public String toString() {
        try {
            return jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(metricRegistry);
        } catch (JsonProcessingException jpe) {
            throw new RuntimeException(jpe);
        }
    }

    /**
     * Converts the current state of the metrics and writes them to the OutputStream.
     *
     * @param os The OutputStream to write the metrics to
     * @throws IOException in case of error during writing to the stream
     */
    public void writeJSONResponse(OutputStream os) throws IOException {
        jsonMapper.writer().writeValue(os, metricRegistry);
    }

    /**
     * Returns the MetricRegistry: public for unit tests -- do not use.
     *
     * @return the MetricRegistry
     */
    @VisibleForTesting
    public MetricRegistry getMetricRegistry() {
        return metricRegistry;
    }

    /**
     * Not Supported: throws {@link UnsupportedOperationException}
     *
     * @return nothing
     */
    @Override
    public Map<String, Map<String, Map<String, Object>>> getAll() {
        throw new UnsupportedOperationException();
    }

    /**
     * For backwards compatibility reasons with {@link Instrumentation}, create a deep copy of {@link #counters}:
     * <ul>
     *     <li>counter groups and names are separated by {@code "."}. Here we use Codahale Metrics internals to concatenate group
     *     and name pairs with {@code "."}</li>
     *     <li>no synchronization is done on {@link #counters} between calls to {@link LoadingCache#asMap()},
     *     {@link ConcurrentMap#keySet()}, and {@link ConcurrentMap#get(Object)}. Hence it's possible to get values that are
     *     not present anymore. It's also possible to get values that have been updated in the meanwhile, and not to get values
     *     that have been inserted in the meanwhile</li>
     * </ul>
     *
     * @return a deep copy of counter groups, names, and values
     */
    @Override
    public Map<String, Map<String, Element<Long>>> getCounters() {
        final ConcurrentMap<String, com.codahale.metrics.Counter> countersAsMap = counters.asMap();
        final Map<String, Map<String, Element<Long>>> countersAsDeepMap = new HashMap<>();

        for (final Map.Entry<String, com.codahale.metrics.Counter> counterEntry : countersAsMap.entrySet()) {
            final String groupAndName = counterEntry.getKey();
            final com.codahale.metrics.Counter value = counterEntry.getValue();
            final String group = groupAndName.substring(0, groupAndName.indexOf("."));
            final String name = groupAndName.substring(groupAndName.indexOf(".") + 1);

            if (!countersAsDeepMap.containsKey(group)) {
                countersAsDeepMap.put(group, new HashMap<>());
            }

            final Instrumentation.Counter counter = new Counter();
            counter.set(value.getCount());
            countersAsDeepMap.get(group).put(name, counter);
        }

        return countersAsDeepMap;
    }

    /**
     * Not Supported: throws {@link UnsupportedOperationException}
     *
     * @return nothing
     */
    @Override
    public Map<String, Map<String, Element<Double>>> getSamplers() {
        throw new UnsupportedOperationException();
    }

    /**
     * Not Supported: throws {@link UnsupportedOperationException}
     *
     * @return nothing
     */
    @Override
    public Map<String, Map<String, Element<Timer>>> getTimers() {
        throw new UnsupportedOperationException();
    }

    /**
     * Not Supported: throws {@link UnsupportedOperationException}
     *
     * @return nothing
     */
    @Override
    public Map<String, Map<String, Element<Variable>>> getVariables() {
        throw new UnsupportedOperationException();
    }
}
