/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.falcon.service;

import org.apache.commons.lang3.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.LifeCycle;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.SchemaHelper;
import org.apache.falcon.entity.v0.process.Cluster;
import org.apache.falcon.entity.v0.process.Clusters;
import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.jdbc.BacklogMetricStore;
import org.apache.falcon.metrics.MetricNotificationService;
import org.apache.falcon.resource.InstancesResult;
import org.apache.falcon.security.CurrentUser;
import org.apache.falcon.util.MetricInfo;
import org.apache.falcon.util.StartupProperties;
import org.apache.falcon.workflow.WorkflowExecutionContext;
import org.apache.falcon.workflow.WorkflowExecutionListener;
import org.apache.falcon.workflow.engine.AbstractWorkflowEngine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import static org.apache.falcon.workflow.WorkflowEngineFactory.getWorkflowEngine;

/**
 * Backlog Metric Emitter Service to publish metrics to Graphite.
 */
public final class BacklogMetricEmitterService implements FalconService,
        EntitySLAListener, WorkflowExecutionListener, ConfigurationChangeListener {

    private static final String METRIC_PREFIX = StartupProperties.get().getProperty("falcon.graphite.prefix");
    private static final String METRIC_SEPARATOR = ".";
    private static final String BACKLOG_METRIC_EMIT_INTERVAL = "falcon.backlog.metricservice.emit.interval.millisecs";
    private static final String BACKLOG_METRIC_RECHECK_INTERVAL = "falcon.backlog.metricservice."
            + "recheck.interval.millisecs";
    private static final String DEFAULT_PIPELINE = "DEFAULT";

    private static final Logger LOG = LoggerFactory.getLogger(BacklogMetricEmitterService.class);

    private static BacklogMetricStore backlogMetricStore = new BacklogMetricStore();

    private static final BacklogMetricEmitterService SERVICE = new BacklogMetricEmitterService();

    private static MetricNotificationService metricNotificationService =
            Services.get().getService(MetricNotificationService.SERVICE_NAME);

    private static final List<LifeCycle> PROCESS_LIFE_CYCLE =
            Arrays.asList(LifeCycle.valueOf(LifeCycle.EXECUTION.name()));

    public static BacklogMetricEmitterService get() {
        return SERVICE;
    }

    private BacklogMetricEmitterService() {
    }

    private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor1 = new ScheduledThreadPoolExecutor(1);
    private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor2 = new ScheduledThreadPoolExecutor(1);


    public static final ThreadLocal<SimpleDateFormat> DATE_FORMAT = new ThreadLocal<SimpleDateFormat>() {
        @Override
        protected SimpleDateFormat initialValue() {
            SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd'T'HH-mm'Z'");
            format.setTimeZone(TimeZone.getTimeZone("UTC"));
            return format;
        }
    };

    private static ConcurrentHashMap<Entity, List<MetricInfo>> entityBacklogs = new ConcurrentHashMap<>();

    @Override
    public void onAdd(Entity entity) throws FalconException{
        addToBacklog(entity);
    }

    @Override
    public void onRemove(Entity entity) throws FalconException{
        if (entity.getEntityType() != EntityType.PROCESS){
            return;
        }
        Process process = (Process) entity;
        if (process.getSla() != null) {
            backlogMetricStore.deleteEntityInstance(entity.getName());
            entityBacklogs.remove(entity);
            process = EntityUtil.getEntity(entity.getEntityType(), entity.getName());
            for (Cluster cluster : process.getClusters().getClusters()) {
                dropMetric(cluster.getName(), process);
            }
        }
    }

    public void dropMetric(String clusterName, Process process){
        String pipelinesStr = process.getPipelines();
        String metricName;

        if (pipelinesStr != null && !pipelinesStr.isEmpty()) {
            String[] pipelines = pipelinesStr.split(",");
            for (String pipeline : pipelines) {
                metricName = getMetricName(clusterName, process.getName(), pipeline);
                metricNotificationService.deleteMetric(metricName);
            }
        } else {
            metricName = getMetricName(clusterName, process.getName(), DEFAULT_PIPELINE);
            metricNotificationService.deleteMetric(metricName);
        }
    }

    @Override
    public void onChange(Entity oldEntity, Entity newEntity) throws FalconException{
        if (oldEntity.getEntityType() != EntityType.PROCESS){
            return;
        }
        Process newProcess = (Process) newEntity;
        Process oldProcess = EntityUtil.getEntity(oldEntity.getEntityType(), oldEntity.getName());
        if (newProcess.getSla() == null || newProcess.getSla().getShouldEndIn() == null){
            if (oldProcess.getSla() != null) {
                backlogMetricStore.deleteEntityInstance(newProcess.getName());
                entityBacklogs.remove(newProcess);
                for (Cluster cluster : oldProcess.getClusters().getClusters()) {
                    dropMetric(cluster.getName(), oldProcess);
                }
            }
        } else {
            addToBacklog(newEntity);
        }
    }

    @Override
    public void onReload(Entity entity) throws FalconException{
        addToBacklog(entity);
    }

    public void addToBacklog(Entity entity) {
        if (entity.getEntityType() != EntityType.PROCESS) {
            return;
        }
        Process process = (Process) entity;
        if (process.getSla() == null){
            return;
        }
        entityBacklogs.putIfAbsent(entity, Collections.synchronizedList(new ArrayList<MetricInfo>()));
    }

    @Override
    public void highSLAMissed(String entityName, String clusterName, EntityType entityType,
                              Date nominalTime) throws FalconException {
        if (entityType != EntityType.PROCESS) {
            return;
        }
        Entity entity = EntityUtil.getEntity(entityType, entityName);
        entityBacklogs.putIfAbsent(entity, Collections.synchronizedList(new ArrayList<MetricInfo>()));
        List<MetricInfo> metricInfoList = entityBacklogs.get(entity);
        String nominalTimeStr = DATE_FORMAT.get().format(nominalTime);
        MetricInfo metricInfo = new MetricInfo(nominalTimeStr, clusterName);
        if (!metricInfoList.contains(metricInfo)) {
            synchronized (metricInfoList) {
                backlogMetricStore.addInstance(entityName, clusterName, nominalTime, entityType);
                metricInfoList.add(metricInfo);
            }
        }
    }

    @Override
    public String getName() {
        return this.getClass().getSimpleName();
    }

    @Override
    public void init() throws FalconException {
        initInstances();
        int emitInterval = Integer.parseInt(StartupProperties.get().getProperty(BACKLOG_METRIC_EMIT_INTERVAL,
                "60000"));
        int recheckInterval = Integer.parseInt(StartupProperties.get().getProperty(BACKLOG_METRIC_RECHECK_INTERVAL,
                "60000"));
        scheduledThreadPoolExecutor1.scheduleAtFixedRate(new BacklogMetricEmitter(),
                1, emitInterval, TimeUnit.MILLISECONDS);
        scheduledThreadPoolExecutor2.scheduleAtFixedRate(new BacklogCheckService(),
                1, recheckInterval, TimeUnit.MILLISECONDS);
    }

    private void initInstances() throws FalconException {
        LOG.info("Initializing backlog instances from state store");
        Map<Entity, List<MetricInfo>> backlogInstances = backlogMetricStore.getAllInstances();
        if (backlogInstances != null && !backlogInstances.isEmpty()) {
            for (Map.Entry<Entity, List<MetricInfo>> entry : backlogInstances.entrySet()) {
                List<MetricInfo> metricsInDB = entry.getValue();
                List<MetricInfo> metricInfoList = Collections.synchronizedList(metricsInDB);
                entityBacklogs.put(entry.getKey(), metricInfoList);
                LOG.debug("Initializing backlog for entity " + entry.getKey().getName());
            }
        }
    }

    @Override
    public void destroy() throws FalconException {
        scheduledThreadPoolExecutor1.shutdown();
        scheduledThreadPoolExecutor2.shutdown();
    }

    @Override
    public synchronized void onSuccess(WorkflowExecutionContext context) throws FalconException {
        Entity entity = EntityUtil.getEntity(context.getEntityType(), context.getEntityName());
        if (entity.getEntityType() != EntityType.PROCESS) {
            return;
        }
        if (entityBacklogs.containsKey(entity)) {
            List<MetricInfo> metrics = entityBacklogs.get(entity);
            synchronized (metrics) {
                Date date = SchemaHelper.parseDateUTC(context.getNominalTimeAsISO8601());
                backlogMetricStore.deleteMetricInstance(entity.getName(), context.getClusterName(),
                        date, entity.getEntityType());
                metrics.remove(new MetricInfo(DATE_FORMAT.get().format(date), context.getClusterName()));
                if (metrics.isEmpty()) {
                    entityBacklogs.remove(entity);
                    publishBacklog((Process) entity, context.getClusterName(), 0L);
                }
            }
        }
    }

    @Override
    public void onFailure(WorkflowExecutionContext context) throws FalconException {
        // Do Nothing
    }

    @Override
    public void onStart(WorkflowExecutionContext context) throws FalconException {
        // Do Nothing
    }

    @Override
    public void onSuspend(WorkflowExecutionContext context) throws FalconException {
        // Do Nothing
    }

    @Override
    public void onWait(WorkflowExecutionContext context) throws FalconException {
        // Do Nothing
    }

    /**
     * Service which executes backlog evaluation and publishing metrics to Graphite parallel for entities.
     */
    public static class BacklogMetricEmitter implements Runnable {
        private ThreadPoolExecutor executor;

        @Override
        public void run() {
            LOG.debug("BacklogMetricEmitter running for entities");
            executor = new ScheduledThreadPoolExecutor(10);
            List<Future> futures = new ArrayList<>();
            try {
                for (Entity entity : entityBacklogs.keySet()) {
                    futures.add(executor.submit(new BacklogCalcService(entity, entityBacklogs.get(entity))));
                }
                waitForFuturesToComplete(futures);
            } finally {
                executor.shutdown();
            }
        }

        private void waitForFuturesToComplete(List<Future> futures) {
            try {
                for (Future future : futures) {
                    future.get();
                }
            } catch (InterruptedException e) {
                LOG.error("Interruption while executing tasks " + e);
            } catch (ExecutionException e) {
                LOG.error("Error in executing threads " + e);
            }
        }
    }

    /**
     * Service which calculates backlog for given entity and publish to graphite.
     */
    public static class BacklogCalcService implements Runnable {

        private Entity entityObj;
        private List<MetricInfo> metrics;

        BacklogCalcService(Entity entity, List<MetricInfo> metricInfoList) {
            this.entityObj = entity;
            this.metrics = metricInfoList;
        }

        @Override
        public void run() {

            MetricInfo metricInfo = null;
            HashMap<String, Long> backLogsCluster = new HashMap<>();
            synchronized (metrics) {
                if (metrics.isEmpty()){
                    Process process = (Process)entityObj;
                    Clusters clusters = process.getClusters();
                    for (Cluster cluster : clusters.getClusters()){
                        publishBacklog(process, cluster.getName(), 0L);
                    }
                }else{
                    long currentTime = System.currentTimeMillis();
                    Iterator iter = metrics.iterator();
                    while (iter.hasNext()) {
                        try {
                            metricInfo = (MetricInfo) iter.next();
                            long time = DATE_FORMAT.get().parse(metricInfo.getNominalTime()).getTime();
                            long backlog = backLogsCluster.containsKey(metricInfo.getCluster())
                                    ? backLogsCluster.get(metricInfo.getCluster()) : 0;
                            backlog += (currentTime - time);
                            backLogsCluster.put(metricInfo.getCluster(), backlog);
                        } catch (ParseException e) {
                            LOG.error("Unable to parse nominal time" + metricInfo.getNominalTime());
                        }
                    }

                }
            }
            org.apache.falcon.entity.v0.process.Process process = (Process) entityObj;

            if (backLogsCluster != null && !backLogsCluster.isEmpty()) {
                for (Map.Entry<String, Long> entry : backLogsCluster.entrySet()) {
                    String clusterName = entry.getKey();
                    Long backlog = entry.getValue() / (60 * 1000L); // Converting to minutes
                    publishBacklog(process, clusterName, backlog);
                }
            }
        }
    }


    public static void publishBacklog(Process process, String clusterName, Long backlog){
        String pipelinesStr = process.getPipelines();
        String metricName;

        if (pipelinesStr != null && !pipelinesStr.isEmpty()) {
            String[] pipelines = pipelinesStr.split(",");
            for (String pipeline : pipelines) {
                metricName = getMetricName(clusterName, process.getName(), pipeline);
                metricNotificationService.publish(metricName, backlog);
            }
        } else {
            metricName = getMetricName(clusterName, process.getName(), DEFAULT_PIPELINE);
            metricNotificationService.publish(metricName, backlog);
        }
    }

    public static String getMetricName(String clusterName, String processName, String pipeline){
        String metricName = METRIC_PREFIX + METRIC_SEPARATOR + clusterName + METRIC_SEPARATOR
                + pipeline + METRIC_SEPARATOR + LifeCycle.EXECUTION.name()
                + METRIC_SEPARATOR + processName + METRIC_SEPARATOR
                + "backlogInMins";
        return metricName;
    }

    /**
     * Service runs periodically and removes succeeded instances from backlog list.
     */
    public static class BacklogCheckService implements Runnable {

        @Override
        public void run() {
            LOG.trace("BacklogCheckService running for entities");
            try {
                AbstractWorkflowEngine wfEngine = getWorkflowEngine();
                for (Entity entity : entityBacklogs.keySet()) {
                    List<MetricInfo> metrics = entityBacklogs.get(entity);
                    if (!metrics.isEmpty()) {
                        synchronized (metrics) {
                            Iterator iterator = metrics.iterator();
                            while (iterator.hasNext()) {
                                MetricInfo metricInfo = (MetricInfo) iterator.next();
                                String nominalTimeStr = metricInfo.getNominalTime();
                                Date nominalTime;
                                try {
                                    nominalTime = DATE_FORMAT.get().parse(nominalTimeStr);
                                    authenticateUser(entity);
                                    if (wfEngine.isMissing(entity)) {
                                        LOG.info("Entity of name {} was deleted so removing instance of "
                                                + "nominaltime {} ", entity.getName(), nominalTimeStr);
                                        backlogMetricStore.deleteMetricInstance(entity.getName(),
                                                metricInfo.getCluster(), nominalTime, entity.getEntityType());
                                        iterator.remove();
                                        continue;
                                    }
                                    InstancesResult status = wfEngine.getStatus(entity, nominalTime,
                                            new Date(nominalTime.getTime() + 200), PROCESS_LIFE_CYCLE, false);
                                    if (status.getInstances().length > 0
                                            && status.getInstances()[0].status == InstancesResult.
                                            WorkflowStatus.SUCCEEDED) {
                                        LOG.debug("Instance of nominaltime {} of entity {} has succeeded, removing "
                                                + "from backlog entries", nominalTimeStr, entity.getName());
                                        backlogMetricStore.deleteMetricInstance(entity.getName(),
                                                metricInfo.getCluster(), nominalTime, entity.getEntityType());
                                        iterator.remove();
                                    }
                                } catch (ParseException e) {
                                    LOG.error("Unable to parse date " + nominalTimeStr);
                                }
                            }
                        }
                    }
                }
            } catch (Throwable e) {
                LOG.error("Error while checking backlog metrics" + e);
            }
        }
    }

    private static void authenticateUser(Entity entity){
        if (!CurrentUser.isAuthenticated()) {
            if (StringUtils.isNotBlank(entity.getACL().getOwner())) {
                CurrentUser.authenticate(entity.getACL().getOwner());
            } else {
                CurrentUser.authenticate(System.getProperty("user.name"));
            }
        }
    }

}
