/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.drill.test;

import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import javax.json.Json;
import javax.json.JsonArray;
import javax.json.JsonNumber;
import javax.json.JsonObject;
import javax.json.JsonReader;
import javax.json.JsonValue;

import org.apache.drill.exec.server.rest.profile.CoreOperatorType;
import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Parses a query profile and provides access to various bits of the profile
 * for diagnostic purposes during tests.
 */

public class ProfileParser {
  private static final Logger logger = LoggerFactory.getLogger(ProfileParser.class);

  /**
   * The original JSON profile.
   */

  JsonObject profile;

  /**
   * Query text parsed out of the profile.
   */

  String query;

  /**
   * List of operator plans in the order in which they appear in the query
   * plan section of the profile. This is an intermediate representation used
   * to create the more fully analyzed structures.
   */

  List<String> plans;

  /**
   * Operations sorted by operator ID. The Operator ID serves as
   * an index into the list to get the information for that operator.
   * Operator ID is the one shown in the plan: xx-nn, where nn is the
   * operator ID. This is NOT the same as the operator type.
   */

  List<OperatorSummary> operations;

  /**
   * Map from major fragment number to fragment information. The major
   * fragment number is the nn in the nn-xx notation in the plan.
   */

  Map<Integer,FragInfo> fragments = new HashMap<>();

  /**
   * Operations in the original topological order as shown in the text
   * version of the query plan in the query profile.
   */
  private List<OperatorSummary> topoOrder;

  public ProfileParser(File file) throws IOException {
    try (FileReader fileReader = new FileReader(file);
         JsonReader reader = Json.createReader(fileReader)) {
      profile = (JsonObject) reader.read();
    }

    parse();
  }

  private void parse() {
    parseQuery();
    parsePlans();
    buildFrags();
    parseFragProfiles();
    mapOpProfiles();
    aggregateOpers();
    buildTree();
  }

  private void parseQuery() {
    query = profile.getString("query");
    query = query.replace("//n", "\n");
  }

  /**
   * Parse a text version of the plan as it appears in the JSON
   * query profile.
   */

  private static class PlanParser {

    List<String> plans = new ArrayList<>();
    List<OperatorSummary> operations = new ArrayList<>();
    List<OperatorSummary> sorted = new ArrayList<>();

    public void parsePlans(String plan) {
      plans = new ArrayList<>();
      String[] parts = plan.split("\n");
      for (String part : parts) {
        plans.add(part);
        OperatorSummary opDef = new OperatorSummary(part);
        operations.add(opDef);
      }
      sortList();
    }

    private void sortList() {
      List<OperatorSummary> raw = new ArrayList<>();
      raw.addAll(operations);
      Collections.sort(raw, new Comparator<OperatorSummary>() {
        @Override
        public int compare(OperatorSummary o1, OperatorSummary o2) {
          int result = Integer.compare(o1.majorId, o2.majorId);
          if (result == 0) {
            result = Integer.compare(o1.stepId, o2.stepId);
          }
          return result;
        }
      });
      int currentFrag = 0;
      int currentStep = 0;
      for (OperatorSummary opDef : raw) {
        if (currentFrag < opDef.majorId) {
          currentFrag++;
          OperatorSummary sender = new OperatorSummary(currentFrag, 0);
          sender.isInferred = true;
          sender.name = "Sender";
          sorted.add(sender);
          currentStep = 1;
          opDef.inferredParent = sender;
          sender.children.add(opDef);
        }
        if (opDef.stepId > currentStep) {
          OperatorSummary unknown = new OperatorSummary(currentFrag, currentStep);
          unknown.isInferred = true;
          unknown.name = "Unknown";
          sorted.add(unknown);
          opDef.inferredParent = unknown;
          unknown.children.add(opDef);
        }
        sorted.add(opDef);
        currentStep = opDef.stepId + 1;
      }
    }
  }

  /**
   * Parse the plan portion of the query profile. Unfortunately,
   * the plan is in text form an is awkward to parse. Also, there is no ID
   * to correlate operators shown in the plan with those referenced in the
   * profile JSON. Inference is needed.
   */

