/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.tajo;

import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import com.google.common.io.Closeables;
import com.google.common.io.Files;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.Options;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.client.TajoClient;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.master.TajoMaster;
import org.apache.tajo.master.rm.TajoWorkerResourceManager;
import org.apache.tajo.master.rm.YarnTajoResourceManager;
import org.apache.tajo.util.CommonTestingUtil;
import org.apache.tajo.util.NetUtils;
import org.apache.tajo.worker.TajoWorker;

import java.io.*;
import java.net.InetSocketAddress;
import java.net.URL;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

public class TajoTestingCluster {
	private static Log LOG = LogFactory.getLog(TajoTestingCluster.class);
	private TajoConf conf;

  protected MiniTajoYarnCluster yarnCluster;
  private FileSystem defaultFS;
  private MiniDFSCluster dfsCluster;
	private MiniCatalogServer catalogServer;

  private TajoMaster tajoMaster;
  private List<TajoWorker> tajoWorkers = new ArrayList<TajoWorker>();
  private boolean standbyWorkerMode = false;

	// If non-null, then already a cluster running.
	private File clusterTestBuildDir = null;

	/**
	 * System property key to get test directory value.
	 * Name is as it is because mini dfs has hard-codings to put test data here.
	 */
	public static final String TEST_DIRECTORY_KEY = MiniDFSCluster.PROP_TEST_BUILD_DATA;

	/**
	 * Default parent directory for test output.
	 */
	public static final String DEFAULT_TEST_DIRECTORY = "target/test-data";

  public TajoTestingCluster() {
    this.conf = new TajoConf();
    initPropertiesAndConfigs();
	}

  void initPropertiesAndConfigs() {
    if (System.getProperty(ConfVars.RESOURCE_MANAGER_CLASS.varname) != null) {
      String testResourceManager = System.getProperty(ConfVars.RESOURCE_MANAGER_CLASS.varname);
      Preconditions.checkState(
          testResourceManager.equals(TajoWorkerResourceManager.class.getCanonicalName()) ||
              testResourceManager.equals(YarnTajoResourceManager.class.getCanonicalName()),
          ConfVars.RESOURCE_MANAGER_CLASS.varname + " must be either " + TajoWorkerResourceManager.class.getCanonicalName() + " or " +
              YarnTajoResourceManager.class.getCanonicalName() +"."
      );
      conf.set(ConfVars.RESOURCE_MANAGER_CLASS.varname, System.getProperty(ConfVars.RESOURCE_MANAGER_CLASS.varname));
    }
    conf.setInt(ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB.varname, 1024);
    conf.setFloat(ConfVars.WORKER_RESOURCE_AVAILABLE_DISKS.varname, 2.0f);

    this.standbyWorkerMode = conf.getVar(ConfVars.RESOURCE_MANAGER_CLASS)
        .indexOf(TajoWorkerResourceManager.class.getName()) >= 0;
    conf.set(CommonTestingUtil.TAJO_TEST, "TRUE");
  }

	public TajoConf getConfiguration() {
		return this.conf;
	}

	public void initTestDir() {
		if (System.getProperty(TEST_DIRECTORY_KEY) == null) {
			clusterTestBuildDir = setupClusterTestBuildDir();
			System.setProperty(TEST_DIRECTORY_KEY,
          clusterTestBuildDir.getAbsolutePath());
		}
	}

	/**
	 * @return Where to write test data on local filesystem; usually
	 * {@link #DEFAULT_TEST_DIRECTORY}
	 * @see #setupClusterTestBuildDir()
	 */
	public static File getTestDir() {
		return new File(System.getProperty(TEST_DIRECTORY_KEY,
			DEFAULT_TEST_DIRECTORY));
	}

	/**
	 * @param subdirName
	 * @return Path to a subdirectory named <code>subdirName</code> under
	 * {@link #getTestDir()}.
	 * @see #setupClusterTestBuildDir()
	 */
	public static File getTestDir(final String subdirName) {
		return new File(getTestDir(), subdirName);
  }

