/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.pig.tools.pigstats.tez;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import java.util.Map;

import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapred.JobClient;
import org.apache.pig.PigRunner.ReturnCode;
import org.apache.pig.backend.hadoop.executionengine.tez.TezExecType;
import org.apache.pig.backend.hadoop.executionengine.tez.TezJob;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainer;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainerNode;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainerVisitor;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.NativeTezOper;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.plan.DependencyOrderWalker;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.LogUtils;
import org.apache.pig.tools.pigstats.InputStats;
import org.apache.pig.tools.pigstats.OutputStats;
import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.tools.pigstats.ScriptState;
import org.apache.pig.tools.pigstats.tez.TezScriptState.TezDAGScriptInfo;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.client.DAGStatus;

import com.google.common.collect.Maps;

/*
 * TezPigScriptStats encapsulates the statistics collected from a running script executed in Tez mode.
 * It includes status of the execution, the Tez DAGs launched for the script, as well as
 * information about outputs and inputs of the script.
 *
 * TezPigScriptStats encapsulates multiple TezDAGStats. TezDAGStats encapsulates multiple
 * TezVertexStats
 */
public class TezPigScriptStats extends PigStats {
    private static final Log LOG = LogFactory.getLog(TezPigScriptStats.class);

    private TezScriptState tezScriptState;
    private Map<String, TezDAGStats> tezDAGStatsMap;

    /**
     * This class builds the graph of Tez DAGs to be executed.
     */
    private class DAGGraphBuilder extends TezPlanContainerVisitor {
        public DAGGraphBuilder(TezPlanContainer planContainer) {
            super(planContainer, new DependencyOrderWalker<TezPlanContainerNode, TezPlanContainer>(planContainer));
        }

        @Override
        public void visitTezPlanContainerNode(TezPlanContainerNode tezPlanNode) throws VisitorException {
            TezScriptState ss = TezScriptState.get();
            TezDAGScriptInfo dagScriptInfo = ss.setDAGScriptInfo(tezPlanNode);
            TezDAGStats.TezDAGStatsBuilder builder = new TezDAGStats.TezDAGStatsBuilder(tezPlanNode, dagScriptInfo);
            TezDAGStats currStats = builder.build();
            jobPlan.add(currStats);
            List<TezPlanContainerNode> preds = getPlan().getPredecessors(tezPlanNode);
            if (preds != null) {
                for (TezPlanContainerNode pred : preds) {
                    TezDAGStats predStats = tezDAGStatsMap.get(pred.getOperatorKey().toString());
                    if (!jobPlan.isConnected(predStats, currStats)) {
                        jobPlan.connect(predStats, currStats);
                    }
                }
            }
            tezDAGStatsMap.put(tezPlanNode.getOperatorKey().toString(), currStats);
        }
    }

    public TezPigScriptStats(PigContext pigContext) {
        this.pigContext = pigContext;
        this.jobPlan = new JobGraph();
        this.tezDAGStatsMap = Maps.newHashMap();
        this.tezScriptState = (TezScriptState) ScriptState.get();
    }

    public void initialize(TezPlanContainer tezPlanContainer) {
        super.start();
        try {
            new DAGGraphBuilder(tezPlanContainer).visit();
        } catch (FrontendException e) {
            LOG.warn("Unable to build Tez DAG", e);
        }
    }

    public void finish() {
        super.stop();
        try {
            display();
        } catch (Throwable e) {
            LOG.warn("Exception while displaying stats:", e);
        }
    }

    private void display() {
        LOG.info(getDisplayString());
    }

    @Override
    public String getDisplayString() {
        SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMAT);
        StringBuilder sb = new StringBuilder();
        sb.append("\n");
        sb.append(String.format("%1$20s: %2$-100s%n", "HadoopVersion", getHadoopVersion()));
        sb.append(String.format("%1$20s: %2$-100s%n", "PigVersion", getPigVersion()));
        sb.append(String.format("%1$20s: %2$-100s%n", "TezVersion", TezExecType.getTezVersion()));
        sb.append(String.format("%1$20s: %2$-100s%n", "UserId", userId));
        sb.append(String.format("%1$20s: %2$-100s%n", "FileName", getFileName()));
        sb.append(String.format("%1$20s: %2$-100s%n", "StartedAt", sdf.format(new Date(startTime))));
        sb.append(String.format("%1$20s: %2$-100s%n", "FinishedAt", sdf.format(new Date(endTime))));
        sb.append(String.format("%1$20s: %2$-100s%n", "Features", getFeatures()));
        sb.append("\n");
        if (returnCode == ReturnCode.SUCCESS) {
            sb.append("Success!\n");
        } else if (returnCode == ReturnCode.PARTIAL_FAILURE) {
            sb.append("Some tasks have failed! Stop running all dependent tasks\n");
        } else {
            sb.append("Failed!\n");
        }
        sb.append("\n");

