/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.beam.sdk.extensions.gcp.util;

import com.google.api.client.http.HttpIOExceptionHandler;
import com.google.api.client.http.HttpRequest;
import com.google.api.client.http.HttpRequestInitializer;
import com.google.api.client.http.HttpResponse;
import com.google.api.client.http.HttpResponseInterceptor;
import com.google.api.client.http.HttpUnsuccessfulResponseHandler;
import com.google.api.client.util.BackOff;
import com.google.api.client.util.ExponentialBackOff;
import com.google.api.client.util.NanoClock;
import com.google.api.client.util.Sleeper;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Implements a request initializer that adds retry handlers to all HttpRequests.
 *
 * <p>Also can take a HttpResponseInterceptor to be applied to the responses.
 */
public class RetryHttpRequestInitializer implements HttpRequestInitializer {

  private static final Logger LOG = LoggerFactory.getLogger(RetryHttpRequestInitializer.class);

  /** Http response codes that should be silently ignored. */
  private static final Set<Integer> DEFAULT_IGNORED_RESPONSE_CODES =
      new HashSet<>(
          Arrays.asList(
              307 /* Redirect, handled by the client library */,
              308 /* Resume Incomplete, handled by the client library */));

  /** Http response timeout to use for hanging gets. */
  private static final int HANGING_GET_TIMEOUT_SEC = 80;

  private int writeTimeout;

  /** Handlers used to provide additional logging information on unsuccessful HTTP requests. */
  private static class LoggingHttpBackOffHandler
      implements HttpIOExceptionHandler, HttpUnsuccessfulResponseHandler {

    private final Sleeper sleeper;
    private final BackOff ioExceptionBackOff;
    private final BackOff unsuccessfulResponseBackOff;
    private final Set<Integer> ignoredResponseCodes;
    // aggregate the total time spent in exponential backoff
    private final Counter throttlingSeconds =
        Metrics.counter(LoggingHttpBackOffHandler.class, "cumulativeThrottlingSeconds");
    private int ioExceptionRetries;
    private int unsuccessfulResponseRetries;
    @Nullable private CustomHttpErrors customHttpErrors;

    private LoggingHttpBackOffHandler(
        Sleeper sleeper,
        BackOff ioExceptionBackOff,
        BackOff unsucessfulResponseBackOff,
        Set<Integer> ignoredResponseCodes,
        @Nullable CustomHttpErrors customHttpErrors) {
      this.sleeper = sleeper;
      this.ioExceptionBackOff = ioExceptionBackOff;
      this.unsuccessfulResponseBackOff = unsucessfulResponseBackOff;
      this.ignoredResponseCodes = ignoredResponseCodes;
      this.customHttpErrors = customHttpErrors;
    }

    @Override
    public boolean handleIOException(HttpRequest request, boolean supportsRetry)
        throws IOException {
      // We will retry if the request supports retry or the backoff was successful.
      // Note that the order of these checks is important since
      // backOffWasSuccessful will perform a sleep.
      boolean willRetry = supportsRetry && backOffWasSuccessful(ioExceptionBackOff);
      if (willRetry) {
        ioExceptionRetries += 1;
        LOG.debug("Request failed with IOException, will retry: {}", request.getUrl());
      } else {
        String message =
            "Request failed with IOException, "
                + "performed {} retries due to IOExceptions, "
                + "performed {} retries due to unsuccessful status codes, "
                + "HTTP framework says request {} be retried, "
                + "(caller responsible for retrying): {}";
        LOG.warn(
            message,
            ioExceptionRetries,
            unsuccessfulResponseRetries,
            supportsRetry ? "can" : "cannot",
            request.getUrl());
      }
      return willRetry;
    }

    @Override
    public boolean handleResponse(HttpRequest request, HttpResponse response, boolean supportsRetry)
        throws IOException {
      // We will retry if the request supports retry and the status code requires a backoff
      // and the backoff was successful. Note that the order of these checks is important since
      // backOffWasSuccessful will perform a sleep.
      boolean willRetry =
          supportsRetry
              && retryOnStatusCode(response.getStatusCode())
              && backOffWasSuccessful(unsuccessfulResponseBackOff);
      if (willRetry) {
        unsuccessfulResponseRetries += 1;
        LOG.debug(
            "Request failed with code {}, will retry: {}",
            response.getStatusCode(),
            request.getUrl());
      } else {

        String message =
            "Request failed with code {}, "
                + "performed {} retries due to IOExceptions, "
                + "performed {} retries due to unsuccessful status codes, "
                + "HTTP framework says request {} be retried, "
                + "(caller responsible for retrying): {}. {}";
        String customLogMessage = "";
        if (customHttpErrors != null) {
          String error =
              customHttpErrors.getCustomError(
                  new HttpRequestWrapper(request), new HttpResponseWrapper(response));
          if (error != null) {
            customLogMessage = error;
          }
        }
        if (ignoredResponseCodes.contains(response.getStatusCode())) {
          // Log ignored response codes at a lower level
          LOG.debug(
              message,
              response.getStatusCode(),
              ioExceptionRetries,
              unsuccessfulResponseRetries,
              supportsRetry ? "can" : "cannot",
              request.getUrl(),
              customLogMessage);
        } else {
          LOG.warn(
              message,
              response.getStatusCode(),
              ioExceptionRetries,
              unsuccessfulResponseRetries,
              supportsRetry ? "can" : "cannot",
              request.getUrl(),
              customLogMessage);
        }
      }
      return willRetry;
    }

    /** Returns true iff performing the backoff was successful. */
    private boolean backOffWasSuccessful(BackOff backOff) {
      try {
        long backOffTime = backOff.nextBackOffMillis();
        if (backOffTime == BackOff.STOP) {
          return false;
        }
        throttlingSeconds.inc(backOffTime / 1000);
        sleeper.sleep(backOffTime);
        return true;
      } catch (InterruptedException | IOException e) {
        return false;
      }
    }

    /** Returns true iff the {@code statusCode} represents an error that should be retried. */
    private boolean retryOnStatusCode(int statusCode) {
      return (statusCode == 0) // Code 0 usually means no response / network error
          || (statusCode / 100 == 5) // 5xx: server error
          || statusCode == 429; // 429: Too many requests
    }
  }

