/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.beam.sdk.nexmark;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.annotation.Nullable;
import org.apache.beam.sdk.nexmark.model.Auction;
import org.apache.beam.sdk.nexmark.model.Bid;
import org.apache.beam.sdk.nexmark.model.Person;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testutils.publishing.BigQueryResultsPublisher;
import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
import org.joda.time.Duration;
import org.joda.time.Instant;

/**
 * An implementation of the 'NEXMark queries' for Beam. These are multiple queries over a three
 * table schema representing an online auction system:
 *
 * <ul>
 *   <li>{@link Person} represents a person submitting an item for auction and/or making a bid on an
 *       auction.
 *   <li>{@link Auction} represents an item under auction.
 *   <li>{@link Bid} represents a bid for an item under auction.
 * </ul>
 *
 * The queries exercise many aspects of the Beam model.
 *
 * <p>We synthesize the creation of people, auctions and bids in real-time. The data is not
 * particularly sensible.
 *
 * <p>See <a
 * href="https://web.archive.org/web/20100620010601/http://datalab.cs.pdx.edu/niagaraST/NEXMark/">
 * Nexmark website</a>
 */
public class Main {

  private static class Result {
    private final NexmarkConfiguration configuration;
    private final NexmarkPerf perf;

    private Result(NexmarkConfiguration configuration, NexmarkPerf perf) {
      this.configuration = configuration;
      this.perf = perf;
    }
  }

  private static class Run implements Callable<Result> {
    private final NexmarkLauncher<NexmarkOptions> nexmarkLauncher;
    private final NexmarkConfiguration configuration;

    private Run(NexmarkOptions options, NexmarkConfiguration configuration) {
      this.nexmarkLauncher = new NexmarkLauncher<>(options, configuration);
      this.configuration = configuration;
    }

    @Override
    public Result call() throws IOException {
      NexmarkPerf perf = nexmarkLauncher.run();
      return new Result(configuration, perf);
    }
  }

  /** Entry point. */
  void runAll(String[] args) throws IOException {
    NexmarkOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(NexmarkOptions.class);

    Instant start = Instant.now();
    Map<NexmarkConfiguration, NexmarkPerf> baseline = loadBaseline(options.getBaselineFilename());
    Map<NexmarkConfiguration, NexmarkPerf> actual = new LinkedHashMap<>();
    Set<NexmarkConfiguration> configurations = options.getSuite().getConfigurations(options);
    int nThreads = Math.min(options.getNexmarkParallel(), configurations.size());
    ExecutorService executor = Executors.newFixedThreadPool(nThreads);
    CompletionService<Result> completion = new ExecutorCompletionService(executor);

    boolean successful = true;
    try {
      // Schedule all the configurations.
      for (NexmarkConfiguration configuration : configurations) {
        NexmarkOptions optionsCopy = PipelineOptionsFactory.fromArgs(args).as(NexmarkOptions.class);
        completion.submit(new Run(optionsCopy, configuration));
      }

      // Collect all the results.
      for (int scheduled = configurations.size(); scheduled > 0; scheduled--) {
        Result result;
        try {
          result = completion.take().get();
        } catch (InterruptedException e) {
          break;
        } catch (ExecutionException e) {
          Throwable t = e.getCause();
          if (t instanceof IOException) {
            throw new IOException(t);
          } else {
            throw new RuntimeException(t);
          }
        }

        NexmarkConfiguration configuration = result.configuration;
        NexmarkPerf perf = result.perf;
        if (perf == null) {
          continue;
        } else if (perf.errors == null || perf.errors.size() > 0) {
          successful = false;
        }
        appendPerf(options.getPerfFilename(), configuration, perf);
        actual.put(configuration, perf);
        // Summarize what we've run so far.
        saveSummary(null, configurations, actual, baseline, start, options);
      }

      if (options.getExportSummaryToBigQuery()) {
        ImmutableMap<String, String> schema =
            ImmutableMap.<String, String>builder()
                .put("timestamp", "timestamp")
                .put("runtimeSec", "float")
                .put("eventsPerSec", "float")
                .put("numResults", "integer")
                .build();

        savePerfsToBigQuery(
            BigQueryResultsPublisher.create(options.getBigQueryDataset(), schema),
            options,
            actual,
            start);
      }
    } finally {
      if (options.getMonitorJobs()) {
        // Report overall performance.
        saveSummary(options.getSummaryFilename(), configurations, actual, baseline, start, options);
        saveJavascript(options.getJavascriptFilename(), configurations, actual, baseline, start);
      }

      executor.shutdown();
    }
    if (!successful) {
      throw new RuntimeException("Execution was not successful");
    }
  }