  private void parsePlans() {
    PlanParser parser = new PlanParser();
    String plan = getPlan();
    parser.parsePlans(plan);
    plans = parser.plans;
    topoOrder = parser.operations;
    operations = parser.sorted;
  }

  private void buildFrags() {
    for (OperatorSummary opDef : operations) {
      FragInfo major = fragments.get(opDef.majorId);
      if (major == null) {
        major = new FragInfo(opDef.majorId);
        fragments.put(opDef.majorId, major);
      }
      major.ops.add(opDef);
    }
  }

  private static List<FieldDef> parseCols(String cols) {
    String[] parts = cols.split(", ");
    List<FieldDef> fields = new ArrayList<>();
    for (String part : parts) {
      String[] halves = part.split(" ");
      fields.add(new FieldDef(halves[1], halves[0]));
    }
    return fields;
  }

  private void parseFragProfiles() {
    JsonArray frags = getFragmentProfile();
    for (JsonObject fragProfile : frags.getValuesAs(JsonObject.class)) {
      int mId = fragProfile.getInt("majorFragmentId");
      FragInfo major = fragments.get(mId);
      major.parse(fragProfile);
    }
  }

  private void mapOpProfiles() {
    for (FragInfo major : fragments.values()) {
      for (MinorFragInfo minor : major.minors) {
        minor.mapOpProfiles(major);
      }
    }
  }

  /**
   * A typical plan has many operator details across multiple
   * minor fragments. Aggregate these totals to the "master"
   * definition of each operator.
   */

  private void aggregateOpers() {
    for (FragInfo major : fragments.values()) {
      for (OperatorSummary opDef : major.ops) {
        long sumPeak = 0;
        opDef.execCount = opDef.opExecs.size();
        for (OperatorProfile op : opDef.opExecs) {
          Preconditions.checkState(major.id == op.majorFragId);
          Preconditions.checkState(opDef.stepId == op.opId);
          opDef.actualRows += op.records;
          opDef.actualBatches += op.batches;
          opDef.setupMs += op.setupMs;
          opDef.processMs += op.processMs;
          sumPeak += op.peakMem;
        }
        opDef.actualMemory = sumPeak * 1024 * 1024;
      }
    }
  }

  /**
   * Reconstruct the operator tree from parsed information.
   */

  public void buildTree() {
    int currentLevel = 0;
    OperatorSummary[] opStack = new OperatorSummary[topoOrder.size()];
    for (OperatorSummary opDef : topoOrder) {
      currentLevel = opDef.globalLevel;
      opStack[currentLevel] = opDef;
      if (opDef.inferredParent == null) {
        if (currentLevel > 0) {
          opStack[currentLevel-1].children.add(opDef);
        }
      } else {
        opStack[currentLevel-1].children.add(opDef.inferredParent);
      }
    }
  }


  public String getQuery() {
    return profile.getString("query");
  }

  public String getPlan() {
    return profile.getString("plan");
  }

  public List<String> getPlans() {
    return plans;
  }

  public List<String> getScans() {
    List<String> scans = new ArrayList<>();
    int n = getPlans().size();
    for (int i = n-1; i >= 0;  i--) {
      String plan = plans.get(i);
      if (plan.contains(" Scan(")) {
        scans.add(plan);
      }
    }
    return scans;
  }

  public List<FieldDef> getColumns(String plan) {
    Pattern p = Pattern.compile("RecordType\\((.*)\\):");
    Matcher m = p.matcher(plan);
    if (! m.find()) { return null; }
    String frag = m.group(1);
    String[] parts = frag.split(", ");
    List<FieldDef> fields = new ArrayList<>();
    for (String part : parts) {
      String[] halves = part.split(" ");
      fields.add(new FieldDef(halves[1], halves[0]));
    }
    return fields;
  }

  public Map<Integer,String> getOperators() {
    Map<Integer,String> ops = new HashMap<>();
    int n = getPlans().size();
    Pattern p = Pattern.compile("\\d+-(\\d+)\\s+(\\w+)");
    for (int i = n-1; i >= 0;  i--) {
      String plan = plans.get(i);
      Matcher m = p.matcher(plan);
      if (! m.find()) { continue; }
      int index = Integer.parseInt(m.group(1));
      String op = m.group(2);
      ops.put(index,op);
    }
    return ops;
  }

