/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.server.federation.store;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import javax.management.NotCompliantMBeanException;
import javax.management.ObjectName;
import javax.management.StandardMBean;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMBean;
import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMetrics;
import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
import org.apache.hadoop.hdfs.server.federation.store.impl.MembershipStoreImpl;
import org.apache.hadoop.hdfs.server.federation.store.impl.MountTableStoreImpl;
import org.apache.hadoop.hdfs.server.federation.store.impl.RouterStoreImpl;
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
import org.apache.hadoop.hdfs.server.federation.store.records.RouterState;
import org.apache.hadoop.metrics2.MetricsException;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.annotations.VisibleForTesting;

/**
 * A service to initialize a
 * {@link org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver
 * StateStoreDriver} and maintain the connection to the data store. There are
 * multiple state store driver connections supported:
 * <ul>
 * <li>File
 * {@link org.apache.hadoop.hdfs.server.federation.store.driver.impl.
 * StateStoreFileImpl StateStoreFileImpl}
 * <li>ZooKeeper
 * {@link org.apache.hadoop.hdfs.server.federation.store.driver.impl.
 * StateStoreZooKeeperImpl StateStoreZooKeeperImpl}
 * </ul>
 * <p>
 * The service also supports the dynamic registration of record stores like:
 * <ul>
 * <li>{@link MembershipStore}: state of the Namenodes in the
 * federation.
 * <li>{@link MountTableStore}: Mount table between to subclusters.
 * See {@link org.apache.hadoop.fs.viewfs.ViewFs ViewFs}.
 * <li>{@link RebalancerStore}: Log of the rebalancing operations.
 * <li>{@link RouterStore}: Router state in the federation.
 * <li>{@link TokenStore}: Tokens in the federation.
 * </ul>
 */
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class StateStoreService extends CompositeService {

  private static final Logger LOG =
      LoggerFactory.getLogger(StateStoreService.class);


  /** State Store configuration. */
  private Configuration conf;

  /** Identifier for the service. */
  private String identifier;

  /** Driver for the back end connection. */
  private StateStoreDriver driver;

  /** Service to maintain data store connection. */
  private StateStoreConnectionMonitorService monitorService;

  /** StateStore metrics. */
  private StateStoreMetrics metrics;

  /** Supported record stores. */
  private final Map<
      Class<? extends BaseRecord>, RecordStore<? extends BaseRecord>>
          recordStores;

  /** Service to maintain State Store caches. */
  private StateStoreCacheUpdateService cacheUpdater;
  /** Time the cache was last successfully updated. */
  private long cacheLastUpdateTime;
  /** List of internal caches to update. */
  private final List<StateStoreCache> cachesToUpdateInternal;
  /** List of external caches to update. */
  private final List<StateStoreCache> cachesToUpdateExternal;


  public StateStoreService() {
    super(StateStoreService.class.getName());

    // Records and stores supported by this implementation
    this.recordStores = new HashMap<>();

    // Caches to maintain
    this.cachesToUpdateInternal = new ArrayList<>();
    this.cachesToUpdateExternal = new ArrayList<>();
  }

  /**
   * Initialize the State Store and the connection to the backend.
   *
   * @param config Configuration for the State Store.
   * @throws IOException
   */
  @Override
  protected void serviceInit(Configuration config) throws Exception {
    this.conf = config;

    // Create implementation of State Store
    Class<? extends StateStoreDriver> driverClass = this.conf.getClass(
        RBFConfigKeys.FEDERATION_STORE_DRIVER_CLASS,
        RBFConfigKeys.FEDERATION_STORE_DRIVER_CLASS_DEFAULT,
        StateStoreDriver.class);
    this.driver = ReflectionUtils.newInstance(driverClass, this.conf);

    if (this.driver == null) {
      throw new IOException("Cannot create driver for the State Store");
    }

    // Add supported record stores
    addRecordStore(MembershipStoreImpl.class);
    addRecordStore(MountTableStoreImpl.class);
    addRecordStore(RouterStoreImpl.class);

    // Check the connection to the State Store periodically
    this.monitorService = new StateStoreConnectionMonitorService(this);
    this.addService(monitorService);

    // Set expirations intervals for each record
    MembershipState.setExpirationMs(conf.getLong(
        RBFConfigKeys.FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS,
        RBFConfigKeys.FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS_DEFAULT));

    RouterState.setExpirationMs(conf.getTimeDuration(
        RBFConfigKeys.FEDERATION_STORE_ROUTER_EXPIRATION_MS,
        RBFConfigKeys.FEDERATION_STORE_ROUTER_EXPIRATION_MS_DEFAULT,
        TimeUnit.MILLISECONDS));

    // Cache update service
    this.cacheUpdater = new StateStoreCacheUpdateService(this);
    addService(this.cacheUpdater);

    // Create metrics for the State Store
    this.metrics = StateStoreMetrics.create(conf);

    // Adding JMX interface
    try {
      StandardMBean bean = new StandardMBean(metrics, StateStoreMBean.class);
      ObjectName registeredObject =
          MBeans.register("Router", "StateStore", bean);
      LOG.info("Registered StateStoreMBean: {}", registeredObject);
    } catch (NotCompliantMBeanException e) {
      throw new RuntimeException("Bad StateStoreMBean setup", e);
    } catch (MetricsException e) {
      LOG.info("Failed to register State Store bean {}", e.getMessage());
    }

    super.serviceInit(this.conf);
  }

  @Override
  protected void serviceStart() throws Exception {
    loadDriver();
    super.serviceStart();
  }

  @Override
  protected void serviceStop() throws Exception {
    closeDriver();

    if (metrics != null) {
      metrics.shutdown();
      metrics = null;
    }

    super.serviceStop();
  }

  /**
   * Add a record store to the State Store. It includes adding the store, the
   * supported record and the cache management.
   *
   * @param clazz Class of the record store to track.
   * @return New record store.
   * @throws ReflectiveOperationException
   */
  private <T extends RecordStore<?>> void addRecordStore(
      final Class<T> clazz) throws ReflectiveOperationException {

    assert this.getServiceState() == STATE.INITED :
        "Cannot add record to the State Store once started";

    T recordStore = RecordStore.newInstance(clazz, this.getDriver());
    Class<? extends BaseRecord> recordClass = recordStore.getRecordClass();
    this.recordStores.put(recordClass, recordStore);

    // Subscribe for cache updates
    if (recordStore instanceof StateStoreCache) {
      StateStoreCache cachedRecordStore = (StateStoreCache) recordStore;
      this.cachesToUpdateInternal.add(cachedRecordStore);
    }
  }

  /**
   * Get the record store in this State Store for a given interface.
   *
   * @param recordStoreClass Class of the record store.
   * @return Registered record store or null if not found.
   */
  public <T extends RecordStore<?>> T getRegisteredRecordStore(
      final Class<T> recordStoreClass) {
    for (RecordStore<? extends BaseRecord> recordStore :
        this.recordStores.values()) {
      if (recordStoreClass.isInstance(recordStore)) {
        @SuppressWarnings("unchecked")
        T recordStoreChecked = (T) recordStore;
        return recordStoreChecked;
      }
    }
    return null;
  }

  /**
   * List of records supported by this State Store.
   *
   * @return List of supported record classes.
   */
  public Collection<Class<? extends BaseRecord>> getSupportedRecords() {
    return this.recordStores.keySet();
  }

  /**
   * Load the State Store driver. If successful, refresh cached data tables.
   */
  public void loadDriver() {
    synchronized (this.driver) {
      if (!isDriverReady()) {
        String driverName = this.driver.getClass().getSimpleName();
        if (this.driver.init(
            conf, getIdentifier(), getSupportedRecords(), metrics)) {
          LOG.info("Connection to the State Store driver {} is open and ready",
              driverName);
          this.refreshCaches();
        } else {
          LOG.error("Cannot initialize State Store driver {}", driverName);
        }
      }
    }
  }

  /**
   * Check if the driver is ready to be used.
   *
   * @return If the driver is ready.
   */
  public boolean isDriverReady() {
    return this.driver.isDriverReady();
  }

  /**
   * Manually shuts down the driver.
   *
   * @throws Exception If the driver cannot be closed.
   */
  @VisibleForTesting
  public void closeDriver() throws Exception {
    if (this.driver != null) {
      this.driver.close();
    }
  }

  /**
   * Get the state store driver.
   *
   * @return State store driver.
   */
  public StateStoreDriver getDriver() {
    return this.driver;
  }

  /**
   * Fetch a unique identifier for this state store instance. Typically it is
   * the address of the router.
   *
   * @return Unique identifier for this store.
   */
  public String getIdentifier() {
    return this.identifier;
  }

  /**
   * Set a unique synchronization identifier for this store.
   *
   * @param id Unique identifier, typically the router's RPC address.
   */
  public void setIdentifier(String id) {
    this.identifier = id;
  }

  //
  // Cached state store data
  //
  /**
   * The last time the state store cache was fully updated.
   *
   * @return Timestamp.
   */
  public long getCacheUpdateTime() {
    return this.cacheLastUpdateTime;
  }

  /**
   * Stops the cache update service.
   */
  @VisibleForTesting
  public void stopCacheUpdateService() {
    if (this.cacheUpdater != null) {
      this.cacheUpdater.stop();
      removeService(this.cacheUpdater);
      this.cacheUpdater = null;
    }
  }

  /**
   * Register a cached record store for automatic periodic cache updates.
   *
   * @param client Client to the state store.
   */
  public void registerCacheExternal(StateStoreCache client) {
    this.cachesToUpdateExternal.add(client);
  }

  /**
   * Refresh the cache with information from the State Store. Called
   * periodically by the CacheUpdateService to maintain data caches and
   * versions.
   */
  public void refreshCaches() {
    refreshCaches(false);
  }

  /**
   * Refresh the cache with information from the State Store. Called
   * periodically by the CacheUpdateService to maintain data caches and
   * versions.
   * @param force If we force the refresh.
   */
  public void refreshCaches(boolean force) {
    boolean success = true;
    if (isDriverReady()) {
      List<StateStoreCache> cachesToUpdate = new LinkedList<>();
      cachesToUpdate.addAll(cachesToUpdateInternal);
      cachesToUpdate.addAll(cachesToUpdateExternal);
      for (StateStoreCache cachedStore : cachesToUpdate) {
        String cacheName = cachedStore.getClass().getSimpleName();
        boolean result = false;
        try {
          result = cachedStore.loadCache(force);
        } catch (IOException e) {
          LOG.error("Error updating cache for {}", cacheName, e);
          result = false;
        }
        if (!result) {
          success = false;
          LOG.error("Cache update failed for cache {}", cacheName);
        }
      }
    } else {
      success = false;
      LOG.info("Skipping State Store cache update, driver is not ready.");
    }
    if (success) {
      // Uses local time, not driver time.
      this.cacheLastUpdateTime = Time.now();
    }
  }

  /**
   * Update the cache for a specific record store.
   *
   * @param clazz Class of the record store.
   * @return If the cached was loaded.
   * @throws IOException if the cache update failed.
   */
  public boolean loadCache(final Class<?> clazz) throws IOException {
    return loadCache(clazz, false);
  }

  /**
   * Update the cache for a specific record store.
   *
   * @param clazz Class of the record store.
   * @param force Force the update ignoring cached periods.
   * @return If the cached was loaded.
   * @throws IOException if the cache update failed.
   */
  public boolean loadCache(Class<?> clazz, boolean force) throws IOException {
    List<StateStoreCache> cachesToUpdate =
        new LinkedList<StateStoreCache>();
    cachesToUpdate.addAll(this.cachesToUpdateInternal);
    cachesToUpdate.addAll(this.cachesToUpdateExternal);
    for (StateStoreCache cachedStore : cachesToUpdate) {
      if (clazz.isInstance(cachedStore)) {
        return cachedStore.loadCache(force);
      }
    }
    throw new IOException("Registered cache was not found for " + clazz);
  }

  /**
   * Get the metrics for the State Store.
   *
   * @return State Store metrics.
   */
  public StateStoreMetrics getMetrics() {
    return metrics;
  }

}