  @VisibleForTesting
  static void savePerfsToBigQuery(
      BigQueryResultsPublisher publisher,
      NexmarkOptions options,
      Map<NexmarkConfiguration, NexmarkPerf> perfs,
      Instant start) {

    for (Map.Entry<NexmarkConfiguration, NexmarkPerf> entry : perfs.entrySet()) {
      String queryName =
          NexmarkUtils.fullQueryName(
              options.getQueryLanguage(), entry.getKey().query.getNumberOrName());
      String tableName = NexmarkUtils.tableName(options, queryName, 0L, null);

      publisher.publish(entry.getValue(), tableName, start.getMillis());
    }
  }

  /** Append the pair of {@code configuration} and {@code perf} to perf file. */
  private void appendPerf(
      @Nullable String perfFilename, NexmarkConfiguration configuration, NexmarkPerf perf) {
    if (perfFilename == null) {
      return;
    }
    List<String> lines = new ArrayList<>();
    lines.add("");
    lines.add(String.format("# %s", Instant.now()));
    lines.add(String.format("# %s", configuration.toShortString()));
    lines.add(configuration.toString());
    lines.add(perf.toString());
    try {
      Files.write(
          Paths.get(perfFilename),
          lines,
          StandardCharsets.UTF_8,
          StandardOpenOption.CREATE,
          StandardOpenOption.APPEND);
    } catch (IOException e) {
      throw new RuntimeException("Unable to write perf file: ", e);
    }
    NexmarkUtils.console("appended results to perf file %s.", perfFilename);
  }

  /** Load the baseline perf. */
  @Nullable
  private static Map<NexmarkConfiguration, NexmarkPerf> loadBaseline(
      @Nullable String baselineFilename) {
    if (baselineFilename == null) {
      return null;
    }
    Map<NexmarkConfiguration, NexmarkPerf> baseline = new LinkedHashMap<>();
    List<String> lines;
    try {
      lines = Files.readAllLines(Paths.get(baselineFilename), StandardCharsets.UTF_8);
    } catch (IOException e) {
      throw new RuntimeException("Unable to read baseline perf file: ", e);
    }
    for (int i = 0; i < lines.size(); i++) {
      if (lines.get(i).startsWith("#") || lines.get(i).trim().isEmpty()) {
        continue;
      }
      NexmarkConfiguration configuration = NexmarkConfiguration.fromString(lines.get(i++));
      NexmarkPerf perf = NexmarkPerf.fromString(lines.get(i));
      baseline.put(configuration, perf);
    }
    NexmarkUtils.console(
        "loaded %d entries from baseline file %s.", baseline.size(), baselineFilename);
    return baseline;
  }

  private static final String LINE =
      "==========================================================================================";

