/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.slider.funtest.framework

import groovy.transform.CompileStatic
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem as HadoopFS
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hdfs.HdfsConfiguration
import org.apache.hadoop.registry.client.api.RegistryConstants
import org.apache.hadoop.util.ExitUtil
import org.apache.hadoop.util.Shell
import org.apache.hadoop.yarn.api.records.YarnApplicationState
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.slider.api.StatusKeys
import org.apache.slider.common.tools.ConfigHelper
import org.apache.slider.core.exceptions.SliderException
import org.apache.slider.core.launch.SerializedApplicationReport
import org.apache.slider.core.main.ServiceLauncher
import org.apache.slider.common.SliderKeys
import org.apache.slider.common.SliderXmlConfKeys
import org.apache.slider.api.ClusterDescription
import org.apache.slider.common.tools.SliderUtils
import org.apache.slider.client.SliderClient
import org.apache.slider.core.persist.ApplicationReportSerDeser
import org.apache.slider.test.SliderTestUtils
import org.junit.Before
import org.junit.BeforeClass
import org.junit.Rule
import org.junit.rules.Timeout
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import static org.apache.slider.common.SliderExitCodes.*
import static org.apache.slider.core.main.LauncherExitCodes.*
import static org.apache.slider.funtest.framework.FuntestProperties.*
import static org.apache.slider.common.params.Arguments.*
import static org.apache.slider.common.params.SliderActions.*
import static org.apache.slider.common.SliderXMLConfKeysForTesting.*

@CompileStatic
abstract class CommandTestBase extends SliderTestUtils {
  private static final Logger log =
      LoggerFactory.getLogger(CommandTestBase.class);

  public static final String SLIDER_CONF_DIR = sysprop(SLIDER_CONF_DIR_PROP)
  public static final String SLIDER_TAR_DIR = sysprop(SLIDER_BIN_DIR_PROP)
  public static final File SLIDER_TAR_DIRECTORY = new File(
      SLIDER_TAR_DIR).canonicalFile
  public static final File SLIDER_SCRIPT = new File(
      SLIDER_TAR_DIRECTORY,
      BIN_SLIDER).canonicalFile
  public static final File SLIDER_SCRIPT_PYTHON = new File(
      SLIDER_TAR_DIRECTORY,
      BIN_SLIDER_PYTHON).canonicalFile
  public static final File SLIDER_CONF_DIRECTORY = new File(
      SLIDER_CONF_DIR).canonicalFile
  public static final File SLIDER_CONF_XML = new File(SLIDER_CONF_DIRECTORY,
      CLIENT_CONFIG_FILENAME).canonicalFile
  public static final YarnConfiguration SLIDER_CONFIG
  public static final int THAW_WAIT_TIME
  public static final int FREEZE_WAIT_TIME

  public static final int SLIDER_TEST_TIMEOUT

  public static final String YARN_RAM_REQUEST

  /**
   * Keytab for secure cluster
   */
  public static final String TEST_AM_KEYTAB
  static File keytabFile

  /**
   * shell-escaped ~ symbol. On windows this does
   * not need to be escaped
   */
  public static final String TILDE
  
  /*
  Static initializer for test configurations. If this code throws exceptions
  (which it may) the class will not be instantiable.
   */
  static {
    new HdfsConfiguration()
    ConfigHelper.registerDeprecatedConfigItems();
    SLIDER_CONFIG = ConfLoader.loadSliderConf(SLIDER_CONF_XML, true);
    THAW_WAIT_TIME = getTimeOptionMillis(SLIDER_CONFIG,
        KEY_TEST_THAW_WAIT_TIME,
        1000 * DEFAULT_THAW_WAIT_TIME_SECONDS)
    FREEZE_WAIT_TIME = getTimeOptionMillis(SLIDER_CONFIG,
        KEY_TEST_FREEZE_WAIT_TIME,
        1000 * DEFAULT_TEST_FREEZE_WAIT_TIME_SECONDS)
    SLIDER_TEST_TIMEOUT = getTimeOptionMillis(SLIDER_CONFIG,
        KEY_TEST_TIMEOUT,
        1000 * DEFAULT_TEST_TIMEOUT_SECONDS)

    YARN_RAM_REQUEST = SLIDER_CONFIG.getTrimmed(
        KEY_TEST_YARN_RAM_REQUEST,
        DEFAULT_YARN_RAM_REQUEST)

    TEST_AM_KEYTAB = SLIDER_CONFIG.getTrimmed(
        KEY_TEST_AM_KEYTAB)
    
    

    TILDE = Shell.WINDOWS? "~" : "\\~" 
  }