	public File setupClusterTestBuildDir() {
		String randomStr = UUID.randomUUID().toString();
		String dirStr = getTestDir(randomStr).toString();
		File dir = new File(dirStr).getAbsoluteFile();
		// Have it cleaned up on exit
		dir.deleteOnExit();
		return dir;
	}

  ////////////////////////////////////////////////////////
  // HDFS Section
  ////////////////////////////////////////////////////////
  /**
   * Start a minidfscluster.
   * @param servers How many DNs to start.
   * @throws Exception
   * @see {@link #shutdownMiniDFSCluster()}
   * @return The mini dfs cluster created.
   */
  public MiniDFSCluster startMiniDFSCluster(int servers) throws Exception {
    return startMiniDFSCluster(servers, null, null);
  }

  /**
   * Start a minidfscluster.
   * Can only create one.
   * @param servers How many DNs to start.
   * @param dir Where to home your dfs cluster.
   * @param hosts hostnames DNs to run on.
   * @throws Exception
   * @see {@link #shutdownMiniDFSCluster()}
   * @return The mini dfs cluster created.
   * @throws java.io.IOException
   */
  public MiniDFSCluster startMiniDFSCluster(int servers,
                                            final File dir,
                                            final String hosts[])
      throws IOException {
    if (dir == null) {
      this.clusterTestBuildDir = setupClusterTestBuildDir();
    } else {
      this.clusterTestBuildDir = dir;
    }

    System.setProperty(MiniDFSCluster.PROP_TEST_BUILD_DATA,
        this.clusterTestBuildDir.toString());

    conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
    conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, false);
    MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(new HdfsConfiguration(conf));
    builder.hosts(hosts);
    builder.numDataNodes(servers);
    builder.format(true);
    builder.manageNameDfsDirs(true);
    builder.manageDataDfsDirs(true);
    builder.waitSafeMode(true);
    this.dfsCluster = builder.build();