  /** Print summary of {@code actual} vs (if non-null) {@code baseline}. */
  private static void saveSummary(
      @Nullable String summaryFilename,
      Iterable<NexmarkConfiguration> configurations,
      Map<NexmarkConfiguration, NexmarkPerf> actual,
      @Nullable Map<NexmarkConfiguration, NexmarkPerf> baseline,
      Instant start,
      NexmarkOptions options) {

    List<String> lines = new ArrayList<>();

    lines.add("");
    lines.add(LINE);

    lines.add(
        String.format("Run started %s and ran for %s", start, new Duration(start, Instant.now())));
    lines.add("");

    lines.add("Default configuration:");
    lines.add(NexmarkConfiguration.DEFAULT.toString());
    lines.add("");

    lines.add("Configurations:");
    lines.add("  Conf  Description");
    int conf = 0;
    for (NexmarkConfiguration configuration : configurations) {
      lines.add(String.format("  %04d  %s", conf++, configuration.toShortString()));
      NexmarkPerf actualPerf = actual.get(configuration);
      if (actualPerf != null && actualPerf.jobId != null) {
        lines.add(String.format("  %4s  [Ran as job %s]", "", actualPerf.jobId));
      }
    }

    lines.add("");
    lines.add("Performance:");
    lines.add(
        String.format(
            "  %4s  %12s  %12s  %12s  %12s  %12s  %12s",
            "Conf",
            "Runtime(sec)",
            "(Baseline)",
            "Events(/sec)",
            "(Baseline)",
            "Results",
            "(Baseline)"));
    conf = 0;
    for (NexmarkConfiguration configuration : configurations) {
      String line = String.format("  %04d  ", conf++);
      NexmarkPerf actualPerf = actual.get(configuration);
      if (actualPerf == null) {
        line += "*** not run ***";
      } else {
        NexmarkPerf baselinePerf = baseline == null ? null : baseline.get(configuration);
        double runtimeSec = actualPerf.runtimeSec;
        line += String.format("%12.1f  ", runtimeSec);
        if (baselinePerf == null) {
          line += String.format("%12s  ", "");
        } else {
          double baselineRuntimeSec = baselinePerf.runtimeSec;
          double diff = ((runtimeSec - baselineRuntimeSec) / baselineRuntimeSec) * 100.0;
          line += String.format("%+11.2f%%  ", diff);
        }

        double eventsPerSec = actualPerf.eventsPerSec;
        line += String.format("%12.1f  ", eventsPerSec);
        if (baselinePerf == null) {
          line += String.format("%12s  ", "");
        } else {
          double baselineEventsPerSec = baselinePerf.eventsPerSec;
          double diff = ((eventsPerSec - baselineEventsPerSec) / baselineEventsPerSec) * 100.0;
          line += String.format("%+11.2f%%  ", diff);
        }

        long numResults = actualPerf.numResults;
        line += String.format("%12d  ", numResults);
        if (baselinePerf == null) {
          line += String.format("%12s", "");
        } else {
          long baselineNumResults = baselinePerf.numResults;
          long diff = numResults - baselineNumResults;
          line += String.format("%+12d", diff);
        }
      }
      lines.add(line);

      if (actualPerf != null) {
        List<String> errors = actualPerf.errors;
        if (errors == null) {
          errors = new ArrayList<>();
          errors.add("NexmarkGoogleRunner returned null errors list");
        }
        for (String error : errors) {
          lines.add(String.format("  %4s  *** %s ***", "", error));
        }
      }
    }

    lines.add(LINE);
    lines.add("");

    for (String line : lines) {
      System.out.println(line);
    }

    if (summaryFilename != null) {
      try {
        Files.write(
            Paths.get(summaryFilename),
            lines,
            StandardCharsets.UTF_8,
            StandardOpenOption.CREATE,
            StandardOpenOption.APPEND);
      } catch (IOException e) {
        throw new RuntimeException("Unable to save summary file: ", e);
      }
      NexmarkUtils.console("appended summary to summary file %s.", summaryFilename);
    }
  }

  /**
   * Write all perf data and any baselines to a javascript file which can be used by graphing page
   * etc.
   */
  private static void saveJavascript(
      @Nullable String javascriptFilename,
      Iterable<NexmarkConfiguration> configurations,
      Map<NexmarkConfiguration, NexmarkPerf> actual,
      @Nullable Map<NexmarkConfiguration, NexmarkPerf> baseline,
      Instant start) {
    if (javascriptFilename == null) {
      return;
    }

    List<String> lines = new ArrayList<>();
    lines.add(
        String.format(
            "// Run started %s and ran for %s", start, new Duration(start, Instant.now())));
    lines.add("var all = [");

    for (NexmarkConfiguration configuration : configurations) {
      lines.add("  {");
      lines.add(String.format("    config: %s", configuration));
      NexmarkPerf actualPerf = actual.get(configuration);
      if (actualPerf != null) {
        lines.add(String.format("    ,perf: %s", actualPerf));
      }
      NexmarkPerf baselinePerf = baseline == null ? null : baseline.get(configuration);
      if (baselinePerf != null) {
        lines.add(String.format("    ,baseline: %s", baselinePerf));
      }
      lines.add("  },");
    }

    lines.add("];");

    try {
      Files.write(
          Paths.get(javascriptFilename),
          lines,
          StandardCharsets.UTF_8,
          StandardOpenOption.CREATE,
          StandardOpenOption.TRUNCATE_EXISTING);
    } catch (IOException e) {
      throw new RuntimeException("Unable to save javascript file: ", e);
    }
    NexmarkUtils.console("saved javascript to file %s.", javascriptFilename);
  }

  public static void main(String[] args) throws IOException {
    new Main().runAll(args);
  }
}