  public JsonArray getFragmentProfile() {
    return profile.getJsonArray("fragmentProfile");
  }

  /**
   * Information for a fragment, including the operators
   * in that fragment and the set of minor fragments.
   */

  public static class FragInfo {
    public int baseLevel;
    public int id;
    public List<OperatorSummary> ops = new ArrayList<>();
    public List<MinorFragInfo> minors = new ArrayList<>();

    public FragInfo(int majorId) {
      this.id = majorId;
    }

    public OperatorSummary getRootOperator() {
      return ops.get(0);
    }

    public void parse(JsonObject fragProfile) {
      JsonArray minorList = fragProfile.getJsonArray("minorFragmentProfile");
      for (JsonObject minorProfile : minorList.getValuesAs(JsonObject.class)) {
        minors.add(new MinorFragInfo(id, minorProfile));
      }
    }
  }

  /**
   * Information about a minor fragment as parsed from the profile.
   */

  public static class MinorFragInfo {
    public final int majorId;
    public final int id;
    public final List<OperatorProfile> ops = new ArrayList<>();

    public MinorFragInfo(int majorId, JsonObject minorProfile) {
      this.majorId = majorId;
      id = minorProfile.getInt("minorFragmentId");
      JsonArray opList = minorProfile.getJsonArray("operatorProfile");
      for (JsonObject opProfile : opList.getValuesAs(JsonObject.class)) {
        ops.add(new OperatorProfile(majorId, id, opProfile));
      }
    }

    /**
     * Map each operator execution profiles back to the definition of that
     * operator. The only common key is the xx-yy value where xx is the fragment
     * number and yy is the operator ID.
     *
     * @param major major fragment that corresponds to the xx portion of the
     * operator id
     */

    public void mapOpProfiles(FragInfo major) {
      for (OperatorProfile op : ops) {
        OperatorSummary opDef = major.ops.get(op.opId);
        if (opDef == null) {
          logger.info("Can't find operator def: {}-{}", major.id, op.opId);
          continue;
        }
        op.opName = op.type.replace("_", " ");
        op.name = opDef.name;
        if (op.name.equalsIgnoreCase(op.opName)) {
          op.opName = null;
        }
        op.defn = opDef;
        opDef.opName = op.opName;
        opDef.type = op.type;
        opDef.opExecs.add(op);
      }
    }
  }

  /**
   * Detailed information about each operator within a minor fragment
   * for a major fragment. Gathers the detailed information from
   * the profile.
   */

  public static class OperatorProfile {
    public OperatorSummary defn;
    public String opName;
    public int majorFragId;
    public int minorFragId;
    public int opId;
    public String type;
    public String name;
    public long processMs;
    public long waitMs;
    public long setupMs;
    public long peakMem;
    public Map<Integer,JsonNumber> metrics = new HashMap<>();
    public long records;
    public int batches;
    public int schemas;

    public OperatorProfile(int majorId, int minorId, JsonObject opProfile) {
      majorFragId = majorId;
      minorFragId = minorId;
      opId = opProfile.getInt("operatorId");
      JsonValue.ValueType valueType = opProfile.get("operatorType").getValueType();
      type = valueType == JsonValue.ValueType.STRING
          ? opProfile.getString("operatorType")
          : Objects.requireNonNull(CoreOperatorType.valueOf(opProfile.getInt("operatorType"))).name();
      processMs = opProfile.getJsonNumber("processNanos").longValue() / 1_000_000;
      waitMs = opProfile.getJsonNumber("waitNanos").longValue() / 1_000_000;
      setupMs = opProfile.getJsonNumber("setupNanos").longValue() / 1_000_000;
      peakMem = opProfile.getJsonNumber("peakLocalMemoryAllocated").longValue() / (1024 * 1024);
      JsonArray array = opProfile.getJsonArray("inputProfile");
      if (array != null) {
        for (int i = 0; i < array.size(); i++) {
          JsonObject obj = array.getJsonObject(i);
          records += obj.getJsonNumber("records").longValue();
          batches += obj.getInt("batches");
          schemas += obj.getInt("schemas");
        }
      }
      array = opProfile.getJsonArray("metric");
      if (array != null) {
        for (int i = 0; i < array.size(); i++) {
          JsonObject metric = array.getJsonObject(i);
          metrics.put(metric.getJsonNumber("metricId").intValue(), metric.getJsonNumber("longValue"));
        }
      }
    }