  private final HttpResponseInterceptor responseInterceptor; // response Interceptor to use

  private CustomHttpErrors customHttpErrors = null;

  private final NanoClock nanoClock; // used for testing

  private final Sleeper sleeper; // used for testing

  private Set<Integer> ignoredResponseCodes = new HashSet<>(DEFAULT_IGNORED_RESPONSE_CODES);

  public RetryHttpRequestInitializer() {
    this(Collections.emptyList());
  }

  /**
   * @param additionalIgnoredResponseCodes a list of HTTP status codes that should not be logged.
   */
  public RetryHttpRequestInitializer(Collection<Integer> additionalIgnoredResponseCodes) {
    this(additionalIgnoredResponseCodes, null);
  }

  /**
   * @param additionalIgnoredResponseCodes a list of HTTP status codes that should not be logged.
   * @param responseInterceptor HttpResponseInterceptor to be applied on all requests. May be null.
   */
  public RetryHttpRequestInitializer(
      Collection<Integer> additionalIgnoredResponseCodes,
      @Nullable HttpResponseInterceptor responseInterceptor) {
    this(NanoClock.SYSTEM, Sleeper.DEFAULT, additionalIgnoredResponseCodes, responseInterceptor);
  }

  /**
   * Visible for testing.
   *
   * @param nanoClock used as a timing source for knowing how much time has elapsed.
   * @param sleeper used to sleep between retries.
   * @param additionalIgnoredResponseCodes a list of HTTP status codes that should not be logged.
   */
  RetryHttpRequestInitializer(
      NanoClock nanoClock,
      Sleeper sleeper,
      Collection<Integer> additionalIgnoredResponseCodes,
      HttpResponseInterceptor responseInterceptor) {
    this.nanoClock = nanoClock;
    this.sleeper = sleeper;
    this.ignoredResponseCodes.addAll(additionalIgnoredResponseCodes);
    this.responseInterceptor = responseInterceptor;
    this.writeTimeout = 0;
  }

  @Override
  public void initialize(HttpRequest request) throws IOException {
    // Set a timeout for hanging-gets.
    // TODO: Do this exclusively for work requests.
    request.setReadTimeout(HANGING_GET_TIMEOUT_SEC * 1000);
    request.setWriteTimeout(this.writeTimeout);

    LoggingHttpBackOffHandler loggingHttpBackOffHandler =
        new LoggingHttpBackOffHandler(
            sleeper,
            // Back off on retryable http errors and IOExceptions.
            // A back-off multiplier of 2 raises the maximum request retrying time
            // to approximately 5 minutes (keeping other back-off parameters to
            // their default values).
            new ExponentialBackOff.Builder().setNanoClock(nanoClock).setMultiplier(2).build(),
            new ExponentialBackOff.Builder().setNanoClock(nanoClock).setMultiplier(2).build(),
            ignoredResponseCodes,
            this.customHttpErrors);

    request.setUnsuccessfulResponseHandler(loggingHttpBackOffHandler);
    request.setIOExceptionHandler(loggingHttpBackOffHandler);

    // Set response initializer
    if (responseInterceptor != null) {
      request.setResponseInterceptor(responseInterceptor);
    }
  }

  public void setCustomErrors(CustomHttpErrors customErrors) {
    this.customHttpErrors = customErrors;
  }

  public void setWriteTimeout(int writeTimeout) {
    this.writeTimeout = writeTimeout;
  }
}
