/**
 * Copyright 2008 The Apache Software Foundation
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hbase;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.security.UnixUserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation;

/**
 * This class creates a single process HBase cluster.
 * each server.  The master uses the 'default' FileSystem.  The RegionServers,
 * if we are running on DistributedFilesystem, create a FileSystem instance
 * each and will close down their instance on the way out.
 */
public class MiniHBaseCluster {
  static final Log LOG = LogFactory.getLog(MiniHBaseCluster.class.getName());
  private Configuration conf;
  public LocalHBaseCluster hbaseCluster;
  // Cache this.  For some reason only works first time I get it.  TODO: Figure
  // out why.
  private final static UserGroupInformation UGI;
  static {
    UGI = UserGroupInformation.getCurrentUGI();
  }

  static long PREFERRED_ASSIGNMENT = 1000L;
  static long WAIT_FOR_LOADBALANCER = 2000L;
  /**
   * Start a MiniHBaseCluster.
   * @param conf Configuration to be used for cluster
   * @param numRegionServers initial number of region servers to start.
   * @throws IOException
   */
  public MiniHBaseCluster(Configuration conf, int numRegionServers)
  throws IOException, InterruptedException {
    this(conf, 1, numRegionServers);
  }

  /**
   * Start a MiniHBaseCluster.
   * @param conf Configuration to be used for cluster
   * @param numMasters initial number of masters to start.
   * @param numRegionServers initial number of region servers to start.
   * @throws IOException
   */
  public MiniHBaseCluster(Configuration conf, int numMasters,
      int numRegionServers)
  throws IOException, InterruptedException {
    this.conf = conf;
    conf.set(HConstants.MASTER_PORT, "0");
    conf.setLong("hbase.master.applyPreferredAssignment.period",
        PREFERRED_ASSIGNMENT);
    conf.setLong("hbase.master.holdRegionForBestLocality.period",
        PREFERRED_ASSIGNMENT / 5);
    init(numMasters, numRegionServers);
  }

  /**
   * Override Master so can add inject behaviors testing.
   */
  public static class MiniHBaseClusterMaster extends HMaster {
    private final Map<HServerInfo, List<HMsg>> messages =
      new ConcurrentHashMap<HServerInfo, List<HMsg>>();

    private final Map<HServerInfo, IOException> exceptions =
      new ConcurrentHashMap<HServerInfo, IOException>();

    public MiniHBaseClusterMaster(final Configuration conf)
    throws IOException {
      super(conf);
    }

    /**
     * Add a message to send to a regionserver next time it checks in.
     * @param hsi RegionServer's HServerInfo.
     * @param msg Message to add.
     */
    void addMessage(final HServerInfo hsi, HMsg msg) {
      synchronized(this.messages) {
        List<HMsg> hmsgs = this.messages.get(hsi);
        if (hmsgs == null) {
          hmsgs = new ArrayList<HMsg>();
          this.messages.put(hsi, hmsgs);
        }
        hmsgs.add(msg);
      }
    }

    void addException(final HServerInfo hsi, final IOException ex) {
      this.exceptions.put(hsi, ex);
    }

    /**
     * This implementation is special, exceptions will be treated first and
     * message won't be sent back to the region servers even if some are
     * specified.
     * @param hsi the rs
     * @param msgs Messages to add to
     * @return
     * @throws IOException will be throw if any added for this region server
     */
    @Override
    protected HMsg[] adornRegionServerAnswer(final HServerInfo hsi,
        final HMsg[] msgs) throws IOException {
      IOException ex = this.exceptions.remove(hsi);
      if (ex != null) {
        throw ex;
      }
      HMsg [] answerMsgs = msgs;
      synchronized (this.messages) {
        List<HMsg> hmsgs = this.messages.get(hsi);
        if (hmsgs != null && !hmsgs.isEmpty()) {
          int size = answerMsgs.length;
          HMsg [] newAnswerMsgs = new HMsg[size + hmsgs.size()];
          System.arraycopy(answerMsgs, 0, newAnswerMsgs, 0, answerMsgs.length);
          for (int i = 0; i < hmsgs.size(); i++) {
            newAnswerMsgs[answerMsgs.length + i] = hmsgs.get(i);
          }
          answerMsgs = newAnswerMsgs;
          hmsgs.clear();
        }
      }
      return super.adornRegionServerAnswer(hsi, answerMsgs);
    }
  }

  /**
   * Subclass so can get at protected methods (none at moment).  Also, creates
   * a FileSystem instance per instantiation.  Adds a shutdown own FileSystem
   * on the way out. Shuts down own Filesystem only, not All filesystems as
   * the FileSystem system exit hook does.
   */
  public static class MiniHBaseClusterRegionServer extends HRegionServer {
    private static int index = 0;
    private Thread shutdownThread = null;

    public MiniHBaseClusterRegionServer(Configuration conf)
        throws IOException {
      super(setDifferentUser(conf));
    }

    public void setHServerInfo(final HServerInfo hsi) {
      this.serverInfo = hsi;
    }

