/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.druid.server.coordination;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.concurrent.LifecycleLock;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;
import org.jboss.netty.handler.codec.http.HttpHeaders;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.joda.time.Duration;

import javax.annotation.Nullable;
import javax.servlet.http.HttpServletResponse;
import java.io.InputStream;
import java.net.URL;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
 * This class facilitates the usage of long-polling HTTP endpoints powered by {@link ChangeRequestHistory}.
 * For example {@link org.apache.druid.client.HttpServerInventoryView} uses it to keep segment state in sync with data
 * nodes which expose the segment state via HTTP endpoint in {@link
 * org.apache.druid.server.http.SegmentListerResource#getSegments}.
 */
public class ChangeRequestHttpSyncer<T>
{
  private static final EmittingLogger log = new EmittingLogger(ChangeRequestHttpSyncer.class);

  public static final long HTTP_TIMEOUT_EXTRA_MS = 5000;

  private static final long MAX_RETRY_BACKOFF = TimeUnit.MINUTES.toMillis(2);

  private final ObjectMapper smileMapper;
  private final HttpClient httpClient;
  private final ScheduledExecutorService executor;
  private final URL baseServerURL;
  private final String baseRequestPath;
  private final TypeReference<ChangeRequestsSnapshot<T>> responseTypeReferences;
  private final long serverTimeoutMS;
  private final long serverUnstabilityTimeout;
  private final long serverHttpTimeout;

  private final Listener<T> listener;

  private final CountDownLatch initializationLatch = new CountDownLatch(1);

  /**
   * This lock is used to ensure proper start-then-stop semantics and making sure after stopping no state update happens
   * and {@link #sync} is not again scheduled in {@link #executor} and if there was a previously scheduled sync before
   * stopping, it is skipped and also, it is used to ensure that duplicate syncs are never scheduled in the executor.
   */
  private final LifecycleLock startStopLock = new LifecycleLock();

  private final String logIdentity;
  private long unstableStartTime = -1;
  private int consecutiveFailedAttemptCount = 0;
  private long lastSuccessfulSyncTime = 0;
  private long lastSyncTime = 0;

  @Nullable
  private ChangeRequestHistory.Counter counter = null;

  public ChangeRequestHttpSyncer(
      ObjectMapper smileMapper,
      HttpClient httpClient,
      ScheduledExecutorService executor,
      URL baseServerURL,
      String baseRequestPath,
      TypeReference<ChangeRequestsSnapshot<T>> responseTypeReferences,
      long serverTimeoutMS,
      long serverUnstabilityTimeout,
      Listener<T> listener
  )
  {
    this.smileMapper = smileMapper;
    this.httpClient = httpClient;
    this.executor = executor;
    this.baseServerURL = baseServerURL;
    this.baseRequestPath = baseRequestPath;
    this.responseTypeReferences = responseTypeReferences;
    this.serverTimeoutMS = serverTimeoutMS;
    this.serverUnstabilityTimeout = serverUnstabilityTimeout;
    this.serverHttpTimeout = serverTimeoutMS + HTTP_TIMEOUT_EXTRA_MS;
    this.listener = listener;
    this.logIdentity = StringUtils.format("%s_%d", baseServerURL, System.currentTimeMillis());
  }

  public void start()
  {
    synchronized (startStopLock) {
      if (!startStopLock.canStart()) {
        throw new ISE("Can't start ChangeRequestHttpSyncer[%s].", logIdentity);
      }

      log.info("Starting ChangeRequestHttpSyncer[%s].", logIdentity);
      startStopLock.started();
      startStopLock.exitStart();

      addNextSyncToWorkQueue();
    }
  }

  public void stop()
  {
    synchronized (startStopLock) {
      if (!startStopLock.canStop()) {
        throw new ISE("Can't stop ChangeRequestHttpSyncer[%s].", logIdentity);
      }

      log.info("Stopped ChangeRequestHttpSyncer[%s].", logIdentity);
    }
  }

  /** Wait for first fetch of segment listing from server. */
  public boolean awaitInitialization(long timeout, TimeUnit timeUnit) throws InterruptedException
  {
    return initializationLatch.await(timeout, timeUnit);
  }

  /**
   * This method returns the debugging information for printing, must not be used for any other purpose.
   */
  public Map<String, Object> getDebugInfo()
  {
    long currTime = System.currentTimeMillis();

    Object notSuccessfullySyncedFor;
    if (lastSuccessfulSyncTime == 0) {
      notSuccessfullySyncedFor = "Never Successfully Synced";
    } else {
      notSuccessfullySyncedFor = (currTime - lastSuccessfulSyncTime) / 1000;
    }
    return ImmutableMap.of(
        "notSyncedForSecs", lastSyncTime == 0 ? "Never Synced" : (currTime - lastSyncTime) / 1000,
        "notSuccessfullySyncedFor", notSuccessfullySyncedFor,
        "consecutiveFailedAttemptCount", consecutiveFailedAttemptCount,
        "syncScheduled", startStopLock.isStarted()
    );
  }

  /**
   * Exposed for monitoring use to see if sync is working fine and not stopped due to any coding bugs. If this
   * ever returns false then caller of this method must create an alert and it should be looked into for any
   * bugs.
   */
  public boolean isOK()
  {
    return (System.currentTimeMillis() - lastSyncTime) < MAX_RETRY_BACKOFF + 3 * serverHttpTimeout;
  }

  public long getServerHttpTimeout()
  {
    return serverHttpTimeout;
  }

  private void sync()
  {
    if (!startStopLock.awaitStarted(1, TimeUnit.MILLISECONDS)) {
      log.info("Skipping sync() call for server[%s].", logIdentity);
      return;
    }

    lastSyncTime = System.currentTimeMillis();

    try {
      final String req = getRequestString();

      BytesAccumulatingResponseHandler responseHandler = new BytesAccumulatingResponseHandler();

      log.debug("Sending sync request to server[%s]", logIdentity);

      ListenableFuture<InputStream> syncRequestFuture = httpClient.go(
          new Request(HttpMethod.GET, new URL(baseServerURL, req))
              .addHeader(HttpHeaders.Names.ACCEPT, SmileMediaTypes.APPLICATION_JACKSON_SMILE)
              .addHeader(HttpHeaders.Names.CONTENT_TYPE, SmileMediaTypes.APPLICATION_JACKSON_SMILE),
          responseHandler,
          Duration.millis(serverHttpTimeout)
      );

      log.debug("Sent sync request to [%s]", logIdentity);

      Futures.addCallback(
          syncRequestFuture,
          new FutureCallback<InputStream>()
          {
            @Override
            public void onSuccess(InputStream stream)
            {
              synchronized (startStopLock) {
                if (!startStopLock.awaitStarted(1, TimeUnit.MILLISECONDS)) {
                  log.info("Skipping sync() success for server[%s].", logIdentity);
                  return;
                }

                try {
                  if (responseHandler.getStatus() == HttpServletResponse.SC_NO_CONTENT) {
                    log.debug("Received NO CONTENT from server[%s]", logIdentity);
                    lastSuccessfulSyncTime = System.currentTimeMillis();
                    return;
                  } else if (responseHandler.getStatus() != HttpServletResponse.SC_OK) {
                    handleFailure(new RE("Bad Sync Response."));
                    return;
                  }

                  log.debug("Received sync response from [%s]", logIdentity);

                  ChangeRequestsSnapshot<T> changes = smileMapper.readValue(stream, responseTypeReferences);

                  log.debug("Finished reading sync response from [%s]", logIdentity);

                  if (changes.isResetCounter()) {
                    log.info("[%s] requested resetCounter for reason [%s].", logIdentity, changes.getResetCause());
                    counter = null;
                    return;
                  }

                  if (counter == null) {
                    listener.fullSync(changes.getRequests());
                  } else {
                    listener.deltaSync(changes.getRequests());
                  }

                  counter = changes.getCounter();

                  if (initializationLatch.getCount() > 0) {
                    initializationLatch.countDown();
                    log.info("[%s] synced successfully for the first time.", logIdentity);
                  }

                  if (consecutiveFailedAttemptCount > 0) {
                    consecutiveFailedAttemptCount = 0;
                    log.info("[%s] synced successfully.", logIdentity);
                  }

                  lastSuccessfulSyncTime = System.currentTimeMillis();
                }
                catch (Exception ex) {
                  String logMsg = StringUtils.nonStrictFormat(
                      "Error processing sync response from [%s]. Reason [%s]",
                      logIdentity,
                      ex.getMessage()
                  );

                  if (incrementFailedAttemptAndCheckUnstabilityTimeout()) {
                    log.error(ex, logMsg);
                  } else {
                    log.info("Temporary Failure. %s", logMsg);
                    log.debug(ex, logMsg);
                  }
                }
                finally {
                  addNextSyncToWorkQueue();
                }
              }
            }

            @Override
            public void onFailure(Throwable t)
            {
              synchronized (startStopLock) {
                if (!startStopLock.awaitStarted(1, TimeUnit.MILLISECONDS)) {
                  log.info("Skipping sync() failure for URL[%s].", logIdentity);
                  return;
                }

                try {
                  handleFailure(t);
                }
                finally {
                  addNextSyncToWorkQueue();
                }
              }
            }

            private void handleFailure(Throwable t)
            {
              String logMsg = StringUtils.nonStrictFormat(
                  "failed to get sync response from [%s]. Return code [%s], Reason: [%s]",
                  logIdentity,
                  responseHandler.getStatus(),
                  responseHandler.getDescription()
              );

              if (incrementFailedAttemptAndCheckUnstabilityTimeout()) {
                log.error(t, logMsg);
              } else {
                log.info("Temporary Failure. %s", logMsg);
                log.debug(t, logMsg);
              }
            }
          },
          executor
      );
    }
    catch (Throwable th) {
      try {
        String logMsg = StringUtils.nonStrictFormat(
            "Fatal error while fetching segment list from [%s].", logIdentity
        );

        if (incrementFailedAttemptAndCheckUnstabilityTimeout()) {
          log.makeAlert(th, logMsg).emit();
        } else {
          log.info("Temporary Failure. %s", logMsg);
          log.debug(th, logMsg);
        }
      }
      finally {
        addNextSyncToWorkQueue();
      }
    }
  }

  private String getRequestString()
  {
    final String req;
    if (counter != null) {
      req = StringUtils.format(
          "%s?counter=%s&hash=%s&timeout=%s",
          baseRequestPath,
          counter.getCounter(),
          counter.getHash(),
          serverTimeoutMS
      );
    } else {
      req = StringUtils.format("%s?counter=-1&timeout=%s", baseRequestPath, serverTimeoutMS);
    }
    return req;
  }

  private void addNextSyncToWorkQueue()
  {
    synchronized (startStopLock) {
      if (!startStopLock.awaitStarted(1, TimeUnit.MILLISECONDS)) {
        log.info("Not scheduling sync for server[%s]. Instance stopped.", logIdentity);
        return;
      }

      try {
        if (consecutiveFailedAttemptCount > 0) {
          long sleepMillis = Math.min(
              MAX_RETRY_BACKOFF,
              RetryUtils.nextRetrySleepMillis(consecutiveFailedAttemptCount)
          );
          log.info("Scheduling next syncup in [%d] millis for server[%s].", sleepMillis, logIdentity);
          executor.schedule(this::sync, sleepMillis, TimeUnit.MILLISECONDS);
        } else {
          executor.execute(this::sync);
        }
      }
      catch (Throwable th) {
        if (executor.isShutdown()) {
          log.warn(
              th,
              "Couldn't schedule next sync. [%s] is not being synced any more, probably because executor is stopped.",
              logIdentity
          );
        } else {
          log.makeAlert(
              th,
              "Couldn't schedule next sync. [%s] is not being synced any more, restarting Druid process on that "
              + "server might fix the issue.",
              logIdentity
          ).emit();
        }
      }
    }
  }

  private boolean incrementFailedAttemptAndCheckUnstabilityTimeout()
  {
    if (consecutiveFailedAttemptCount > 0
        && (System.currentTimeMillis() - unstableStartTime) > serverUnstabilityTimeout) {
      return true;
    }

    if (consecutiveFailedAttemptCount++ == 0) {
      unstableStartTime = System.currentTimeMillis();
    }

    return false;
  }

  /**
   * Concurrency guarantees: all calls to {@link #fullSync} and {@link #deltaSync} (that is done within the {@link
   * #executor}) are linearizable.
   */
  public interface Listener<T>
  {
    /**
     * This method is called either if on the previous request the server had asked to reset the counter or it was the
     * first request to the server.
     */
    void fullSync(List<T> changes);
    void deltaSync(List<T> changes);
  }
}