    public long getMetric(int id) {
      JsonNumber value = metrics.get(id);
      if (value == null) {
        return 0;
      }
      return value.longValue();
    }

    @Override
    public String toString() {
      return String.format("[OperatorProfile %02d-%02d-%02d, type: %s, name: %s]",
          majorFragId, opId, minorFragId, type,
          (name == null) ? "null" : name);
    }
  }

  /**
   * Information about an operator definition: the plan-time information
   * that appears in the plan portion of the profile. Also holds the
   * "actuals" from the minor fragment portion of the profile.
   * Allows integrating the "planned" vs. "actual" performance of the
   * query.
   * <p>
   * There is one operator definition (represented here), each of which may
   * give rise to multiple operator executions (housed in minor fragments.)
   * The {@link #opExecs} field provides the list of operator executions
   * (which provides access to operator metrics.)
   */

  public static class OperatorSummary {
    public String type;
    public long processMs;
    public long setupMs;
    public int execCount;
    public String opName;
    public boolean isInferred;
    public int majorId;
    public int stepId;
    public String args;
    public List<FieldDef> columns;
    public int globalLevel;
    public int localLevel;
    public int id;
    public int branchId;
    public boolean isBranchRoot;
    public double estMemoryCost;
    public double estNetCost;
    public double estIOCost;
    public double estCpuCost;
    public double estRowCost;
    public double estRows;
    public String name;
    public long actualMemory;
    public int actualBatches;
    public long actualRows;
    public OperatorSummary inferredParent;
    public List<OperatorProfile> opExecs = new ArrayList<>();
    public List<OperatorSummary> children = new ArrayList<>();

    // 00-00    Screen : rowType = RecordType(VARCHAR(10) Year, VARCHAR(65536) Month, VARCHAR(100) Devices, VARCHAR(100) Tier, VARCHAR(100) LOB, CHAR(10) Gateway, BIGINT Day, BIGINT Hour, INTEGER Week, VARCHAR(100) Week_end_date, BIGINT Usage_Cnt): \
    // rowcount = 100.0, cumulative cost = {7.42124276972414E9 rows, 7.663067406383167E10 cpu, 0.0 io, 2.24645048816E10 network, 2.692766612982188E8 memory}, id = 129302
    //
    // 00-01      Project(Year=[$0], Month=[$1], Devices=[$2], Tier=[$3], LOB=[$4], Gateway=[$5], Day=[$6], Hour=[$7], Week=[$8], Week_end_date=[$9], Usage_Cnt=[$10]) :
    // rowType = RecordType(VARCHAR(10) Year, VARCHAR(65536) Month, VARCHAR(100) Devices, VARCHAR(100) Tier, VARCHAR(100) LOB, CHAR(10) Gateway, BIGINT Day, BIGINT Hour, INTEGER Week, VARCHAR(100) Week_end_date, BIGINT Usage_Cnt): rowcount = 100.0, cumulative cost = {7.42124275972414E9 rows, 7.663067405383167E10 cpu, 0.0 io, 2.24645048816E10 network, 2.692766612982188E8 memory}, id = 129301

