/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
*/

package org.apache.slider.client;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathNotFoundException;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.registry.client.api.RegistryConstants;
import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
import org.apache.hadoop.registry.client.types.RegistryPathStatus;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.alias.CredentialProvider;
import org.apache.hadoop.security.alias.CredentialProviderFactory;
import org.apache.hadoop.security.alias.CredentialShell;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.registry.client.api.RegistryOperations;

import static org.apache.hadoop.registry.client.binding.RegistryUtils.*;

import org.apache.hadoop.registry.client.binding.RegistryUtils;
import org.apache.hadoop.registry.client.exceptions.NoRecordException;
import org.apache.hadoop.registry.client.types.Endpoint;
import org.apache.hadoop.registry.client.types.ServiceRecord;
import org.apache.hadoop.registry.client.types.yarn.YarnRegistryAttributes;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.slider.api.ClusterDescription;
import org.apache.slider.api.ClusterNode;
import org.apache.slider.api.InternalKeys;
import org.apache.slider.api.OptionKeys;
import org.apache.slider.api.ResourceKeys;
import org.apache.slider.api.SliderClusterProtocol;
import org.apache.slider.api.proto.Messages;
import org.apache.slider.common.Constants;
import org.apache.slider.common.SliderExitCodes;
import org.apache.slider.common.SliderKeys;
import org.apache.slider.common.params.AbstractActionArgs;
import org.apache.slider.common.params.AbstractClusterBuildingActionArgs;
import org.apache.slider.common.params.ActionDiagnosticArgs;
import org.apache.slider.common.params.ActionExistsArgs;
import org.apache.slider.common.params.ActionInstallKeytabArgs;
import org.apache.slider.common.params.ActionInstallPackageArgs;
import org.apache.slider.common.params.ActionAMSuicideArgs;
import org.apache.slider.common.params.ActionCreateArgs;
import org.apache.slider.common.params.ActionEchoArgs;
import org.apache.slider.common.params.ActionFlexArgs;
import org.apache.slider.common.params.ActionFreezeArgs;
import org.apache.slider.common.params.ActionKillContainerArgs;
import org.apache.slider.common.params.ActionListArgs;
import org.apache.slider.common.params.ActionLookupArgs;
import org.apache.slider.common.params.ActionRegistryArgs;
import org.apache.slider.common.params.ActionResolveArgs;
import org.apache.slider.common.params.ActionStatusArgs;
import org.apache.slider.common.params.ActionThawArgs;
import org.apache.slider.common.params.Arguments;
import org.apache.slider.common.params.ClientArgs;
import org.apache.slider.common.params.CommonArgs;
import org.apache.slider.common.params.LaunchArgsAccessor;
import org.apache.slider.common.tools.ConfigHelper;
import org.apache.slider.common.tools.Duration;
import org.apache.slider.common.tools.SliderFileSystem;
import org.apache.slider.common.tools.SliderUtils;
import org.apache.slider.common.tools.SliderVersionInfo;
import org.apache.slider.core.build.InstanceBuilder;
import org.apache.slider.core.build.InstanceIO;
import org.apache.slider.core.conf.AggregateConf;
import org.apache.slider.core.conf.ConfTree;
import org.apache.slider.core.conf.ConfTreeOperations;
import org.apache.slider.core.conf.MapOperations;
import org.apache.slider.core.conf.ResourcesInputPropertiesValidator;
import org.apache.slider.core.conf.TemplateInputPropertiesValidator;
import org.apache.slider.core.exceptions.BadClusterStateException;
import org.apache.slider.core.exceptions.BadCommandArgumentsException;
import org.apache.slider.core.exceptions.BadConfigException;
import org.apache.slider.core.exceptions.ErrorStrings;
import org.apache.slider.core.exceptions.NoSuchNodeException;
import org.apache.slider.core.exceptions.NotFoundException;
import org.apache.slider.core.exceptions.SliderException;
import org.apache.slider.core.exceptions.UnknownApplicationInstanceException;
import org.apache.slider.core.exceptions.WaitTimeoutException;
import org.apache.slider.core.launch.AppMasterLauncher;
import org.apache.slider.core.launch.ClasspathConstructor;
import org.apache.slider.core.launch.CommandLineBuilder;
import org.apache.slider.core.launch.JavaCommandLineBuilder;
import org.apache.slider.core.launch.LaunchedApplication;
import org.apache.slider.core.launch.RunningApplication;
import org.apache.slider.core.launch.SerializedApplicationReport;
import org.apache.slider.core.main.RunService;
import org.apache.slider.core.persist.ApplicationReportSerDeser;
import org.apache.slider.core.persist.ConfPersister;
import org.apache.slider.core.persist.LockAcquireFailedException;
import org.apache.slider.core.registry.SliderRegistryUtils;
import org.apache.slider.core.registry.YarnAppListClient;
import org.apache.slider.core.registry.docstore.ConfigFormat;
import org.apache.slider.core.registry.docstore.PublishedConfigSet;
import org.apache.slider.core.registry.docstore.PublishedConfiguration;
import org.apache.slider.core.registry.docstore.PublishedConfigurationOutputter;
import org.apache.slider.core.registry.docstore.PublishedExports;
import org.apache.slider.core.registry.docstore.PublishedExportsOutputter;
import org.apache.slider.core.registry.docstore.PublishedExportsSet;
import org.apache.slider.core.registry.retrieve.RegistryRetriever;
import org.apache.slider.core.zk.BlockingZKWatcher;
import org.apache.slider.core.zk.ZKIntegration;
import org.apache.slider.core.zk.ZKPathBuilder;
import org.apache.slider.providers.AbstractClientProvider;
import org.apache.slider.providers.SliderProviderFactory;
import org.apache.slider.providers.agent.AgentKeys;
import org.apache.slider.providers.slideram.SliderAMClientProvider;
import org.apache.slider.server.appmaster.SliderAppMaster;
import org.apache.slider.server.appmaster.rpc.RpcBinder;
import org.apache.slider.server.services.utility.AbstractSliderLaunchedService;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.io.StringWriter;
import java.io.Writer;
import java.net.InetSocketAddress;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.regex.Pattern;

import static org.apache.slider.common.params.SliderActions.*;

/**
 * Client service for Slider
 */