        // Print diagnostic info in case of failure
        if (returnCode == ReturnCode.FAILURE
                || returnCode == ReturnCode.PARTIAL_FAILURE) {
            if (errorMessage != null) {
                String[] lines = errorMessage.split("\n");
                for (int i = 0; i < lines.length; i++) {
                    String s = lines[i].trim();
                    if (i == 0 || !StringUtils.isEmpty(s)) {
                        sb.append(String.format("%1$20s: %2$-100s%n", i == 0 ? "ErrorMessage" : "", s));
                    }
                }
                sb.append("\n");
            }
        }

        int count = 0;
        for (TezDAGStats dagStats : tezDAGStatsMap.values()) {
            sb.append("\n");
            sb.append("DAG " + count++ + ":\n");
            sb.append(dagStats.getDisplayString());
            sb.append("\n");
        }

        sb.append("Input(s):\n");
        for (InputStats is : getInputStats()) {
            sb.append(is.getDisplayString().trim()).append("\n");
        }
        sb.append("\n");
        sb.append("Output(s):\n");
        for (OutputStats os : getOutputStats()) {
            sb.append(os.getDisplayString().trim()).append("\n");
        }
        return "Script Statistics:\n" + sb.toString();
    }

    /**
     * Updates the statistics after a DAG is finished.
     */
    public void accumulateStats(TezJob tezJob) throws IOException {
        DAGStatus dagStatus = tezJob.getDAGStatus();
        TezDAGStats tezDAGStats = tezDAGStatsMap.get(tezJob.getName());
        if (dagStatus == null) {
            tezDAGStats.setSuccessful(false);
            tezScriptState.emitJobFailedNotification(tezDAGStats);
            return;
        } else {
            tezDAGStats.accumulateStats(tezJob);
            for(OutputStats output: tezDAGStats.getOutputs()) {
                tezScriptState.emitOutputCompletedNotification(output);
            }
            if (dagStatus.getState() == DAGStatus.State.SUCCEEDED) {
                tezDAGStats.setSuccessful(true);
                tezScriptState.emitjobFinishedNotification(tezDAGStats);
            } else if (dagStatus.getState() == DAGStatus.State.FAILED) {
                tezDAGStats.setSuccessful(false);
                String diagnostics = tezJob.getDiagnostics();
                tezDAGStats.setErrorMsg(diagnostics);
                tezDAGStats.setBackendException(new TezException(diagnostics));
                boolean stop_on_failure =
                        Boolean.valueOf(pigContext.getProperties().getProperty("stop.on.failure", "false"));
                if (stop_on_failure) {
                    LogUtils.writeLog("Backend error message",
                            diagnostics, pigContext.getProperties()
                                    .getProperty("pig.logfile"),
                            LOG);
                }
                tezScriptState.emitJobFailedNotification(tezDAGStats);
            }
            tezScriptState.dagCompletedNotification(tezJob.getName(), tezDAGStats);
        }

        if (!tezDAGStats.isSuccessful()) {
            String outputCommitOnDAGSuccess = pigContext.getProperties().getProperty(
                    TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS);
            if ((outputCommitOnDAGSuccess == null && TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS_DEFAULT)
                    || "true".equals(outputCommitOnDAGSuccess)) {
                for (OutputStats stats : tezDAGStats.getOutputs()) {
                    stats.setSuccessful(false);
                }
            }
        }
    }

    public TezDAGStats addTezJobStatsForNative(String dagName, NativeTezOper tezOper, boolean success) {
        TezDAGStats js = tezDAGStatsMap.get(dagName);
        js.setJobId(tezOper.getJobId());
        js.setSuccessful(success);
        return js;
    }

    public TezVertexStats getVertexStats(String dagName, String vertexName) {
        TezDAGStats tezDAGStats = tezDAGStatsMap.get(dagName);
        return tezDAGStats == null ? null : tezDAGStats.getVertexStats(vertexName);
    }

    @Override
    public boolean isEmbedded() {
        return false;
    }

    @Override
    public JobClient getJobClient() {
        throw new UnsupportedOperationException();
    }

    @Override
    public Map<String, List<PigStats>> getAllStats() {
        throw new UnsupportedOperationException();
    }

    @Override
    public List<String> getAllErrorMessages() {
        throw new UnsupportedOperationException();
    }

    @Override
    public long getSMMSpillCount() {
        long ret = 0;
        for (TezDAGStats dagStats : tezDAGStatsMap.values()) {
            ret += dagStats.getSMMSpillCount();
        }
        return ret;
    }

    @Override
    public long getProactiveSpillCountObjects() {
        long ret = 0;
        for (TezDAGStats dagStats : tezDAGStatsMap.values()) {
            ret += dagStats.getProactiveSpillCountObjects();
        }
        return ret;
    }

    @Override
    public long getProactiveSpillCountRecords() {
        long ret = 0;
        for (TezDAGStats dagStats : tezDAGStatsMap.values()) {
            ret += dagStats.getProactiveSpillCountRecs();
        }
        return ret;
    }

}