    public OperatorSummary(String plan) {
      Pattern p = Pattern.compile("^(\\d+)-(\\d+)(\\s+)(\\w+)(?:\\((.*)\\))?\\s*:\\s*(.*)$");
      Matcher m = p.matcher(plan);
      if (!m.matches()) {
        throw new IllegalStateException("Could not parse plan: " + plan);
      }
      majorId = Integer.parseInt(m.group(1));
      stepId = Integer.parseInt(m.group(2));
      name = m.group(4);
      args = m.group(5);
      String tail = m.group(6);
      String indent = m.group(3);
      globalLevel = (indent.length() - 4) / 2;

      p = Pattern.compile("rowType = RecordType\\((.*)\\): (rowcount .*)");
      m = p.matcher(tail);
      if (m.matches()) {
        columns = parseCols(m.group(1));
        tail = m.group(2);
      }

      p = Pattern.compile("rowcount = ([\\d.E]+), cumulative cost = \\{([\\d.E]+) rows, ([\\d.E]+) cpu, ([\\d.E]+) io, ([\\d.E]+) network, ([\\d.E]+) memory\\}, id = (\\d+)");
      m = p.matcher(tail);
      if (! m.find()) {
        throw new IllegalStateException("Could not parse costs: " + tail);
      }
      estRows = Double.parseDouble(m.group(1));
      estRowCost = Double.parseDouble(m.group(2));
      estCpuCost = Double.parseDouble(m.group(3));
      estIOCost = Double.parseDouble(m.group(4));
      estNetCost = Double.parseDouble(m.group(5));
      estMemoryCost = Double.parseDouble(m.group(6));
      id = Integer.parseInt(m.group(7));
    }

    public void printTree(String indent) {
      new TreePrinter().visit(this);
    }

    public OperatorSummary(int major, int id) {
      majorId = major;
      stepId = id;
    }

    @Override
    public String toString() {
      String head = "[OpDefInfo " + majorId + "-" + stepId + ": " + name;
      if (isInferred) {
        head += " (" + opName + ")";
      }
      return head + "]";
    }
  }

  /**
   * Visit a tree of operator definitions to support printing,
   * analysis and other tasks.
   */

  public static class TreeVisitor
  {
    public void visit(OperatorSummary root) {
      visit(root, 0);
    }

    public void visit(OperatorSummary node, int indent) {
      visitOp(node, indent);
      if (node.children.isEmpty()) {
        return;
      }
      if (node.children.size() == 1) {
        visit(node.children.get(0), indent);
        return;
      }
      indent++;
      int i = 0;
      for (OperatorSummary child : node.children) {
        visitSubtree(node, i++, indent);
        visit(child, indent+1);
      }
    }

    protected void visitOp(OperatorSummary node, int indent) {
    }

    protected void visitSubtree(OperatorSummary node, int i, int indent) {
    }

    public String indentString(int indent, String pad) {
      StringBuilder buf = new StringBuilder();
      for (int i = 0; i < indent; i++) {
        buf.append(pad);
      }
      return buf.toString();
    }

    public String indentString(int indent) {
      return indentString(indent, "  ");
    }

    public String subtreeLabel(OperatorSummary node, int branch) {
      if (node.name.equals("HashJoin")) {
        return (branch == 0) ? "Probe" : "Build";
      } else {
        return "Input " + (branch + 1);
      }
    }
  }

  /**
   * Print the operator tree for analysis.
   */

  public static class TreePrinter extends TreeVisitor
  {
    @Override
    protected void visitOp(OperatorSummary node, int indent) {
      logger.info("{}{}", indentString(indent), node.toString());
    }

    @Override
    protected void visitSubtree(OperatorSummary node, int i, int indent) {
      logger.info("{}{}", indentString(indent), subtreeLabel(node, i));
    }
  }

  /**
   * Print out the tree showing a comparison of estimated vs.
   * actual costs. Example:
   * <p><pre>
   * 03-05 HashJoin (HASH JOIN)
   *                                 Estimate:       2,521,812 rows,      1 MB
   *                                 Actual:           116,480 rows,     52 MB
   *         Probe
   * 03-07 . . Project
   *                                 Estimate:       2,521,812 rows,      1 MB
   *                                 Actual:                 0 rows,      0 MB
   * </pre>
   */