  @Rule
  public final Timeout testTimeout = new Timeout(SLIDER_TEST_TIMEOUT);


  @BeforeClass
  public static void setupTestBase() {
    Configuration conf = loadSliderConf();

    SliderShell.confDir = SLIDER_CONF_DIRECTORY
    
    // choose python script if on windows or the launch key recommends it
    // 
    boolean python = SLIDER_CONFIG.getBoolean(KEY_LAUNCH_PYTHON, false)
    SliderShell.scriptFile =
        (SliderShell.windows || python) ? SLIDER_SCRIPT_PYTHON : SLIDER_SCRIPT
    
    //set the property of the configuration directory
    def path = SLIDER_CONF_DIRECTORY.absolutePath
    SLIDER_CONFIG.set(ENV_SLIDER_CONF_DIR, path)
    // locate any hadoop conf dir
    def hadoopConf = SLIDER_CONFIG.getTrimmed(ENV_HADOOP_CONF_DIR)
    if (hadoopConf) {
      File hadoopConfDir = new File(hadoopConf).canonicalFile
      // propagate the value to the client config
      SliderShell.setEnv(ENV_HADOOP_CONF_DIR, hadoopConfDir.absolutePath)
    }

    if (SliderUtils.maybeInitSecurity(conf)) {
      log.debug("Security enabled")
      SliderUtils.forceLogin()
      // now look for the security key
/*
      if (!TEST_AM_KEYTAB) {
        fail("Security keytab is not defined in $KEY_TEST_AM_KEYTAB")
      }
      keytabFile = new File(TEST_AM_KEYTAB)
      if (!keytabFile.exists()) {
        throw new FileNotFoundException("Security keytab ${keytabFile} " +
                    " defined in $KEY_TEST_AM_KEYTAB")
      }
*/

    } else {
      log.info "Security is off"
    }
    
    log.info("Test using ${HadoopFS.getDefaultUri(SLIDER_CONFIG)} " +
             "and YARN RM @ ${SLIDER_CONFIG.get(YarnConfiguration.RM_ADDRESS)}")
  }

  public static void assertContainersLive(ClusterDescription clusterDescription,
      String component, int count) {
    log.info("Asserting component count.")
    int instanceCount = clusterDescription.instances[component].size()
    if (count != instanceCount) {
      log.warn(clusterDescription.toString())
    }
    assert count == instanceCount 
  }

  public static void logShell(SliderShell shell) {
    shell.dumpOutput();
  }

  /**
   * give the test thread a name
   */
  @Before
  public void nameThread() {
    Thread.currentThread().name = "JUnit"
  }

  /**
   * Add a configuration file at a given path
   * @param dir directory containing the file
   * @param filename filename
   * @return true if the file was found
   * @throws IOException loading problems (other than a missing file)
   */
  public static boolean maybeAddConfFile(File dir, String filename) throws IOException {
    File confFile = new File(dir, filename)
    if (confFile.isFile()) {
      ConfigHelper.addConfigurationFile(SLIDER_CONFIG, confFile, true)
      log.debug("Loaded $confFile")
      return true;
    } else {
      log.debug("Did not find $confFile —skipping load")
      return false;
    }
  }
  
  /**
   * Add a jar to the slider classpath by looking up a class and determining
   * its containing JAR
   * @param clazz class inside the JAR
   */
  public static void addExtraJar(Class clazz) {
    def jar = SliderUtils.findContainingJarOrFail(clazz)

    def path = jar.absolutePath
    if (!SliderShell.slider_classpath_extra.contains(path)) {
      SliderShell.slider_classpath_extra << path
    }
  }