public class SliderClient extends AbstractSliderLaunchedService implements RunService,
    SliderExitCodes, SliderKeys, ErrorStrings, SliderClientAPI {
  private static final Logger log = LoggerFactory.getLogger(SliderClient.class);

  private ClientArgs serviceArgs;
  public ApplicationId applicationId;
  
  private String deployedClusterName;
  /**
   * Cluster opaerations against the deployed cluster -will be null
   * if no bonding has yet taken place
   */
  private SliderClusterOperations sliderClusterOperations;

  protected SliderFileSystem sliderFileSystem;

  /**
   * Yarn client service
   */
  private SliderYarnClientImpl yarnClient;
  private YarnAppListClient YarnAppListClient;
  private AggregateConf launchedInstanceDefinition;

  /**
   * The YARN registry service
   */
  private RegistryOperations registryOperations;

  /**
   * Constructor
   */
  public SliderClient() {
    super("Slider Client");
    new HdfsConfiguration();
    new YarnConfiguration();
  }

  /**
   * This is called <i>Before serviceInit is called</i>
   * @param config the initial configuration build up by the
   * service launcher.
   * @param args argument list list of arguments passed to the command line
   * after any launcher-specific commands have been stripped.
   * @return the post-binding configuration to pass to the <code>init()</code>
   * operation.
   * @throws Exception
   */
  @Override
  public Configuration bindArgs(Configuration config, String... args) throws Exception {
    config = super.bindArgs(config, args);
    serviceArgs = new ClientArgs(args);
    serviceArgs.parse();
    // yarn-ify
    YarnConfiguration yarnConfiguration = new YarnConfiguration(config);
    return SliderUtils.patchConfiguration(yarnConfiguration);
  }

  @Override
  protected void serviceInit(Configuration conf) throws Exception {
    Configuration clientConf = SliderUtils.loadClientConfigurationResource();
    ConfigHelper.mergeConfigurations(conf, clientConf, CLIENT_RESOURCE, true);
    serviceArgs.applyDefinitions(conf);
    serviceArgs.applyFileSystemBinding(conf);
    // init security with our conf
    if (SliderUtils.isHadoopClusterSecure(conf)) {
      SliderUtils.forceLogin();
      SliderUtils.initProcessSecurity(conf);
    }
    AbstractActionArgs coreAction = serviceArgs.getCoreAction();
    if (coreAction.getHadoopServicesRequired()) {
      initHadoopBinding();
    }
    super.serviceInit(conf);
  }

  /**
   * this is where the work is done.
   * @return the exit code
   * @throws Throwable anything that went wrong
   */
/* JDK7

  @Override
  public int runService() throws Throwable {

    // choose the action
    String action = serviceArgs.getAction();
    int exitCode = EXIT_SUCCESS;
    String clusterName = serviceArgs.getClusterName();
    // actions
    switch (action) {
      case ACTION_BUILD:
        exitCode = actionBuild(clusterName, serviceArgs.getActionBuildArgs());
        break;
      case ACTION_UPDATE:
        exitCode = actionUpdate(clusterName, serviceArgs.getActionUpdateArgs());
        break;
      case ACTION_CREATE:
        exitCode = actionCreate(clusterName, serviceArgs.getActionCreateArgs());
        break;
      case ACTION_FREEZE:
        exitCode = actionFreeze(clusterName, serviceArgs.getActionFreezeArgs());
        break;
      case ACTION_THAW:
        exitCode = actionThaw(clusterName, serviceArgs.getActionThawArgs());
        break;
      case ACTION_DESTROY:
        exitCode = actionDestroy(clusterName);
        break;
      case ACTION_EXISTS:
        exitCode = actionExists(clusterName,
            serviceArgs.getActionExistsArgs().live);
        break;
      case ACTION_FLEX:
        exitCode = actionFlex(clusterName, serviceArgs.getActionFlexArgs());
        break;
      case ACTION_GETCONF:
        exitCode =
            actionGetConf(clusterName, serviceArgs.getActionGetConfArgs());
        break;
      case ACTION_HELP:
      case ACTION_USAGE:
        log.info(serviceArgs.usage());
        break;
      case ACTION_KILL_CONTAINER:
        exitCode = actionKillContainer(clusterName,
            serviceArgs.getActionKillContainerArgs());
        break;
      case ACTION_AM_SUICIDE:
        exitCode = actionAmSuicide(clusterName,
            serviceArgs.getActionAMSuicideArgs());
        break;
      case ACTION_LIST:
        exitCode = actionList(clusterName, serviceArgs.getActionListArgs());
        break;
      case ACTION_REGISTRY:
        exitCode = actionRegistry(
            serviceArgs.getActionRegistryArgs());
        break;
      case ACTION_STATUS:
        exitCode = actionStatus(clusterName,
            serviceArgs.getActionStatusArgs());
        break;
      case ACTION_VERSION:
        exitCode = actionVersion();
        break;
      default:
        throw new SliderException(EXIT_UNIMPLEMENTED,
            "Unimplemented: " + action);
    }

    return exitCode;
  }

*/

  /**
   * Launched service execution. This runs {@link #exec()}
   * then catches some exceptions and converts them to exit codes
   * @return an exit code
   * @throws Throwable
   */
  @Override
  public int runService() throws Throwable {
    try {
      return exec();
    } catch (FileNotFoundException nfe) {
      throw new NotFoundException(nfe, nfe.toString());
    } catch (PathNotFoundException nfe) {
      throw new NotFoundException(nfe, nfe.toString());
    }
  }

  /**
   * Execute the command line
   * @return an exit code
   * @throws Throwable on a failure
   */
  public int exec() throws Throwable {

    // choose the action
    String action = serviceArgs.getAction();

    int exitCode = EXIT_SUCCESS;
    String clusterName = serviceArgs.getClusterName();
    // actions
    if (ACTION_INSTALL_PACKAGE.equals(action)) {
      exitCode = actionInstallPkg(serviceArgs.getActionInstallPackageArgs());
    } else if (ACTION_INSTALL_KEYTAB.equals(action)) {
      exitCode = actionInstallKeytab(serviceArgs.getActionInstallKeytabArgs());
    } else if (ACTION_BUILD.equals(action)) {
      exitCode = actionBuild(clusterName, serviceArgs.getActionBuildArgs());
    } else if (ACTION_CREATE.equals(action)) {
      exitCode = actionCreate(clusterName, serviceArgs.getActionCreateArgs());
    } else if (ACTION_FREEZE.equals(action)) {
      exitCode = actionFreeze(clusterName,
            serviceArgs.getActionFreezeArgs());
    } else if (ACTION_THAW.equals(action)) {
      exitCode = actionThaw(clusterName, serviceArgs.getActionThawArgs());
    } else if (ACTION_DESTROY.equals(action)) {
      exitCode = actionDestroy(clusterName);
    } else if (ACTION_DIAGNOSTICS.equals(action)) {
      exitCode = actionDiagnostic(serviceArgs.getActionDiagnosticArgs());
    } else if (ACTION_EXISTS.equals(action)) {
      exitCode = actionExists(clusterName,
          serviceArgs.getActionExistsArgs());
    } else if (ACTION_FLEX.equals(action)) {
      exitCode = actionFlex(clusterName, serviceArgs.getActionFlexArgs());
    } else if (ACTION_HELP.equals(action)) {
      log.info(serviceArgs.usage());
    } else if (ACTION_KILL_CONTAINER.equals(action)) {
      exitCode = actionKillContainer(clusterName,
          serviceArgs.getActionKillContainerArgs());
    } else if (ACTION_AM_SUICIDE.equals(action)) {
      exitCode = actionAmSuicide(clusterName,
          serviceArgs.getActionAMSuicideArgs());
    } else if (ACTION_LIST.equals(action)) {
      exitCode = actionList(clusterName, serviceArgs.getActionListArgs());
    } else if (ACTION_LOOKUP.equals(action)) {
      exitCode = actionLookup(serviceArgs.getActionLookupArgs());
    } else if (ACTION_REGISTRY.equals(action)) {
      exitCode = actionRegistry(serviceArgs.getActionRegistryArgs());
    } else if (ACTION_RESOLVE.equals(action)) {
      exitCode = actionResolve(serviceArgs.getActionResolveArgs());
    } else if (ACTION_STATUS.equals(action)) {
      exitCode = actionStatus(clusterName,
          serviceArgs.getActionStatusArgs());
    } else if (ACTION_UPDATE.equals(action)) {
      exitCode = actionUpdate(clusterName, serviceArgs.getActionUpdateArgs());
    } else if (ACTION_VERSION.equals(action)) {
      exitCode = actionVersion();
    } else if (SliderUtils.isUnset(action)) {
        throw new SliderException(EXIT_USAGE,
                serviceArgs.usage());
    } else {
      throw new SliderException(EXIT_UNIMPLEMENTED,
          "Unimplemented: " + action);
    }

    return exitCode;
  }

/**
   * Perform everything needed to init the hadoop binding.
   * This assumes that the service is already  in inited or started state
   * @throws IOException
   * @throws SliderException
   */
  protected void initHadoopBinding() throws IOException, SliderException {
    // validate the client
    SliderUtils.validateSliderClientEnvironment(null);
    //create the YARN client
    yarnClient = new SliderYarnClientImpl();
    yarnClient.init(getConfig());
    if (getServiceState() == STATE.STARTED) {
      yarnClient.start();
    }
    addService(yarnClient);
    YarnAppListClient =
        new YarnAppListClient(yarnClient, getUsername(), getConfig());
    // create the filesystem
    sliderFileSystem = new SliderFileSystem(getConfig());    
  }

  /**
   * Delete the zookeeper node associated with the calling user and the cluster
   * TODO: YARN registry operations
   **/
  @VisibleForTesting
  public boolean deleteZookeeperNode(String clusterName) throws YarnException, IOException {
    String user = getUsername();
    String zkPath = ZKIntegration.mkClusterPath(user, clusterName);
    Exception e = null;
    try {
      Configuration config = getConfig();
      ZKIntegration client = getZkClient(clusterName, user);
      if (client != null) {
        if (client.exists(zkPath)) {
          log.info("Deleting zookeeper path {}", zkPath);
        }
        client.deleteRecursive(zkPath);
        return true;
      }
    } catch (InterruptedException ignored) {
      e = ignored;
    } catch (KeeperException ignored) {
      e = ignored;
    } catch (BadConfigException ignored) {
      e = ignored;
    }
    if (e != null) {
      log.debug("Unable to recursively delete zk node {}", zkPath);
      log.debug("Reason: ", e);
    }

    return false;
  }

  /**
   * Create the zookeeper node associated with the calling user and the cluster
   */
  @VisibleForTesting
  public String createZookeeperNode(String clusterName, Boolean nameOnly) throws YarnException, IOException {
    String user = getUsername();
    String zkPath = ZKIntegration.mkClusterPath(user, clusterName);
    if (nameOnly) {
      return zkPath;
    }
    Configuration config = getConfig();
    ZKIntegration client = getZkClient(clusterName, user);
    if (client != null) {
      try {
        client.createPath(zkPath, "", ZooDefs.Ids.OPEN_ACL_UNSAFE,
                          CreateMode.PERSISTENT);
        return zkPath;
      } catch (InterruptedException e) {
        log.warn("Unable to create default zk node {}", zkPath, e);
      } catch (KeeperException e) {
        log.warn("Unable to create default zk node {}", zkPath, e);
      }
    }

    return null;
  }

  /**
   * Gets a zookeeper client, returns null if it cannot connect to zookeeper
   **/
  protected ZKIntegration getZkClient(String clusterName, String user) throws YarnException {
    String registryQuorum = lookupZKQuorum();
    ZKIntegration client = null;
    try {
      BlockingZKWatcher watcher = new BlockingZKWatcher();
      client = ZKIntegration.newInstance(registryQuorum, user, clusterName, true, false, watcher);
      client.init();
      watcher.waitForZKConnection(2 * 1000);
    } catch (InterruptedException e) {
      client = null;
      log.warn("Unable to connect to zookeeper quorum {}", registryQuorum, e);
    } catch (IOException e) {
      log.warn("Unable to connect to zookeeper quorum {}", registryQuorum, e);
    }
    return client;
  }

  @Override
  public int actionDestroy(String clustername) throws YarnException,
                                                      IOException {
    // verify that a live cluster isn't there
    SliderUtils.validateClusterName(clustername);
    //no=op, it is now mandatory. 
    verifyBindingsDefined();
    verifyNoLiveClusters(clustername, "Destroy");

    // create the directory path
    Path clusterDirectory = sliderFileSystem.buildClusterDirPath(clustername);
    // delete the directory;
    boolean exists = sliderFileSystem.getFileSystem().exists(clusterDirectory);
    if (exists) {
      log.debug("Application Instance {} found at {}: destroying", clustername, clusterDirectory);
      boolean deleted =
          sliderFileSystem.getFileSystem().delete(clusterDirectory, true);
      if (!deleted) {
        log.warn("Filesystem returned false from delete() operation");
      }

      if(!deleteZookeeperNode(clustername)) {
        log.warn("Unable to perform node cleanup in Zookeeper.");
      }

    } else {
      log.debug("Application Instance {} already destroyed", clustername);
    }


    // rm the registry entry —do not let this block the destroy operations
    String registryPath = SliderRegistryUtils.registryPathForInstance(
        clustername);
    try {
      getRegistryOperations().delete(registryPath, true);
    } catch (IOException e) {
      log.warn("Error deleting registry entry {}: {} ", registryPath, e, e);
    } catch (SliderException e) {
      log.warn("Error binding to registry {} ", e, e);
    }

    List<ApplicationReport> instances = findAllLiveInstances(clustername);
    // detect any race leading to cluster creation during the check/destroy process
    // and report a problem.
    if (!instances.isEmpty()) {
      throw new SliderException(EXIT_APPLICATION_IN_USE,
                              clustername + ": "
                              + E_DESTROY_CREATE_RACE_CONDITION
                              + " :" +
                              instances.get(0));
    }
    log.info("Destroyed cluster {}", clustername);
    return EXIT_SUCCESS;
  }
  
  @Override
  public int actionAmSuicide(String clustername,
      ActionAMSuicideArgs args) throws YarnException, IOException {
    SliderClusterOperations clusterOperations =
      createClusterOperations(clustername);
    clusterOperations.amSuicide(args.message, args.exitcode, args.waittime);
    return EXIT_SUCCESS;
  }

  @Override
  public AbstractClientProvider createClientProvider(String provider)
    throws SliderException {
    SliderProviderFactory factory =
      SliderProviderFactory.createSliderProviderFactory(provider);
    return factory.createClientProvider();
  }

  /**
   * Create the cluster -saving the arguments to a specification file first
   * @param clustername cluster name
   * @return the status code
   * @throws YarnException Yarn problems
   * @throws IOException other problems
   * @throws BadCommandArgumentsException bad arguments.
   */
  public int actionCreate(String clustername, ActionCreateArgs createArgs) throws
                                               YarnException,
                                               IOException {

    actionBuild(clustername, createArgs);
    Path clusterDirectory = sliderFileSystem.buildClusterDirPath(clustername);
    AggregateConf instanceDefinition = loadInstanceDefinitionUnresolved(
        clustername, clusterDirectory);
    try {
      checkForCredentials(getConfig(), instanceDefinition.getAppConf());
    } catch (Exception e) {
      throw new IOException("problem setting credentials", e);
    }
    return startCluster(clustername, createArgs);
  }

  private void checkForCredentials(Configuration conf,
      ConfTree tree) throws Exception {
    if (tree.credentials == null || tree.credentials.size()==0) {
      log.info("No credentials requested");
      return;
    }
    CredentialShell credentialShell = null;
    for (Entry<String, List<String>> cred : tree.credentials.entrySet()) {
      String provider = cred.getKey();
      List<String> aliases = cred.getValue();
      if (aliases == null || aliases.size()==0) {
        continue;
      }
      if (credentialShell == null) {
        credentialShell = new CredentialShell();
        credentialShell.setConf(conf);
      }
      Configuration c = new Configuration(conf);
      c.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH, provider);
      CredentialProvider credentialProvider =
          CredentialProviderFactory.getProviders(c).get(0);
      Set<String> existingAliases = new HashSet<String>(credentialProvider.getAliases());
      for (String alias : aliases) {
        if (existingAliases.contains(alias.toLowerCase(Locale.ENGLISH))) {
          log.warn("Skipping creation of credentials for {}, " +
              "alias already exists in {}", alias, provider);
          continue;
        }
        String[] csarg = new String[]{
            "create", alias, "-provider", provider};
        log.info("Creating credentials for {} in {}", alias, provider);
        credentialShell.run(csarg);
      }
    }
  }

  @Override
  public int actionBuild(String clustername,
      AbstractClusterBuildingActionArgs buildInfo) throws
                                               YarnException,
                                               IOException {

    buildInstanceDefinition(clustername, buildInfo, false, false);
    return EXIT_SUCCESS; 
  }

  @Override
  public int actionInstallKeytab(ActionInstallKeytabArgs installKeytabInfo)
      throws YarnException, IOException {

    Path srcFile = null;
    if (StringUtils.isEmpty(installKeytabInfo.folder)) {
      throw new BadCommandArgumentsException(
          "A valid destination keytab sub-folder name is required (e.g. 'security').\n"
              + CommonArgs.usage(serviceArgs, ACTION_INSTALL_KEYTAB));
    }

    if (StringUtils.isEmpty(installKeytabInfo.keytabUri)) {
      throw new BadCommandArgumentsException("A valid local keytab location is required.");
    } else {
      File keytabFile = new File(installKeytabInfo.keytabUri);
      if (!keytabFile.exists() || keytabFile.isDirectory()) {
        throw new BadCommandArgumentsException("Unable to access supplied keytab file at " +
                                               keytabFile.getAbsolutePath());
      } else {
        srcFile = new Path(keytabFile.toURI());
      }
    }

    Path pkgPath = sliderFileSystem.buildKeytabInstallationDirPath(installKeytabInfo.folder);
    sliderFileSystem.getFileSystem().mkdirs(pkgPath);
    sliderFileSystem.getFileSystem().setPermission(pkgPath, new FsPermission(
        FsAction.ALL, FsAction.NONE, FsAction.NONE));

    Path fileInFs = new Path(pkgPath, srcFile.getName());
    log.info("Installing keytab {} at {} and overwrite is {}.", srcFile, fileInFs, installKeytabInfo.overwrite);
    if (sliderFileSystem.getFileSystem().exists(fileInFs) && !installKeytabInfo.overwrite) {
      throw new BadCommandArgumentsException("Keytab exists at " +
                                             fileInFs.toUri().toString() +
                                             ". Use --overwrite to overwrite.");
    }

    sliderFileSystem.getFileSystem().copyFromLocalFile(false, installKeytabInfo.overwrite, srcFile, fileInFs);
    sliderFileSystem.getFileSystem().setPermission(fileInFs, new FsPermission(
        FsAction.READ_WRITE, FsAction.NONE, FsAction.NONE));

    return EXIT_SUCCESS;
  }

  @Override
  public int actionInstallPkg(ActionInstallPackageArgs installPkgInfo) throws
      YarnException,
      IOException {

    Path srcFile = null;
    if (StringUtils.isEmpty(installPkgInfo.name)) {
      throw new BadCommandArgumentsException(
          "A valid application type name is required (e.g. HBASE).\n"
              + CommonArgs.usage(serviceArgs, ACTION_INSTALL_PACKAGE));
    }

    if (StringUtils.isEmpty(installPkgInfo.packageURI)) {
      throw new BadCommandArgumentsException("A valid application package location required.");
    } else {
      File pkgFile = new File(installPkgInfo.packageURI);
      if (!pkgFile.exists() || pkgFile.isDirectory()) {
        throw new BadCommandArgumentsException("Unable to access supplied pkg file at " +
                                               pkgFile.getAbsolutePath());
      } else {
        srcFile = new Path(pkgFile.toURI());
      }
    }

    Path pkgPath = sliderFileSystem.buildPackageDirPath(installPkgInfo.name);
    sliderFileSystem.getFileSystem().mkdirs(pkgPath);

    Path fileInFs = new Path(pkgPath, srcFile.getName());
    log.info("Installing package {} at {} and overwrite is {}.", srcFile, fileInFs, installPkgInfo.replacePkg);
    if (sliderFileSystem.getFileSystem().exists(fileInFs) && !installPkgInfo.replacePkg) {
      throw new BadCommandArgumentsException("Pkg exists at " +
                                             fileInFs.toUri().toString() +
                                             ". Use --replacepkg to overwrite.");
    }

    sliderFileSystem.getFileSystem().copyFromLocalFile(false, installPkgInfo.replacePkg, srcFile, fileInFs);
    return EXIT_SUCCESS;
  }

  @Override
  public int actionUpdate(String clustername,
      AbstractClusterBuildingActionArgs buildInfo) throws
      YarnException, IOException {
    buildInstanceDefinition(clustername, buildInfo, true, true);
    return EXIT_SUCCESS; 
  }

  /**
   * Build up the AggregateConfiguration for an application instance then
   * persists it
   * @param clustername name of the cluster
   * @param buildInfo the arguments needed to build the cluster
   * @param overwrite true if existing cluster directory can be overwritten
   * @param liveClusterAllowed true if live cluster can be modified
   * @throws YarnException
   * @throws IOException
   */
  
  public void buildInstanceDefinition(String clustername,
      AbstractClusterBuildingActionArgs buildInfo, boolean overwrite, boolean liveClusterAllowed)
        throws YarnException, IOException {
    // verify that a live cluster isn't there
    SliderUtils.validateClusterName(clustername);
    verifyBindingsDefined();
    if (!liveClusterAllowed) {
      verifyNoLiveClusters(clustername, "Create");
    }

    Configuration conf = getConfig();
    String registryQuorum = lookupZKQuorum();

    Path appconfdir = buildInfo.getConfdir();
    // Provider
    String providerName = buildInfo.getProvider();
    requireArgumentSet(Arguments.ARG_PROVIDER, providerName);
    log.debug("Provider is {}", providerName);
    SliderAMClientProvider sliderAM = new SliderAMClientProvider(conf);
    AbstractClientProvider provider =
      createClientProvider(providerName);
    InstanceBuilder builder =
      new InstanceBuilder(sliderFileSystem, 
                          getConfig(),
                          clustername);
    
    AggregateConf instanceDefinition = new AggregateConf();
    ConfTreeOperations appConf = instanceDefinition.getAppConfOperations();
    ConfTreeOperations resources = instanceDefinition.getResourceOperations();
    ConfTreeOperations internal = instanceDefinition.getInternalOperations();
    //initial definition is set by the providers 
    sliderAM.prepareInstanceConfiguration(instanceDefinition);
    provider.prepareInstanceConfiguration(instanceDefinition);

    //load in any specified on the command line
    if (buildInfo.resources != null) {
      try {
        resources.mergeFile(buildInfo.resources,
                            new ResourcesInputPropertiesValidator());

      } catch (IOException e) {
        throw new BadConfigException(e,
               "incorrect argument to %s: \"%s\" : %s ", 
                                     Arguments.ARG_RESOURCES,
                                     buildInfo.resources,
                                     e.toString());
      }
    }
    if (buildInfo.template != null) {
      try {
        appConf.mergeFile(buildInfo.template,
                          new TemplateInputPropertiesValidator());
      } catch (IOException e) {
        throw new BadConfigException(e,
                                     "incorrect argument to %s: \"%s\" : %s ",
                                     Arguments.ARG_TEMPLATE,
                                     buildInfo.template,
                                     e.toString());
      }
    }

    //get the command line options
    ConfTree cmdLineAppOptions = buildInfo.buildAppOptionsConfTree();
    ConfTree cmdLineResourceOptions = buildInfo.buildResourceOptionsConfTree();

    appConf.merge(cmdLineAppOptions);

    // put the role counts into the resources file
    Map<String, String> argsRoleMap = buildInfo.getComponentMap();
    for (Map.Entry<String, String> roleEntry : argsRoleMap.entrySet()) {
      String count = roleEntry.getValue();
      String key = roleEntry.getKey();
      log.debug("{} => {}", key, count);
      resources.getOrAddComponent(key)
                 .put(ResourceKeys.COMPONENT_INSTANCES, count);
    }

    //all CLI role options
    Map<String, Map<String, String>> appOptionMap =
      buildInfo.getCompOptionMap();
    appConf.mergeComponents(appOptionMap);

    //internal picks up core. values only
    internal.propagateGlobalKeys(appConf, "slider.");
    internal.propagateGlobalKeys(appConf, "internal.");

    //copy over role. and yarn. values ONLY to the resources
    if (PROPAGATE_RESOURCE_OPTION) {
      resources.propagateGlobalKeys(appConf, "component.");
      resources.propagateGlobalKeys(appConf, "role.");
      resources.propagateGlobalKeys(appConf, "yarn.");
      resources.mergeComponentsPrefix(appOptionMap, "component.", true);
      resources.mergeComponentsPrefix(appOptionMap, "yarn.", true);
      resources.mergeComponentsPrefix(appOptionMap, "role.", true);
    }

    // resource component args
    appConf.merge(cmdLineResourceOptions);
    resources.merge(cmdLineResourceOptions);
    resources.mergeComponents(buildInfo.getResourceCompOptionMap());

    builder.init(providerName, instanceDefinition);
    builder.propagateFilename();
    builder.propagatePrincipals();
    builder.setImageDetailsIfAvailable(buildInfo.getImage(),
                                       buildInfo.getAppHomeDir());
    builder.setQueue(buildInfo.queue);

    String quorum = buildInfo.getZKhosts();
    if (SliderUtils.isUnset(quorum)) {
      quorum = registryQuorum;
    }
    if (isUnset(quorum)) {
      throw new BadConfigException("No Zookeeper quorum defined");
    }
    ZKPathBuilder zkPaths = new ZKPathBuilder(getAppName(),
        getUsername(),
        clustername,
        registryQuorum,
        quorum);
    String zookeeperRoot = buildInfo.getAppZKPath();

    if (isSet(zookeeperRoot)) {
      zkPaths.setAppPath(zookeeperRoot);
    } else {
      String createDefaultZkNode = appConf.getGlobalOptions().getOption(AgentKeys.CREATE_DEF_ZK_NODE, "false");
      if (createDefaultZkNode.equals("true")) {
        String defaultZKPath = createZookeeperNode(clustername, false);
        log.debug("ZK node created for application instance: {}", defaultZKPath);
        if (defaultZKPath != null) {
          zkPaths.setAppPath(defaultZKPath);
        }
      } else {
        // create AppPath if default is being used
        String defaultZKPath = createZookeeperNode(clustername, true);
        log.debug("ZK node assigned to application instance: {}", defaultZKPath);
        zkPaths.setAppPath(defaultZKPath);
      }
    }

    builder.addZKBinding(zkPaths);

    //then propagate any package URI
    if (buildInfo.packageURI != null) {
      appConf.set(AgentKeys.PACKAGE_PATH, buildInfo.packageURI);
    }

    // make any substitutions needed at this stage
    replaceTokens(appConf.getConfTree(), getUsername(), clustername);

    // providers to validate what there is
    AggregateConf instanceDescription = builder.getInstanceDescription();
    validateInstanceDefinition(sliderAM, instanceDescription, sliderFileSystem);
    validateInstanceDefinition(provider, instanceDescription, sliderFileSystem);
    try {
      persistInstanceDefinition(overwrite, appconfdir, builder);
    } catch (LockAcquireFailedException e) {
      log.warn("Failed to get a Lock on {} : {}", builder, e);
      throw new BadClusterStateException("Failed to save " + clustername
                                         + ": " + e);
    }
  }

  protected void persistInstanceDefinition(boolean overwrite,
                                         Path appconfdir,
                                         InstanceBuilder builder)
      throws IOException, SliderException, LockAcquireFailedException {
    builder.persist(appconfdir, overwrite);
  }

  @VisibleForTesting
  public static void replaceTokens(ConfTree conf,
      String userName, String clusterName) throws IOException {
    Map<String,String> newglobal = new HashMap<String,String>();
    for (Entry<String,String> entry : conf.global.entrySet()) {
      newglobal.put(entry.getKey(), replaceTokens(entry.getValue(),
          userName, clusterName));
    }
    conf.global.putAll(newglobal);

    Map<String,List<String>> newcred = new HashMap<String,List<String>>();
    for (Entry<String,List<String>> entry : conf.credentials.entrySet()) {
      List<String> resultList = new ArrayList<String>();
      for (String v : entry.getValue()) {
        resultList.add(replaceTokens(v, userName, clusterName));
      }
      newcred.put(replaceTokens(entry.getKey(), userName, clusterName),
          resultList);
    }
    conf.credentials.clear();
    conf.credentials.putAll(newcred);
  }

  private static String replaceTokens(String s, String userName,
      String clusterName) throws IOException {
    return s.replaceAll(Pattern.quote("${USER}"), userName)
        .replaceAll(Pattern.quote("${USER_NAME}"), userName)
        .replaceAll(Pattern.quote("${CLUSTER_NAME}"), clusterName);
  }

  public FsPermission getClusterDirectoryPermissions(Configuration conf) {
    String clusterDirPermsOct =
      conf.get(CLUSTER_DIRECTORY_PERMISSIONS,
          DEFAULT_CLUSTER_DIRECTORY_PERMISSIONS);
    return new FsPermission(clusterDirPermsOct);
  }

  /**
   * Verify that the Resource Manager is configured (on a non-HA cluster).
   * with a useful error message
   * @throws BadCommandArgumentsException the exception raised on an invalid config
   */
  public void verifyBindingsDefined() throws BadCommandArgumentsException {
    InetSocketAddress rmAddr = SliderUtils.getRmAddress(getConfig());
    if (!getConfig().getBoolean(YarnConfiguration.RM_HA_ENABLED, false)
     && !SliderUtils.isAddressDefined(rmAddr)) {
      throw new BadCommandArgumentsException(
        "No valid Resource Manager address provided in the argument "
        + Arguments.ARG_MANAGER
        + " or the configuration property "
        + YarnConfiguration.RM_ADDRESS 
        + " value :" + rmAddr);
    }
  }

  /**
   * Load and start a cluster specification.
   * This assumes that all validation of args and cluster state
   * have already taken place
   *
   * @param clustername name of the cluster.
   * @param launchArgs launch arguments
   * @return the exit code
   * @throws YarnException
   * @throws IOException
   */
  private int startCluster(String clustername,
                           LaunchArgsAccessor launchArgs) throws
                                                          YarnException,
                                                          IOException {
    Path clusterDirectory = sliderFileSystem.buildClusterDirPath(clustername);
    AggregateConf instanceDefinition = loadInstanceDefinitionUnresolved(
      clustername,
      clusterDirectory);

    LaunchedApplication launchedApplication =
      launchApplication(clustername, clusterDirectory, instanceDefinition,
                        serviceArgs.isDebug());
    applicationId = launchedApplication.getApplicationId();

    if (launchArgs.getOutputFile() != null) {
      // output file has been requested. Get the app report and serialize it
      ApplicationReport report =
          launchedApplication.getApplicationReport();
      SerializedApplicationReport sar = new SerializedApplicationReport(report);
      sar.submitTime = System.currentTimeMillis();
      ApplicationReportSerDeser serDeser = new ApplicationReportSerDeser();
      serDeser.save(sar, launchArgs.getOutputFile());
    }
    int waittime = launchArgs.getWaittime();
    if (waittime > 0) {
      return waitForAppRunning(launchedApplication, waittime, waittime);
    } else {
      // no waiting
      return EXIT_SUCCESS;
    }
  }

  /**
   * Load the instance definition. It is not resolved at this point
   * @param name cluster name
   * @param clusterDirectory cluster dir
   * @return the loaded configuration
   * @throws IOException
   * @throws SliderException
   * @throws UnknownApplicationInstanceException if the file is not found
   */
  public AggregateConf loadInstanceDefinitionUnresolved(String name,
                                                         Path clusterDirectory) throws
                                                                      IOException,
      SliderException {

    try {
      AggregateConf definition =
        InstanceIO.loadInstanceDefinitionUnresolved(sliderFileSystem,
                                                    clusterDirectory);
      definition.setName(name);
      return definition;
    } catch (FileNotFoundException e) {
      throw UnknownApplicationInstanceException.unknownInstance(name, e);
    }
  }
    /**
   * Load the instance definition. 
   * @param name cluster name
   * @param resolved flag to indicate the cluster should be resolved
   * @return the loaded configuration
   * @throws IOException IO problems
   * @throws SliderException slider explicit issues
   * @throws UnknownApplicationInstanceException if the file is not found
   */
    public AggregateConf loadInstanceDefinition(String name,
        boolean resolved) throws
        IOException,
        SliderException {

    Path clusterDirectory = sliderFileSystem.buildClusterDirPath(name);
    AggregateConf instanceDefinition = loadInstanceDefinitionUnresolved(
      name,
      clusterDirectory);
    if (resolved) {
      instanceDefinition.resolve();
    }
    return instanceDefinition;

  }

  /**
   *
   * @param clustername name of the cluster
   * @param clusterDirectory cluster dir
   * @param instanceDefinition the instance definition
   * @param debugAM enable debug AM options
   * @return the launched application
   * @throws YarnException
   * @throws IOException
   */
  public LaunchedApplication launchApplication(String clustername,
                                               Path clusterDirectory,
                                               AggregateConf instanceDefinition,
                                               boolean debugAM)
    throws YarnException, IOException {


    deployedClusterName = clustername;
    SliderUtils.validateClusterName(clustername);
    verifyNoLiveClusters(clustername, "Launch");
    Configuration config = getConfig();
    lookupZKQuorum();
    boolean clusterSecure = SliderUtils.isHadoopClusterSecure(config);
    //create the Slider AM provider -this helps set up the AM
    SliderAMClientProvider sliderAM = new SliderAMClientProvider(config);

    instanceDefinition.resolve();
    launchedInstanceDefinition = instanceDefinition;

    ConfTreeOperations internalOperations =
      instanceDefinition.getInternalOperations();
    MapOperations internalOptions = internalOperations.getGlobalOptions();
    ConfTreeOperations resourceOperations =
      instanceDefinition.getResourceOperations();
    ConfTreeOperations appOperations =
      instanceDefinition.getAppConfOperations();
    Path generatedConfDirPath =
      createPathThatMustExist(internalOptions.getMandatoryOption(
        InternalKeys.INTERNAL_GENERATED_CONF_PATH));
    Path snapshotConfPath =
      createPathThatMustExist(internalOptions.getMandatoryOption(
        InternalKeys.INTERNAL_SNAPSHOT_CONF_PATH));


    // cluster Provider
    AbstractClientProvider provider = createClientProvider(
      internalOptions.getMandatoryOption(
        InternalKeys.INTERNAL_PROVIDER_NAME));
    // make sure the conf dir is valid;
    
    if (log.isDebugEnabled()) {
      log.debug(instanceDefinition.toString());
    }
    MapOperations sliderAMResourceComponent =
      resourceOperations.getOrAddComponent(SliderKeys.COMPONENT_AM);
    MapOperations resourceGlobalOptions = resourceOperations.getGlobalOptions();

    // add the tags if available
    Set<String> applicationTags = provider.getApplicationTags(sliderFileSystem,
        appOperations.getGlobalOptions().get(AgentKeys.APP_DEF));
    AppMasterLauncher amLauncher = new AppMasterLauncher(clustername,
        SliderKeys.APP_TYPE,
        config,
        sliderFileSystem,
        yarnClient,
        clusterSecure,
        sliderAMResourceComponent,
        resourceGlobalOptions,
        applicationTags);

    ApplicationId appId = amLauncher.getApplicationId();
    // set the application name;
    amLauncher.setKeepContainersOverRestarts(true);

    int maxAppAttempts = config.getInt(KEY_AM_RESTART_LIMIT, 0);
    amLauncher.setMaxAppAttempts(maxAppAttempts);

    sliderFileSystem.purgeAppInstanceTempFiles(clustername);
    Path tempPath = sliderFileSystem.createAppInstanceTempPath(
        clustername,
        appId.toString() + "/am");
    String libdir = "lib";
    Path libPath = new Path(tempPath, libdir);
    sliderFileSystem.getFileSystem().mkdirs(libPath);
    log.debug("FS={}, tempPath={}, libdir={}", sliderFileSystem.toString(),
              tempPath, libPath);
    // set local resources for the application master
    // local files or archives as needed
    // In this scenario, the jar file for the application master is part of the local resources
    Map<String, LocalResource> localResources = amLauncher.getLocalResources();
    
    // look for the configuration directory named on the command line
    Path remoteConfPath = null;
    String relativeConfDir = null;
    String confdirProp =
      System.getProperty(SliderKeys.PROPERTY_CONF_DIR);
    if (confdirProp == null || confdirProp.isEmpty()) {
      log.debug("No local configuration directory provided as system property");
    } else {
      File confDir = new File(confdirProp);
      if (!confDir.exists()) {
        throw new BadConfigException(E_CONFIGURATION_DIRECTORY_NOT_FOUND,
                                     confDir);
      }
      Path localConfDirPath = SliderUtils.createLocalPath(confDir);
      log.debug("Slide configuration directory is {}; remote to be {}",
          localConfDirPath, SliderKeys.SUBMITTED_CONF_DIR);
      remoteConfPath = new Path(clusterDirectory, SliderKeys.SUBMITTED_CONF_DIR);
      SliderUtils.copyDirectory(config, localConfDirPath, remoteConfPath, null);
    }
    // the assumption here is that minimr cluster => this is a test run
    // and the classpath can look after itself

    boolean usingMiniMRCluster = getUsingMiniMRCluster();
    if (!usingMiniMRCluster) {

      log.debug("Destination is not a MiniYARNCluster -copying full classpath");

      // insert conf dir first
      if (remoteConfPath != null) {
        relativeConfDir = SliderKeys.SUBMITTED_CONF_DIR;
        Map<String, LocalResource> submittedConfDir =
          sliderFileSystem.submitDirectory(remoteConfPath,
                                         relativeConfDir);
        SliderUtils.mergeMaps(localResources, submittedConfDir);
      }
    }
    // build up the configuration 
    // IMPORTANT: it is only after this call that site configurations
    // will be valid.

    propagatePrincipals(config, instanceDefinition);
    // validate security data
/*
    // turned off until tested
    SecurityConfiguration securityConfiguration =
        new SecurityConfiguration(config,
            instanceDefinition, clustername);
    
*/
    Configuration clientConfExtras = new Configuration(false);
    // then build up the generated path.
    FsPermission clusterPerms = getClusterDirectoryPermissions(config);
    SliderUtils.copyDirectory(config, snapshotConfPath, generatedConfDirPath,
        clusterPerms);


    // standard AM resources
    sliderAM.prepareAMAndConfigForLaunch(sliderFileSystem,
                                       config,
                                       amLauncher,
                                       instanceDefinition,
                                       snapshotConfPath,
                                       generatedConfDirPath,
                                       clientConfExtras,
                                       libdir,
                                       tempPath,
                                       usingMiniMRCluster);
    //add provider-specific resources
    provider.prepareAMAndConfigForLaunch(sliderFileSystem,
                                         config,
                                         amLauncher,
                                         instanceDefinition,
                                         snapshotConfPath,
                                         generatedConfDirPath,
                                         clientConfExtras,
                                         libdir,
                                         tempPath,
                                         usingMiniMRCluster);

    // now that the site config is fully generated, the provider gets
    // to do a quick review of them.
    log.debug("Preflight validation of cluster configuration");


    sliderAM.preflightValidateClusterConfiguration(sliderFileSystem,
                                                 clustername,
                                                 config,
                                                 instanceDefinition,
                                                 clusterDirectory,
                                                 generatedConfDirPath,
                                                 clusterSecure
                                                );

    provider.preflightValidateClusterConfiguration(sliderFileSystem,
                                                   clustername,
                                                   config,
                                                   instanceDefinition,
                                                   clusterDirectory,
                                                   generatedConfDirPath,
                                                   clusterSecure
                                                  );


    // TODO: consider supporting apps that don't have an image path
    Path imagePath =
        SliderUtils.extractImagePath(sliderFileSystem, internalOptions);
    if (sliderFileSystem.maybeAddImagePath(localResources, imagePath)) {
      log.debug("Registered image path {}", imagePath);
    }

    // build the environment
    amLauncher.putEnv(
      SliderUtils.buildEnvMap(sliderAMResourceComponent));
    ClasspathConstructor classpath = SliderUtils.buildClasspath(relativeConfDir,
        libdir,
        getConfig(),
        usingMiniMRCluster);
    amLauncher.setClasspath(classpath);
    if (log.isDebugEnabled()) {
      log.debug("AM classpath={}", classpath);
      log.debug("Environment Map:\n{}",
                SliderUtils.stringifyMap(amLauncher.getEnv()));
      log.debug("Files in lib path\n{}", sliderFileSystem.listFSDir(libPath));
    }

    // rm address

    InetSocketAddress rmSchedulerAddress;
    try {
      rmSchedulerAddress = SliderUtils.getRmSchedulerAddress(config);
    } catch (IllegalArgumentException e) {
      throw new BadConfigException("%s Address invalid: %s",
                                   YarnConfiguration.RM_SCHEDULER_ADDRESS,
                                   config.get(
                                     YarnConfiguration.RM_SCHEDULER_ADDRESS)
      );

    }
    String rmAddr = NetUtils.getHostPortString(rmSchedulerAddress);

    JavaCommandLineBuilder commandLine = new JavaCommandLineBuilder();
    // insert any JVM options);
    sliderAM.addJVMOptions(instanceDefinition, commandLine);
    // enable asserts if the text option is set
    commandLine.enableJavaAssertions();
    // add the AM sevice entry point
    commandLine.add(SliderAppMaster.SERVICE_CLASSNAME);

    // create action and the cluster name
    commandLine.add(ACTION_CREATE, clustername);

    // debug
    if (debugAM) {
      commandLine.add(Arguments.ARG_DEBUG);
    }

    // set the cluster directory path
    commandLine.add(Arguments.ARG_CLUSTER_URI, clusterDirectory.toUri());

    if (!isUnset(rmAddr)) {
      commandLine.add(Arguments.ARG_RM_ADDR, rmAddr);
    }

    if (serviceArgs.getFilesystemBinding() != null) {
      commandLine.add(Arguments.ARG_FILESYSTEM, serviceArgs.getFilesystemBinding());
    }

    /**
     * pass the registry binding
     */
    addConfOptionToCLI(commandLine, config, REGISTRY_PATH,
        DEFAULT_REGISTRY_PATH);
    addMandatoryConfOptionToCLI(commandLine, config,
        RegistryConstants.KEY_REGISTRY_ZK_QUORUM);

    if (clusterSecure) {
      // if the cluster is secure, make sure that
      // the relevant security settings go over
/*
      addConfOptionToCLI(commandLine, config, KEY_SECURITY);
*/
      addConfOptionToCLI(commandLine,
          config,
          DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY);
    }
    // write out the path output
    commandLine.addOutAndErrFiles(STDOUT_AM, STDERR_AM);

    String cmdStr = commandLine.build();
    log.debug("Completed setting up app master command {}", cmdStr);

    amLauncher.addCommandLine(commandLine);

    // the Slider AM gets to configure the AM requirements, not the custom provider
    sliderAM.prepareAMResourceRequirements(sliderAMResourceComponent,
        amLauncher.getResource());


    // Set the priority for the application master

    int amPriority = config.getInt(KEY_YARN_QUEUE_PRIORITY,
                                   DEFAULT_YARN_QUEUE_PRIORITY);


    amLauncher.setPriority(amPriority);

    // Set the queue to which this application is to be submitted in the RM
    // Queue for App master
    String amQueue = config.get(KEY_YARN_QUEUE, DEFAULT_YARN_QUEUE);
    String suppliedQueue = internalOperations.getGlobalOptions().get(InternalKeys.INTERNAL_QUEUE);
    if(!SliderUtils.isUnset(suppliedQueue)) {
      amQueue = suppliedQueue;
      log.info("Using queue {} for the application instance.", amQueue);
    }

    if (amQueue != null) {
      amLauncher.setQueue(amQueue);
    }

    // submit the application
    LaunchedApplication launchedApplication = amLauncher.submitApplication();
    return launchedApplication;
  }
  
  
  /**
   * Wait for the launched app to be accepted in the time  
   * and, optionally running.
   * <p>
   * If the application
   *
   * @param launchedApplication application
   * @param acceptWaitMillis time in millis to wait for accept
   * @param runWaitMillis time in millis to wait for the app to be running.
   * May be null, in which case no wait takes place
   * @return exit code: success
   * @throws YarnException
   * @throws IOException
   */
  public int waitForAppRunning(LaunchedApplication launchedApplication,
      int acceptWaitMillis, int runWaitMillis) throws YarnException, IOException {
    assert launchedApplication != null;
    int exitCode;
    // wait for the submit state to be reached
    ApplicationReport report = launchedApplication.monitorAppToState(
      YarnApplicationState.ACCEPTED,
      new Duration(acceptWaitMillis));

    // may have failed, so check that
    if (SliderUtils.hasAppFinished(report)) {
      exitCode = buildExitCode(report);
    } else {
      // exit unless there is a wait


      if (runWaitMillis != 0) {
        // waiting for state to change
        Duration duration = new Duration(runWaitMillis * 1000);
        duration.start();
        report = launchedApplication.monitorAppToState(
          YarnApplicationState.RUNNING, duration);
        if (report != null &&
            report.getYarnApplicationState() == YarnApplicationState.RUNNING) {
          exitCode = EXIT_SUCCESS;
        } else {
          exitCode = buildExitCode(report);
        }
      } else {
        exitCode = EXIT_SUCCESS;
      }
    }
    return exitCode;
  }


  /**
   * Propagate any critical principals from the current site config down to the HBase one.
   * @param config config to read from
   * @param clusterSpec cluster spec
   */
  private void propagatePrincipals(Configuration config,
                                   AggregateConf clusterSpec) {
    String dfsPrincipal = config.get(
        DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY);
    if (dfsPrincipal != null) {
      String siteDfsPrincipal = OptionKeys.SITE_XML_PREFIX +
                                DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY;
      clusterSpec.getAppConfOperations().getGlobalOptions().putIfUnset(
        siteDfsPrincipal,
        dfsPrincipal);
    }
  }


  private boolean addConfOptionToCLI(CommandLineBuilder cmdLine,
      Configuration conf,
      String key) {
    String val = conf.get(key);
    return defineIfSet(cmdLine, key, val);
  }

  private String addConfOptionToCLI(CommandLineBuilder cmdLine,
      Configuration conf,
      String key,
      String defVal) {
    String val = conf.get(key, defVal);
    define(cmdLine, key, val);
    return val;
  }

  /**
   * Add a <code>-D key=val</code> command to the CLI
   * @param cmdLine command line
   * @param key key
   * @param val value
   */
  private void define(CommandLineBuilder cmdLine, String key, String val) {
    Preconditions.checkArgument(key != null, "null key");
    Preconditions.checkArgument(val != null, "null value");
    cmdLine.add(Arguments.ARG_DEFINE, key + "=" + val);
  }

  /**
   * Add a <code>-D key=val</code> command to the CLI if <code>val</code>
   * is not null
   * @param cmdLine command line
   * @param key key
   * @param val value
   */
  private boolean defineIfSet(CommandLineBuilder cmdLine, String key, String val) {
    Preconditions.checkArgument(key != null, "null key");
    if (val != null) {
      define(cmdLine, key, val);
      return true;
    } else {
      return false;
    }
  }

  private void addMandatoryConfOptionToCLI(CommandLineBuilder cmdLine,
      Configuration conf,
      String key) throws BadConfigException {
    if (!addConfOptionToCLI(cmdLine, conf, key)) {
      throw new BadConfigException("Missing configuration option: " + key);
    }
  }
  
  /**
   * Create a path that must exist in the cluster fs
   * @param uri uri to create
   * @return the path
   * @throws FileNotFoundException if the path does not exist
   */
  public Path createPathThatMustExist(String uri) throws
      SliderException,
                                                  IOException {
    return sliderFileSystem.createPathThatMustExist(uri);
  }

  /**
   * verify that a live cluster isn't there
   * @param clustername cluster name
   * @param action
   * @throws SliderException with exit code EXIT_CLUSTER_LIVE
   * if a cluster of that name is either live or starting up.
   */
  public void verifyNoLiveClusters(String clustername, String action) throws
                                                       IOException,
                                                       YarnException {
    List<ApplicationReport> existing = findAllLiveInstances(clustername);

    if (!existing.isEmpty()) {
      throw new SliderException(EXIT_APPLICATION_IN_USE,
          action +" failed for "
                              + clustername
                              + ": "
                              + E_CLUSTER_RUNNING + " :" +
                              existing.get(0));
    }
  }

  public String getUsername() throws IOException {
    return RegistryUtils.currentUser();
  }

  /**
   * Get the name of any deployed cluster
   * @return the cluster name
   */
  public String getDeployedClusterName() {
    return deployedClusterName;
  }

  @VisibleForTesting
  public void setDeployedClusterName(String deployedClusterName) {
    this.deployedClusterName = deployedClusterName;
  }

  /**
   * ask if the client is using a mini MR cluster
   * @return true if they are
   */
  private boolean getUsingMiniMRCluster() {
    return getConfig().getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER,
        false);
  }

  /**
   * Get the application name used in the zookeeper root paths
   * @return an application-specific path in ZK
   */
  private String getAppName() {
    return "slider";
  }

  /**
   * Wait for the app to start running (or go past that state)
   * @param duration time to wait
   * @return the app report; null if the duration turned out
   * @throws YarnException YARN or app issues
   * @throws IOException IO problems
   */
  @VisibleForTesting
  public ApplicationReport monitorAppToRunning(Duration duration)
    throws YarnException, IOException {
    return monitorAppToState(YarnApplicationState.RUNNING, duration);
  }

  /**
   * Build an exit code for an application from its report.
   * If the report parameter is null, its interpreted as a timeout
   * @param report report application report
   * @return the exit code
   * @throws IOException
   * @throws YarnException
   */
  private int buildExitCode(ApplicationReport report) throws
                                                      IOException,
                                                      YarnException {
    if (null == report) {
      return EXIT_TIMED_OUT;
    }

    YarnApplicationState state = report.getYarnApplicationState();
    FinalApplicationStatus dsStatus = report.getFinalApplicationStatus();
    switch (state) {
      case FINISHED:
        if (FinalApplicationStatus.SUCCEEDED == dsStatus) {
          log.info("Application has completed successfully");
          return EXIT_SUCCESS;
        } else {
          log.info("Application finished unsuccessfully." +
                   "YarnState = {}, DSFinalStatus = {} Breaking monitoring loop",
                   state, dsStatus);
          return EXIT_YARN_SERVICE_FINISHED_WITH_ERROR;
        }

      case KILLED:
        log.info("Application did not finish. YarnState={}, DSFinalStatus={}",
                 state, dsStatus);
        return EXIT_YARN_SERVICE_KILLED;

      case FAILED:
        log.info("Application Failed. YarnState={}, DSFinalStatus={}", state,
                 dsStatus);
        return EXIT_YARN_SERVICE_FAILED;

      default:
        //not in any of these states
        return EXIT_SUCCESS;
    }
  }

  /**
   * Monitor the submitted application for reaching the requested state.
   * Will also report if the app reaches a later state (failed, killed, etc)
   * Kill application if duration!= null & time expires. 
   * Prerequisite: the applicatin was launched.
   * @param desiredState desired state.
   * @param duration how long to wait -must be more than 0
   * @return the application report -null on a timeout
   * @throws YarnException
   * @throws IOException
   */
  @VisibleForTesting
  public ApplicationReport monitorAppToState(
    YarnApplicationState desiredState,
    Duration duration)
    throws YarnException, IOException {
    LaunchedApplication launchedApplication =
      new LaunchedApplication(applicationId, yarnClient);
    return launchedApplication.monitorAppToState(desiredState, duration);
  }

  @Override
  public ApplicationReport getApplicationReport() throws
                                                  IOException,
                                                  YarnException {
    return getApplicationReport(applicationId);
  }

  @Override
  public boolean forceKillApplication(String reason)
    throws YarnException, IOException {
    if (applicationId != null) {
      new LaunchedApplication(applicationId, yarnClient).forceKill(reason);
      return true;
    }
    return false;
  }

  /**
   * List Slider instances belonging to a specific user. This will include
   * failed and killed instances; there may be duplicates
   * @param user user: "" means all users, null means "default"
   * @return a possibly empty list of Slider AMs
   */
  @VisibleForTesting
  public List<ApplicationReport> listSliderInstances(String user)
    throws YarnException, IOException {
    return YarnAppListClient.listInstances(user);
  }

  /**
   * A basic list action to list live instances
   * @param clustername cluster name
   * @return success if the listing was considered successful
   * @throws IOException
   * @throws YarnException
   */
  public int actionList(String clustername) throws IOException, YarnException {
    ActionListArgs args = new ActionListArgs();
    args.live = true;
    return actionList(clustername, args);
  }

  /**
   * Implement the list action.
   * @param clustername List out specific instance name
   * @param args Action list arguments
   * @return 0 if one or more entries were listed
   * @throws IOException
   * @throws YarnException
   * @throws UnknownApplicationInstanceException if a specific instance
   * was named but it was not found
   */
  @Override
  @VisibleForTesting
  public int actionList(String clustername, ActionListArgs args)
      throws IOException, YarnException {
    verifyBindingsDefined();

    boolean live = args.live;
    String state = args.state;
    boolean verbose = args.verbose;

    if (live && !state.isEmpty()) {
      throw new BadCommandArgumentsException(
          Arguments.ARG_LIVE + " and " + Arguments.ARG_STATE + " are exclusive");
    }
    // flag to indicate only services in a specific state are to be listed
    boolean listOnlyInState = live || !state.isEmpty();
    
    YarnApplicationState min, max;
    if (live) {
      min = YarnApplicationState.NEW;
      max = YarnApplicationState.RUNNING;
    } else if (!state.isEmpty()) {
      YarnApplicationState stateVal = extractYarnApplicationState(state);
      min = max = stateVal;
    } else {
      min = YarnApplicationState.NEW;
      max = YarnApplicationState.KILLED;
    }
    // get the complete list of persistent instances
    Map<String, Path> persistentInstances = sliderFileSystem.listPersistentInstances();

    if (persistentInstances.isEmpty() && isUnset(clustername)) {
      // an empty listing is a success if no cluster was named
      log.debug("No application instances found");
      return EXIT_SUCCESS;
    }
    
    // and those the RM knows about
    List<ApplicationReport> instances = listSliderInstances(null);
    SliderUtils.sortApplicationsByMostRecent(instances);
    Map<String, ApplicationReport> reportMap =
        SliderUtils.buildApplicationReportMap(instances, min, max);
    log.debug("Persisted {} deployed {} filtered[{}-{}] & de-duped to {}",
        persistentInstances.size(),
        instances.size(),
        min, max,
        reportMap.size() );

    if (isSet(clustername)) {
      // only one instance is expected
      // resolve the persistent value
      Path persistent = persistentInstances.get(clustername);
      if (persistent == null) {
        throw unknownClusterException(clustername);
      }
      // create a new map with only that instance in it.
      // this restricts the output of results to this instance
      persistentInstances = new HashMap<String, Path>();
      persistentInstances.put(clustername, persistent);  
    }
    
    // at this point there is either the entire list or a stripped down instance
    int listed = 0;

    for (String name : persistentInstances.keySet()) {
      ApplicationReport report = reportMap.get(name);
      if (!listOnlyInState || report != null) {
        // list the details if all were requested, or the filtering contained
        // a report
        listed++;
        String details = instanceDetailsToString(name, report, verbose);
        print(details);
      }
    }
    
    return listed > 0 ? EXIT_SUCCESS: EXIT_FALSE;
  }

  /**
   * Convert the instance details of an application to a string
   * @param name instance name
   * @param report the application report
   * @param verbose verbose output
   * @return a string
   */
  String instanceDetailsToString(String name,
      ApplicationReport report,
      boolean verbose) {
    // format strings
    String staticf = "%-30s";
    String reportedf = staticf + "  %10s  %-40s";
    String livef = reportedf + " %s";
    StringBuilder builder = new StringBuilder(200);
    if (report == null) {
      builder.append(String.format(staticf, name));
    } else {
      // there's a report to look at
      String appId = report.getApplicationId().toString();
      String state = report.getYarnApplicationState().toString();
      if (report.getYarnApplicationState() == YarnApplicationState.RUNNING) {
        // running: there's a URL
        builder.append(String.format(livef, name, state, appId ,report.getTrackingUrl()));
      } else {
        builder.append(String.format(reportedf, name, state, appId));
      }
      if (verbose) {
        builder.append('\n');
        builder.append(SliderUtils.appReportToString(report, "\n  "));
      }
    }

    builder.append('\n');
    return builder.toString();
  }

  /**
   * Extract the state of a Yarn application --state argument
   * @param state state argument
   * @return the application state
   * @throws BadCommandArgumentsException if the argument did not match
   * any known state
   */
  private YarnApplicationState extractYarnApplicationState(String state) throws
      BadCommandArgumentsException {
    YarnApplicationState stateVal;
    try {
      stateVal = YarnApplicationState.valueOf(state.toUpperCase(Locale.ENGLISH));
    } catch (IllegalArgumentException e) {
      throw new BadCommandArgumentsException("Unknown state: " + state);

    }
    return stateVal;
  }
  
  /**
   * Is an application active: accepted or running
   * @param report the application report
   * @return true if it is running or scheduled to run.
   */
  public boolean isApplicationActive(ApplicationReport report) {
    return report.getYarnApplicationState() == YarnApplicationState.RUNNING
                || report.getYarnApplicationState() == YarnApplicationState.ACCEPTED;
  }

  /**
   * Implement the islive action: probe for a cluster of the given name existing
   * @return exit code
   */

  @Override
  @VisibleForTesting
  public int actionFlex(String name, ActionFlexArgs args) throws YarnException, IOException {
    verifyBindingsDefined();
    SliderUtils.validateClusterName(name);
    log.debug("actionFlex({})", name);
    Map<String, Integer> roleInstances = new HashMap<String, Integer>();
    Map<String, String> roleMap = args.getComponentMap();
    for (Map.Entry<String, String> roleEntry : roleMap.entrySet()) {
      String key = roleEntry.getKey();
      String val = roleEntry.getValue();
      try {
        roleInstances.put(key, Integer.valueOf(val));
      } catch (NumberFormatException e) {
        throw new BadCommandArgumentsException("Requested count of role %s" +
                                               " is not a number: \"%s\"",
                                               key, val);
      }
    }
    return flex(name, roleInstances);
  }

  @Override
  public int actionExists(String name, boolean checkLive) throws YarnException, IOException {
    ActionExistsArgs args = new ActionExistsArgs();
    args.live = checkLive;
    return actionExists(name, args);
  }

  public int actionExists(String name, ActionExistsArgs args) throws YarnException, IOException {
    verifyBindingsDefined();
    SliderUtils.validateClusterName(name);
    boolean checkLive = args.live;
    log.debug("actionExists({}, {}, {})", name, checkLive, args.state);

    //initial probe for a cluster in the filesystem
    Path clusterDirectory = sliderFileSystem.buildClusterDirPath(name);
    if (!sliderFileSystem.getFileSystem().exists(clusterDirectory)) {
      throw unknownClusterException(name);
    }
    String state = args.state;
    if (!checkLive && SliderUtils.isUnset(state)) {
      log.info("Application {} exists", name);
      return EXIT_SUCCESS;
    }

    //test for liveness/state
    boolean inDesiredState = false;
    ApplicationReport instance;
    instance = findInstance(name);
    if (instance == null) {
      log.info("Application {} not running", name);
      return EXIT_FALSE;
    }
    if (checkLive) {
      // the app exists, check that it is not in any terminated state
      YarnApplicationState appstate = instance.getYarnApplicationState();
      log.debug(" current app state = {}", appstate);
      inDesiredState =
            appstate.ordinal() < YarnApplicationState.FINISHED.ordinal();
    } else {
      // scan for instance in single --state state
      List<ApplicationReport> userInstances = yarnClient.listInstances("");
      state = state.toUpperCase(Locale.ENGLISH);
      YarnApplicationState desiredState = extractYarnApplicationState(state);
      ApplicationReport foundInstance =
          yarnClient.findAppInInstanceList(userInstances, name, desiredState);
      if (foundInstance != null) {
        // found in selected state: success
        inDesiredState = true;
        // mark this as the instance to report
        instance = foundInstance;
      }
    }

    SliderUtils.OnDemandReportStringifier report =
        new SliderUtils.OnDemandReportStringifier(instance);
    if (!inDesiredState) {
      //cluster in the list of apps but not running
      log.info("Application {} found but is in wrong state {}", name,
          instance.getYarnApplicationState());
      log.debug("State {}", report);
      return EXIT_FALSE;
    } else {
      log.debug("Application instance is in desired state");
      log.info("Application {} is {}\n{}", name,
          instance.getYarnApplicationState(), report);
      return EXIT_SUCCESS;
    }
  }


  @Override
  public int actionKillContainer(String name,
      ActionKillContainerArgs args) throws YarnException, IOException {
    String id = args.id;
    if (SliderUtils.isUnset(id)) {
      throw new BadCommandArgumentsException("Missing container id");
    }
    log.info("killingContainer {}:{}", name, id);
    SliderClusterOperations clusterOps =
      new SliderClusterOperations(bondToCluster(name));
    try {
      clusterOps.killContainer(id);
    } catch (NoSuchNodeException e) {
      throw new BadClusterStateException("Container %s not found in cluster %s",
                                         id, name);
    }
    return EXIT_SUCCESS;
  }

  @Override
  public String actionEcho(String name, ActionEchoArgs args) throws
                                                             YarnException,
                                                             IOException {
    String message = args.message;
    if (message == null) {
      throw new BadCommandArgumentsException("missing message");
    }
    SliderClusterOperations clusterOps =
      new SliderClusterOperations(bondToCluster(name));
    return clusterOps.echo(message);
  }

  /**
   * Get at the service registry operations
   * @return registry client -valid after the service is inited.
   */
  public YarnAppListClient getYarnAppListClient() {
    return YarnAppListClient;
  }

  /**
   * Find an instance of an application belonging to the current user
   * @param appname application name
   * @return the app report or null if none is found
   * @throws YarnException YARN issues
   * @throws IOException IO problems
   */
  private ApplicationReport findInstance(String appname)
      throws YarnException, IOException {
    return YarnAppListClient.findInstance(appname);
  }
  
  private RunningApplication findApplication(String appname)
      throws YarnException, IOException {
    ApplicationReport applicationReport = findInstance(appname);
    return applicationReport != null ?
           new RunningApplication(yarnClient, applicationReport): null; 
  }

  /**
   * find all live instances of a specific app -if there is >1 in the cluster,
   * this returns them all. State should be running or less
   * @param appname application name
   * @return the list of all matching application instances
   */
  private List<ApplicationReport> findAllLiveInstances(String appname)
    throws YarnException, IOException {
    
    return YarnAppListClient.findAllLiveInstances(appname);
  }

  /**
   * Connect to a Slider AM
   * @param app application report providing the details on the application
   * @return an instance
   * @throws YarnException
   * @throws IOException
   */
  private SliderClusterProtocol connect(ApplicationReport app)
      throws YarnException, IOException {

    try {
      return RpcBinder.getProxy(getConfig(),
                                yarnClient.getRmClient(),
                                app,
                                Constants.CONNECT_TIMEOUT,
                                Constants.RPC_TIMEOUT);
    } catch (InterruptedException e) {
      throw new SliderException(SliderExitCodes.EXIT_TIMED_OUT,
                              e,
                              "Interrupted waiting for communications with the Slider AM");
    }
  }

  @Override
  @VisibleForTesting
  public int actionStatus(String clustername, ActionStatusArgs statusArgs) throws
                                              YarnException,
                                              IOException {
    verifyBindingsDefined();
    SliderUtils.validateClusterName(clustername);
    String outfile = statusArgs.getOutput();
    ClusterDescription status = getClusterDescription(clustername);
    String text = status.toJsonString();
    if (outfile == null) {
      log.info(text);
    } else {
      status.save(new File(outfile).getAbsoluteFile());
    }
    return EXIT_SUCCESS;
  }

  @Override
  public int actionVersion() {
    SliderVersionInfo.loadAndPrintVersionInfo(log);
    return EXIT_SUCCESS;
  }

  @Override
  public int actionFreeze(String clustername,
      ActionFreezeArgs freezeArgs) throws YarnException, IOException {
    verifyBindingsDefined();
    SliderUtils.validateClusterName(clustername);
    int waittime = freezeArgs.getWaittime();
    String text = freezeArgs.message;
    boolean forcekill = freezeArgs.force;
    log.debug("actionFreeze({}, reason={}, wait={}, force={})", clustername,
              text,
              waittime,
              forcekill);
    
    //is this actually a known cluster?
    sliderFileSystem.locateInstanceDefinition(clustername);
    ApplicationReport app = findInstance(clustername);
    if (app == null) {
      // exit early
      log.info("Cluster {} not running", clustername);
      // not an error to stop a stopped cluster
      return EXIT_SUCCESS;
    }
    log.debug("App to stop was found: {}:\n{}", clustername,
              new SliderUtils.OnDemandReportStringifier(app));
    if (app.getYarnApplicationState().ordinal() >=
        YarnApplicationState.FINISHED.ordinal()) {
      log.info("Cluster {} is a terminated state {}", clustername,
               app.getYarnApplicationState());
      return EXIT_SUCCESS;
    }

    // IPC request for a managed shutdown is only possible if the app is running.
    // so we need to force kill if the app is accepted or submitted
    if (!forcekill
        && app.getYarnApplicationState().ordinal() < YarnApplicationState.RUNNING.ordinal()) {
      log.info("Cluster {} is in a pre-running state {}. Force killing it", clustername,
          app.getYarnApplicationState());
      forcekill = true;
    }

    LaunchedApplication application = new LaunchedApplication(yarnClient, app);
    applicationId = application.getApplicationId();
    
    if (forcekill) {
      // escalating to forced kill
      application.kill("Forced stop of " + clustername + ": " + text);
    } else {
      try {
        SliderClusterProtocol appMaster = connect(app);
        Messages.StopClusterRequestProto r =
          Messages.StopClusterRequestProto
                  .newBuilder()
                  .setMessage(text)
                  .build();
        appMaster.stopCluster(r);

        log.debug("Cluster stop command issued");

      } catch (YarnException e) {
        log.warn("Exception while trying to terminate {}: {}", clustername, e);
        return EXIT_FALSE;
      } catch (IOException e) {
        log.warn("Exception while trying to terminate {}: {}", clustername, e);
        return EXIT_FALSE;
      }
    }

    //wait for completion. We don't currently return an exception during this process
    //as the stop operation has been issued, this is just YARN.
    try {
      if (waittime > 0) {
        ApplicationReport applicationReport =
          application.monitorAppToState(YarnApplicationState.FINISHED,
                                        new Duration(waittime * 1000));
        if (applicationReport == null) {
          log.info("application did not shut down in time");
          return EXIT_FALSE;
        }
      }

// JDK7    } catch (YarnException | IOException e) {
    } catch (YarnException e) {
      log.warn("Exception while waiting for the application {} to shut down: {}",
               clustername, e);
    } catch ( IOException e) {
      log.warn("Exception while waiting for the application {} to shut down: {}",
               clustername, e);
    }

    return EXIT_SUCCESS;
  }

  @Override
  public int actionThaw(String clustername, ActionThawArgs thaw) throws YarnException, IOException {
    SliderUtils.validateClusterName(clustername);
    verifyBindingsDefined();
    // see if it is actually running and bail out;
    verifyNoLiveClusters(clustername, "Start");

    //start the cluster
    return startCluster(clustername, thaw);
  }

  /**
   * Implement flexing
   * @param clustername name of the cluster
   * @param roleInstances map of new role instances
   * @return EXIT_SUCCESS if the #of nodes in a live cluster changed
   * @throws YarnException
   * @throws IOException
   */
  public int flex(String clustername, Map<String, Integer> roleInstances)
      throws YarnException, IOException {
    verifyBindingsDefined();
    SliderUtils.validateClusterName(clustername);
    Path clusterDirectory = sliderFileSystem.buildClusterDirPath(clustername);
    AggregateConf instanceDefinition = loadInstanceDefinitionUnresolved(
      clustername,
      clusterDirectory);

    ConfTreeOperations resources =
      instanceDefinition.getResourceOperations();
    for (Map.Entry<String, Integer> entry : roleInstances.entrySet()) {
      String role = entry.getKey();
      int count = entry.getValue();
      resources.getOrAddComponent(role).put(ResourceKeys.COMPONENT_INSTANCES,
                                            Integer.toString(count));

      log.debug("Flexed cluster specification ( {} -> {}) : \n{}",
                role,
                count,
                resources);
    }
    SliderAMClientProvider sliderAM = new SliderAMClientProvider(getConfig());
    AbstractClientProvider provider = createClientProvider(
        instanceDefinition.getInternalOperations().getGlobalOptions().getMandatoryOption(
            InternalKeys.INTERNAL_PROVIDER_NAME));
    // slider provider to validate what there is
    validateInstanceDefinition(sliderAM, instanceDefinition, sliderFileSystem);
    validateInstanceDefinition(provider, instanceDefinition, sliderFileSystem);

    int exitCode = EXIT_FALSE;
    // save the specification
    try {
      InstanceIO.updateInstanceDefinition(sliderFileSystem, clusterDirectory, instanceDefinition);
    } catch (LockAcquireFailedException e) {
      // lock failure
      log.debug("Failed to lock dir {}", clusterDirectory, e);
      log.warn("Failed to save new resource definition to {} : {}", clusterDirectory, e);
    }

    // now see if it is actually running and tell it about the update if it is
    ApplicationReport instance = findInstance(clustername);
    if (instance != null) {
      log.info("Flexing running cluster");
      SliderClusterProtocol appMaster = connect(instance);
      SliderClusterOperations clusterOps = new SliderClusterOperations(appMaster);
      clusterOps.flex(instanceDefinition.getResources());
      log.info("application instance size updated");
      exitCode = EXIT_SUCCESS;
    } else {
      log.info("No running instance to update");
    }
    return exitCode;
  }

  /**
   * Validate an instance definition against a provider.
   * @param provider the provider performing the validation
   * @param instanceDefinition the instance definition
   * @throws SliderException if invalid.
   */
  protected void validateInstanceDefinition(AbstractClientProvider provider,
      AggregateConf instanceDefinition, SliderFileSystem fs) throws SliderException {
    try {
      provider.validateInstanceDefinition(instanceDefinition, fs);
    } catch (SliderException e) {
      //problem, reject it
      log.info("Error {} validating application instance definition ", e.getMessage());
      log.debug("Error validating application instance definition ", e);
      log.info(instanceDefinition.toString());
      throw e;
    }
  }


  /**
   * Load the persistent cluster description
   * @param clustername name of the cluster
   * @return the description in the filesystem
   * @throws IOException any problems loading -including a missing file
   */
  @VisibleForTesting
  public AggregateConf loadPersistedClusterDescription(String clustername)
      throws IOException, SliderException, LockAcquireFailedException {
    Path clusterDirectory = sliderFileSystem.buildClusterDirPath(clustername);
    ConfPersister persister = new ConfPersister(sliderFileSystem, clusterDirectory);
    AggregateConf instanceDescription = new AggregateConf();
    persister.load(instanceDescription);
    return instanceDescription;
  }

    /**
     * Connect to a live cluster and get its current state
     * @param clustername the cluster name
     * @return its description
     */
  @VisibleForTesting
  public ClusterDescription getClusterDescription(String clustername) throws
                                                                 YarnException,
                                                                 IOException {
    SliderClusterOperations clusterOperations =
      createClusterOperations(clustername);
    return clusterOperations.getClusterDescription();
  }

  /**
   * Connect to the cluster and get its current state
   * @return its description
   */
  @VisibleForTesting
  public ClusterDescription getClusterDescription() throws
                                               YarnException,
                                               IOException {
    return getClusterDescription(getDeployedClusterName());
  }

  /**
   * List all node UUIDs in a role
   * @param role role name or "" for all
   * @return an array of UUID strings
   * @throws IOException
   * @throws YarnException
   */
  @VisibleForTesting
  public String[] listNodeUUIDsByRole(String role) throws
                                               IOException,
                                               YarnException {
    return createClusterOperations()
              .listNodeUUIDsByRole(role);
  }

  /**
   * List all nodes in a role. This is a double round trip: once to list
   * the nodes in a role, another to get their details
   * @param role component/role to look for
   * @return an array of ContainerNode instances
   * @throws IOException
   * @throws YarnException
   */
  @VisibleForTesting
  public List<ClusterNode> listClusterNodesInRole(String role) throws
                                               IOException,
                                               YarnException {
    return createClusterOperations().listClusterNodesInRole(role);
  }

  /**
   * Get the details on a list of uuids
   * @param uuids uuids to ask for 
   * @return a possibly empty list of node details
   * @throws IOException
   * @throws YarnException
   */
  @VisibleForTesting
  public List<ClusterNode> listClusterNodes(String[] uuids) throws
                                               IOException,
                                               YarnException {

    if (uuids.length == 0) {
      // short cut on an empty list
      return new LinkedList<ClusterNode>();
    }
    return createClusterOperations().listClusterNodes(uuids);
  }

  /**
   * Get a node from the AM
   * @param uuid uuid of node
   * @return deserialized node
   * @throws IOException IO problems
   * @throws NoSuchNodeException if the node isn't found
   */
  @VisibleForTesting
  public ClusterNode getNode(String uuid) throws IOException, YarnException {
    return createClusterOperations().getNode(uuid);
  }
  
  /**
   * Get the instance definition from the far end
   */
  @VisibleForTesting
  public AggregateConf getLiveInstanceDefinition() throws IOException, YarnException {
    return createClusterOperations().getInstanceDefinition();
  }

  /**
   * Bond to a running cluster
   * @param clustername cluster name
   * @return the AM RPC client
   * @throws SliderException if the cluster is unkown
   */
  private SliderClusterProtocol bondToCluster(String clustername) throws
                                                                  YarnException,
                                                                  IOException {
    verifyBindingsDefined();
    if (clustername == null) {
      throw unknownClusterException("(undefined)");
    }
    ApplicationReport instance = findInstance(clustername);
    if (null == instance) {
      throw unknownClusterException(clustername);
    }
    return connect(instance);
  }

  /**
   * Create a cluster operations instance against a given cluster
   * @param clustername cluster name
   * @return a bonded cluster operations instance
   * @throws YarnException YARN issues
   * @throws IOException IO problems
   */
  private SliderClusterOperations createClusterOperations(String clustername) throws
                                                                            YarnException,
                                                                            IOException {
    SliderClusterProtocol sliderAM = bondToCluster(clustername);
    return new SliderClusterOperations(sliderAM);
  }

  /**
   * Create a cluster operations instance against the active cluster
   * -returning any previous created one if held.
   * @return a bonded cluster operations instance
   * @throws YarnException YARN issues
   * @throws IOException IO problems
   */
  public SliderClusterOperations createClusterOperations() throws
                                                         YarnException,
                                                         IOException {
    if (sliderClusterOperations == null) {
      sliderClusterOperations =
        createClusterOperations(getDeployedClusterName());
    }
    return sliderClusterOperations;
  }

  /**
   * Wait for an instance of a named role to be live (or past it in the lifecycle)
   * @param role role to look for
   * @param timeout time to wait
   * @return the state. If still in CREATED, the cluster didn't come up
   * in the time period. If LIVE, all is well. If >LIVE, it has shut for a reason
   * @throws IOException IO
   * @throws SliderException Slider
   * @throws WaitTimeoutException if the wait timed out
   */
  @VisibleForTesting
  public int waitForRoleInstanceLive(String role, long timeout)
    throws WaitTimeoutException, IOException, YarnException {
    return createClusterOperations().waitForRoleInstanceLive(role, timeout);
  }

  /**
   * Generate an exception for an unknown cluster
   * @param clustername cluster name
   * @return an exception with text and a relevant exit code
   */
  public UnknownApplicationInstanceException unknownClusterException(String clustername) {
    return UnknownApplicationInstanceException.unknownInstance(clustername);
  }

  @Override
  public String toString() {
    return "Slider Client in state " + getServiceState()
           + " and Slider Application Instance " + deployedClusterName;
  }

  /**
   * Get all YARN applications
   * @return a possibly empty list
   * @throws YarnException
   * @throws IOException
   */
  @VisibleForTesting
  public List<ApplicationReport> getApplications() throws YarnException, IOException {
    return yarnClient.getApplications();
  }

  @VisibleForTesting
  public ApplicationReport getApplicationReport(ApplicationId appId)
    throws YarnException, IOException {
    return new LaunchedApplication(appId, yarnClient).getApplicationReport();
  }

  /**
   * The configuration used for deployment (after resolution)
   * @return
   */
  @VisibleForTesting
  public AggregateConf getLaunchedInstanceDefinition() {
    return launchedInstanceDefinition;
  }


  @Override
  public int actionResolve(ActionResolveArgs args)
      throws YarnException, IOException {
    // as this is an API entry point, validate
    // the arguments
    args.validate();
    RegistryOperations operations = getRegistryOperations();
    String path = SliderRegistryUtils.resolvePath(args.path);
    ServiceRecordMarshal serviceRecordMarshal = new ServiceRecordMarshal();
    try {
      if (args.list) {
        File destDir = args.destdir;
        if (destDir != null) {
          destDir.mkdirs();
        }

        Map<String, ServiceRecord> recordMap;
        Map<String, RegistryPathStatus> znodes;
        try {
          znodes = statChildren(registryOperations, path);
          recordMap = extractServiceRecords(registryOperations,
              path,
              znodes.values());
        } catch (PathNotFoundException e) {
          // treat the root directory as if if is always there
        
          if ("/".equals(path)) {
            znodes = new HashMap<String, RegistryPathStatus>(0);
            recordMap = new HashMap<String, ServiceRecord>(0);
          } else {
            throw e;
          }
        }
        // subtract all records from the znodes map to get pure directories
        log.info("Entries: {}", znodes.size());

        for (String name : znodes.keySet()) {
          println("  " + name);
        }
        println("");

        log.info("Service records: {}", recordMap.size());
        for (Entry<String, ServiceRecord> recordEntry : recordMap.entrySet()) {
          String name = recordEntry.getKey();
          ServiceRecord instance = recordEntry.getValue();
          String json = serviceRecordMarshal.toJson(instance);
          if (destDir == null) {
            println(name);
            println(json);
          } else {
            String filename = RegistryPathUtils.lastPathEntry(name) + ".json";
            File jsonFile = new File(destDir, filename);
            SliderUtils.write(jsonFile,
                serviceRecordMarshal.toBytes(instance),
                true);
          }
        }
      } else  {
        // resolve single entry
        ServiceRecord instance = resolve(path);
        File outFile = args.out;
        if (args.destdir != null) {
          outFile = new File(args.destdir, RegistryPathUtils.lastPathEntry(path));
        }
        if (outFile != null) {
          SliderUtils.write(outFile, serviceRecordMarshal.toBytes(instance), true);
        } else {
          println(serviceRecordMarshal.toJson(instance));
        }
      }
//      TODO JDK7
    } catch (PathNotFoundException e) {
      // no record at this path
      throw new NotFoundException(e, path);
    } catch (NoRecordException e) {
      throw new NotFoundException(e, path);
    }
    return EXIT_SUCCESS;
  }

  @Override
  public int actionRegistry(ActionRegistryArgs registryArgs) throws
      YarnException,
      IOException {
    // as this is also a test entry point, validate
    // the arguments
    registryArgs.validate();
    try {
      if (registryArgs.list) {
        actionRegistryList(registryArgs);
      } else if (registryArgs.listConf) {
        // list the configurations
        actionRegistryListConfigsYarn(registryArgs);
      } else if (registryArgs.listExports) {
        // list the exports
        actionRegistryListExports(registryArgs);
      } else if (SliderUtils.isSet(registryArgs.getConf)) {
        // get a configuration
        PublishedConfiguration publishedConfiguration =
            actionRegistryGetConfig(registryArgs);
        outputConfig(publishedConfiguration, registryArgs);
      } else if (SliderUtils.isSet(registryArgs.getExport)) {
        // get a export group
        PublishedExports publishedExports =
            actionRegistryGetExport(registryArgs);
        outputExport(publishedExports, registryArgs);
      } else {
        // it's an unknown command
        log.info(CommonArgs.usage(serviceArgs, ACTION_DIAGNOSTICS));
        return EXIT_USAGE;
      }
//      JDK7
    } catch (FileNotFoundException e) {
      log.info("{}", e);
      log.debug("{}", e, e);
      return EXIT_NOT_FOUND;
    } catch (PathNotFoundException e) {
      log.info("{}", e);
      log.debug("{}", e, e);
      return EXIT_NOT_FOUND;
    }
    return EXIT_SUCCESS;
  }

  /**
   * Registry operation
   *
   * @param registryArgs registry Arguments
   * @return the instances (for tests)
   * @throws YarnException YARN problems
   * @throws IOException Network or other problems
   */
  @VisibleForTesting
  public Collection<ServiceRecord> actionRegistryList(
      ActionRegistryArgs registryArgs)
      throws YarnException, IOException {
    String serviceType = registryArgs.serviceType;
    String name = registryArgs.name;
    RegistryOperations operations = getRegistryOperations();
    Collection<ServiceRecord> serviceRecords;
    if (StringUtils.isEmpty(name)) {
      String path =
          serviceclassPath(
              currentUser(),
              serviceType);

      try {
        Map<String, ServiceRecord> recordMap =
            listServiceRecords(operations, path);
        if (recordMap.isEmpty()) {
          throw new UnknownApplicationInstanceException(
              "No applications registered under " + path);
        }
        serviceRecords = recordMap.values();
      } catch (PathNotFoundException e) {
        throw new NotFoundException(path, e);
      }
    } else {
      ServiceRecord instance = lookupServiceRecord(registryArgs);
      serviceRecords = new ArrayList<ServiceRecord>(1);
      serviceRecords.add(instance);
    }

    for (ServiceRecord serviceRecord : serviceRecords) {
      logInstance(serviceRecord, registryArgs.verbose);
    }
    return serviceRecords;
  }

  @Override
  public int actionDiagnostic(ActionDiagnosticArgs diagnosticArgs) {
    try {
      if (diagnosticArgs.client) {
        actionDiagnosticClient(diagnosticArgs);
      } else if (diagnosticArgs.application) {
        actionDiagnosticApplication(diagnosticArgs);
      } else if (diagnosticArgs.yarn) {
        actionDiagnosticYarn(diagnosticArgs);
      } else if (diagnosticArgs.credentials) {
        actionDiagnosticCredentials();
      } else if (diagnosticArgs.all) {
        actionDiagnosticAll(diagnosticArgs);
      } else if (diagnosticArgs.level) {
        actionDiagnosticIntelligent(diagnosticArgs);
      } else {
        // it's an unknown option
        log.info(CommonArgs.usage(serviceArgs, ACTION_DIAGNOSTICS));
        return EXIT_USAGE;
      }
    } catch (Exception e) {
      log.error(e.toString());
      return EXIT_FALSE;
    }
    return EXIT_SUCCESS;
  }

  private void actionDiagnosticIntelligent(ActionDiagnosticArgs diagnosticArgs)
      throws YarnException, IOException, URISyntaxException {
    // not using member variable clustername because we want to place
    // application name after --application option and member variable
    // cluster name has to be put behind action
    String clusterName = diagnosticArgs.name;
    if(SliderUtils.isUnset(clusterName)){
      throw new BadCommandArgumentsException("application name must be provided with --name option");
    }
    
    try {
      SliderUtils.validateClientConfigFile();
      log.info("Slider-client.xml is accessible");
    } catch (IOException e) {
      // we are catching exceptions here because those are indication of
      // validation result, and we need to print them here
      log.error(
          "validation of slider-client.xml fails because: " + e.toString(), e);
      return;
    }
    SliderClusterOperations clusterOperations = createClusterOperations(clusterName);
    // cluster not found exceptions will be thrown upstream
    ClusterDescription clusterDescription = clusterOperations
        .getClusterDescription();
    log.info("Slider AppMaster is accessible");

    if (clusterDescription.state == ClusterDescription.STATE_LIVE) {
      AggregateConf instanceDefinition = clusterOperations
          .getInstanceDefinition();
      String imagePath = instanceDefinition.getInternalOperations().get(
          InternalKeys.INTERNAL_APPLICATION_IMAGE_PATH);
      // if null, that means slider uploaded the agent tarball for the user
      // and we need to use where slider has put
      if (imagePath == null) {
        ApplicationReport appReport = findInstance(clusterName);
        Path path1 = sliderFileSystem.getTempPathForCluster(clusterName);
        
        Path subPath = new Path(path1, appReport.getApplicationId().toString()
            + "/agent");
        imagePath = subPath.toString();
      }
      try {
        SliderUtils.validateHDFSFile(sliderFileSystem, imagePath + "/" + AGENT_TAR);
        log.info("Slider agent package is properly installed");
      } catch (FileNotFoundException e) {
        log.error("can not find agent package: " + e.toString());
        return;
      } catch (IOException e) {
        log.error("can not open agent package: " + e.toString());
        return;
      }
      String pkgTarballPath = instanceDefinition.getAppConfOperations()
          .getGlobalOptions().getMandatoryOption(AgentKeys.APP_DEF);
      try {
        SliderUtils.validateHDFSFile(sliderFileSystem, pkgTarballPath);
        log.info("Application package is properly installed");
      } catch (FileNotFoundException e) {
        log.error("can not find application package: {}", e);
        return;
      } catch (IOException e) {
        log.error("can not open application package: {} ", e);
        return;
      }
    }
  }

  private void actionDiagnosticAll(ActionDiagnosticArgs diagnosticArgs)
      throws IOException, YarnException {
    // assign application name from param to each sub diagnostic function
    actionDiagnosticClient(diagnosticArgs);
    actionDiagnosticApplication(diagnosticArgs);
    actionDiagnosticSlider(diagnosticArgs);
    actionDiagnosticYarn(diagnosticArgs);
    actionDiagnosticCredentials();
  }

  private void actionDiagnosticCredentials() throws BadConfigException,
      IOException {
    if (SliderUtils.isHadoopClusterSecure(SliderUtils
        .loadClientConfigurationResource())) {
      String credentialCacheFileDescription = null;
      try {
        credentialCacheFileDescription = SliderUtils.checkCredentialCacheFile();
      } catch (BadConfigException e) {
        log.error("The credential config is not valid: " + e.toString());
        throw e;
      } catch (IOException e) {
        log.error("Unable to read the credential file: " + e.toString());
        throw e;
      }
      log.info("Credential cache file for the current user: "
          + credentialCacheFileDescription);
    } else {
      log.info("the cluster is not in secure mode");
    }
  }

  private void actionDiagnosticYarn(ActionDiagnosticArgs diagnosticArgs)
      throws IOException, YarnException {
    JSONObject converter = null;
    log.info("the node in the YARN cluster has below state: ");
    List<NodeReport> yarnClusterInfo;
    try {
      yarnClusterInfo = yarnClient.getNodeReports(NodeState.RUNNING);
    } catch (YarnException e1) {
      log.error("Exception happened when fetching node report from the YARN cluster: "
          + e1.toString());
      throw e1;
    } catch (IOException e1) {
      log.error("Network problem happened when fetching node report YARN cluster: "
          + e1.toString());
      throw e1;
    }
    for (NodeReport nodeReport : yarnClusterInfo) {
      log.info(nodeReport.toString());
    }

    if (diagnosticArgs.verbose) {
      Writer configWriter = new StringWriter();
      try {
        Configuration.dumpConfiguration(yarnClient.getConfig(), configWriter);
      } catch (IOException e1) {
        log.error("Network problem happened when retrieving YARN config from YARN: "
            + e1.toString());
        throw e1;
      }
      try {
        converter = new JSONObject(configWriter.toString());
        log.info("the configuration of the YARN cluster is: "
            + converter.toString(2));

      } catch (JSONException e) {
        log.error("JSONException happened during parsing response from YARN: "
            + e.toString());
      }
    }
  }

  private void actionDiagnosticSlider(ActionDiagnosticArgs diagnosticArgs)
      throws YarnException, IOException {
    // not using member variable clustername because we want to place
    // application name after --application option and member variable
    // cluster name has to be put behind action
    String clusterName = diagnosticArgs.name;
    if(SliderUtils.isUnset(clusterName)){
      throw new BadCommandArgumentsException("application name must be provided with --name option");
    }
    SliderClusterOperations clusterOperations;
    AggregateConf instanceDefinition = null;
    try {
      clusterOperations = createClusterOperations(clusterName);
      instanceDefinition = clusterOperations.getInstanceDefinition();
    } catch (YarnException e) {
      log.error("Exception happened when retrieving instance definition from YARN: "
          + e.toString());
      throw e;
    } catch (IOException e) {
      log.error("Network problem happened when retrieving instance definition from YARN: "
          + e.toString());
      throw e;
    }
    String imagePath = instanceDefinition.getInternalOperations().get(
        InternalKeys.INTERNAL_APPLICATION_IMAGE_PATH);
    // if null, it will be uploaded by Slider and thus at slider's path
    if (imagePath == null) {
      ApplicationReport appReport = findInstance(clusterName);
      Path path1 = sliderFileSystem.getTempPathForCluster(clusterName);
      Path subPath = new Path(path1, appReport.getApplicationId().toString()
          + "/agent");
      imagePath = subPath.toString();
    }
    log.info("The path of slider agent tarball on HDFS is: " + imagePath);
  }

  private void actionDiagnosticApplication(ActionDiagnosticArgs diagnosticArgs)
      throws YarnException, IOException {
    // not using member variable clustername because we want to place
    // application name after --application option and member variable
    // cluster name has to be put behind action
    String clusterName = diagnosticArgs.name;
    if(SliderUtils.isUnset(clusterName)){
      throw new BadCommandArgumentsException("application name must be provided with --name option");
    }
    SliderClusterOperations clusterOperations;
    AggregateConf instanceDefinition = null;
    try {
      clusterOperations = createClusterOperations(clusterName);
      instanceDefinition = clusterOperations.getInstanceDefinition();
    } catch (YarnException e) {
      log.error("Exception happened when retrieving instance definition from YARN: "
          + e.toString());
      throw e;
    } catch (IOException e) {
      log.error("Network problem happened when retrieving instance definition from YARN: "
          + e.toString());
      throw e;
    }
    String clusterDir = instanceDefinition.getAppConfOperations()
        .getGlobalOptions().get(AgentKeys.APP_ROOT);
    String pkgTarball = instanceDefinition.getAppConfOperations()
        .getGlobalOptions().get(AgentKeys.APP_DEF);
    String runAsUser = instanceDefinition.getAppConfOperations()
        .getGlobalOptions().get(AgentKeys.RUNAS_USER);

    log.info("The location of the cluster instance directory in HDFS is: "
        + clusterDir);
    log.info("The name of the application package tarball on HDFS is: "
        + pkgTarball);
    log.info("The runas user of the application in the cluster is: "
        + runAsUser);

    if (diagnosticArgs.verbose) {
      log.info("App config of the application: "
          + instanceDefinition.getAppConf().toJson());
      log.info("Resource config of the application: "
          + instanceDefinition.getResources().toJson());
    }
  }

  private void actionDiagnosticClient(ActionDiagnosticArgs diagnosticArgs)
      throws SliderException, IOException {
    try {
      String currentCommandPath = SliderUtils.getCurrentCommandPath();
      SliderVersionInfo.loadAndPrintVersionInfo(log);
      String clientConfigPath = SliderUtils.getClientConfigPath();
      String jdkInfo = SliderUtils.getJDKInfo();
      println("The slider command path: %s", currentCommandPath);
      println("The slider-client.xml used by current running command path: %s",
          clientConfigPath);
      println(jdkInfo);

      // security info
      Configuration config = getConfig();
      if (SliderUtils.isHadoopClusterSecure(config)) {
        println("Hadoop Cluster is secure");
        println("Login user is %s", UserGroupInformation.getLoginUser());
        println("Current user is %s", UserGroupInformation.getCurrentUser());

      } else {
        println("Hadoop Cluster is insecure");
      }

      // verbose?
      if (diagnosticArgs.verbose) {
        // do the environment
        Map<String, String> env = System.getenv();
        Set<String> envList = ConfigHelper.sortedConfigKeys(env.entrySet());
        StringBuilder builder = new StringBuilder("Environment variables:\n");
        for (String key : envList) {
          builder.append(key).append("=").append(env.get(key)).append("\n");
        }
        println(builder.toString());

        // Java properties
        builder = new StringBuilder("JVM Properties\n");
        Map<String, String> props =
            SliderUtils.sortedMap(SliderUtils.toMap(System.getProperties()));
        for (Entry<String, String> entry : props.entrySet()) {
          builder.append(entry.getKey()).append("=")
                 .append(entry.getValue()).append("\n");
        }
        
        println(builder.toString());

        // then the config
        println("Slider client configuration:\n"
                + ConfigHelper.dumpConfigToString(config));
        
      }

      SliderUtils.validateSliderClientEnvironment(log);
    } catch (SliderException e) {
      log.error(e.toString());
      throw e;
    } catch (IOException e) {
      log.error(e.toString());
      throw e;
    }

  }

  /**
   * Log a service record instance
   * @param instance record
   * @param verbose verbose logging of all external endpoints
   */
  private void logInstance(ServiceRecord instance,
      boolean verbose) {
    if (!verbose) {
      log.info("{}", instance.get(YarnRegistryAttributes.YARN_ID, ""));
    } else {
      log.info("{}: ", instance.get(YarnRegistryAttributes.YARN_ID, ""));
      logEndpoints(instance);
    }
  }

  /**
   * Log the external endpoints of a service record
   * @param instance service record instance
   */
  private void logEndpoints(ServiceRecord instance) {
    List<Endpoint> endpoints = instance.external;
    for (Endpoint endpoint : endpoints) {
      log.info(endpoint.toString());
    }
  }

 /**
   * list configs available for an instance
   *
   * @param registryArgs registry Arguments
   * @throws YarnException YARN problems
   * @throws IOException Network or other problems
   */
  public void actionRegistryListConfigsYarn(ActionRegistryArgs registryArgs)
      throws YarnException, IOException {

    ServiceRecord instance = lookupServiceRecord(registryArgs);

    RegistryRetriever retriever = new RegistryRetriever(instance);
    PublishedConfigSet configurations =
        retriever.getConfigurations(!registryArgs.internal);
    PrintStream out = null;
    try {
      if (registryArgs.out != null) {
        out = new PrintStream(new FileOutputStream(registryArgs.out));
      } else {
        out = System.out;
      }
      for (String configName : configurations.keys()) {
        if (!registryArgs.verbose) {
          out.println(configName);
        } else {
          PublishedConfiguration published =
              configurations.get(configName);
          out.printf("%s: %s\n",
              configName,
              published.description);
        }
      }
    } finally {
      if (registryArgs.out != null && out != null) {
        out.flush();
        out.close();
      }
    }
  }

  /**
   * list exports available for an instance
   *
   * @param registryArgs registry Arguments
   * @throws YarnException YARN problems
   * @throws IOException Network or other problems
   */
  public void actionRegistryListExports(ActionRegistryArgs registryArgs)
      throws YarnException, IOException {
    ServiceRecord instance = lookupServiceRecord(registryArgs);

    RegistryRetriever retriever = new RegistryRetriever(instance);
    PublishedExportsSet exports =
        retriever.getExports(!registryArgs.internal);
    PrintStream out = null;
    boolean streaming = false;
    try {
      if (registryArgs.out != null) {
        out = new PrintStream(new FileOutputStream(registryArgs.out));
        streaming = true;
        log.debug("Saving output to {}", registryArgs.out);
      } else {
        out = System.out;
      }
      log.debug("Number of exports: {}", exports.keys().size());
      for (String exportName : exports.keys()) {
        if (streaming) {
          log.debug(exportName);
        }
        if (!registryArgs.verbose) {
          out.println(exportName);
        } else {
          PublishedExports published = exports.get(exportName);
          out.printf("%s: %s\n",
              exportName,
              published.description);
        }
      }
    } finally {
      if (streaming) {
        out.flush();
        out.close();
      }
    }
  }

  /**
   * list configs available for an instance
   *
   * @param registryArgs registry Arguments
   * @throws YarnException YARN problems
   * @throws IOException Network or other problems
   * @throws FileNotFoundException if the config is not found
   */
  @VisibleForTesting
  public PublishedConfiguration actionRegistryGetConfig(ActionRegistryArgs registryArgs)
      throws YarnException, IOException {
    ServiceRecord instance = lookupServiceRecord(registryArgs);

    RegistryRetriever retriever = new RegistryRetriever(instance);
    boolean external = !registryArgs.internal;
    PublishedConfigSet configurations =
        retriever.getConfigurations(external);

    PublishedConfiguration published = retriever.retrieveConfiguration(configurations,
            registryArgs.getConf,
            external);
    return published;
  }

  /**
   * get a specific export group
   *
   * @param registryArgs registry Arguments
   *
   * @throws YarnException         YARN problems
   * @throws IOException           Network or other problems
   * @throws FileNotFoundException if the config is not found
   */
  @VisibleForTesting
  public PublishedExports actionRegistryGetExport(ActionRegistryArgs registryArgs)
      throws YarnException, IOException {
    ServiceRecord instance = lookupServiceRecord(registryArgs);

    RegistryRetriever retriever = new RegistryRetriever(instance);
    boolean external = !registryArgs.internal;
    PublishedExportsSet exports =
        retriever.getExports(external);

    PublishedExports published = retriever.retrieveExports(exports,
                                                           registryArgs.getExport,
                                                           external);
    return published;
  }

  /**
   * write out the config. If a destination is provided and that dir is a
   * directory, the entry is written to it with the name provided + extension,
   * else it is printed to standard out.
   * @param published published config
   * @param registryArgs registry Arguments
   * @throws BadCommandArgumentsException
   * @throws IOException
   */
  private void outputConfig(PublishedConfiguration published,
      ActionRegistryArgs registryArgs) throws
      BadCommandArgumentsException,
      IOException {
    // decide whether or not to print
    String entry = registryArgs.getConf;
    String format = registryArgs.format;
    ConfigFormat configFormat = ConfigFormat.resolve(format);
    if (configFormat == null) {
      throw new BadCommandArgumentsException(
          "Unknown/Unsupported format %s ", format);
    }
    PublishedConfigurationOutputter outputter =
        PublishedConfigurationOutputter.createOutputter(configFormat,
            published);
    boolean print = registryArgs.out == null;
    if (!print) {
      File outputPath = registryArgs.out;
      if (outputPath.isDirectory()) {
        // creating it under a directory
        outputPath = new File(outputPath, entry + "." + format);
      }
      log.debug("Destination path: {}", outputPath);
      outputter.save(outputPath);
    } else {
      print(outputter.asString());
    }
    
  }

  /**
   * write out the config
   * @param published
   * @param registryArgs
   * @throws BadCommandArgumentsException
   * @throws IOException
   */
  private void outputExport(PublishedExports published,
                            ActionRegistryArgs registryArgs) throws
      BadCommandArgumentsException,
      IOException {
    // decide whether or not to print
    String entry = registryArgs.getExport;
    String format = ConfigFormat.JSON.toString();
    ConfigFormat configFormat = ConfigFormat.resolve(format);
    if (configFormat == null || configFormat != ConfigFormat.JSON) {
      throw new BadCommandArgumentsException(
          "Unknown/Unsupported format %s . Only JSON is supported.", format);
    }

    PublishedExportsOutputter outputter =
        PublishedExportsOutputter.createOutputter(configFormat,
                                                  published);
    boolean print = registryArgs.out == null;
    if (!print) {
      File destFile;
      destFile = registryArgs.out;
      if (destFile.isDirectory()) {
        // creating it under a directory
        destFile = new File(destFile, entry + "." + format);
      }
      log.info("Destination path: {}", destFile);
      outputter.save(destFile);
    } else {
      print(outputter.asString());
    }
  }

  /**
   * Look up an instance
   * @return instance data
   * @throws SliderException other failures
   * @throws IOException IO problems or wrapped exceptions
   */
  private ServiceRecord lookupServiceRecord(ActionRegistryArgs registryArgs) throws
      SliderException,
      IOException {
    String user;
    if (StringUtils.isNotEmpty(registryArgs.user)) {
      user = RegistryPathUtils.encodeForRegistry(registryArgs.user);
    } else {
      user = currentUser();
    }

    String path = servicePath(user, registryArgs.serviceType,
        registryArgs.name);
    return resolve(path);
  }

  /**
   * Look up a service record of the current user
   * @param serviceType service type
   * @param id instance ID
   * @return instance data
   * @throws UnknownApplicationInstanceException no path or service record
   * at the end of the path
   * @throws SliderException other failures
   * @throws IOException IO problems or wrapped exceptions
   */
  public ServiceRecord lookupServiceRecord(String serviceType, String id)
      throws IOException, SliderException {
    String path = servicePath(currentUser(), serviceType, id);
    return resolve(path);
  }

  /**
   * 
   * Look up an instance
   * @param path path
   * @return instance data
   * @throws NotFoundException no path/no service record
   * at the end of the path
   * @throws SliderException other failures
   * @throws IOException IO problems or wrapped exceptions
   */
  public ServiceRecord resolve(String path)
      throws IOException, SliderException {
    try {
      return getRegistryOperations().resolve(path);
    } catch (PathNotFoundException e) {
      throw new NotFoundException(e.getPath().toString(), e);
    } catch (NoRecordException e) {
      throw new NotFoundException(e.getPath().toString(), e);
    }
  }

  /**
   * List instances in the registry for the current user
   * @return a list of slider registry instances
   * @throws IOException Any IO problem ... including no path in the registry
   * to slider service classes for this user
   * @throws SliderException other failures
   */

  public Map<String, ServiceRecord> listRegistryInstances()
      throws IOException, SliderException {
    Map<String, ServiceRecord> recordMap = listServiceRecords(
        getRegistryOperations(),
        serviceclassPath(currentUser(), SliderKeys.APP_TYPE));
    return recordMap;
  }
  
  /**
   * List instances in the registry
   * @return the instance IDs
   * @throws IOException
   * @throws YarnException
   */
  public List<String> listRegisteredSliderInstances() throws
      IOException,
      YarnException {
    try {
      Map<String, ServiceRecord> recordMap = listServiceRecords(
          getRegistryOperations(),
          serviceclassPath(currentUser(), SliderKeys.APP_TYPE));
      return new ArrayList<String>(recordMap.keySet());
/// JDK7    } catch (YarnException | IOException e) {
    } catch (IOException e) {
      throw e;
    } catch (YarnException e) {
      throw e;
    } catch (Exception e) {
      throw new IOException(e);
    }
  }

  /**
   * Start the registry if it is not there yet
   * @return the registry service
   * @throws SliderException
   * @throws IOException
   */
  private synchronized RegistryOperations maybeStartYarnRegistry()
      throws SliderException, IOException {

    if (registryOperations == null) {
      registryOperations = startRegistryOperationsService();
    }
    return registryOperations;
  }

  @Override
  public RegistryOperations getRegistryOperations()
      throws SliderException, IOException {
    return maybeStartYarnRegistry();
  }

  /**
   * Output to standard out/stderr (implementation specific detail)
   * @param src source
   */
  @SuppressWarnings("UseOfSystemOutOrSystemErr")
  private static void print(CharSequence src) {
    System.out.append(src);
  }

  /**
   * Output to standard out/stderr with a newline after
   * @param message message
   */
  private static void println(String message) {
    print(message);
    print("\n");
  }
  /**
   * Output to standard out/stderr with a newline after, formatted
   * @param message message
   * @param args arguments for string formatting
   */
  private static void println(String message, Object ... args) {
    print(String.format(message, args));
    print("\n");
  }

  /**
   * Implement the lookup action.
   * @param args Action arguments
   * @return 0 if the entry was found
   * @throws IOException
   * @throws YarnException
   * @throws UnknownApplicationInstanceException if a specific instance
   * was named but it was not found
   */
  @VisibleForTesting
  public int actionLookup(ActionLookupArgs args)
      throws IOException, YarnException {
    verifyBindingsDefined();
    try {
      ApplicationId id = ConverterUtils.toApplicationId(args.id);
      ApplicationReport report = yarnClient.getApplicationReport(id);
      SerializedApplicationReport sar = new SerializedApplicationReport(report);
      ApplicationReportSerDeser serDeser = new ApplicationReportSerDeser();
      if (args.outputFile != null) {
        serDeser.save(sar, args.outputFile);
      } else {
        println(serDeser.toJson(sar));
      }
    } catch (IllegalArgumentException e) {
      throw new BadCommandArgumentsException(e, "%s : %s", args, e);
    } catch (ApplicationAttemptNotFoundException notFound) {
      throw new NotFoundException(notFound, notFound.toString());
    } catch (ApplicationNotFoundException notFound) {
      throw new NotFoundException(notFound, notFound.toString());
    }
    return EXIT_SUCCESS;
  }
}