    /*
     * @param c
     * @param currentfs We return this if we did not make a new one.
     * @param uniqueName Same name used to help identify the created fs.
     * @return A new fs instance if we are up on DistributeFileSystem.
     * @throws IOException
     */
    private static Configuration setDifferentUser(final Configuration c)
    throws IOException {
      FileSystem currentfs = FileSystem.get(c);
      if (!(currentfs instanceof DistributedFileSystem)) return c;
      // Else distributed filesystem.  Make a new instance per daemon.  Below
      // code is taken from the AppendTestUtil over in hdfs.
      Configuration c2 = new Configuration(c);
      String username = UGI.getUserName() + ".hrs." + index++;
      UnixUserGroupInformation.saveToConf(c2,
        UnixUserGroupInformation.UGI_PROPERTY_NAME,
        new UnixUserGroupInformation(username, new String[]{"supergroup"}));
      return c2;
    }

    @Override
    protected void init(MapWritable c) throws IOException {
      super.init(c);
      // Run this thread to shutdown our filesystem on way out.
      this.shutdownThread = new SingleFileSystemShutdownThread(getFileSystem());
    }

    @Override
    public void run() {
      try {
        super.run();
      } finally {
        // Run this on the way out.
        if (this.shutdownThread != null) {
          this.shutdownThread.start();
          Threads.shutdown(this.shutdownThread, 30000);
        }
      }
    }

    public void kill() {
      super.kill();
    }
  }

  /**
   * Alternate shutdown hook.
   * Just shuts down the passed fs, not all as default filesystem hook does.
   */
  static class SingleFileSystemShutdownThread extends Thread {
    private final FileSystem fs;
    SingleFileSystemShutdownThread(final FileSystem fs) {
      super("Shutdown of " + fs);
      this.fs = fs;
    }
    @Override
    public void run() {
      try {
        LOG.info("Hook closing fs=" + this.fs);
        this.fs.close();
      } catch (IOException e) {
        LOG.warn("Running hook", e);
      }
    }
  }

  private void init(final int nMasterNodes, final int nRegionNodes)
  throws IOException {
    try {
      // start up a LocalHBaseCluster
      hbaseCluster = new LocalHBaseCluster(conf, nMasterNodes, nRegionNodes,
          MiniHBaseCluster.MiniHBaseClusterMaster.class,
          MiniHBaseCluster.MiniHBaseClusterRegionServer.class);
      hbaseCluster.startup();
    } catch(IOException e) {
      shutdown();
      throw e;
    }
  }

  /**
   * Starts a region server thread running
   *
   * @throws IOException
   * @return New RegionServerThread
   */
  public JVMClusterUtil.RegionServerThread startRegionServer() throws IOException {
    JVMClusterUtil.RegionServerThread t = this.hbaseCluster.addRegionServer();
    t.start();
    t.waitForServerOnline();
    return t;
  }

  /**
   * @return Returns the rpc address actually used by the master server, because
   * the supplied port is not necessarily the actual port used.
   */
  public HServerAddress getHMasterAddress() {
    return this.hbaseCluster.getMaster().getMasterAddress();
  }

  /**
   * @return the HMaster
   */
  public HMaster getMaster() {
    return this.hbaseCluster.getMaster();
  }

  /**
   * Adds a new master to the cluster and starts the master thread. Useful if
   * the existing master dies and a live master is needed for cleanup.
   */
  public void startNewMaster() throws IOException {
    hbaseCluster.addMaster().start();
  }

  /**
   * Cause a region server to exit doing basic clean up only on its way out.
   * @param serverNumber  Used as index into a list.
   */
  public String abortRegionServer(int serverNumber) {
    HRegionServer server = getRegionServer(serverNumber);
    LOG.info("Aborting " + server.toString());
    server.abort("Aborting for tests", new Exception("Trace info"));
    return server.toString();
  }

  /**
   * Shut down the specified region server cleanly
   *
   * @param serverNumber  Used as index into a list.
   * @return the region server that was stopped
   */
  public JVMClusterUtil.RegionServerThread stopRegionServer(int serverNumber) {
    return stopRegionServer(serverNumber, true);
  }

  /**
   * Shut down the specified region server cleanly
   *
   * @param serverNumber  Used as index into a list.
   * @param shutdownFS True is we are to shutdown the filesystem as part of this
   * regionserver's shutdown.  Usually we do but you do not want to do this if
   * you are running multiple regionservers in a test and you shut down one
   * before end of the test.
   * @return the region server that was stopped
   */
  public JVMClusterUtil.RegionServerThread stopRegionServer(int serverNumber,
      final boolean shutdownFS) {
    JVMClusterUtil.RegionServerThread server =
      hbaseCluster.getRegionServers().get(serverNumber);
    LOG.info("Stopping " + server.toString());
    server.getRegionServer().stop();
    return server;
  }

  /**
   * Wait for the specified region server to stop. Removes this thread from list
   * of running threads.
   * @param serverNumber
   * @return Name of region server that just went down.
   */
  public String waitOnRegionServer(final int serverNumber) {
    return this.hbaseCluster.waitOnRegionServer(serverNumber);
  }