  /**
   * Resolve a system property, throwing an exception if it is not present
   * @param key property name
   * @return the value
   * @throws RuntimeException if the property is not set
   */
  public static String sysprop(String key) {
    def property = System.getProperty(key)
    if (!property) {
      throw new RuntimeException("Undefined property $key")
    }
    return property
  }

  /**
   * Print to system out
   * @param string
   */
  static void println(String s) {
    System.out.println(s)
  }
  
  /**
   * Print a newline to system out
   * @param string
   */
  static void println() {
    System.out.println()
  }
  
  /**
   * Exec any slider command
   * @param conf
   * @param commands
   * @return the shell
   */
  public static SliderShell slider(Collection<String> commands) {
    SliderShell shell = new SliderShell(commands)
    shell.execute()
    return shell
  }

  /**
   * Execute an operation, state the expected error code
   * @param exitCode exit code
   * @param commands commands
   * @return
   */
  public static SliderShell slider(int exitCode, Collection<String> commands) {
    return SliderShell.run(exitCode, commands)
  }

  /**
   * Load the client XML file
   * @return
   */
  public static Configuration loadSliderConf() {
    Configuration conf = ConfLoader.loadSliderConf(SLIDER_CONF_XML, true)
    return conf
  }

  public static HadoopFS getClusterFS() {
    return HadoopFS.get(SLIDER_CONFIG)
  }

  static SliderShell destroy(String name) {
    slider([
        ACTION_DESTROY, name
    ])
  }

  static SliderShell destroy(int result, String name) {
    slider(result, [
        ACTION_DESTROY, name
    ])
  }

  static SliderShell exists(String name, boolean live = true) {

    List<String> args = [
        ACTION_EXISTS, name
    ]
    if (live) {
      args << ARG_LIVE
    }
    slider(args)
  }

  static SliderShell exists(int result, String name, boolean live = true) {
    List<String> args = [
        ACTION_EXISTS, name
    ]
    if (live) {
      args << ARG_LIVE
    }
    slider(result, args)
  }

  static SliderShell freeze(String name) {
    slider([
        ACTION_FREEZE, name
    ])
  }


  static SliderShell freeze(
      int exitCode,
      String name,
      Collection<String> args) {
    slider(exitCode, [ACTION_FREEZE, name] + args)
  }

  /**
   * Stop cluster: no exit code checking
   * @param name
   * @param args
   * @return
   */
  static SliderShell freeze(String name, Collection<String> args) {
    slider([ACTION_FREEZE, name] + args)
  }

  static SliderShell freezeForce(String name) {
    freeze(name, [ARG_FORCE, ARG_WAIT, "10000"])
  }

  static SliderShell killContainer(String name, String containerID) {
    slider(0,
        [
            ACTION_KILL_CONTAINER,
            name,
            containerID
        ])
  }

  static SliderShell list(String name) {
    List<String> cmd = [
        ACTION_LIST
    ]
    if (name != null) {
      cmd << name
    }
    slider(cmd)
  }

  static SliderShell lookup(int result, String id, File out) {
    assert id
    def commands = [ACTION_LOOKUP, ARG_ID, id]
    if (out) commands += [ARG_OUTPUT, out.absolutePath]
    slider(result, commands)
  }
  
  static SliderShell lookup(String id, File out) {
    assert id
    def commands = [ACTION_LOOKUP, ARG_ID, id]
    if (out) commands += [ARG_OUTPUT, out.absolutePath]
    slider(commands)
  }

  static SliderShell list(int result, Collection<String> commands =[]) {
    slider(result, [ACTION_LIST] + commands )
  }

  static SliderShell status(String name) {
    slider([
        ACTION_STATUS, name
    ])
  }

  static SliderShell status(int result, String name) {
    slider(result,
        [
            ACTION_STATUS, name
        ])
  }

  static SliderShell thaw(String name) {
    slider([
        ACTION_THAW, name
    ])
  }

