/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.beam.runners.dataflow;

import static org.apache.beam.sdk.options.ExperimentalOptions.hasExperiment;
import static org.hamcrest.MatcherAssert.assertThat;

import com.google.api.services.dataflow.model.JobMessage;
import com.google.api.services.dataflow.model.JobMetrics;
import com.google.api.services.dataflow.model.MetricUpdate;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.List;
import java.util.concurrent.Callable;
import javax.annotation.Nullable;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.util.MonitoringUtil;
import org.apache.beam.runners.dataflow.util.MonitoringUtil.JobMessagesHandler;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult.State;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestPipelineOptions;
import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Joiner;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Optional;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Strings;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * {@link TestDataflowRunner} is a pipeline runner that wraps a {@link DataflowRunner} when running
 * tests against the {@link TestPipeline}.
 *
 * @see TestPipeline
 */
public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> {
  private static final String TENTATIVE_COUNTER = "tentative";
  private static final Logger LOG = LoggerFactory.getLogger(TestDataflowRunner.class);

  private final TestDataflowPipelineOptions options;
  private final DataflowClient dataflowClient;
  private final DataflowRunner runner;
  private int expectedNumberOfAssertions = 0;

  TestDataflowRunner(TestDataflowPipelineOptions options, DataflowClient client) {
    this.options = options;
    this.dataflowClient = client;
    this.runner = DataflowRunner.fromOptions(options);
  }

  /** Constructs a runner from the provided options. */
  public static TestDataflowRunner fromOptions(PipelineOptions options) {
    TestDataflowPipelineOptions dataflowOptions = options.as(TestDataflowPipelineOptions.class);
    String tempLocation =
        Joiner.on("/")
            .join(dataflowOptions.getTempRoot(), dataflowOptions.getJobName(), "output", "results");
    dataflowOptions.setTempLocation(tempLocation);

    return new TestDataflowRunner(
        dataflowOptions, DataflowClient.create(options.as(DataflowPipelineOptions.class)));
  }

  @VisibleForTesting
  static TestDataflowRunner fromOptionsAndClient(
      TestDataflowPipelineOptions options, DataflowClient client) {
    return new TestDataflowRunner(options, client);
  }

  @Override
  public DataflowPipelineJob run(Pipeline pipeline) {
    return run(pipeline, runner);
  }

  DataflowPipelineJob run(Pipeline pipeline, DataflowRunner runner) {
    updatePAssertCount(pipeline);

    TestPipelineOptions testPipelineOptions = options.as(TestPipelineOptions.class);
    final DataflowPipelineJob job;
    job = runner.run(pipeline);

    LOG.info(
        "Running Dataflow job {} with {} expected assertions.",
        job.getJobId(),
        expectedNumberOfAssertions);

    assertThat(job, testPipelineOptions.getOnCreateMatcher());

    Boolean jobSuccess;
    Optional<Boolean> allAssertionsPassed;

    ErrorMonitorMessagesHandler messageHandler =
        new ErrorMonitorMessagesHandler(job, new MonitoringUtil.LoggingHandler());

    if (options.isStreaming()) {
      jobSuccess = waitForStreamingJobTermination(job, messageHandler);
      // No metrics in streaming
      allAssertionsPassed = Optional.absent();
    } else {
      jobSuccess = waitForBatchJobTermination(job, messageHandler);
      allAssertionsPassed = checkForPAssertSuccess(job);
    }

    // If there is a certain assertion failure, throw the most precise exception we can.
    // There are situations where the metric will not be available, but as long as we recover
    // the actionable message from the logs it is acceptable.
    if (!allAssertionsPassed.isPresent()) {
      LOG.warn("Dataflow job {} did not output a success or failure metric.", job.getJobId());
    } else if (!allAssertionsPassed.get()) {
      throw new AssertionError(errorMessage(job, messageHandler));
    }

    // Other failures, or jobs where metrics fell through for some reason, will manifest
    // as simply job failures.
    if (!jobSuccess) {
      throw new RuntimeException(errorMessage(job, messageHandler));
    }

    // If there is no reason to immediately fail, run the success matcher.
    assertThat(job, testPipelineOptions.getOnSuccessMatcher());
    return job;
  }

  /**
   * Return {@code true} if the job succeeded or {@code false} if it terminated in any other manner.
   */
  @SuppressWarnings("FutureReturnValueIgnored") // Job status checked via job.waitUntilFinish
  private boolean waitForStreamingJobTermination(
      final DataflowPipelineJob job, ErrorMonitorMessagesHandler messageHandler) {
    // In streaming, there are infinite retries, so rather than timeout
    // we try to terminate early by polling and canceling if we see
    // an error message
    options.getExecutorService().submit(new CancelOnError(job, messageHandler));

    // Whether we canceled or not, this gets the final state of the job or times out
    State finalState;
    try {
      finalState =
          job.waitUntilFinish(
              Duration.standardSeconds(options.getTestTimeoutSeconds()), messageHandler);
    } catch (IOException e) {
      throw new RuntimeException(e);
    } catch (InterruptedException e) {
      Thread.interrupted();
      return false;
    }

    // Getting the final state may have timed out; it may not indicate a failure.
    // This cancellation may be the second
    if (finalState == null || !finalState.isTerminal()) {
      LOG.info(
          "Dataflow job {} took longer than {} seconds to complete, cancelling.",
          job.getJobId(),
          options.getTestTimeoutSeconds());
      try {
        job.cancel();
      } catch (IOException e) {
        throw new RuntimeException(e);
      }
      return false;
    } else {
      return finalState == State.DONE && !messageHandler.hasSeenError();
    }
  }

  /** Return {@code true} if job state is {@code State.DONE}. {@code false} otherwise. */
  private boolean waitForBatchJobTermination(
      DataflowPipelineJob job, ErrorMonitorMessagesHandler messageHandler) {
    {
      try {
        job.waitUntilFinish(Duration.standardSeconds(-1), messageHandler);
      } catch (IOException e) {
        throw new RuntimeException(e);
      } catch (InterruptedException e) {
        Thread.interrupted();
        return false;
      }

      return job.getState() == State.DONE;
    }
  }

  private static String errorMessage(
      DataflowPipelineJob job, ErrorMonitorMessagesHandler messageHandler) {
    return Strings.isNullOrEmpty(messageHandler.getErrorMessage())
        ? String.format(
            "Dataflow job %s terminated in state %s but did not return a failure reason.",
            job.getJobId(), job.getState())
        : messageHandler.getErrorMessage();
  }

  @VisibleForTesting
  void updatePAssertCount(Pipeline pipeline) {
    if (hasExperiment(options, "beam_fn_api")) {
      // TODO[BEAM-1866]: FnAPI does not support metrics, so expect 0 assertions.
      expectedNumberOfAssertions = 0;
    } else {
      expectedNumberOfAssertions = PAssert.countAsserts(pipeline);
    }
  }

  /**
   * Check that PAssert expectations were met.
   *
   * <p>If the pipeline is not in a failed/cancelled state and no PAsserts were used within the
   * pipeline, then this method will state that all PAsserts succeeded.
   *
   * @return Optional.of(false) if we are certain a PAssert failed. Optional.of(true) if we are
   *     certain all PAsserts passed. Optional.absent() if the evidence is inconclusive, including
   *     when the pipeline may have failed for other reasons.
   */
  @VisibleForTesting
  Optional<Boolean> checkForPAssertSuccess(DataflowPipelineJob job) {

    JobMetrics metrics = getJobMetrics(job);
    if (metrics == null || metrics.getMetrics() == null) {
      LOG.warn("Metrics not present for Dataflow job {}.", job.getJobId());
      return Optional.absent();
    }

    int successes = 0;
    int failures = 0;
    for (MetricUpdate metric : metrics.getMetrics()) {
      if (metric.getName() == null
          || metric.getName().getContext() == null
          || !metric.getName().getContext().containsKey(TENTATIVE_COUNTER)) {
        // Don't double count using the non-tentative version of the metric.
        continue;
      }
      if (PAssert.SUCCESS_COUNTER.equals(metric.getName().getName())) {
        successes += ((BigDecimal) metric.getScalar()).intValue();
      } else if (PAssert.FAILURE_COUNTER.equals(metric.getName().getName())) {
        failures += ((BigDecimal) metric.getScalar()).intValue();
      }
    }

    if (failures > 0) {
      LOG.info(
          "Failure result for Dataflow job {}. Found {} success, {} failures out of "
              + "{} expected assertions.",
          job.getJobId(),
          successes,
          failures,
          expectedNumberOfAssertions);
      return Optional.of(false);
    } else if (successes >= expectedNumberOfAssertions) {
      LOG.info(
          "Success result for Dataflow job {}."
              + " Found {} success, {} failures out of {} expected assertions.",
          job.getJobId(),
          successes,
          failures,
          expectedNumberOfAssertions);
      return Optional.of(true);
    }

    // If the job failed, this is a definite failure. We only cancel jobs when they fail.
    State state = job.getState();
    if (state == State.FAILED || state == State.CANCELLED) {
      LOG.info(
          "Dataflow job {} terminated in failure state {} without reporting a failed assertion",
          job.getJobId(),
          state);
      return Optional.absent();
    }

    LOG.info(
        "Inconclusive results for Dataflow job {}."
            + " Found {} success, {} failures out of {} expected assertions.",
        job.getJobId(),
        successes,
        failures,
        expectedNumberOfAssertions);
    return Optional.absent();
  }

  @Nullable
  @VisibleForTesting
  JobMetrics getJobMetrics(DataflowPipelineJob job) {
    JobMetrics metrics = null;
    try {
      metrics = dataflowClient.getJobMetrics(job.getJobId());
    } catch (IOException e) {
      LOG.warn("Failed to get job metrics: ", e);
    }
    return metrics;
  }

  @Override
  public String toString() {
    return "TestDataflowRunner#" + options.getAppName();
  }

  /**
   * Monitors job log output messages for errors.
   *
   * <p>Creates an error message representing the concatenation of all error messages seen.
   */
  private static class ErrorMonitorMessagesHandler implements JobMessagesHandler {
    private final DataflowPipelineJob job;
    private final JobMessagesHandler messageHandler;
    private final StringBuilder errorMessage;
    private volatile boolean hasSeenError;

    private ErrorMonitorMessagesHandler(
        DataflowPipelineJob job, JobMessagesHandler messageHandler) {
      this.job = job;
      this.messageHandler = messageHandler;
      this.errorMessage = new StringBuilder();
      this.hasSeenError = false;
    }

    @Override
    public void process(List<JobMessage> messages) {
      messageHandler.process(messages);
      for (JobMessage message : messages) {
        if ("JOB_MESSAGE_ERROR".equals(message.getMessageImportance())) {
          LOG.info(
              "Dataflow job {} threw exception. Failure message was: {}",
              job.getJobId(),
              message.getMessageText());
          errorMessage.append(message.getMessageText());
          hasSeenError = true;
        }
      }
    }

    boolean hasSeenError() {
      return hasSeenError;
    }

    String getErrorMessage() {
      return errorMessage.toString();
    }
  }

  private static class CancelOnError implements Callable<Void> {

    private final DataflowPipelineJob job;
    private final ErrorMonitorMessagesHandler messageHandler;

    public CancelOnError(DataflowPipelineJob job, ErrorMonitorMessagesHandler messageHandler) {
      this.job = job;
      this.messageHandler = messageHandler;
    }

    @Override
    public Void call() throws Exception {
      while (true) {
        State jobState = job.getState();

        // If we see an error, cancel and note failure
        if (messageHandler.hasSeenError() && !job.getState().isTerminal()) {
          job.cancel();
          LOG.info("Cancelling Dataflow job {}", job.getJobId());
          return null;
        }

        if (jobState.isTerminal()) {
          return null;
        }

        Thread.sleep(3000L);
      }
    }
  }
}