  /**
   * Wait for Mini HBase Cluster to shut down.
   */
  public void join() {
    this.hbaseCluster.join();
  }

  /**
   * Shut down the mini HBase cluster
   * @throws IOException
   */
  public void shutdown() throws IOException {
    if (this.hbaseCluster != null) {
      this.hbaseCluster.shutdown();
    }
    HConnectionManager.deleteAllConnections(false);
  }

  /**
   * Call flushCache on all regions on all participating regionservers.
   * @throws IOException
   */
  public void flushcache() throws IOException {
    for (JVMClusterUtil.RegionServerThread t:
        this.hbaseCluster.getRegionServers()) {
      for(HRegion r: t.getRegionServer().getOnlineRegions()) {
        r.flushcache();
      }
    }
  }

  /**
   * Call flushCache on all regions of the specified table.
   * @throws IOException
   */
  public void flushcache(byte [] tableName) throws IOException {
    for (JVMClusterUtil.RegionServerThread t:
        this.hbaseCluster.getRegionServers()) {
      for(HRegion r: t.getRegionServer().getOnlineRegions()) {
        if(Bytes.equals(r.getTableDesc().getName(), tableName)) {
          r.flushcache();
        }
      }
    }
  }

  /**
   * @return List of region server threads.
   */
  public List<JVMClusterUtil.RegionServerThread> getRegionServerThreads() {
    return this.hbaseCluster.getRegionServers();
  }

  /**
   * @return List of live region server threads (skips the aborted and the killed)
   */
  public List<JVMClusterUtil.RegionServerThread> getLiveRegionServerThreads() {
    return this.hbaseCluster.getLiveRegionServers();
  }

  /**
   * Grab a numbered region server of your choice.
   * @param serverNumber
   * @return region server
   */
  public HRegionServer getRegionServer(int serverNumber) {
    return hbaseCluster.getRegionServer(serverNumber);
  }

  public List<HRegion> getRegions(byte[] tableName) {
    List<HRegion> ret = new ArrayList<HRegion>();
    for (JVMClusterUtil.RegionServerThread rst : getRegionServerThreads()) {
      HRegionServer hrs = rst.getRegionServer();
      for (HRegion region : hrs.getOnlineRegions()) {
        if (Bytes.equals(region.getTableDesc().getName(), tableName)) {
          ret.add(region);
        }
      }
    }
    return ret;
  }

  /**
   * @return Index into List of {@link MiniHBaseCluster#getRegionServerThreads()}
   * of HRS carrying regionName. Returns -1 if none found.
   */
  public int getServerWithMeta() {
    return getServerWith(HRegionInfo.FIRST_META_REGIONINFO.getRegionName());
  }

  /**
   * Get the location of the specified region
   * @param regionName Name of the region in bytes
   * @return Index into List of {@link MiniHBaseCluster#getRegionServerThreads()}
   * of HRS carrying .META.. Returns -1 if none found.
   */
  public int getServerWith(byte[] regionName) {
    int index = -1;
    int count = 0;
    for (JVMClusterUtil.RegionServerThread rst: getRegionServerThreads()) {
      HRegionServer hrs = rst.getRegionServer();
      HRegion metaRegion =
        hrs.getOnlineRegion(regionName);
      if (metaRegion != null) {
        index = count;
        break;
      }
      count++;
    }
    return index;
  }

  /**
   * Add an exception to send when a region server checks back in
   * @param serverNumber Which server to send it to
   * @param ex The exception that will be sent
   * @throws IOException
   */
  public void addExceptionToSendRegionServer(final int serverNumber,
      IOException ex) throws IOException {
    MiniHBaseClusterRegionServer hrs =
      (MiniHBaseClusterRegionServer)getRegionServer(serverNumber);
    addExceptionToSendRegionServer(hrs, ex);
  }

  /**
   * Add an exception to send when a region server checks back in
   * @param hrs Which server to send it to
   * @param ex The exception that will be sent
   * @throws IOException
   */
  public void addExceptionToSendRegionServer(
      final MiniHBaseClusterRegionServer hrs, IOException ex)
      throws IOException {
    ((MiniHBaseClusterMaster)getMaster()).addException(hrs.getHServerInfo(),ex);
  }

  /**
   * Add a message to include in the responses send a regionserver when it
   * checks back in.
   * @param serverNumber Which server to send it to.
   * @param msg The MESSAGE
   * @throws IOException
   */
  public void addMessageToSendRegionServer(final int serverNumber,
    final HMsg msg)
  throws IOException {
    MiniHBaseClusterRegionServer hrs =
      (MiniHBaseClusterRegionServer)getRegionServer(serverNumber);
    addMessageToSendRegionServer(hrs, msg);
  }

  /**
   * Add a message to include in the responses send a regionserver when it
   * checks back in.
   * @param hrs Which region server.
   * @param msg The MESSAGE
   * @throws IOException
   */
  public void addMessageToSendRegionServer(final MiniHBaseClusterRegionServer hrs,
    final HMsg msg)
  throws IOException {
    ((MiniHBaseClusterMaster)getMaster()).addMessage(hrs.getHServerInfo(), msg);
  }
}