  static SliderShell thaw(int result, String name) {
    slider(result,
        [
            ACTION_THAW, name
        ])
  }

  static SliderShell thaw(String name, Collection<String> args) {
    slider(0, [ACTION_THAW, name] + args)
  }

  static SliderShell resolve(int result, Collection<String> commands) {
    slider(result,
        [ACTION_RESOLVE] + commands
    )
  }

  static SliderShell registry(int result, Collection<String> commands) {
    slider(result,
        [ACTION_REGISTRY] + commands
    )
  }

  static SliderShell registry(Collection<String> commands) {
    slider(0,
        [ACTION_REGISTRY] + commands
    )
  }

  /**
   * Ensure that a cluster has been destroyed
   * @param name
   */
  static void ensureClusterDestroyed(String name) {
    def froze = freezeForce(name)

    def result = froze.ret
    if (result != 0 && result != EXIT_UNKNOWN_INSTANCE) {
      froze.assertExitCode(0)
    }
    destroy(0, name)
  }

  /**
   * If the functional tests are enabled, set up the cluster
   *
   * @param cluster
   */
  static void setupCluster(String cluster) {
    ensureClusterDestroyed(cluster)
  }

  /**
   * Teardown operation -freezes cluster, and may destroy it
   * though for testing it is best if it is retained
   * @param name cluster name
   */
  static void teardown(String name) {
    freezeForce(name)
  }

  /**
   * Assert the exit code is that the cluster is 0
   * @param shell shell
   */
  public static void assertSuccess(SliderShell shell) {
    assertExitCode(shell, 0)
  }
  /**
   * Assert the exit code is that the cluster is unknown
   * @param shell shell
   */
  public static void assertUnknownCluster(SliderShell shell) {
    assertExitCode(shell, EXIT_UNKNOWN_INSTANCE)
  }

  /**
   * Assert a shell exited with a given error code
   * if not the output is printed and an assertion is raised
   * @param shell shell
   * @param errorCode expected error code
   */
  public static void assertExitCode(SliderShell shell, int errorCode) {
    shell.assertExitCode(errorCode)
  }

  /**
   * Assert that the stdout/stderr streams of the shell contain the string
   * to look for.
   * If the assertion does not hold, the output is logged before
   * the assertion is thrown
   * @param shell
   * @param lookThisUp
   * @param n number of times (default = 1)
   */
  public static void assertOutputContains(
      SliderShell shell,
      String lookThisUp,
      int n = 1) {
    if (!shell.outputContains(lookThisUp)) {
      log.error("Missing $lookThisUp from:")
      shell.dumpOutput()
      assert shell.outputContains(lookThisUp)
    }
  }
  
  /**
   * Create a connection to the cluster by execing the status command
   *
   * @param clustername
   * @return
   */
  SliderClient bondToCluster(Configuration conf, String clustername) {

    String address = getRequiredConfOption(conf, YarnConfiguration.RM_ADDRESS)

    ServiceLauncher<SliderClient> launcher = launchClientAgainstRM(
        address,
        ["exists", clustername],
        conf)

    int exitCode = launcher.serviceExitCode
    if (exitCode) {
      throw new ExitUtil.ExitException(exitCode, "exit code = $exitCode")
    }
    SliderClient sliderClient = launcher.service
    sliderClient.deployedClusterName = clustername
    return sliderClient;
  }

