/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.server.federation.router;

import static org.apache.hadoop.hdfs.server.federation.router.FederationUtil.newActiveNamenodeResolver;
import static org.apache.hadoop.hdfs.server.federation.router.FederationUtil.newFileSubclusterResolver;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.server.federation.metrics.FederationMetrics;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.source.JvmMetrics;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.JvmPauseMonitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Router that provides a unified view of multiple federated HDFS clusters. It
 * has two main roles: (1) federated interface and (2) NameNode heartbeat.
 * <p>
 * For the federated interface, the Router receives a client request, checks the
 * State Store for the correct subcluster, and forwards the request to the
 * active Namenode of that subcluster. The reply from the Namenode then flows in
 * the opposite direction. The Routers are stateless and can be behind a load
 * balancer. HDFS clients connect to the router using the same interfaces as are
 * used to communicate with a namenode, namely the ClientProtocol RPC interface
 * and the WebHdfs HTTP interface exposed by the router. {@link RouterRpcServer}
 * {@link RouterHttpServer}
 * <p>
 * For NameNode heartbeat, the Router periodically checks the state of a
 * NameNode (usually on the same server) and reports their high availability
 * (HA) state and load/space status to the State Store. Note that this is an
 * optional role as a Router can be independent of any subcluster.
 * {@link StateStoreService} {@link NamenodeHeartbeatService}
 */
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class Router extends CompositeService {

  private static final Logger LOG = LoggerFactory.getLogger(Router.class);


  /** Configuration for the Router. */
  private Configuration conf;

  /** Router address/identifier. */
  private String routerId;

  /** RPC interface to the client. */
  private RouterRpcServer rpcServer;
  private InetSocketAddress rpcAddress;

  /** RPC interface for the admin. */
  private RouterAdminServer adminServer;
  private InetSocketAddress adminAddress;

  /** HTTP interface and web application. */
  private RouterHttpServer httpServer;

  /** Interface with the State Store. */
  private StateStoreService stateStore;

  /** Interface to map global name space to HDFS subcluster name spaces. */
  private FileSubclusterResolver subclusterResolver;

  /** Interface to identify the active NN for a nameservice or blockpool ID. */
  private ActiveNamenodeResolver namenodeResolver;
  /** Updates the namenode status in the namenode resolver. */
  private Collection<NamenodeHeartbeatService> namenodeHearbeatServices;

  /** Router metrics. */
  private RouterMetricsService metrics;

  /** JVM pauses (GC and others). */
  private JvmPauseMonitor pauseMonitor;



  /////////////////////////////////////////////////////////
  // Constructor
  /////////////////////////////////////////////////////////

  public Router() {
    super(Router.class.getName());
  }

  /////////////////////////////////////////////////////////
  // Service management
  /////////////////////////////////////////////////////////

  @Override
  protected void serviceInit(Configuration configuration) throws Exception {
    this.conf = configuration;

    if (conf.getBoolean(
        DFSConfigKeys.DFS_ROUTER_STORE_ENABLE,
        DFSConfigKeys.DFS_ROUTER_STORE_ENABLE_DEFAULT)) {
      // Service that maintains the State Store connection
      this.stateStore = new StateStoreService();
      addService(this.stateStore);
    }

    // Resolver to track active NNs
    this.namenodeResolver = newActiveNamenodeResolver(
        this.conf, this.stateStore);
    if (this.namenodeResolver == null) {
      throw new IOException("Cannot find namenode resolver.");
    }

    // Lookup interface to map between the global and subcluster name spaces
    this.subclusterResolver = newFileSubclusterResolver(this.conf, this);
    if (this.subclusterResolver == null) {
      throw new IOException("Cannot find subcluster resolver");
    }

    if (conf.getBoolean(
        DFSConfigKeys.DFS_ROUTER_RPC_ENABLE,
        DFSConfigKeys.DFS_ROUTER_RPC_ENABLE_DEFAULT)) {
      // Create RPC server
      this.rpcServer = createRpcServer();
      addService(this.rpcServer);
      this.setRpcServerAddress(rpcServer.getRpcAddress());
    }

    if (conf.getBoolean(
        DFSConfigKeys.DFS_ROUTER_ADMIN_ENABLE,
        DFSConfigKeys.DFS_ROUTER_ADMIN_ENABLE_DEFAULT)) {
      // Create admin server
      this.adminServer = createAdminServer();
      addService(this.adminServer);
    }

    if (conf.getBoolean(
        DFSConfigKeys.DFS_ROUTER_HTTP_ENABLE,
        DFSConfigKeys.DFS_ROUTER_HTTP_ENABLE_DEFAULT)) {
      // Create HTTP server
      this.httpServer = createHttpServer();
      addService(this.httpServer);
    }

    if (conf.getBoolean(
        DFSConfigKeys.DFS_ROUTER_HEARTBEAT_ENABLE,
        DFSConfigKeys.DFS_ROUTER_HEARTBEAT_ENABLE_DEFAULT)) {

      // Create status updater for each monitored Namenode
      this.namenodeHearbeatServices = createNamenodeHearbeatServices();
      for (NamenodeHeartbeatService hearbeatService :
          this.namenodeHearbeatServices) {
        addService(hearbeatService);
      }

      if (this.namenodeHearbeatServices.isEmpty()) {
        LOG.error("Heartbeat is enabled but there are no namenodes to monitor");
      }
    }

    // Router metrics system
    if (conf.getBoolean(
        DFSConfigKeys.DFS_ROUTER_METRICS_ENABLE,
        DFSConfigKeys.DFS_ROUTER_METRICS_ENABLE_DEFAULT)) {

      DefaultMetricsSystem.initialize("Router");

      this.metrics = new RouterMetricsService(this);
      addService(this.metrics);

      // JVM pause monitor
      this.pauseMonitor = new JvmPauseMonitor();
      this.pauseMonitor.init(conf);
    }

    super.serviceInit(conf);
  }

  @Override
  protected void serviceStart() throws Exception {

    if (this.pauseMonitor != null) {
      this.pauseMonitor.start();
      JvmMetrics jvmMetrics = this.metrics.getJvmMetrics();
      if (jvmMetrics != null) {
        jvmMetrics.setPauseMonitor(pauseMonitor);
      }
    }

    super.serviceStart();
  }

  @Override
  protected void serviceStop() throws Exception {

    // JVM pause monitor
    if (this.pauseMonitor != null) {
      this.pauseMonitor.stop();
    }

    super.serviceStop();
  }

  /**
   * Shutdown the router.
   */
  public void shutDown() {
    new Thread() {
      @Override
      public void run() {
        Router.this.stop();
      }
    }.start();
  }

  /////////////////////////////////////////////////////////
  // RPC Server
  /////////////////////////////////////////////////////////

  /**
   * Create a new Router RPC server to proxy ClientProtocol requests.
   *
   * @return New Router RPC Server.
   * @throws IOException If the router RPC server was not started.
   */
  protected RouterRpcServer createRpcServer() throws IOException {
    return new RouterRpcServer(this.conf, this, this.getNamenodeResolver(),
        this.getSubclusterResolver());
  }

  /**
   * Get the Router RPC server.
   *
   * @return Router RPC server.
   */
  public RouterRpcServer getRpcServer() {
    return this.rpcServer;
  }

  /**
   * Set the current RPC socket for the router.
   *
   * @param rpcAddress RPC address.
   */
  protected void setRpcServerAddress(InetSocketAddress address) {
    this.rpcAddress = address;

    // Use the RPC address as our unique router Id
    if (this.rpcAddress != null) {
      try {
        String hostname = InetAddress.getLocalHost().getHostName();
        setRouterId(hostname + ":" + this.rpcAddress.getPort());
      } catch (UnknownHostException ex) {
        LOG.error("Cannot set unique router ID, address not resolvable {}",
            this.rpcAddress);
      }
    }
  }

  /**
   * Get the current RPC socket address for the router.
   *
   * @return InetSocketAddress
   */
  public InetSocketAddress getRpcServerAddress() {
    return this.rpcAddress;
  }

  /////////////////////////////////////////////////////////
  // Admin server
  /////////////////////////////////////////////////////////

  /**
   * Create a new router admin server to handle the router admin interface.
   *
   * @return RouterAdminServer
   * @throws IOException If the admin server was not successfully started.
   */
  protected RouterAdminServer createAdminServer() throws IOException {
    return new RouterAdminServer(this.conf, this);
  }

  /**
   * Set the current Admin socket for the router.
   *
   * @param adminAddress Admin RPC address.
   */
  protected void setAdminServerAddress(InetSocketAddress address) {
    this.adminAddress = address;
  }

  /**
   * Get the current Admin socket address for the router.
   *
   * @return InetSocketAddress Admin address.
   */
  public InetSocketAddress getAdminServerAddress() {
    return adminAddress;
  }

  /////////////////////////////////////////////////////////
  // HTTP server
  /////////////////////////////////////////////////////////

  /**
   * Create an HTTP server for this Router.
   *
   * @return HTTP server for this Router.
   */
  protected RouterHttpServer createHttpServer() {
    return new RouterHttpServer(this);
  }

  /**
   * Get the current HTTP socket address for the router.
   *
   * @return InetSocketAddress HTTP address.
   */
  public InetSocketAddress getHttpServerAddress() {
    if (httpServer != null) {
      return httpServer.getHttpAddress();
    }
    return null;
  }

  /////////////////////////////////////////////////////////
  // Namenode heartbeat monitors
  /////////////////////////////////////////////////////////

  /**
   * Create each of the services that will monitor a Namenode.
   *
   * @return List of heartbeat services.
   */
  protected Collection<NamenodeHeartbeatService>
      createNamenodeHearbeatServices() {

    Map<String, NamenodeHeartbeatService> ret = new HashMap<>();

    if (conf.getBoolean(
        DFSConfigKeys.DFS_ROUTER_MONITOR_LOCAL_NAMENODE,
        DFSConfigKeys.DFS_ROUTER_MONITOR_LOCAL_NAMENODE_DEFAULT)) {
      // Create a local heartbet service
      NamenodeHeartbeatService localHeartbeatService =
          createLocalNamenodeHearbeatService();
      if (localHeartbeatService != null) {
        String nnDesc = localHeartbeatService.getNamenodeDesc();
        ret.put(nnDesc, localHeartbeatService);
      }
    }

    // Create heartbeat services for a list specified by the admin
    String namenodes = this.conf.get(
        DFSConfigKeys.DFS_ROUTER_MONITOR_NAMENODE);
    if (namenodes != null) {
      for (String namenode : namenodes.split(",")) {
        String[] namenodeSplit = namenode.split("\\.");
        String nsId = null;
        String nnId = null;
        if (namenodeSplit.length == 2) {
          nsId = namenodeSplit[0];
          nnId = namenodeSplit[1];
        } else if (namenodeSplit.length == 1) {
          nsId = namenode;
        } else {
          LOG.error("Wrong Namenode to monitor: {}", namenode);
        }
        if (nsId != null) {
          NamenodeHeartbeatService heartbeatService =
              createNamenodeHearbeatService(nsId, nnId);
          if (heartbeatService != null) {
            ret.put(heartbeatService.getNamenodeDesc(), heartbeatService);
          }
        }
      }
    }

    return ret.values();
  }

  /**
   * Create a new status updater for the local Namenode.
   *
   * @return Updater of the status for the local Namenode.
   */
  protected NamenodeHeartbeatService createLocalNamenodeHearbeatService() {
    // Detect NN running in this machine
    String nsId = DFSUtil.getNamenodeNameServiceId(conf);
    String nnId = null;
    if (HAUtil.isHAEnabled(conf, nsId)) {
      nnId = HAUtil.getNameNodeId(conf, nsId);
      if (nnId == null) {
        LOG.error("Cannot find namenode id for local {}", nsId);
      }
    }

    return createNamenodeHearbeatService(nsId, nnId);
  }

  /**
   * Create a heartbeat monitor for a particular Namenode.
   *
   * @param nsId Identifier of the nameservice to monitor.
   * @param nnId Identifier of the namenode (HA) to monitor.
   * @return Updater of the status for the specified Namenode.
   */
  protected NamenodeHeartbeatService createNamenodeHearbeatService(
      String nsId, String nnId) {

    LOG.info("Creating heartbeat service for Namenode {} in {}", nnId, nsId);
    NamenodeHeartbeatService ret = new NamenodeHeartbeatService(
        namenodeResolver, nsId, nnId);
    return ret;
  }

  /////////////////////////////////////////////////////////
  // Submodule getters
  /////////////////////////////////////////////////////////

  /**
   * Get the State Store service.
   *
   * @return State Store service.
   */
  public StateStoreService getStateStore() {
    return this.stateStore;
  }

  /**
   * Get the metrics system for the Router.
   *
   * @return Router metrics.
   */
  public RouterMetrics getRouterMetrics() {
    if (this.metrics != null) {
      return this.metrics.getRouterMetrics();
    }
    return null;
  }

  /**
   * Get the federation metrics.
   *
   * @return Federation metrics.
   */
  public FederationMetrics getMetrics() {
    if (this.metrics != null) {
      return this.metrics.getFederationMetrics();
    }
    return null;
  }

  /**
   * Get the subcluster resolver for files.
   *
   * @return Subcluster resolver for files.
   */
  public FileSubclusterResolver getSubclusterResolver() {
    return this.subclusterResolver;
  }

  /**
   * Get the namenode resolver for a subcluster.
   *
   * @return The namenode resolver for a subcluster.
   */
  public ActiveNamenodeResolver getNamenodeResolver() {
    return this.namenodeResolver;
  }

  /////////////////////////////////////////////////////////
  // Router info
  /////////////////////////////////////////////////////////

  /**
   * Unique ID for the router, typically the hostname:port string for the
   * router's RPC server. This ID may be null on router startup before the RPC
   * server has bound to a port.
   *
   * @return Router identifier.
   */
  public String getRouterId() {
    return this.routerId;
  }

  /**
   * Sets a unique ID for this router.
   *
   * @param router Identifier of the Router.
   */
  public void setRouterId(String id) {
    this.routerId = id;
    if (this.stateStore != null) {
      this.stateStore.setIdentifier(this.routerId);
    }
    if (this.namenodeResolver != null) {
      this.namenodeResolver.setRouterId(this.routerId);
    }
  }
}
