/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.slider.providers.accumulo

import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import org.apache.accumulo.core.client.ZooKeeperInstance
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.slider.core.main.ServiceLauncher
import org.apache.slider.api.ClusterDescription
import org.apache.slider.api.ResourceKeys
import org.apache.slider.client.SliderClient
import org.apache.slider.test.YarnZKMiniClusterTestBase
import org.junit.internal.AssumptionViolatedException

import static org.apache.slider.common.SliderXMLConfKeysForTesting.*
import static org.apache.slider.providers.accumulo.AccumuloKeys.*
import static org.apache.slider.common.params.Arguments.ARG_PROVIDER
import static org.apache.slider.common.params.Arguments.ARG_RES_COMP_OPT

/**
 * test base for accumulo clusters
 */
@CompileStatic
@Slf4j
public abstract class AccumuloTestBase extends YarnZKMiniClusterTestBase {


  public final int accumulo_cluster_startup_time = accumuloLaunchWaitTime

  /**
   * The time to sleep before trying to talk to the HBase Master and
   * expect meaningful results.
   */
  public final int accumulo_cluster_startup_to_live_time = accumulo_cluster_startup_time
  public static final int ACCUMULO_GO_LIVE_TIME = 60000
  @Override
  public String getTestConfigurationPath() {
    return "src/main/resources/" + CONF_RESOURCE;
  }

  @Override
  void setup() {
    super.setup()
    assumeBoolOption(SLIDER_CONFIG, KEY_TEST_ACCUMULO_ENABLED, true)
    assumeArchiveDefined();
    assumeApplicationHome();
    YarnConfiguration conf = testConfiguration
    assumeOtherSettings(conf)
  }

  /**
   * Teardown 
   */
  @Override
  void teardown() {
    super.teardown();
    if (teardownKillall && kill_supported) {
      try {
        killAllAccumuloProcesses();
      } catch (AssumptionViolatedException e) {
        log.info e.toString();
      }
    }
  }

  void killAllAccumuloProcesses() {
    killJavaProcesses("org.apache.accumulo.start.Main", SIGKILL)
  }

  @Override
  public String getArchiveKey() {
    return KEY_TEST_ACCUMULO_TAR
  }

  /**
   * Get the key for the application
   * @return
   */
  @Override
  public String getApplicationHomeKey() {
    return KEY_TEST_ACCUMULO_HOME
  }

  /**
   * Assume that HBase home is defined. This does not check that the
   * path is valid -that is expected to be a failure on tests that require
   * HBase home to be set.
   */

  public void assumeOtherSettings(YarnConfiguration conf) {
    assumeStringOptionSet(conf, OPTION_ZK_HOME)
  }

  /**
   * Create a full cluster with a master & the requested no. of region servers
   * @param clustername cluster name
   * @param tablets # of nodes
   * @param extraArgs list of extra args to add to the creation command
   * @param deleteExistingData should the data of any existing cluster
   * of this name be deleted
   * @param blockUntilRunning block until the AM is running
   * @return launcher which will have executed the command.
   */
  public ServiceLauncher<SliderClient> createAccCluster(
      String clustername,
      int tablets,
      List<String> extraArgs,
      boolean deleteExistingData,
      boolean blockUntilRunning) {
    Map<String, Integer> roles = [
        (ROLE_MASTER): 1,
        (ROLE_TABLET): tablets,
    ];
    return createAccCluster(
        clustername,
        roles,
        extraArgs,
        deleteExistingData,
        blockUntilRunning);
  }

  /**
   * Create an accumulo cluster
   * @param clustername
   * @param roles
   * @param extraArgs
   * @param deleteExistingData
   * @param blockUntilRunning
   * @return the cluster launcher
   */
  public ServiceLauncher<SliderClient> createAccCluster(
      String clustername,
      Map<String, Integer> roles,
      List<String> extraArgs,
      boolean deleteExistingData,
      boolean blockUntilRunning) {
    extraArgs << ARG_PROVIDER << PROVIDER_ACCUMULO;

    YarnConfiguration conf = testConfiguration

    def clusterOps = [
        (OPTION_ZK_HOME)                                         : conf.getTrimmed(
            OPTION_ZK_HOME),
        (OPTION_HADOOP_HOME)                                     : conf.getTrimmed(
            OPTION_HADOOP_HOME),
        ("site." +
         AccumuloConfigFileOptions.MONITOR_PORT_CLIENT)          : AccumuloConfigFileOptions.MONITOR_PORT_CLIENT_DEFAULT,
        ("site." +
         AccumuloConfigFileOptions.MASTER_PORT_CLIENT)           : AccumuloConfigFileOptions.MASTER_PORT_CLIENT_DEFAULT,
    ]


    extraArgs << ARG_RES_COMP_OPT << ROLE_MASTER << ResourceKeys.YARN_MEMORY <<
    YRAM;
    extraArgs << ARG_RES_COMP_OPT << ROLE_TABLET << ResourceKeys.YARN_MEMORY <<
    YRAM
    extraArgs << ARG_RES_COMP_OPT << ROLE_MONITOR << ResourceKeys.YARN_MEMORY <<
    YRAM
    extraArgs << ARG_RES_COMP_OPT << ROLE_GARBAGE_COLLECTOR <<
    ResourceKeys.YARN_MEMORY << YRAM

    return createCluster(clustername,
        roles,
        extraArgs,
        deleteExistingData,
        blockUntilRunning,
        clusterOps)
  }

  def getAccClusterStatus() {
    ZooKeeperInstance instance = new ZooKeeperInstance("", "localhost:4");
    instance.getConnector("user", "pass").instanceOperations().tabletServers;
  }


  public String fetchLocalPage(int port, String page) {
    String url = "http://localhost:" + port + page
    return fetchWebPage(url)

  }

  public ClusterDescription flexAccClusterTestRun(
      String clustername, List<Map<String, Integer>> plan) {
    int planCount = plan.size()
    assert planCount > 0
    createMiniCluster(clustername, configuration,
        1,
        true);
    //now launch the cluster
    SliderClient sliderClient = null;
    ServiceLauncher launcher = createAccCluster(clustername,
        plan[0],
        [],
        true,
        true);
    sliderClient = (SliderClient) launcher.service;
    try {

      //verify the #of roles is as expected
      //get the hbase status
      waitForRoleCount(sliderClient, plan[0],
          accumulo_cluster_startup_to_live_time);
      sleep(ACCUMULO_GO_LIVE_TIME);

      plan.remove(0)

      ClusterDescription cd = null
      while (!plan.empty) {

        Map<String, Integer> flexTarget = plan.remove(0)
        //now flex
        describe(
            "Flexing " + roleMapToString(flexTarget));
        sliderClient.flex(clustername, flexTarget);
        cd = waitForRoleCount(sliderClient, flexTarget,
            accumulo_cluster_startup_to_live_time);

        sleep(ACCUMULO_GO_LIVE_TIME);

      }

      return cd;

    } finally {
      maybeStopCluster(sliderClient, null, "end of flex test run");
    }

  }

}