  /**
   * Create or build a slider cluster (the action is set by the first verb)
   * @param action operation to invoke: ACTION_CREATE or ACTION_BUILD
   * @param clustername cluster name
   * @param roles map of rolename to count
   * @param extraArgs list of extra args to add to the creation command
   * @param deleteExistingData should the data of any existing cluster
   * of this name be deleted
   * @param blockUntilRunning block until the AM is running
   * @param clusterOps map of key=value cluster options to set with the --option arg
   * @return shell which will have executed the command.
   */
  public SliderShell createOrBuildSliderCluster(
      String action,
      String clustername,
      Map<String, Integer> roles,
      List<String> extraArgs,
      boolean blockUntilRunning,
      Map<String, String> clusterOps) {
    assert action != null
    assert clustername != null

    List<String> argsList = [action, clustername]

    argsList << ARG_ZKHOSTS <<
    SLIDER_CONFIG.getTrimmed(RegistryConstants.KEY_REGISTRY_ZK_QUORUM)


    if (blockUntilRunning) {
      argsList << ARG_WAIT << Integer.toString(THAW_WAIT_TIME)
    }

    List<String> roleList = [];
    roles.each { String role, Integer val ->
      log.info("Role $role := $val")
      roleList << ARG_COMPONENT << role << Integer.toString(val)
    }

    argsList += roleList;

    //now inject any cluster options
    clusterOps.each { String opt, String val ->
      argsList << ARG_OPTION << opt.toString() << val.toString();
    }

    if (extraArgs != null) {
      argsList += extraArgs;
    }
    slider(0, argsList)
  }

  /**
   * Create a slider cluster
   * @param clustername cluster name
   * @param roles map of rolename to count
   * @param extraArgs list of extra args to add to the creation command
   * @param blockUntilRunning block until the AM is running
   * @param clusterOps map of key=value cluster options to set with the --option arg
   * @return launcher which will have executed the command.
   */
  public SliderShell createSliderApplication(
      String clustername,
      Map<String, Integer> roles,
      List<String> extraArgs,
      boolean blockUntilRunning,
      Map<String, String> clusterOps) {
    return createOrBuildSliderCluster(
        ACTION_CREATE,
        clustername,
        roles,
        extraArgs,
        blockUntilRunning,
        clusterOps)
  }

  /**
   * Create a templated slider app
   * @param name name
   * @param appTemplate application template
   * @param resourceTemplate resource template
   * @return the shell
   */
  public SliderShell createTemplatedSliderApplication(
      String name,
      String appTemplate,
      String resourceTemplate,
      List<String> extraArgs = [],
      File launchReport = null) {

    if (!launchReport) {
      launchReport = createAppReportFile()
    }
    
    List<String> commands = [
        ACTION_CREATE, name,
        ARG_TEMPLATE, appTemplate,
        ARG_RESOURCES, resourceTemplate,
        ARG_OUTPUT, launchReport.absolutePath,
        ARG_WAIT, Integer.toString(THAW_WAIT_TIME)
    ]

    maybeAddCommandOption(commands,
        [ARG_COMP_OPT, SliderKeys.COMPONENT_AM, SliderXmlConfKeys.KEY_AM_LOGIN_KEYTAB_NAME],
        SLIDER_CONFIG.getTrimmed(SliderXmlConfKeys.KEY_AM_LOGIN_KEYTAB_NAME));
    maybeAddCommandOption(commands,
        [ARG_COMP_OPT, SliderKeys.COMPONENT_AM, SliderXmlConfKeys.KEY_HDFS_KEYTAB_DIR],
        SLIDER_CONFIG.getTrimmed(SliderXmlConfKeys.KEY_HDFS_KEYTAB_DIR));
    maybeAddCommandOption(commands,
        [ARG_COMP_OPT, SliderKeys.COMPONENT_AM, SliderXmlConfKeys.KEY_AM_KEYTAB_LOCAL_PATH],
        SLIDER_CONFIG.getTrimmed(SliderXmlConfKeys.KEY_AM_KEYTAB_LOCAL_PATH));
    maybeAddCommandOption(commands,
        [ARG_COMP_OPT, SliderKeys.COMPONENT_AM, SliderXmlConfKeys.KEY_KEYTAB_PRINCIPAL],
        SLIDER_CONFIG.getTrimmed(SliderXmlConfKeys.KEY_KEYTAB_PRINCIPAL));
    commands.addAll(extraArgs)
    SliderShell shell = new SliderShell(commands)
    shell.execute()
    if (!shell.execute()) {
      // app has failed.

      // grab the app report of the last known instance of this app
      // which may not be there if it was a config failure; may be out of date
      // from a previous run
      log.error(
          "Launch failed with exit code ${shell.ret}")
      shell.dumpOutput()

      // now grab that app report if it is there
      def appReport = maybeLookupFromLaunchReport(launchReport)
      String extraText = ""
      if (appReport) {
        log.error("Application report:\n$appReport")
        extraText = appReport.diagnostics
      }

      fail("Application Launch Failure, exit code  ${shell.ret}\n${extraText}")
    }
    return shell
  }