    // Set this just-started cluser as our filesystem.
    this.defaultFS = this.dfsCluster.getFileSystem();
    this.conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, defaultFS.getUri().toString());
    this.conf.setVar(TajoConf.ConfVars.ROOT_DIR, defaultFS.getUri() + "/tajo");

    return this.dfsCluster;
  }

  public void shutdownMiniDFSCluster() throws Exception {
    if (this.dfsCluster != null) {
      try {
        FileSystem fs = this.dfsCluster.getFileSystem();
        if (fs != null) fs.close();
      } catch (IOException e) {
        System.err.println("error closing file system: " + e);
      }
      // The below throws an exception per dn, AsynchronousCloseException.
      this.dfsCluster.shutdown();
    }
  }

  public boolean isRunningDFSCluster() {
    return this.defaultFS != null;
  }

  public MiniDFSCluster getMiniDFSCluster() {
    return this.dfsCluster;
  }

  public FileSystem getDefaultFileSystem() {
    return this.defaultFS;
  }

  ////////////////////////////////////////////////////////
  // Catalog Section
  ////////////////////////////////////////////////////////
  public MiniCatalogServer startCatalogCluster() throws Exception {
    TajoConf c = getConfiguration();

    if(clusterTestBuildDir == null) {
      clusterTestBuildDir = setupClusterTestBuildDir();
    }

    conf.set(CatalogConstants.STORE_CLASS, "org.apache.tajo.catalog.store.MemStore");
    conf.set(CatalogConstants.CATALOG_URI, "jdbc:derby:" + clusterTestBuildDir.getAbsolutePath() + "/db");
    LOG.info("Apache Derby repository is set to "+conf.get(CatalogConstants.CATALOG_URI));
    conf.setVar(ConfVars.CATALOG_ADDRESS, "localhost:0");

    catalogServer = new MiniCatalogServer(conf);
    CatalogServer catServer = catalogServer.getCatalogServer();
    InetSocketAddress sockAddr = catServer.getBindAddress();
    c.setVar(ConfVars.CATALOG_ADDRESS, NetUtils.normalizeInetSocketAddress(sockAddr));

    return this.catalogServer;
  }

  public void shutdownCatalogCluster() {
    if (catalogServer != null) {
      this.catalogServer.shutdown();
    }
  }

  public MiniCatalogServer getMiniCatalogCluster() {
    return this.catalogServer;
  }

  ////////////////////////////////////////////////////////
  // Tajo Cluster Section
  ////////////////////////////////////////////////////////
  private void startMiniTajoCluster(File testBuildDir,
                                               final int numSlaves,
                                               boolean local) throws Exception {
    TajoConf c = getConfiguration();
    c.setVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS, "localhost:0");
    c.setVar(ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS, "localhost:0");
    c.setVar(ConfVars.RESOURCE_TRACKER_RPC_ADDRESS, "localhost:0");
    c.setVar(ConfVars.WORKER_PEER_RPC_ADDRESS, "localhost:0");
    c.setVar(ConfVars.CATALOG_ADDRESS, "localhost:0");
    c.set(CatalogConstants.STORE_CLASS, "org.apache.tajo.catalog.store.MemStore");
    c.set(CatalogConstants.CATALOG_URI, "jdbc:derby:" + testBuildDir.getAbsolutePath() + "/db");
    c.setVar(ConfVars.WORKER_TEMPORAL_DIR, "file://" + testBuildDir.getAbsolutePath() + "/tajo-localdir");

    LOG.info("derby repository is set to "+conf.get(CatalogConstants.CATALOG_URI));

    if (!local) {
      c.setVar(ConfVars.ROOT_DIR,
          getMiniDFSCluster().getFileSystem().getUri() + "/tajo");
    } else {
      c.setVar(ConfVars.ROOT_DIR,
          clusterTestBuildDir.getAbsolutePath() + "/tajo");
    }

    tajoMaster = new TajoMaster();
    tajoMaster.init(c);
    tajoMaster.start();

    this.conf.setVar(ConfVars.WORKER_PEER_RPC_ADDRESS, c.getVar(ConfVars.WORKER_PEER_RPC_ADDRESS));
    this.conf.setVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS, c.getVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS));

    InetSocketAddress tajoMasterAddress = tajoMaster.getContext().getTajoMasterService().getBindAddress();

    this.conf.setVar(ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS,
        tajoMasterAddress.getHostName() + ":" + tajoMasterAddress.getPort());
    this.conf.setVar(ConfVars.RESOURCE_TRACKER_RPC_ADDRESS, c.getVar(ConfVars.RESOURCE_TRACKER_RPC_ADDRESS));
    this.conf.setVar(ConfVars.CATALOG_ADDRESS, c.getVar(ConfVars.CATALOG_ADDRESS));

    if(standbyWorkerMode) {
      startTajoWorkers(numSlaves);
    }
    LOG.info("Mini Tajo cluster is up");
  }

  private void startTajoWorkers(int numSlaves) throws Exception {
    for(int i = 0; i < 1; i++) {
      TajoWorker tajoWorker = new TajoWorker();

      TajoConf workerConf  = new TajoConf(this.conf);

      workerConf.setVar(ConfVars.WORKER_INFO_ADDRESS, "localhost:0");
      workerConf.setVar(ConfVars.WORKER_CLIENT_RPC_ADDRESS, "localhost:0");
      workerConf.setVar(ConfVars.WORKER_PEER_RPC_ADDRESS, "localhost:0");

      workerConf.setVar(ConfVars.WORKER_QM_RPC_ADDRESS, "localhost:0");
      
      tajoWorker.startWorker(workerConf, new String[]{"standby"});

      LOG.info("MiniTajoCluster Worker #" + (i + 1) + " started.");
      tajoWorkers.add(tajoWorker);
    }
  }

  public void restartTajoCluster(int numSlaves) throws Exception {
    tajoMaster.stop();
    tajoMaster.start();

    LOG.info("Minicluster has been restarted");
  }

  public TajoMaster getMaster() {
    return this.tajoMaster;
  }

  public List<TajoWorker> getTajoWorkers() {
    return this.tajoWorkers;
  }

  public void shutdownMiniTajoCluster() {
    if(this.tajoMaster != null) {
      this.tajoMaster.stop();
    }
    for(TajoWorker eachWorker: tajoWorkers) {
      eachWorker.stopWorkerForce();
    }
    tajoWorkers.clear();
    this.tajoMaster= null;
  }

  ////////////////////////////////////////////////////////
  // Meta Cluster Section
  ////////////////////////////////////////////////////////
  /**
   * @throws java.io.IOException If a cluster -- dfs or engine -- already running.
   */
  void isRunningCluster(String passedBuildPath) throws IOException {
    if (this.clusterTestBuildDir == null || passedBuildPath != null) return;
    throw new IOException("Cluster already running at " +
        this.clusterTestBuildDir);
  }

  /**
   * This method starts up a tajo cluster with a given number of clusters in
   * distributed mode.
   *
   * @param numSlaves the number of tajo cluster to start up
   * @throws Exception
   */
  public void startMiniCluster(final int numSlaves)
      throws Exception {
    startMiniCluster(numSlaves, null);
  }

  public void startMiniCluster(final int numSlaves, final String [] dataNodeHosts) throws Exception {

    int numDataNodes = numSlaves;
    if(dataNodeHosts != null && dataNodeHosts.length != 0) {
      numDataNodes = dataNodeHosts.length;
    }

    LOG.info("Starting up minicluster with 1 master(s) and " +
        numSlaves + " worker(s) and " + numDataNodes + " datanode(s)");

    // If we already put up a cluster, fail.
    String testBuildPath = conf.get(TEST_DIRECTORY_KEY, null);
    isRunningCluster(testBuildPath);
    if (testBuildPath != null) {
      LOG.info("Using passed path: " + testBuildPath);
    }

    // Make a new random dir to home everything in.  Set it as system property.
    // minidfs reads home from system property.
    this.clusterTestBuildDir = testBuildPath == null?
        setupClusterTestBuildDir() : new File(testBuildPath);

    System.setProperty(TEST_DIRECTORY_KEY,
        this.clusterTestBuildDir.getAbsolutePath());

    startMiniDFSCluster(numDataNodes, this.clusterTestBuildDir, dataNodeHosts);
    this.dfsCluster.waitClusterUp();

    if(!standbyWorkerMode) {
      startMiniYarnCluster();
    }

    startMiniTajoCluster(this.clusterTestBuildDir, numSlaves, false);
  }

  private void startMiniYarnCluster() throws Exception {
    LOG.info("Starting up YARN cluster");
    // Scheduler properties required for YARN to work
    conf.set("yarn.scheduler.capacity.root.queues", "default");
    conf.set("yarn.scheduler.capacity.root.default.capacity", "100");

    conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 384);
    conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 3000);
    conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, 1);
    conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, 2);

    if (yarnCluster == null) {
      yarnCluster = new MiniTajoYarnCluster(TajoTestingCluster.class.getName(), 3);
      yarnCluster.init(conf);
      yarnCluster.start();

      ResourceManager resourceManager = yarnCluster.getResourceManager();
      InetSocketAddress rmAddr = resourceManager.getClientRMService().getBindAddress();
      InetSocketAddress rmSchedulerAddr = resourceManager.getApplicationMasterService().getBindAddress();
      conf.set(YarnConfiguration.RM_ADDRESS, NetUtils.normalizeInetSocketAddress(rmAddr));
      conf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS, NetUtils.normalizeInetSocketAddress(rmSchedulerAddr));

      URL url = Thread.currentThread().getContextClassLoader().getResource("yarn-site.xml");
      if (url == null) {
        throw new RuntimeException("Could not find 'yarn-site.xml' dummy file in classpath");
      }
      yarnCluster.getConfig().set("yarn.application.classpath", new File(url.getPath()).getParent());
      OutputStream os = new FileOutputStream(new File(url.getPath()));
      yarnCluster.getConfig().writeXml(os);
      os.close();
    }
  }

  public void startMiniClusterInLocal(final int numSlaves) throws Exception {
    // If we already put up a cluster, fail.
    String testBuildPath = conf.get(TEST_DIRECTORY_KEY, null);
    isRunningCluster(testBuildPath);
    if (testBuildPath != null) {
      LOG.info("Using passed path: " + testBuildPath);
    }

    // Make a new random dir to home everything in.  Set it as system property.
    // minidfs reads home from system property.
    this.clusterTestBuildDir = testBuildPath == null?
        setupClusterTestBuildDir() : new File(testBuildPath);

    System.setProperty(TEST_DIRECTORY_KEY,
        this.clusterTestBuildDir.getAbsolutePath());

    startMiniTajoCluster(this.clusterTestBuildDir, numSlaves, true);
  }

  public void shutdownMiniCluster() throws IOException {
    LOG.info("========================================");
    LOG.info("Minicluster is stopping");
    LOG.info("========================================");

    try {
      Thread.sleep(3000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }

    shutdownMiniTajoCluster();

    if(this.catalogServer != null) {
      shutdownCatalogCluster();
    }

    if(this.yarnCluster != null) {
      this.yarnCluster.stop();
    }

    try {
      Thread.sleep(3000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }

    if(this.dfsCluster != null) {

      try {
        FileSystem fs = this.dfsCluster.getFileSystem();
        if (fs != null) fs.close();
        this.dfsCluster.shutdown();
      } catch (IOException e) {
        System.err.println("error closing file system: " + e);
      }
    }

    if(this.clusterTestBuildDir != null && this.clusterTestBuildDir.exists()) {
      if(!ShutdownHookManager.get().isShutdownInProgress()) {
        //TODO clean test dir when ShutdownInProgress
        LocalFileSystem localFS = LocalFileSystem.getLocal(conf);
        localFS.delete(new Path(clusterTestBuildDir.toString()), true);
        localFS.close();
      }
      this.clusterTestBuildDir = null;
    }
    LOG.info("Minicluster is down");
  }

  public static ResultSet run(String[] names,
                              Schema[] schemas,
                              Options option,
                              String[][] tables,
                              String query) throws Exception {
    TpchTestBase instance = TpchTestBase.getInstance();
    TajoTestingCluster util = instance.getTestingCluster();
    while(true) {
      if(util.getMaster().isMasterRunning()) {
        break;
      }
      Thread.sleep(1000);
    }
    TajoConf conf = util.getConfiguration();
    TajoClient client = new TajoClient(conf);

    FileSystem fs = util.getDefaultFileSystem();
    Path rootDir = util.getMaster().
        getStorageManager().getWarehouseDir();
    fs.mkdirs(rootDir);
    for (int i = 0; i < names.length; i++) {
      Path tablePath = new Path(rootDir, names[i]);
      fs.mkdirs(tablePath);
      Path dfsPath = new Path(tablePath, names[i] + ".tbl");
      FSDataOutputStream out = fs.create(dfsPath);
      for (int j = 0; j < tables[i].length; j++) {
        out.write((tables[i][j]+"\n").getBytes());
      }
      out.close();
      TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.CSV, option);
      client.createExternalTable(names[i], schemas[i], tablePath, meta);
    }
    Thread.sleep(1000);
    ResultSet res = client.executeQueryAndGetResult(query);
    return res;
  }

    /**
    * Write lines to a file.
    *
    * @param file File to write lines to
    * @param lines Strings written to the file
    * @throws java.io.IOException
    */
  private static void writeLines(File file, String... lines)
      throws IOException {
    Writer writer = Files.newWriter(file, Charsets.UTF_8);
    try {
      for (String line : lines) {
        writer.write(line);
        writer.write('\n');
      }
    } finally {
      Closeables.closeQuietly(writer);
    }
  }
}
