/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.server.namenode.ha;

import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.URI;
import java.util.concurrent.TimeUnit;
import java.util.List;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.hdfs.ClientGSIContext;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.io.retry.AtMostOnce;
import org.apache.hadoop.io.retry.Idempotent;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
import org.apache.hadoop.ipc.AlignmentContext;
import org.apache.hadoop.ipc.Client.ConnectionId;
import org.apache.hadoop.ipc.ObserverRetryOnActiveException;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.RpcInvocationHandler;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.annotations.VisibleForTesting;

/**
 * A {@link org.apache.hadoop.io.retry.FailoverProxyProvider} implementation
 * that supports reading from observer namenode(s).
 *
 * This constructs a wrapper proxy that sends the request to observer
 * namenode(s), if observer read is enabled. In case there are multiple
 * observer namenodes, it will try them one by one in case the RPC failed. It
 * will fail back to the active namenode after it has exhausted all the
 * observer namenodes.
 *
 * Read and write requests will still be sent to active NN if reading from
 * observer is turned off.
 */
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class ObserverReadProxyProvider<T>
    extends AbstractNNFailoverProxyProvider<T> {
  @VisibleForTesting
  static final Logger LOG = LoggerFactory.getLogger(
      ObserverReadProxyProvider.class);

  /** Configuration key for {@link #autoMsyncPeriodMs}. */
  static final String AUTO_MSYNC_PERIOD_KEY_PREFIX =
      HdfsClientConfigKeys.Failover.PREFIX + "observer.auto-msync-period";
  /** Auto-msync disabled by default. */
  static final long AUTO_MSYNC_PERIOD_DEFAULT = -1;

  /** Client-side context for syncing with the NameNode server side. */
  private final AlignmentContext alignmentContext;

  /** The inner proxy provider used for active/standby failover. */
  private final AbstractNNFailoverProxyProvider<T> failoverProxy;
  /** List of all NameNode proxies. */
  private final List<NNProxyInfo<T>> nameNodeProxies;

  /** The policy used to determine if an exception is fatal or retriable. */
  private final RetryPolicy observerRetryPolicy;
  /** The combined proxy which redirects to other proxies as necessary. */
  private final ProxyInfo<T> combinedProxy;

  /**
   * Whether reading from observer is enabled. If this is false, all read
   * requests will still go to active NN.
   */
  private boolean observerReadEnabled;

  /**
   * This adjusts how frequently this proxy provider should auto-msync to the
   * Active NameNode, automatically performing an msync() call to the active
   * to fetch the current transaction ID before submitting read requests to
   * observer nodes. See HDFS-14211 for more description of this feature.
   * If this is below 0, never auto-msync. If this is 0, perform an msync on
   * every read operation. If this is above 0, perform an msync after this many
   * ms have elapsed since the last msync.
   */
  private final long autoMsyncPeriodMs;

  /**
   * The time, in millisecond epoch, that the last msync operation was
   * performed. This includes any implicit msync (any operation which is
   * serviced by the Active NameNode).
   */
  private volatile long lastMsyncTimeMs = -1;

  /**
   * A client using an ObserverReadProxyProvider should first sync with the
   * active NameNode on startup. This ensures that the client reads data which
   * is consistent with the state of the world as of the time of its
   * instantiation. This variable will be true after this initial sync has
   * been performed.
   */
  private volatile boolean msynced = false;

  /**
   * The index into the nameNodeProxies list currently being used. Should only
   * be accessed in synchronized methods.
   */
  private int currentIndex = -1;

  /**
   * The proxy being used currently. Should only be accessed in synchronized
   * methods.
   */
  private NNProxyInfo<T> currentProxy;

  /** The last proxy that has been used. Only used for testing. */
  private volatile ProxyInfo<T> lastProxy = null;

  /**
   * By default ObserverReadProxyProvider uses
   * {@link ConfiguredFailoverProxyProvider} for failover.
   */
  public ObserverReadProxyProvider(
      Configuration conf, URI uri, Class<T> xface, HAProxyFactory<T> factory) {
    this(conf, uri, xface, factory,
        new ConfiguredFailoverProxyProvider<>(conf, uri, xface, factory));
  }

  @SuppressWarnings("unchecked")
  public ObserverReadProxyProvider(
      Configuration conf, URI uri, Class<T> xface, HAProxyFactory<T> factory,
      AbstractNNFailoverProxyProvider<T> failoverProxy) {
    super(conf, uri, xface, factory);
    this.failoverProxy = failoverProxy;
    this.alignmentContext = new ClientGSIContext();
    factory.setAlignmentContext(alignmentContext);

    // Don't bother configuring the number of retries and such on the retry
    // policy since it is mainly only used for determining whether or not an
    // exception is retriable or fatal
    observerRetryPolicy = RetryPolicies.failoverOnNetworkException(
        RetryPolicies.TRY_ONCE_THEN_FAIL, 1);

    // Get all NameNode proxies
    nameNodeProxies = getProxyAddresses(uri,
        HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY);

    // Create a wrapped proxy containing all the proxies. Since this combined
    // proxy is just redirecting to other proxies, all invocations can share it.
    StringBuilder combinedInfo = new StringBuilder("[");
    for (int i = 0; i < nameNodeProxies.size(); i++) {
      if (i > 0) {
        combinedInfo.append(",");
      }
      combinedInfo.append(nameNodeProxies.get(i).proxyInfo);
    }
    combinedInfo.append(']');
    T wrappedProxy = (T) Proxy.newProxyInstance(
        ObserverReadInvocationHandler.class.getClassLoader(),
        new Class<?>[] {xface}, new ObserverReadInvocationHandler());
    combinedProxy = new ProxyInfo<>(wrappedProxy, combinedInfo.toString());

    autoMsyncPeriodMs = conf.getTimeDuration(
        // The host of the URI is the nameservice ID
        AUTO_MSYNC_PERIOD_KEY_PREFIX + "." + uri.getHost(),
        AUTO_MSYNC_PERIOD_DEFAULT, TimeUnit.MILLISECONDS);

    // TODO : make this configurable or remove this variable
    if (wrappedProxy instanceof ClientProtocol) {
      this.observerReadEnabled = true;
    } else {
      LOG.info("Disabling observer reads for {} because the requested proxy "
          + "class does not implement {}", uri, ClientProtocol.class.getName());
      this.observerReadEnabled = false;
    }
  }

  public AlignmentContext getAlignmentContext() {
    return alignmentContext;
  }

  @Override
  public ProxyInfo<T> getProxy() {
    return combinedProxy;
  }

  @Override
  public void performFailover(T currentProxy) {
    failoverProxy.performFailover(currentProxy);
  }

  /**
   * Check if a method is read-only.
   *
   * @return whether the 'method' is a read-only operation.
   */
  private static boolean isRead(Method method) {
    if (!method.isAnnotationPresent(ReadOnly.class)) {
      return false;
    }
    return !method.getAnnotation(ReadOnly.class).activeOnly();
  }

  @VisibleForTesting
  void setObserverReadEnabled(boolean flag) {
    this.observerReadEnabled = flag;
  }

  @VisibleForTesting
  ProxyInfo<T> getLastProxy() {
    return lastProxy;
  }

  /**
   * Return the currently used proxy. If there is none, first calls
   * {@link #changeProxy(NNProxyInfo)} to initialize one.
   */
  private NNProxyInfo<T> getCurrentProxy() {
    return changeProxy(null);
  }

  /**
   * Move to the next proxy in the proxy list. If the NNProxyInfo supplied by
   * the caller does not match the current proxy, the call is ignored; this is
   * to handle concurrent calls (to avoid changing the proxy multiple times).
   * The service state of the newly selected proxy will be updated before
   * returning.
   *
   * @param initial The expected current proxy
   * @return The new proxy that should be used.
   */
  private synchronized NNProxyInfo<T> changeProxy(NNProxyInfo<T> initial) {
    if (currentProxy != initial) {
      // Must have been a concurrent modification; ignore the move request
      return currentProxy;
    }
    currentIndex = (currentIndex + 1) % nameNodeProxies.size();
    currentProxy = createProxyIfNeeded(nameNodeProxies.get(currentIndex));
    currentProxy.setCachedState(getHAServiceState(currentProxy));
    LOG.debug("Changed current proxy from {} to {}",
        initial == null ? "none" : initial.proxyInfo,
        currentProxy.proxyInfo);
    return currentProxy;
  }

  /**
   * Fetch the service state from a proxy. If it is unable to be fetched,
   * assume it is in standby state, but log the exception.
   */
  private HAServiceState getHAServiceState(NNProxyInfo<T> proxyInfo) {
    IOException ioe;
    try {
      return getProxyAsClientProtocol(proxyInfo.proxy).getHAServiceState();
    } catch (RemoteException re) {
      // Though a Standby will allow a getHAServiceState call, it won't allow
      // delegation token lookup, so if DT is used it throws StandbyException
      if (re.unwrapRemoteException() instanceof StandbyException) {
        LOG.debug("NameNode {} threw StandbyException when fetching HAState",
            proxyInfo.getAddress());
        return HAServiceState.STANDBY;
      }
      ioe = re;
    } catch (IOException e) {
      ioe = e;
    }
    if (LOG.isDebugEnabled()) {
      LOG.debug("Failed to connect to {} while fetching HAServiceState",
          proxyInfo.getAddress(), ioe);
    }
    return null;
  }

  /**
   * Return the input proxy, cast as a {@link ClientProtocol}. This catches any
   * {@link ClassCastException} and wraps it in a more helpful message. This
   * should ONLY be called if the caller is certain that the proxy is, in fact,
   * a {@link ClientProtocol}.
   */
  private ClientProtocol getProxyAsClientProtocol(T proxy) {
    assert proxy instanceof ClientProtocol : "BUG: Attempted to use proxy "
        + "of class " + proxy.getClass() + " as if it was a ClientProtocol.";
    return (ClientProtocol) proxy;
  }

  /**
   * This will call {@link ClientProtocol#msync()} on the active NameNode
   * (via the {@link #failoverProxy}) to initialize the state of this client.
   * Calling it multiple times is a no-op; only the first will perform an
   * msync.
   *
   * @see #msynced
   */
  private synchronized void initializeMsync() throws IOException {
    if (msynced) {
      return; // No need for an msync
    }
    getProxyAsClientProtocol(failoverProxy.getProxy().proxy).msync();
    msynced = true;
    lastMsyncTimeMs = Time.monotonicNow();
  }

  /**
   * This will call {@link ClientProtocol#msync()} on the active NameNode
   * (via the {@link #failoverProxy}) to update the state of this client, only
   * if at least {@link #autoMsyncPeriodMs} ms has elapsed since the last time
   * an msync was performed.
   *
   * @see #autoMsyncPeriodMs
   */
  private void autoMsyncIfNecessary() throws IOException {
    if (autoMsyncPeriodMs == 0) {
      // Always msync
      getProxyAsClientProtocol(failoverProxy.getProxy().proxy).msync();
    } else if (autoMsyncPeriodMs > 0) {
      if (Time.monotonicNow() - lastMsyncTimeMs > autoMsyncPeriodMs) {
        synchronized (this) {
          // Use a synchronized block so that only one thread will msync
          // if many operations are submitted around the same time.
          // Re-check the entry criterion since the status may have changed
          // while waiting for the lock.
          if (Time.monotonicNow() - lastMsyncTimeMs > autoMsyncPeriodMs) {
            getProxyAsClientProtocol(failoverProxy.getProxy().proxy).msync();
            lastMsyncTimeMs = Time.monotonicNow();
          }
        }
      }
    }
  }

  /**
   * An InvocationHandler to handle incoming requests. This class's invoke
   * method contains the primary logic for redirecting to observers.
   *
   * If observer reads are enabled, attempt to send read operations to the
   * current proxy. If it is not an observer, or the observer fails, adjust
   * the current proxy and retry on the next one. If all proxies are tried
   * without success, the request is forwarded to the active.
   *
   * Write requests are always forwarded to the active.
   */
  private class ObserverReadInvocationHandler implements RpcInvocationHandler {

    @Override
    public Object invoke(Object proxy, final Method method, final Object[] args)
        throws Throwable {
      lastProxy = null;
      Object retVal;

      if (observerReadEnabled && isRead(method)) {
        if (!msynced) {
          // An msync() must first be performed to ensure that this client is
          // up-to-date with the active's state. This will only be done once.
          initializeMsync();
        } else {
          autoMsyncIfNecessary();
        }

        int failedObserverCount = 0;
        int activeCount = 0;
        int standbyCount = 0;
        int unreachableCount = 0;
        for (int i = 0; i < nameNodeProxies.size(); i++) {
          NNProxyInfo<T> current = getCurrentProxy();
          HAServiceState currState = current.getCachedState();
          if (currState != HAServiceState.OBSERVER) {
            if (currState == HAServiceState.ACTIVE) {
              activeCount++;
            } else if (currState == HAServiceState.STANDBY) {
              standbyCount++;
            } else if (currState == null) {
              unreachableCount++;
            }
            LOG.debug("Skipping proxy {} for {} because it is in state {}",
                current.proxyInfo, method.getName(),
                currState == null ? "unreachable" : currState);
            changeProxy(current);
            continue;
          }
          LOG.debug("Attempting to service {} using proxy {}",
              method.getName(), current.proxyInfo);
          try {
            retVal = method.invoke(current.proxy, args);
            lastProxy = current;
            LOG.debug("Invocation of {} using {} was successful",
                method.getName(), current.proxyInfo);
            return retVal;
          } catch (InvocationTargetException ite) {
            if (!(ite.getCause() instanceof Exception)) {
              throw ite.getCause();
            }
            Exception e = (Exception) ite.getCause();
            if (e instanceof RemoteException) {
              RemoteException re = (RemoteException) e;
              Exception unwrapped = re.unwrapRemoteException(
                  ObserverRetryOnActiveException.class);
              if (unwrapped instanceof ObserverRetryOnActiveException) {
                LOG.info("Encountered ObserverRetryOnActiveException from {}." +
                    " Retry active namenode directly.", current.proxyInfo);
                break;
              }
            }
            RetryAction retryInfo = observerRetryPolicy.shouldRetry(e, 0, 0,
                method.isAnnotationPresent(Idempotent.class)
                    || method.isAnnotationPresent(AtMostOnce.class));
            if (retryInfo.action == RetryAction.RetryDecision.FAIL) {
              throw e;
            } else {
              failedObserverCount++;
              LOG.warn(
                  "Invocation returned exception on [{}]; {} failure(s) so far",
                  current.proxyInfo, failedObserverCount, e);
              changeProxy(current);
            }
          }
        }

        // Only log message if there are actual observer failures.
        // Getting here with failedObserverCount = 0 could
        // be that there is simply no Observer node running at all.
        if (failedObserverCount > 0) {
          // If we get here, it means all observers have failed.
          LOG.warn("{} observers have failed for read request {}; also found "
                  + "{} standby, {} active, and {} unreachable. Falling back "
                  + "to active.", failedObserverCount, method.getName(),
              standbyCount, activeCount, unreachableCount);
        } else {
          if (LOG.isDebugEnabled()) {
            LOG.debug("Read falling back to active without observer read "
                + "fail, is there no observer node running?");
          }
        }
      }

      // Either all observers have failed, observer reads are disabled,
      // or this is a write request. In any case, forward the request to
      // the active NameNode.
      LOG.debug("Using failoverProxy to service {}", method.getName());
      ProxyInfo<T> activeProxy = failoverProxy.getProxy();
      try {
        retVal = method.invoke(activeProxy.proxy, args);
      } catch (InvocationTargetException e) {
        // This exception will be handled by higher layers
        throw e.getCause();
      }
      // If this was reached, the request reached the active, so the
      // state is up-to-date with active and no further msync is needed.
      msynced = true;
      lastMsyncTimeMs = Time.monotonicNow();
      lastProxy = activeProxy;
      return retVal;
    }

    @Override
    public void close() throws IOException {}

    @Override
    public ConnectionId getConnectionId() {
      return RPC.getConnectionIdForProxy(observerReadEnabled
          ? getCurrentProxy().proxy : failoverProxy.getProxy().proxy);
    }
  }

  @Override
  public synchronized void close() throws IOException {
    for (ProxyInfo<T> pi : nameNodeProxies) {
      if (pi.proxy != null) {
        if (pi.proxy instanceof Closeable) {
          ((Closeable)pi.proxy).close();
        } else {
          RPC.stopProxy(pi.proxy);
        }
        // Set to null to avoid the failoverProxy having to re-do the close
        // if it is sharing a proxy instance
        pi.proxy = null;
      }
    }
    failoverProxy.close();
  }

  @Override
  public boolean useLogicalURI() {
    return failoverProxy.useLogicalURI();
  }
}