  public File createAppReportFile() {
    File reportFile = File.createTempFile(
        "launch",
        ".json",
        new File("target"))
    return reportFile
  }

  /**
   * If the option is not null/empty, add the command and the option
   * @param args arg list being built up
   * @param command command to add option
   * @param option option to probe and use
   * @return the (possibly extended) list
   */
  public List<String> maybeAddCommandOption(
      List<String> args, List<String> commands, String option) {
    if ( SliderUtils.isSet(option)) {
      args.addAll(commands)
      args << option
    }
    return args
  }
  
  public SerializedApplicationReport maybeLoadAppReport(File reportFile) {
    if (reportFile.exists() && reportFile.length()> 0) {
      ApplicationReportSerDeser serDeser = new ApplicationReportSerDeser()
      def report = serDeser.fromFile(reportFile)
      return report
    }    
    return null;
  }  
  
  public SerializedApplicationReport maybeLookupFromLaunchReport(File launchReport) {
    def report = maybeLoadAppReport(launchReport)
    if (report) {
      return lookupApplication(report.applicationId)
    } else {
      return null
    }
  }

  /**
   * Lookup an application, return null if loading failed
   * @param id application ID
   * @return an application report or null
   */
  public SerializedApplicationReport lookupApplication(String id) {
    File reportFile = createAppReportFile();
    try {
      def shell = lookup(id, reportFile)
      if (shell.ret) {
        return maybeLoadAppReport(reportFile)
      } else {
        log.warn("Lookup operation failed:\n" + shell.dumpOutput())
        return null
      }
    } finally {
      reportFile.delete()
      
    }
  }

  
  public Path buildClusterPath(String clustername) {
    return new Path(
        clusterFS.homeDirectory,
        "${SliderKeys.SLIDER_BASE_DIRECTORY}/cluster/${clustername}")
  }


  public ClusterDescription killAmAndWaitForRestart(
      SliderClient sliderClient, String cluster) {

    assert cluster
    slider(0, [
        ACTION_AM_SUICIDE, cluster,
        ARG_EXITCODE, "1",
        ARG_WAIT, "1000",
        ARG_MESSAGE, "suicide"
    ])


    sleep(5000)
    ensureApplicationIsUp(cluster)
    
/*
    def sleeptime = SLIDER_CONFIG.getInt(KEY_AM_RESTART_SLEEP_TIME,
        DEFAULT_AM_RESTART_SLEEP_TIME)
    sleep(sleeptime)
*/
    ClusterDescription status

    status = sliderClient.clusterDescription
    return status
  }

  protected void ensureRegistryCallSucceeds(String application) {
    repeatUntilTrue(this.&isRegistryAccessible,
        10,
        5 * 1000,
        [application: application],
        true,
        'Application registry is not accessible, failing test.') {
      describe "final state of app that tests say is not able to access registry"
      exists(application, true).dumpOutput()
    }
  }

  protected void ensureApplicationIsUp(String application) {
    repeatUntilTrue(this.&isApplicationRunning,
        30,
        SLIDER_CONFIG.getInt(KEY_TEST_INSTANCE_LAUNCH_TIME,
            DEFAULT_INSTANCE_LAUNCH_TIME_SECONDS),
        [application: application],
        true,
        'Application did not start, failing test.') {
      describe "final state of app that tests say is not up"
      exists(application, true).dumpOutput()
    }
  }

  protected boolean isRegistryAccessible(Map<String, String> args) {
    String applicationName = args['application'];
    SliderShell shell = slider(
        [
            ACTION_REGISTRY,
            ARG_NAME,
            applicationName,
            ARG_LISTEXP])
    if (EXIT_SUCCESS != shell.execute()) {
      logShell(shell)
    }
    return EXIT_SUCCESS == shell.execute()
  }