  public static class CostPrinter extends TreeVisitor
  {
    @Override
    protected void visitOp(OperatorSummary node, int indentLevel) {
      final StringBuilder nodeBuilder = new StringBuilder();
      nodeBuilder.append(String.format("%02d-%02d ", node.majorId, node.stepId));
      String indent = indentString(indentLevel, ". ");
      nodeBuilder.append(indent).append(node.name);
      if (node.opName != null) {
        nodeBuilder.append(" (").append(node.opName).append(")");
      }
      logger.info(nodeBuilder.toString());

      final StringBuilder costBuilder = new StringBuilder();
      indent = indentString(15);
      costBuilder.append(indent);
      costBuilder.append(String.format("  Estimate: %,15.0f rows, %,7.0f MB",
                         node.estRows, node.estMemoryCost / 1024 / 1024));
      costBuilder.append(indent);
      costBuilder.append(String.format("  Actual:   %,15d rows, %,7d MB",
                         node.actualRows, node.actualMemory / 1024 / 1024));
      logger.info(nodeBuilder.toString());
    }

    @Override
    protected void visitSubtree(OperatorSummary node, int i, int indent) {
      logger.info("{}      {}", indentString(indent), subtreeLabel(node, i));
    }
  }

  public static class FindOpVisitor extends TreeVisitor
  {
    private List<OperatorSummary> ops;
    private String type;

    public List<OperatorSummary> find(String type, OperatorSummary node) {
      ops = new ArrayList<>();
      this.type = type;
      visit(node);
      return ops;
    }

    @Override
    protected void visitOp(OperatorSummary node, int indentLevel) {
      if (type.equals(node.type)) {
        ops.add(node);
      }
    }
  }

  /**
   * We often run test queries single threaded to make analysis of the profile
   * easier. For a single-threaded (single slice) query, get a map from
   * operator ID to operator information as preparation for additional
   * analysis.
   *
   * @return
   */

  public Map<Integer,OperatorProfile> getOpInfo() {
    Map<Integer,String> ops = getOperators();
    Map<Integer,OperatorProfile> info = new HashMap<>();
    JsonArray frags = getFragmentProfile();
    JsonObject fragProfile = frags.getJsonObject(0).getJsonArray("minorFragmentProfile").getJsonObject(0);
    JsonArray opList = fragProfile.getJsonArray("operatorProfile");
    for (JsonObject opProfile : opList.getValuesAs(JsonObject.class)) {
      parseOpProfile(ops, info, opProfile);
    }
    return info;
  }

  /**
   * For a single-slice query, get all operators of a given numeric operator
   * type.
   * @param type the operator type
   * @return a list of operators of the given type
   */

  public List<OperatorProfile> getOpsOfType(String type) {
    List<OperatorProfile> ops = new ArrayList<>();
    List<OperatorSummary> opDefs = getOpDefsOfType(type);
    for (OperatorSummary opDef : opDefs) {
      ops.addAll(opDef.opExecs);
    }
    return ops;
  }

  public List<OperatorSummary> getOpDefsOfType(String type) {
    return new FindOpVisitor().find(type, topoOrder.get(0));
  }

  private void parseOpProfile(Map<Integer, String> ops,
      Map<Integer, OperatorProfile> info, JsonObject opProfile) {
    OperatorProfile opInfo = new OperatorProfile(0, 0, opProfile);
    opInfo.name = ops.get(opInfo.opId);
    info.put(opInfo.opId, opInfo);
  }

  public void printPlan() {
    new CostPrinter().visit(topoOrder.get(0));
  }

  public void printTime() {
    new TimePrinter().visit(topoOrder.get(0));
  }

  public static class Aggregator extends TreeVisitor
  {
    protected int n;
    protected long totalSetup;
    protected long totalProcess;
    protected long total;
    protected int maxFrag;
    protected boolean isTree;

    @Override
    public void visit(OperatorSummary root) {
      super.visit(root, 0);
      total = totalSetup + totalProcess;
    }

    @Override
    protected void visitOp(OperatorSummary node, int indentLevel) {
      n++;
      totalSetup += node.setupMs;
      totalProcess += node.processMs;
      maxFrag = Math.max(maxFrag, node.majorId);
      isTree |= (node.children.size() > 1);
    }
  }

  public static class TimePrinter extends TreeVisitor
  {
    private Aggregator totals;
    private boolean singleThread;
    private boolean singleFragment;