  protected boolean isApplicationRunning(Map<String, String> args) {
    String applicationName = args['application'];
    return isApplicationInState(YarnApplicationState.RUNNING, applicationName);
  }

  protected boolean isApplicationUp(String applicationName) {
    return isApplicationInState(YarnApplicationState.RUNNING, applicationName);
  }

  /**
   * 
   * @param yarnState
   * @param applicationName
   * @return
   */
  public static boolean isApplicationInState(YarnApplicationState yarnState, String applicationName) {
    SliderShell shell = slider(
      [ACTION_EXISTS, applicationName, ARG_STATE, yarnState.toString()])

    return shell.ret == 0
  }

  /**
   * Repeat a probe until it succeeds, if it does not execute a failure
   * closure then raise an exception with the supplied message
   * @param probe probe
   * @param maxAttempts max number of attempts
   * @param sleepDur sleep between failing attempts
   * @param args map of arguments to the probe
   * @param failIfUnsuccessful if the probe fails after all the attempts
   * —should it raise an exception
   * @param failureMessage message to include in exception raised
   * @param failureHandler closure to invoke prior to the failure being raised
   */
  protected void repeatUntilTrue(Closure probe,
      int maxAttempts, int sleepDur, Map args,
      boolean failIfUnsuccessful = false,
      String failureMessage,
      Closure failureHandler) {
    int attemptCount = 0
    boolean succeeded = false;
    while (attemptCount < maxAttempts) {
      if (probe(args)) {
        // finished
        log.debug("Success after $attemptCount attempt(s)")
        succeeded = true;
        break
      };
      attemptCount++;

      sleep(sleepDur)
    }
    
    if (failIfUnsuccessful & !succeeded) {
      if (failureHandler) {
        failureHandler()
      }
      fail(failureMessage)
    }
  }

  public String getInfoAmWebUrl(String applicationName) {
    ClusterDescription cd = execStatus(applicationName);
    String urlString = cd.getInfo("info.am.web.url");
    return urlString;
  }

  public ClusterDescription execStatus(String application) {
    ClusterDescription cd
    File statusFile = File.createTempFile("status", ".json")
    try {
      slider(EXIT_SUCCESS,
          [
              ACTION_STATUS,
              application,
              ARG_OUTPUT, statusFile.absolutePath
          ])

      assert statusFile.exists()
      return ClusterDescription.fromFile(statusFile)
    } finally {
      statusFile.delete()
    }
  }

  public int queryRequestedCount(String  application, String role) {
    ClusterDescription cd = execStatus(application)

    if (cd.statistics.size() == 0) {
      log.debug("No statistics entries")
    }
    
    if (!cd.statistics[role]) {
      log.debug("No stats for role $role")
      return 0;
    }
    def statsForRole = cd.statistics[role]

    def requested = statsForRole[StatusKeys.STATISTICS_CONTAINERS_REQUESTED]
    assert null != statsForRole[StatusKeys.STATISTICS_CONTAINERS_REQUESTED]
    return requested
  }

  boolean hasRequestedContainerCountReached(Map<String, String> args) {
    String application = args['application']
    String role = args['role']
    int expectedCount = args['limit'].toInteger();

    int requestedCount = queryRequestedCount(application, role)
    log.debug("requested count = $requestedCount; expected=$expectedCount")
    return requestedCount >= expectedCount
  }

  void expectContainerRequestedCountReached(String application, String role, int limit) {

    repeatUntilTrue(
        this.&hasRequestedContainerCountReached,
        90,
        1000,
        [limit      : Integer.toString(limit),
         role       : role,
         application: application],
        true,
        "countainer count not reached") {
      describe "container count not reached"
      ClusterDescription cd = execStatus(application);
      log.info("Parsed status \n$cd")
      status(application).dumpOutput()
    };

  }

  public ClusterDescription expectContainersLive(String clustername,
      String component,
      int count) {
    ClusterDescription cd = execStatus(clustername)
    assertContainersLive(cd, component, count)
    return cd;
  }
}