    @Override
    public void visit(OperatorSummary root) {
      totals = new Aggregator();
      totals.visit(root);
      singleThread = ! totals.isTree;
      singleFragment = (totals.maxFrag == 0);
      super.visit(root, 0);
      logger.info("Total:");
      String indent = singleThread? "  " : indentString(15);
      logger.info("{}{}", indent, String.format("Setup:   %,6d ms", totals.totalSetup));
      logger.info("{}{}", indent, String.format("Process: %,6d ms", totals.totalProcess));
    }

    @Override
    protected void visitOp(OperatorSummary node, int indentLevel) {
      if (singleThread) {
        printSimpleFormat(node);
      } else {
        printTreeFormat(node, indentLevel);
      }
    }

    private void printSimpleFormat(OperatorSummary node) {
      final StringBuilder sb = new StringBuilder();

      if (singleFragment) {
        sb.append(String.format("%02d ", node.stepId));
      } else {
        sb.append(String.format("%02d-%02d ", node.majorId, node.stepId));
      }
      sb.append(node.name);
      if (node.opName != null) {
        sb.append(" (").append(node.opName).append(")");
      }
      logger.info(sb.toString());
      printTimes(node, "  ");
    }

    private void printTimes(OperatorSummary node, String indent) {
      logger.info("{}{}", indent, String.format("Setup:   %,6d ms - %3d%%, %3d%%", node.setupMs,
                         percent(node.setupMs, totals.totalSetup),
                         percent(node.setupMs, totals.total)));
      logger.info("{}{}", indent, String.format("Process: %,6d ms - %3d%%, %3d%%", node.processMs,
                         percent(node.processMs, totals.totalProcess),
                         percent(node.processMs, totals.total)));
    }

    private void printTreeFormat(OperatorSummary node, int indentLevel) {
      final StringBuilder sb = new StringBuilder();
      sb.append(String.format("%02d-%02d ", node.majorId, node.stepId));
      String indent = indentString(indentLevel, ". ");
      sb.append(indent).append(node.name);
      if (node.opName != null) {
        sb.append(" (").append(node.opName).append(")");
      }
      logger.info(sb.toString());
      indent = indentString(15);
      printTimes(node, indent);
    }
  }

  /**
   * For a single-slice query, print a summary of the operator stack
   * and costs. At present, works for a linear query with on single-input
   * operators.
   */

  public void print() {
    printTime();
  }

  public void simplePrint() {
    Map<Integer, OperatorProfile> opInfo = getOpInfo();
    int n = opInfo.size();
    long totalSetup = 0;
    long totalProcess = 0;
    for (int i = 0;  i <= n;  i++) {
      OperatorProfile op = opInfo.get(i);
      if (op == null) { continue; }
      totalSetup += op.setupMs;
      totalProcess += op.processMs;
    }
    long total = totalSetup + totalProcess;
    for (int i = 0;  i <= n;  i++) {
      OperatorProfile op = opInfo.get(i);
      if (op == null) { continue; }
      logger.info("Op: {} {}", op.opId, op.name);
      logger.info("Setup:   {} - {}%, {}%", op.setupMs, percent(op.setupMs, totalSetup), percent(op.setupMs, total));
      logger.info("Process: {} - {}%, {}%", op.processMs, percent(op.processMs, totalProcess), percent(op.processMs, total));
      if (op.type.equals("EXTERNAL_SORT")) {
        long value = op.getMetric(0);
        logger.info("  Spills: {}", value);
      }
      if (op.waitMs > 0) {
        logger.info("  Wait:    {}", op.waitMs);
      }
      if (op.peakMem > 0) {
        logger.info("  Memory: {}", op.peakMem);
      }
    }
    logger.info("Total:");
    logger.info("  Setup:   {}", totalSetup);
    logger.info("  Process: {}", totalProcess);
  }

  public static long percent(long value, long total) {
    if (total == 0) {
      return 0; }
    return Math.round(value * 100 / (double)total);
  }

  public List<OperatorSummary> getOpDefn(String target) {
    List<OperatorSummary> ops = new ArrayList<>();
    for (OperatorSummary opDef : operations) {
      if (opDef.name.startsWith(target)) {
        ops.add(opDef);
      }
    }
    return ops;
  }
}
