/*
 * Licensed to the Apache Software Foundation (ASF) under one
 *  or more contributor license agreements.  See the NOTICE file
 *  distributed with this work for additional information
 *  regarding copyright ownership.  The ASF licenses this file
 *  to you under the Apache License, Version 2.0 (the
 *  "License"); you may not use this file except in compliance
 *  with the License.  You may obtain a copy of the License at
 *
 *       http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 */

package org.apache.slider.server.appmaster;

import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.health.HealthCheckRegistry;
import com.codahale.metrics.jvm.GarbageCollectorMetricSet;
import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
import com.codahale.metrics.jvm.ThreadStatesGaugeSet;
import com.google.common.base.Preconditions;
import com.google.protobuf.BlockingService;

import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
import org.apache.hadoop.registry.client.binding.RegistryUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.service.ServiceOperations;
import org.apache.hadoop.service.ServiceStateChangeListener;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.*;
import static org.apache.slider.common.Constants.HADOOP_JAAS_DEBUG;

import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.registry.client.api.RegistryOperations;
import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
import org.apache.hadoop.registry.client.types.yarn.PersistencePolicies;
import org.apache.hadoop.registry.client.types.ServiceRecord;
import org.apache.hadoop.registry.client.types.yarn.YarnRegistryAttributes;
import org.apache.hadoop.registry.server.integration.RMRegistryOperationsService;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager;
import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.webapp.WebAppException;
import org.apache.hadoop.yarn.webapp.WebApps;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import org.apache.slider.api.ClusterDescription;
import org.apache.slider.api.InternalKeys;
import org.apache.slider.api.ResourceKeys;
import org.apache.slider.api.RoleKeys;
import org.apache.slider.api.SliderExitReason;
import org.apache.slider.api.StatusKeys;
import org.apache.slider.api.proto.SliderClusterAPI;
import org.apache.slider.api.types.ApplicationDiagnostics;
import org.apache.slider.client.SliderYarnClientImpl;
import org.apache.slider.common.SliderExitCodes;
import org.apache.slider.common.SliderKeys;
import org.apache.slider.common.params.AbstractActionArgs;
import org.apache.slider.common.params.SliderAMArgs;
import org.apache.slider.common.params.SliderAMCreateAction;
import org.apache.slider.common.params.SliderActions;
import org.apache.slider.common.tools.ConfigHelper;
import org.apache.slider.common.tools.PortScanner;
import org.apache.slider.common.tools.SliderFileSystem;
import org.apache.slider.common.tools.SliderUtils;
import org.apache.slider.common.tools.SliderVersionInfo;
import org.apache.slider.core.build.InstanceIO;
import org.apache.slider.core.conf.AggregateConf;
import org.apache.slider.core.conf.ConfTree;
import org.apache.slider.core.conf.ConfTreeOperations;
import org.apache.slider.core.conf.MapOperations;
import org.apache.slider.core.exceptions.BadConfigException;
import org.apache.slider.core.exceptions.SliderException;
import org.apache.slider.core.exceptions.SliderInternalStateException;
import org.apache.slider.core.exceptions.TriggerClusterTeardownException;
import org.apache.slider.core.launch.CredentialUtils;
import org.apache.slider.core.main.ExitCodeProvider;
import org.apache.slider.core.main.LauncherExitCodes;
import org.apache.slider.core.main.RunService;
import org.apache.slider.core.main.ServiceLauncher;
import org.apache.slider.core.registry.info.CustomRegistryConstants;
import org.apache.slider.providers.ProviderCompleted;
import org.apache.slider.providers.ProviderRole;
import org.apache.slider.providers.ProviderService;
import org.apache.slider.providers.SliderProviderFactory;
import org.apache.slider.providers.agent.AgentKeys;
import org.apache.slider.providers.agent.AgentProviderService;
import org.apache.slider.providers.slideram.SliderAMClientProvider;
import org.apache.slider.providers.slideram.SliderAMProviderService;
import org.apache.slider.server.appmaster.actions.ActionHalt;
import org.apache.slider.server.appmaster.actions.ActionRegisterServiceInstance;
import org.apache.slider.server.appmaster.actions.EscalateOutstandingRequests;
import org.apache.slider.server.appmaster.actions.RegisterComponentInstance;
import org.apache.slider.server.appmaster.actions.QueueExecutor;
import org.apache.slider.server.appmaster.actions.QueueService;
import org.apache.slider.server.appmaster.actions.ActionStopSlider;
import org.apache.slider.server.appmaster.actions.ActionUpgradeContainers;
import org.apache.slider.server.appmaster.actions.AsyncAction;
import org.apache.slider.server.appmaster.actions.RenewingAction;
import org.apache.slider.server.appmaster.actions.ResetFailureWindow;
import org.apache.slider.server.appmaster.actions.ReviewAndFlexApplicationSize;
import org.apache.slider.server.appmaster.actions.UnregisterComponentInstance;
import org.apache.slider.server.appmaster.management.MetricsAndMonitoring;
import org.apache.slider.server.appmaster.management.YarnServiceHealthCheck;
import org.apache.slider.server.appmaster.monkey.ChaosKillAM;
import org.apache.slider.server.appmaster.monkey.ChaosKillContainer;
import org.apache.slider.server.appmaster.monkey.ChaosMonkeyService;
import org.apache.slider.server.appmaster.operations.AsyncRMOperationHandler;
import org.apache.slider.server.appmaster.operations.ProviderNotifyingOperationHandler;
import org.apache.slider.server.appmaster.rpc.RpcBinder;
import org.apache.slider.server.appmaster.rpc.SliderAMPolicyProvider;
import org.apache.slider.server.appmaster.rpc.SliderClusterProtocolPBImpl;
import org.apache.slider.server.appmaster.operations.AbstractRMOperation;
import org.apache.slider.server.appmaster.rpc.SliderIPCService;
import org.apache.slider.server.appmaster.security.SecurityConfiguration;
import org.apache.slider.server.appmaster.state.AppState;
import org.apache.slider.server.appmaster.state.AppStateBindingInfo;
import org.apache.slider.server.appmaster.state.ContainerAssignment;
import org.apache.slider.server.appmaster.state.ProviderAppState;
import org.apache.slider.server.appmaster.state.RMClientAccessForAppState;
import org.apache.slider.server.appmaster.operations.RMOperationHandler;
import org.apache.slider.server.appmaster.state.RoleInstance;
import org.apache.slider.server.appmaster.web.AgentService;
import org.apache.slider.server.appmaster.web.rest.InsecureAmFilterInitializer;
import org.apache.slider.server.appmaster.web.rest.agent.AgentWebApp;
import org.apache.slider.server.appmaster.web.SliderAMWebApp;
import org.apache.slider.server.appmaster.web.WebAppApi;
import org.apache.slider.server.appmaster.web.WebAppApiImpl;
import org.apache.slider.server.appmaster.web.rest.RestPaths;
import org.apache.slider.server.appmaster.web.rest.application.ApplicationResouceContentCacheFactory;
import org.apache.slider.server.appmaster.web.rest.application.resources.ContentCache;
import org.apache.slider.server.services.security.CertificateManager;
import org.apache.slider.server.services.utility.AbstractSliderLaunchedService;
import org.apache.slider.server.services.utility.WebAppService;
import org.apache.slider.server.services.workflow.ServiceThreadFactory;
import org.apache.slider.server.services.workflow.WorkflowExecutorService;
import org.apache.slider.server.services.workflow.WorkflowRpcService;
import org.apache.slider.server.services.yarnregistry.YarnRegistryViewForProviders;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URL;
import java.net.URLClassLoader;
import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * This is the AM, which directly implements the callbacks from the AM and NM
 */
public class SliderAppMaster extends AbstractSliderLaunchedService 
  implements AMRMClientAsync.CallbackHandler,
    NMClientAsync.CallbackHandler,
    RunService,
    SliderExitCodes,
    SliderKeys,
    ServiceStateChangeListener,
    RoleKeys,
    ProviderCompleted,
    AppMasterActionOperations {

  protected static final Logger log =
    LoggerFactory.getLogger(SliderAppMaster.class);

  /**
   * log for YARN events
   */
  protected static final Logger LOG_YARN = log;

  public static final String SERVICE_CLASSNAME_SHORT = "SliderAppMaster";
  public static final String SERVICE_CLASSNAME =
      "org.apache.slider.server.appmaster." + SERVICE_CLASSNAME_SHORT;

  public static final int HEARTBEAT_INTERVAL = 1000;
  public static final int NUM_RPC_HANDLERS = 5;

  /**
   * Metrics and monitoring services.
   * Deployed in {@link #serviceInit(Configuration)}
   */
  private final MetricsAndMonitoring metricsAndMonitoring = new MetricsAndMonitoring(); 

  /**
   * metrics registry
   */
  public MetricRegistry metrics;

  /** Error string on chaos monkey launch failure action: {@value} */
  public static final String E_TRIGGERED_LAUNCH_FAILURE =
      "Chaos monkey triggered launch failure";

  /** YARN RPC to communicate with the Resource Manager or Node Manager */
  private YarnRPC yarnRPC;

  /** Handle to communicate with the Resource Manager*/
  @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
  private AMRMClientAsync asyncRMClient;

  @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
  private RMOperationHandler rmOperationHandler;
  
  private RMOperationHandler providerRMOperationHandler;

  /** Handle to communicate with the Node Manager*/
  @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
  public NMClientAsync nmClientAsync;

  /**
   * Credentials for propagating down to launched containers
   */
  private Credentials containerCredentials;

  /**
   * Slider IPC: Real service handler
   */
  private SliderIPCService sliderIPCService;
  /**
   * Slider IPC: binding
   */
  private WorkflowRpcService rpcService;

  /**
   * Secret manager
   */
  @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
  private ClientToAMTokenSecretManager secretManager;
  
  /** Hostname of the container*/
  private String appMasterHostname = "";
  /* Port on which the app master listens for status updates from clients*/
  private int appMasterRpcPort = 0;
  /** Tracking url to which app master publishes info for clients to monitor*/
  @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
  private String appMasterTrackingUrl = "";

  /** Proxied app master URL (as retrieved from AM report at launch time) */
  @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
  private String appMasterProxiedUrl = "";

  /** Application Attempt Id ( combination of attemptId and fail count )*/
  private ApplicationAttemptId appAttemptID;

  /**
   * App ACLs
   */
  protected Map<ApplicationAccessType, String> applicationACLs;

  /**
   * Ongoing state of the cluster: containers, nodes they
   * live on, etc.
   */
  private final AppState appState =
      new AppState(new ProtobufClusterServices(), metricsAndMonitoring);

  /**
   * App state for external objects. This is almost entirely
   * a read-only view of the application state. To change the state,
   * Providers (or anything else) are expected to queue async changes.
   */
  private final ProviderAppState stateForProviders =
      new ProviderAppState("undefined", appState);

  /**
   * model the state using locks and conditions
   */
  private final ReentrantLock AMExecutionStateLock = new ReentrantLock();
  private final Condition isAMCompleted = AMExecutionStateLock.newCondition();

  /**
   * Flag set if the AM is to be shutdown
   */
  private final AtomicBoolean amCompletionFlag = new AtomicBoolean(false);

  /**
   * Flag set during the init process
   */
  private final AtomicBoolean initCompleted = new AtomicBoolean(false);

  /**
   * Flag to set if the process exit code was set before shutdown started
   */
  @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
  private boolean spawnedProcessExitedBeforeShutdownTriggered;


  /** Arguments passed in : raw*/
  @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
  private SliderAMArgs serviceArgs;

  /**
   * ID of the AM container
   */
  @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
  private ContainerId appMasterContainerID;

  /**
   * Monkey Service -may be null
   */
  private ChaosMonkeyService monkey;
  
  /**
   * ProviderService of this cluster
   */
  @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
  private ProviderService providerService;

  /**
   * The YARN registry service
   */
  @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
  private RegistryOperations registryOperations;

  /**
   * The stop request received...the exit details are extracted
   * from this
   */
  private volatile ActionStopSlider stopAction;

  @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
  private RoleLaunchService launchService;
  
  //username -null if it is not known/not to be set
  @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
  private String hadoop_user_name;
  private String service_user_name;
  
  private SliderAMWebApp webApp;
  @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
  private InetSocketAddress rpcServiceAddress;
  private SliderAMProviderService sliderAMProvider;
  private CertificateManager certificateManager;

  /**
   * Executor.
   * Assigned in {@link #serviceInit(Configuration)}
   */
  private WorkflowExecutorService<ExecutorService> executorService;

  /**
   * Action queues. Created at instance creation, but
   * added as a child and inited in {@link #serviceInit(Configuration)}
   */
  private final QueueService actionQueues = new QueueService();
  private String agentOpsUrl;
  private String agentStatusUrl;
  private YarnRegistryViewForProviders yarnRegistryOperations;
  //private FsDelegationTokenManager fsDelegationTokenManager;
  private RegisterApplicationMasterResponse amRegistrationData;
  private PortScanner portScanner;
  private SecurityConfiguration securityConfiguration;

  /**
   * Is security enabled?
   * Set early on in the {@link #createAndRunCluster(String)} operation.
   */
  private boolean securityEnabled;
  private ContentCache contentCache;

  /**
   * resource limits
   */
  private Resource maximumResourceCapability;

  /**
   * Service Constructor
   */
  public SliderAppMaster() {
    super(SERVICE_CLASSNAME_SHORT);
    new HdfsConfiguration();
    new YarnConfiguration();
  }

/* =================================================================== */
/* service lifecycle methods */
/* =================================================================== */

  @Override //AbstractService
  public synchronized void serviceInit(Configuration conf) throws Exception {
    // slider client if found
    
    Configuration customConf = SliderUtils.loadSliderClientXML();
    // Load in the server configuration - if it is actually on the Classpath
    URL serverXmlUrl = ConfigHelper.getResourceUrl(SLIDER_SERVER_XML);
    if (serverXmlUrl != null) {
      log.info("Loading {} at {}", SLIDER_SERVER_XML, serverXmlUrl);
      Configuration serverConf = ConfigHelper.loadFromResource(SLIDER_SERVER_XML);
      ConfigHelper.mergeConfigurations(customConf, serverConf,
          SLIDER_SERVER_XML, true);
    }
    serviceArgs.applyDefinitions(customConf);
    serviceArgs.applyFileSystemBinding(customConf);
    // conf now contains all customizations

    AbstractActionArgs action = serviceArgs.getCoreAction();
    SliderAMCreateAction createAction = (SliderAMCreateAction) action;

    // sort out the location of the AM
    String rmAddress = createAction.getRmAddress();
    if (rmAddress != null) {
      log.debug("Setting RM address from the command line: {}", rmAddress);
      SliderUtils.setRmSchedulerAddress(customConf, rmAddress);
    }

    log.info("AM configuration:\n{}",
        ConfigHelper.dumpConfigToString(customConf));
    for (Map.Entry<String, String> envs : System.getenv().entrySet()) {
      log.info("System env {}={}", envs.getKey(), envs.getValue());
    }

    ConfigHelper.mergeConfigurations(conf, customConf, SLIDER_CLIENT_XML, true);
    //init security with our conf
    if (SliderUtils.isHadoopClusterSecure(conf)) {
      log.info("Secure mode with kerberos realm {}",
               SliderUtils.getKerberosRealm());
      UserGroupInformation.setConfiguration(conf);
      UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
      log.debug("Authenticating as {}", ugi);
      SliderUtils.verifyPrincipalSet(conf, DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY);
    } else {
      log.info("Cluster is insecure");
    }
    log.info("Login user is {}", UserGroupInformation.getLoginUser());

    //look at settings of Hadoop Auth, to pick up a problem seen once
    checkAndWarnForAuthTokenProblems();
    
    // validate server env
    boolean dependencyChecks =
        !conf.getBoolean(KEY_SLIDER_AM_DEPENDENCY_CHECKS_DISABLED,
            false);
    SliderUtils.validateSliderServerEnvironment(log, dependencyChecks);

    // create and register monitoring services
    addService(metricsAndMonitoring);
    metrics = metricsAndMonitoring.getMetrics();
/* TODO: turn these one once the metrics testing is more under control
    metrics.registerAll(new ThreadStatesGaugeSet());
    metrics.registerAll(new MemoryUsageGaugeSet());
    metrics.registerAll(new GarbageCollectorMetricSet());

*/
    contentCache = ApplicationResouceContentCacheFactory.createContentCache(stateForProviders);

    executorService = new WorkflowExecutorService<>("AmExecutor",
        Executors.newFixedThreadPool(2,
            new ServiceThreadFactory("AmExecutor", true)));
    addService(executorService);

    addService(actionQueues);

    //init all child services
    super.serviceInit(conf);
  }

  @Override
  protected void serviceStart() throws Exception {
    super.serviceStart();
    HealthCheckRegistry health = metricsAndMonitoring.getHealth();
    health.register("AM Health", new YarnServiceHealthCheck(this));
  }

  /**
   * Start the queue processing
   */
  private void startQueueProcessing() {
    log.info("Queue Processing started");
    executorService.execute(actionQueues);
    executorService.execute(new QueueExecutor(this, actionQueues));
  }
  
/* =================================================================== */
/* RunService methods called from ServiceLauncher */
/* =================================================================== */

  /**
   * pick up the args from the service launcher
   * @param config configuration
   * @param args argument list
   */
  @Override // RunService
  public Configuration bindArgs(Configuration config, String... args) throws Exception {
    // let the superclass process it
    Configuration superConf = super.bindArgs(config, args);
    // add the slider XML config
    ConfigHelper.injectSliderXMLResource();

    //yarn-ify
    YarnConfiguration yarnConfiguration = new YarnConfiguration(
        superConf);
    serviceArgs = new SliderAMArgs(args);
    serviceArgs.parse();

    return SliderUtils.patchConfiguration(yarnConfiguration);
  }


  /**
   * this is called by service launcher; when it returns the application finishes
   * @return the exit code to return by the app
   * @throws Throwable
   */
  @Override
  public int runService() throws Throwable {
    SliderVersionInfo.loadAndPrintVersionInfo(log);

    //dump the system properties if in debug mode
    if (log.isDebugEnabled()) {
      log.debug("System properties:\n" + SliderUtils.propertiesToString(System.getProperties()));
    }

    //choose the action
    String action = serviceArgs.getAction();
    List<String> actionArgs = serviceArgs.getActionArgs();
    int exitCode;
    switch (action) {
      case SliderActions.ACTION_HELP:
        log.info("{}: {}", getName(), serviceArgs.usage());
        exitCode = SliderExitCodes.EXIT_USAGE;
        break;
      case SliderActions.ACTION_CREATE:
        exitCode = createAndRunCluster(actionArgs.get(0));
        break;
      default:
        throw new SliderException("Unimplemented: " + action);
    }
    log.info("Exiting AM; final exit code = {}", exitCode);
    return exitCode;
  }

  /**
   * Initialize a newly created service then add it. 
   * Because the service is not started, this MUST be done before
   * the AM itself starts, or it is explicitly added after
   * @param service the service to init
   */
  public Service initAndAddService(Service service) {
    service.init(getConfig());
    addService(service);
    return service;
  }

  /* =================================================================== */

  /**
   * Create and run the cluster.
   * @param clustername cluster name
   * @return exit code
   * @throws Throwable on a failure
   */
  private int createAndRunCluster(String clustername) throws Throwable {

    //load the cluster description from the cd argument
    String sliderClusterDir = serviceArgs.getSliderClusterURI();
    URI sliderClusterURI = new URI(sliderClusterDir);
    Path clusterDirPath = new Path(sliderClusterURI);
    log.info("Application defined at {}", sliderClusterURI);
    SliderFileSystem fs = getClusterFS();

    // build up information about the running application -this
    // will be passed down to the cluster status
    MapOperations appInformation = new MapOperations(); 

    AggregateConf instanceDefinition =
      InstanceIO.loadInstanceDefinitionUnresolved(fs, clusterDirPath);
    instanceDefinition.setName(clustername);

    log.info("Deploying cluster {}:", instanceDefinition);

    // and resolve it
    AggregateConf resolvedInstance = new AggregateConf( instanceDefinition);
    resolvedInstance.resolve();

    stateForProviders.setApplicationName(clustername);

    Configuration serviceConf = getConfig();

    // extend AM configuration with component resource
    MapOperations amConfiguration = resolvedInstance
      .getAppConfOperations().getComponent(COMPONENT_AM);
    // and patch configuration with prefix
    if (amConfiguration != null) {
      Map<String, String> sliderAppConfKeys = amConfiguration.prefixedWith("slider.");
      for (Map.Entry<String, String> entry : sliderAppConfKeys.entrySet()) {
        String k = entry.getKey();
        String v = entry.getValue();
        boolean exists = serviceConf.get(k) != null;
        log.info("{} {} to {}", (exists ? "Overwriting" : "Setting"), k, v);
        serviceConf.set(k, v);
      }
    }

    securityConfiguration = new SecurityConfiguration(serviceConf, resolvedInstance, clustername);
    // obtain security state
    securityEnabled = securityConfiguration.isSecurityEnabled();
    // set the global security flag for the instance definition
    instanceDefinition.getAppConfOperations().set(KEY_SECURITY_ENABLED, securityEnabled);

    // triggers resolution and snapshotting for agent
    appState.setInitialInstanceDefinition(instanceDefinition);

    File confDir = getLocalConfDir();
    if (!confDir.exists() || !confDir.isDirectory()) {
      log.info("Conf dir {} does not exist.", confDir);
      File parentFile = confDir.getParentFile();
      log.info("Parent dir {}:\n{}", parentFile, SliderUtils.listDir(parentFile));
    }
    
    //get our provider
    MapOperations globalInternalOptions = getGlobalInternalOptions();
    String providerType = globalInternalOptions.getMandatoryOption(
      InternalKeys.INTERNAL_PROVIDER_NAME);
    log.info("Cluster provider type is {}", providerType);
    SliderProviderFactory factory =
      SliderProviderFactory.createSliderProviderFactory(providerType);
    providerService = factory.createServerProvider();
    // init the provider BUT DO NOT START IT YET
    initAndAddService(providerService);
    providerRMOperationHandler = new ProviderNotifyingOperationHandler(providerService);
    
    // create a slider AM provider
    sliderAMProvider = new SliderAMProviderService();
    initAndAddService(sliderAMProvider);
    
    InetSocketAddress rmSchedulerAddress = SliderUtils.getRmSchedulerAddress(serviceConf);
    log.info("RM is at {}", rmSchedulerAddress);
    yarnRPC = YarnRPC.create(serviceConf);

    // set up the YARN client. This may require patching in the RM client-API address if it
    // is (somehow) unset server-side.    String clientRMaddr = serviceConf.get(YarnConfiguration.RM_ADDRESS);
    InetSocketAddress clientRpcAddress = SliderUtils.getRmAddress(serviceConf);
    if (!SliderUtils.isAddressDefined(clientRpcAddress)) {
      // client addr is being unset. We can lift it from the other RM APIs
      log.warn("Yarn RM address was unbound; attempting to fix up");
      serviceConf.set(YarnConfiguration.RM_ADDRESS,
          String.format("%s:%d", rmSchedulerAddress.getHostString(), clientRpcAddress.getPort() ));
    }

    /*
     * Extract the container ID. This is then
     * turned into an (incomplete) container
     */
    appMasterContainerID = ConverterUtils.toContainerId(
      SliderUtils.mandatoryEnvVariable(ApplicationConstants.Environment.CONTAINER_ID.name()));
    appAttemptID = appMasterContainerID.getApplicationAttemptId();

    ApplicationId appid = appAttemptID.getApplicationId();
    log.info("AM for ID {}", appid.getId());

    appInformation.put(StatusKeys.INFO_AM_CONTAINER_ID, appMasterContainerID.toString());
    appInformation.put(StatusKeys.INFO_AM_APP_ID, appid.toString());
    appInformation.put(StatusKeys.INFO_AM_ATTEMPT_ID, appAttemptID.toString());

    Map<String, String> envVars;
    List<Container> liveContainers;

    /*
     * It is critical this section is synchronized, to stop async AM events
     * arriving while registering a restarting AM.
     */
    synchronized (appState) {
      int heartbeatInterval = HEARTBEAT_INTERVAL;

      // configure AM to wait forever for RM
      getConfig().setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS,
          -1);
      getConfig().unset(YarnConfiguration.CLIENT_FAILOVER_MAX_ATTEMPTS);

      // add the RM client -this brings the callbacks in
      asyncRMClient = AMRMClientAsync.createAMRMClientAsync(heartbeatInterval, this);
      addService(asyncRMClient);
      //now bring it up
      deployChildService(asyncRMClient);
      RMClientAccessForAppState rmClientAccess = new RMClientAccessForAppState(
          asyncRMClient);
      appState.setRMClientAccessForAppState(rmClientAccess);

      // nmclient relays callbacks back to this class
      nmClientAsync = new NMClientAsyncImpl("nmclient", this);
      deployChildService(nmClientAsync);

      // set up secret manager
      secretManager = new ClientToAMTokenSecretManager(appAttemptID, null);

      if (securityEnabled) {
        // fix up the ACLs if they are not set
        String acls = serviceConf.get(KEY_PROTOCOL_ACL);
        if (acls == null) {
          getConfig().set(KEY_PROTOCOL_ACL, "*");
        }
      }

      certificateManager = new CertificateManager();

      //bring up the Slider RPC service
      buildPortScanner(instanceDefinition);
      startSliderRPCServer(instanceDefinition);

      rpcServiceAddress = rpcService.getConnectAddress();
      appMasterHostname = rpcServiceAddress.getAddress().getCanonicalHostName();
      appMasterRpcPort = rpcServiceAddress.getPort();
      appMasterTrackingUrl = null;
      log.info("AM Server is listening at {}:{}", appMasterHostname, appMasterRpcPort);
      appInformation.put(StatusKeys.INFO_AM_HOSTNAME, appMasterHostname);
      appInformation.set(StatusKeys.INFO_AM_RPC_PORT, appMasterRpcPort);

      log.info("Starting Yarn registry");
      registryOperations = startRegistryOperationsService();
      log.info(registryOperations.toString());

      //build the role map
      List<ProviderRole> providerRoles = new ArrayList<>(providerService.getRoles());
      providerRoles.addAll(SliderAMClientProvider.ROLES);

      // Start up the WebApp and track the URL for it
      MapOperations component = instanceDefinition.getAppConfOperations()
          .getComponent(SliderKeys.COMPONENT_AM);
      certificateManager.initialize(component, appMasterHostname,
                                    appMasterContainerID.toString(),
                                    clustername);
      certificateManager.setPassphrase(instanceDefinition.getPassphrase());
 
      if (component.getOptionBool(
          AgentKeys.KEY_AGENT_TWO_WAY_SSL_ENABLED, false)) {
        uploadServerCertForLocalization(clustername, fs);
      }

      // Web service endpoints: initialize
      WebAppApiImpl webAppApi =
          new WebAppApiImpl(
              stateForProviders,
              providerService,
              certificateManager,
              registryOperations,
              metricsAndMonitoring,
              actionQueues,
              this,
              contentCache);
      initAMFilterOptions(serviceConf);

      // start the agent web app
      startAgentWebApp(appInformation, serviceConf, webAppApi);
      int webAppPort = deployWebApplication(webAppApi);

      String scheme = WebAppUtils.HTTP_PREFIX;
      appMasterTrackingUrl = scheme + appMasterHostname + ":" + webAppPort;

      appInformation.put(StatusKeys.INFO_AM_WEB_URL, appMasterTrackingUrl + "/");
      appInformation.set(StatusKeys.INFO_AM_WEB_PORT, webAppPort);

      // *****************************************************
      // Register self with ResourceManager
      // This will start heartbeating to the RM
      // address = SliderUtils.getRmSchedulerAddress(asyncRMClient.getConfig());
      // *****************************************************
      log.info("Connecting to RM at {}; AM tracking URL={}",
               appMasterRpcPort, appMasterTrackingUrl);
      amRegistrationData = asyncRMClient.registerApplicationMaster(appMasterHostname,
                                   appMasterRpcPort,
                                   appMasterTrackingUrl);
      maximumResourceCapability = amRegistrationData.getMaximumResourceCapability();

      int minMemory = serviceConf.getInt(RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
          DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
       // validate scheduler vcores allocation setting
      int minCores = serviceConf.getInt(RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
          DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
      int maxMemory = maximumResourceCapability.getMemory();
      int maxCores = maximumResourceCapability.getVirtualCores();
      appState.setContainerLimits(minMemory,maxMemory, minCores, maxCores );

      // build the handler for RM request/release operations; this uses
      // the max value as part of its lookup
      rmOperationHandler = new AsyncRMOperationHandler(asyncRMClient, maximumResourceCapability);

      // set the RM-defined maximum cluster values
      appInformation.put(ResourceKeys.YARN_CORES, Integer.toString(maxCores));
      appInformation.put(ResourceKeys.YARN_MEMORY, Integer.toString(maxMemory));

      processAMCredentials(securityConfiguration);

      if (securityEnabled) {
        secretManager.setMasterKey(
            amRegistrationData.getClientToAMTokenMasterKey().array());
        applicationACLs = amRegistrationData.getApplicationACLs();

        //tell the server what the ACLs are
        rpcService.getServer().refreshServiceAcl(serviceConf,
            new SliderAMPolicyProvider());
        if (securityConfiguration.isKeytabProvided()) {
          // perform keytab based login to establish kerberos authenticated
          // principal.  Can do so now since AM registration with RM above required
          // tokens associated to principal
          String principal = securityConfiguration.getPrincipal();
          File localKeytabFile = securityConfiguration.getKeytabFile(instanceDefinition);
          // Now log in...
          login(principal, localKeytabFile);
          // obtain new FS reference that should be kerberos based and different
          // than the previously cached reference
          fs = new SliderFileSystem(serviceConf);
        }
      }

      // YARN client.
      // Important: this is only valid at startup, and must be executed within
      // the right UGI context. Use with care.
      SliderYarnClientImpl yarnClient = null;
      List<NodeReport> nodeReports;
      try {
        yarnClient = new SliderYarnClientImpl();
        yarnClient.init(getConfig());
        yarnClient.start();
        nodeReports = getNodeReports(yarnClient);
        log.info("Yarn node report count: {}", nodeReports.size());
        // look up the application itself -this is needed to get the proxied
        // URL of the AM, for registering endpoints.
        // this call must be made after the AM has registered itself, obviously
        ApplicationAttemptReport report = getApplicationAttemptReport(yarnClient);
        appMasterProxiedUrl = report.getTrackingUrl();
        if (SliderUtils.isUnset(appMasterProxiedUrl)) {
          log.warn("Proxied URL is not set in application report");
          appMasterProxiedUrl = appMasterTrackingUrl;
        }
      } finally {
        // at this point yarnClient is no longer needed.
        // stop it immediately
        ServiceOperations.stop(yarnClient);
        yarnClient = null;
      }

      // extract container list

      liveContainers = amRegistrationData.getContainersFromPreviousAttempts();

      //now validate the installation
      Configuration providerConf =
        providerService.loadProviderConfigurationInformation(confDir);

      providerService.initializeApplicationConfiguration(instanceDefinition,
          fs, null);

      providerService.validateApplicationConfiguration(instanceDefinition,
          confDir,
          securityEnabled);

      //determine the location for the role history data
      Path historyDir = new Path(clusterDirPath, HISTORY_DIR_NAME);

      //build the instance
      AppStateBindingInfo binding = new AppStateBindingInfo();
      binding.instanceDefinition = instanceDefinition;
      binding.serviceConfig = serviceConf;
      binding.publishedProviderConf = providerConf;
      binding.roles = providerRoles;
      binding.fs = fs.getFileSystem();
      binding.historyPath = historyDir;
      binding.liveContainers = liveContainers;
      binding.applicationInfo = appInformation;
      binding.releaseSelector = providerService.createContainerReleaseSelector();
      binding.nodeReports = nodeReports;
      appState.buildInstance(binding);

      providerService.rebuildContainerDetails(liveContainers,
          instanceDefinition.getName(), appState.getRolePriorityMap());

      // add the AM to the list of nodes in the cluster

      appState.buildAppMasterNode(appMasterContainerID,
          appMasterHostname,
          webAppPort,
          appMasterHostname + ":" + webAppPort);

      // build up environment variables that the AM wants set in every container
      // irrespective of provider and role.
      envVars = new HashMap<>();
      if (hadoop_user_name != null) {
        envVars.put(HADOOP_USER_NAME, hadoop_user_name);
      }
      String debug_kerberos = System.getenv(HADOOP_JAAS_DEBUG);
      if (debug_kerberos != null) {
        envVars.put(HADOOP_JAAS_DEBUG, debug_kerberos);
      }
    }
    String rolesTmpSubdir = appMasterContainerID.toString() + "/roles";

    String amTmpDir = globalInternalOptions.getMandatoryOption(InternalKeys.INTERNAL_AM_TMP_DIR);

    Path tmpDirPath = new Path(amTmpDir);
    Path launcherTmpDirPath = new Path(tmpDirPath, rolesTmpSubdir);
    fs.getFileSystem().mkdirs(launcherTmpDirPath);

    //launcher service
    launchService = new RoleLaunchService(actionQueues,
                                          providerService,
                                          fs,
                                          new Path(getGeneratedConfDir()),
                                          envVars,
                                          launcherTmpDirPath);

    deployChildService(launchService);

    appState.noteAMLaunched();


    //Give the provider access to the state, and AM
    providerService.bind(stateForProviders, actionQueues, liveContainers);
    sliderAMProvider.bind(stateForProviders, actionQueues, liveContainers);

    // chaos monkey
    maybeStartMonkey();

    // setup token renewal and expiry handling for long lived apps
//    if (!securityConfiguration.isKeytabProvided() &&
//        SliderUtils.isHadoopClusterSecure(getConfig())) {
//      fsDelegationTokenManager = new FsDelegationTokenManager(actionQueues);
//      fsDelegationTokenManager.acquireDelegationToken(getConfig());
//    }

    // if not a secure cluster, extract the username -it will be
    // propagated to workers
    if (!UserGroupInformation.isSecurityEnabled()) {
      hadoop_user_name = System.getenv(HADOOP_USER_NAME);
      log.info(HADOOP_USER_NAME + "='{}'", hadoop_user_name);
    }
    service_user_name = RegistryUtils.currentUser();
    log.info("Registry service username ={}", service_user_name);


    // declare the cluster initialized
    log.info("Application Master Initialization Completed");
    initCompleted.set(true);

    scheduleFailureWindowResets(instanceDefinition.getResources());
    scheduleEscalation(instanceDefinition.getInternal());

    try {
      // schedule YARN Registry registration
      queue(new ActionRegisterServiceInstance(clustername, appid));

      // log the YARN and web UIs
      log.info("RM Webapp address {}",
          serviceConf.get(YarnConfiguration.RM_WEBAPP_ADDRESS));
      log.info("Slider webapp address {} proxied at {}",
        appMasterTrackingUrl, appMasterProxiedUrl);

      // Start the Slider AM provider
      sliderAMProvider.start();

      // launch the real provider; this is expected to trigger a callback that
      // starts the node review process
      launchProviderService(instanceDefinition, confDir);

      // start handling any scheduled events

      startQueueProcessing();

      //now block waiting to be told to exit the process
      waitForAMCompletionSignal();
    } catch(Exception e) {
      log.error("Exception : {}", e, e);
      // call the AM stop command as if it had been queued (but without
      // going via the queue, which may not have started
      ActionStopSlider stopSlider = new ActionStopSlider(e);
      stopSlider.setExitReason(SliderExitReason.SLIDER_AM_ERROR);
      onAMStop(stopSlider);
    }
    //shutdown time
    return finish();
  }

  /**
   * Get the YARN application Attempt report as the logged in user
   * @param yarnClient client to the RM
   * @return the application report
   * @throws YarnException
   * @throws IOException
   * @throws InterruptedException
   */
  private ApplicationAttemptReport getApplicationAttemptReport(
    final SliderYarnClientImpl yarnClient)
      throws YarnException, IOException, InterruptedException {
    Preconditions.checkNotNull(yarnClient, "Null Yarn client");
    ApplicationAttemptReport report;
    if (securityEnabled) {
      UserGroupInformation ugi = UserGroupInformation.getLoginUser();
      report = ugi.doAs(new PrivilegedExceptionAction<ApplicationAttemptReport>() {
        @Override
        public ApplicationAttemptReport run() throws Exception {
          return yarnClient.getApplicationAttemptReport(appAttemptID);
        }
      });
    } else {
      report = yarnClient.getApplicationAttemptReport(appAttemptID);
    }
    return report;
  }

  /**
   * List the node reports: uses {@link SliderYarnClientImpl} as the login user
   * @param yarnClient client to the RM
   * @return the node reports
   * @throws IOException
   * @throws YarnException
   * @throws InterruptedException
   */
  private List<NodeReport> getNodeReports(final SliderYarnClientImpl yarnClient)
    throws IOException, YarnException, InterruptedException {
    Preconditions.checkNotNull(yarnClient, "Null Yarn client");
    List<NodeReport> nodeReports;
    if (securityEnabled) {
      nodeReports = UserGroupInformation.getLoginUser().doAs(
        new PrivilegedExceptionAction<List<NodeReport>>() {
          @Override
          public List<NodeReport> run() throws Exception {
            return yarnClient.getNodeReports(NodeState.RUNNING);
          }
        });
    } else {
      nodeReports = yarnClient.getNodeReports(NodeState.RUNNING);
    }
    log.info("Yarn node report count: {}", nodeReports.size());
    return nodeReports;
  }

  /**
   * Deploy the web application.
   * <p>
   *   Creates and starts the web application, and adds a
   *   <code>WebAppService</code> service under the AM, to ensure
   *   a managed web application shutdown.
   * @param webAppApi web app API instance
   * @return port the web application is deployed on
   * @throws IOException general problems starting the webapp (network, etc)
   * @throws WebAppException other issues
   */
  private int deployWebApplication(WebAppApiImpl webAppApi)
      throws IOException, SliderException {

    try {
      webApp = new SliderAMWebApp(webAppApi);
      HttpConfig.Policy policy = HttpConfig.Policy.HTTP_ONLY;
      int port = getPortToRequest();
      log.info("Launching web application at port {} with policy {}", port, policy);

      WebApps.$for(SliderAMWebApp.BASE_PATH,
          WebAppApi.class,
          webAppApi,
          RestPaths.WS_CONTEXT)
             .withHttpPolicy(getConfig(), policy)
             .at("0.0.0.0", port, true)
             .inDevMode()
             .start(webApp);

      WebAppService<SliderAMWebApp> webAppService =
        new WebAppService<>("slider", webApp);

      deployChildService(webAppService);
      return webApp.port();
    } catch (WebAppException e) {
      if (e.getCause() instanceof IOException) {
        throw (IOException)e.getCause();
      } else {
        throw e;
      }
    }
  }

  /**
   * Process the initial user to obtain the set of user
   * supplied credentials (tokens were passed in by client).
   * Removes the AM/RM token.
   * If a keytab has been provided, also strip the HDFS delegation token.
   * @param securityConfig slider security config
   * @throws IOException
   */
  private void processAMCredentials(SecurityConfiguration securityConfig)
      throws IOException {

    List<Text> filteredTokens = new ArrayList<>(3);
    filteredTokens.add(AMRMTokenIdentifier.KIND_NAME);
    filteredTokens.add(TimelineDelegationTokenIdentifier.KIND_NAME);

    boolean keytabProvided = securityConfig.isKeytabProvided();
    log.info("Slider AM Security Mode: {}", keytabProvided ? "KEYTAB" : "TOKEN");
    if (keytabProvided) {
      filteredTokens.add(DelegationTokenIdentifier.HDFS_DELEGATION_KIND);
    }
    containerCredentials = CredentialUtils.filterTokens(
        UserGroupInformation.getCurrentUser().getCredentials(),
        filteredTokens);
    log.info(CredentialUtils.dumpTokens(containerCredentials, "\n"));
  }

  /**
   * Build up the port scanner. This may include setting a port range.
   */
  private void buildPortScanner(AggregateConf instanceDefinition)
      throws BadConfigException {
    portScanner = new PortScanner();
    String portRange = instanceDefinition.
        getAppConfOperations().getGlobalOptions().
          getOption(SliderKeys.KEY_ALLOWED_PORT_RANGE, "0");
    if (!"0".equals(portRange)) {
        portScanner.setPortRange(portRange);
    }
  }
  
  /**
   * Locate a port to request for a service such as RPC or web/REST.
   * This uses port range definitions in the <code>instanceDefinition</code>
   * to fix the port range —if one is set.
   * <p>
   * The port returned is available at the time of the request; there are
   * no guarantees as to how long that situation will last.
   * @return the port to request.
   * @throws SliderException
   */
  private int getPortToRequest() throws SliderException, IOException {
    return portScanner.getAvailablePort();
  }

  private void uploadServerCertForLocalization(String clustername,
                                               SliderFileSystem fs)
      throws IOException {
    Path certsDir = fs.buildClusterSecurityDirPath(clustername);
    if (!fs.getFileSystem().exists(certsDir)) {
      fs.getFileSystem().mkdirs(certsDir,
        new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE));
    }
    Path destPath = new Path(certsDir, SliderKeys.CRT_FILE_NAME);
    if (!fs.getFileSystem().exists(destPath)) {
      fs.getFileSystem().copyFromLocalFile(
          new Path(CertificateManager.getServerCertficateFilePath().getAbsolutePath()),
          destPath);
      log.info("Uploaded server cert to localization path {}", destPath);
    }

    fs.getFileSystem().setPermission(destPath,
        new FsPermission(FsAction.READ, FsAction.NONE, FsAction.NONE));
  }

  protected void login(String principal, File localKeytabFile)
      throws IOException, SliderException {
    log.info("Logging in as {} with keytab {}", principal, localKeytabFile);
    UserGroupInformation.loginUserFromKeytab(principal,
                                             localKeytabFile.getAbsolutePath());
    validateLoginUser(UserGroupInformation.getLoginUser());
  }

  /**
   * Ensure that the user is generated from a keytab and has no HDFS delegation
   * tokens.
   *
   * @param user user to validate
   * @throws SliderException
   */
  protected void validateLoginUser(UserGroupInformation user)
      throws SliderException {
    if (!user.isFromKeytab()) {
      log.error("User is not holding on a keytab in a secure deployment:" +
          " slider will fail as tokens expire");
    }
    Credentials credentials = user.getCredentials();
    Iterator<Token<? extends TokenIdentifier>> iter =
        credentials.getAllTokens().iterator();
    while (iter.hasNext()) {
      Token<? extends TokenIdentifier> token = iter.next();
      log.info("Token {}", token.getKind());
      if (token.getKind().equals(
          DelegationTokenIdentifier.HDFS_DELEGATION_KIND)) {
        log.info("HDFS delegation token {}.  Removing...", token);
        iter.remove();
      }
    }
  }

  /**
   * Set up and start the agent web application 
   * @param appInformation application information
   * @param serviceConf service configuration
   * @param webAppApi web app API instance to bind to
   * @throws IOException
   */
  private void startAgentWebApp(MapOperations appInformation,
      Configuration serviceConf, WebAppApiImpl webAppApi) throws IOException, SliderException {
    URL[] urls = ((URLClassLoader) AgentWebApp.class.getClassLoader() ).getURLs();
    StringBuilder sb = new StringBuilder("AM classpath:");
    for (URL url : urls) {
      sb.append("\n").append(url.toString());
    }
    LOG_YARN.debug(sb.append("\n").toString());
    initAMFilterOptions(serviceConf);


    // Start up the agent web app and track the URL for it
    MapOperations appMasterConfig = getInstanceDefinition()
        .getAppConfOperations().getComponent(SliderKeys.COMPONENT_AM);
    AgentWebApp agentWebApp = AgentWebApp.$for(AgentWebApp.BASE_PATH,
        webAppApi,
        RestPaths.AGENT_WS_CONTEXT)
        .withComponentConfig(appMasterConfig)
        .withPort(getPortToRequest())
        .withSecuredPort(getPortToRequest())
            .start();
    agentOpsUrl =
        "https://" + appMasterHostname + ":" + agentWebApp.getSecuredPort();
    agentStatusUrl =
        "https://" + appMasterHostname + ":" + agentWebApp.getPort();
    AgentService agentService =
      new AgentService("slider-agent", agentWebApp);

    agentService.init(serviceConf);
    agentService.start();
    addService(agentService);

    appInformation.put(StatusKeys.INFO_AM_AGENT_OPS_URL, agentOpsUrl + "/");
    appInformation.put(StatusKeys.INFO_AM_AGENT_STATUS_URL, agentStatusUrl + "/");
    appInformation.set(StatusKeys.INFO_AM_AGENT_STATUS_PORT,
                       agentWebApp.getPort());
    appInformation.set(StatusKeys.INFO_AM_AGENT_OPS_PORT,
                       agentWebApp.getSecuredPort());
  }

  /**
   * Set up the AM filter 
   * @param serviceConf configuration to patch
   */
  private void initAMFilterOptions(Configuration serviceConf) {
    // IP filtering
    String amFilterName = AM_FILTER_NAME;

    // This is here until YARN supports proxy & redirect operations
    // on verbs other than GET, and is only supported for testing
    if (X_DEV_INSECURE_REQUIRED && serviceConf.getBoolean(X_DEV_INSECURE_WS, 
        X_DEV_INSECURE_DEFAULT)) {
      log.warn("Insecure filter enabled: REST operations are unauthenticated");
      amFilterName = InsecureAmFilterInitializer.NAME;
    }

    serviceConf.set(HADOOP_HTTP_FILTER_INITIALIZERS, amFilterName);
  }

  /**
   * This registers the service instance and its external values
   * @param instanceName name of this instance
   * @param appId application ID
   * @throws IOException
   */
  public void registerServiceInstance(String instanceName,
      ApplicationId appId) throws IOException {
    
    
    // the registry is running, so register services
    URL amWebURI = new URL(appMasterProxiedUrl);
    URL agentOpsURI = new URL(agentOpsUrl);
    URL agentStatusURI = new URL(agentStatusUrl);

    //Give the provider restricted access to the state, registry
    setupInitialRegistryPaths();
    yarnRegistryOperations = new YarnRegistryViewForProviders(
        registryOperations,
        service_user_name,
        SliderKeys.APP_TYPE,
        instanceName,
        appAttemptID);
    providerService.bindToYarnRegistry(yarnRegistryOperations);
    sliderAMProvider.bindToYarnRegistry(yarnRegistryOperations);

    // Yarn registry
    ServiceRecord serviceRecord = new ServiceRecord();
    serviceRecord.set(YarnRegistryAttributes.YARN_ID, appId.toString());
    serviceRecord.set(YarnRegistryAttributes.YARN_PERSISTENCE,
        PersistencePolicies.APPLICATION);
    serviceRecord.description = "Slider Application Master";

    serviceRecord.addExternalEndpoint(
        RegistryTypeUtils.ipcEndpoint(
            CustomRegistryConstants.AM_IPC_PROTOCOL,
            rpcServiceAddress));
            
    // internal services
    sliderAMProvider.applyInitialRegistryDefinitions(amWebURI,
        agentOpsURI,
        agentStatusURI,
        serviceRecord);

    // provider service dynamic definitions.
    providerService.applyInitialRegistryDefinitions(amWebURI,
        agentOpsURI,
        agentStatusURI,
        serviceRecord);

    // set any provided attributes
    setProvidedServiceRecordAttributes(
        getInstanceDefinition().getAppConfOperations().getComponent(
            SliderKeys.COMPONENT_AM), serviceRecord);

    // register the service's entry
    log.info("Service Record \n{}", serviceRecord);
    yarnRegistryOperations.registerSelf(serviceRecord, true);
    log.info("Registered service under {}; absolute path {}",
        yarnRegistryOperations.getSelfRegistrationPath(),
        yarnRegistryOperations.getAbsoluteSelfRegistrationPath());
    
    boolean isFirstAttempt = 1 == appAttemptID.getAttemptId();
    // delete the children in case there are any and this is an AM startup.
    // just to make sure everything underneath is purged
    if (isFirstAttempt) {
      yarnRegistryOperations.deleteChildren(
          yarnRegistryOperations.getSelfRegistrationPath(),
          true);
    }
  }

  /**
   * TODO: purge this once RM is doing the work
   * @throws IOException
   */
  protected void setupInitialRegistryPaths() throws IOException {
    if (registryOperations instanceof RMRegistryOperationsService) {
      RMRegistryOperationsService rmRegOperations =
          (RMRegistryOperationsService) registryOperations;
      rmRegOperations.initUserRegistryAsync(service_user_name);
    }
  }

  /**
   * Handler for {@link RegisterComponentInstance action}
   * Register/re-register an ephemeral container that is already in the app state
   * @param id the component
   * @param description component description
   * @param type component type
   * @return true if the component is registered
   */
  public boolean registerComponent(ContainerId id, String description,
      String type) throws IOException {
    RoleInstance instance = appState.getOwnedContainer(id);
    if (instance == null) {
      return false;
    }
    // this is where component registrations  go
    log.info("Registering component {}", id);
    String cid = RegistryPathUtils.encodeYarnID(id.toString());
    ServiceRecord container = new ServiceRecord();
    container.set(YarnRegistryAttributes.YARN_ID, cid);
    container.description = description;
    container.set(YarnRegistryAttributes.YARN_PERSISTENCE,
        PersistencePolicies.CONTAINER);
    MapOperations compOps = getInstanceDefinition().getAppConfOperations().
        getComponent(type);
    setProvidedServiceRecordAttributes(compOps, container);
    try {
      yarnRegistryOperations.putComponent(cid, container);
    } catch (IOException e) {
      log.warn("Failed to register container {}/{}: {}",
          id, description, e, e);
      return false;
    }
    return true;
  }

  protected void setProvidedServiceRecordAttributes(MapOperations ops,
                                                  ServiceRecord record) {
    String prefix = RoleKeys.SERVICE_RECORD_ATTRIBUTE_PREFIX;
    for (Map.Entry<String, String> entry : ops.entrySet()) {
      if (entry.getKey().startsWith(
          prefix)) {
        String key = entry.getKey().substring(
            prefix.length() + 1);
        record.set(key, entry.getValue().trim());
      }
    }
  }

  /**
   * Handler for {@link UnregisterComponentInstance}
   * 
   * unregister a component. At the time this message is received,
   * the component may not have been registered
   * @param id the component
   */
  public void unregisterComponent(ContainerId id) {
    log.info("Unregistering component {}", id);
    if (yarnRegistryOperations == null) {
      log.warn("Processing unregister component event before initialization " +
               "completed; init flag ={}", initCompleted);
      return;
    }
    String cid = RegistryPathUtils.encodeYarnID(id.toString());
    try {
      yarnRegistryOperations.deleteComponent(cid);
    } catch (IOException e) {
      log.warn("Failed to delete container {} : {}", id, e, e);
    }
  }

  /**
   * looks for a specific case where a token file is provided as an environment
   * variable, yet the file is not there.
   * 
   * This surfaced (once) in HBase, where its HDFS library was looking for this,
   * and somehow the token was missing. This is a check in the AM so that
   * if the problem re-occurs, the AM can fail with a more meaningful message.
   * 
   */
  private void checkAndWarnForAuthTokenProblems() {
    String fileLocation =
      System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
    if (fileLocation != null) {
      File tokenFile = new File(fileLocation);
      if (!tokenFile.exists()) {
        log.warn("Token file {} specified in {} not found", tokenFile,
                 UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
      }
    }
  }

  /**
   * Build the configuration directory passed in or of the target FS
   * @return the file
   */
  public File getLocalConfDir() {
    File confdir =
      new File(SliderKeys.PROPAGATED_CONF_DIR_NAME).getAbsoluteFile();
    return confdir;
  }

  /**
   * Get the path to the DFS configuration that is defined in the cluster specification 
   * @return the generated configuration dir
   */
  public String getGeneratedConfDir() {
    return getGlobalInternalOptions().get(
        InternalKeys.INTERNAL_GENERATED_CONF_PATH);
  }

  /**
   * Get the global internal options for the AM
   * @return a map to access the internals
   */
  public MapOperations getGlobalInternalOptions() {
    return getInstanceDefinition()
      .getInternalOperations().
      getGlobalOptions();
  }

  /**
   * Get the filesystem of this cluster
   * @return the FS of the config
   */
  public SliderFileSystem getClusterFS() throws IOException {
    return new SliderFileSystem(getConfig());
  }

  /**
   * Get the AM log
   * @return the log of the AM
   */
  public static Logger getLog() {
    return log;
  }

  /**
   * Get the application state
   * @return the application state
   */
  public AppState getAppState() {
    return appState;
  }

  /**
   * Block until it is signalled that the AM is done
   */
  private void waitForAMCompletionSignal() {
    AMExecutionStateLock.lock();
    try {
      if (!amCompletionFlag.get()) {
        log.debug("blocking until signalled to terminate");
        isAMCompleted.awaitUninterruptibly();
      }
    } finally {
      AMExecutionStateLock.unlock();
    }
  }

  /**
   * Signal that the AM is complete .. queues it in a separate thread
   *
   * @param stopActionRequest request containing shutdown details
   */
  public synchronized void signalAMComplete(ActionStopSlider stopActionRequest) {
    // this is a queued action: schedule it through the queues
    schedule(stopActionRequest);
  }

  /**
   * Signal that the AM is complete
   *
   * @param stopActionRequest request containing shutdown details
   */
  public synchronized void onAMStop(ActionStopSlider stopActionRequest) {

    AMExecutionStateLock.lock();
    try {
      if (amCompletionFlag.compareAndSet(false, true)) {
        // first stop request received
        this.stopAction = stopActionRequest;
        isAMCompleted.signal();
      }
    } finally {
      AMExecutionStateLock.unlock();
    }
  }

  
  /**
   * trigger the YARN cluster termination process
   * @return the exit code
   * @throws Exception if the stop action contained an Exception which implements
   * ExitCodeProvider
   */
  private synchronized int finish() throws Exception {
    Preconditions.checkNotNull(stopAction, "null stop action");
    FinalApplicationStatus appStatus;
    log.info("Triggering shutdown of the AM: {}", stopAction);

    String finalMessage = stopAction.getMessage();
    //stop the daemon & grab its exit code
    int exitCode = stopAction.getExitCode();
    Exception exception = stopAction.getEx();

    appStatus = stopAction.getFinalApplicationStatus();
    if (!spawnedProcessExitedBeforeShutdownTriggered) {
      //stopped the forked process but don't worry about its exit code
      int forkedExitCode = stopForkedProcess();
      log.debug("Stopped forked process: exit code={}", forkedExitCode);
    }

    // make sure the AM is actually registered. If not, there's no point
    // trying to unregister it
    if (amRegistrationData == null) {
      log.info("Application attempt not yet registered; skipping unregistration");
      if (exception != null) {
        throw exception;
      }
      return exitCode;
    }

    //stop any launches in progress
    launchService.stop();

    //now release all containers
    String containerReleaseMessage = "Application stop triggered";
    releaseAllContainers(containerReleaseMessage);

    // When the application completes, it should send a finish application
    // signal to the RM
    log.info("Application completed. Signalling finish to RM");

    // Serialize the app diagnostics to app message for rich detailed
    // diagnostics
    ApplicationDiagnostics appDiagnostics = getApplicationDiagnostics();
    appDiagnostics.setFinalStatus(appStatus);
    appDiagnostics.setFinalMessage(finalMessage);
    appDiagnostics.setExitReason(stopAction.getExitReason());
    String appMessage = appDiagnostics.toString();
    try {
      log.info("Unregistering AM status={} message={}", appStatus, appMessage);
      asyncRMClient.unregisterApplicationMaster(appStatus, appMessage, null);
    } catch (InvalidApplicationMasterRequestException e) {
      log.info("Application not found in YARN application list;" +
        " it may have been terminated/YARN shutdown in progress: {}", e, e);
    } catch (YarnException | IOException e) {
      log.info("Failed to unregister application: " + e, e);
    }
    if (exception != null) {
      throw exception;
    }
    return exitCode;
  }

    /**
     * Get diagnostics info about containers
     */
  private String getContainerDiagnosticInfo() {

    return appState.getContainerDiagnosticInfo();
  }

  public Object getProxy(Class protocol, InetSocketAddress addr) {
    return yarnRPC.getProxy(protocol, addr, getConfig());
  }

  /**
   * Start the slider RPC server
   */
  private void startSliderRPCServer(AggregateConf instanceDefinition)
      throws IOException, SliderException {
    verifyIPCAccess();

    sliderIPCService = new SliderIPCService(
        this,
        certificateManager,
        stateForProviders,
        actionQueues,
        metricsAndMonitoring,
        contentCache);

    deployChildService(sliderIPCService);
    SliderClusterProtocolPBImpl protobufRelay =
        new SliderClusterProtocolPBImpl(sliderIPCService);
    BlockingService blockingService = SliderClusterAPI.SliderClusterProtocolPB
        .newReflectiveBlockingService(
            protobufRelay);

    int port = getPortToRequest();
    InetSocketAddress rpcAddress = new InetSocketAddress("0.0.0.0", port);
    rpcService =
        new WorkflowRpcService("SliderRPC",
            RpcBinder.createProtobufServer(rpcAddress, getConfig(),
                secretManager,
            NUM_RPC_HANDLERS,
            blockingService,
            null));
    deployChildService(rpcService);
  }

  /**
   * verify that if the cluster is authed, the ACLs are set.
   * @throws BadConfigException if Authorization is set without any ACL
   */
  private void verifyIPCAccess() throws BadConfigException {
    boolean authorization = getConfig().getBoolean(
        CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
        false);
    String acls = getConfig().get(KEY_PROTOCOL_ACL);
    if (authorization && SliderUtils.isUnset(acls)) {
      throw new BadConfigException("Application has IPC authorization enabled in " +
          CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION +
          " but no ACLs in " + KEY_PROTOCOL_ACL);
    }
  }


/* =================================================================== */
/* AMRMClientAsync callbacks */
/* =================================================================== */

  /**
   * Callback event when a container is allocated.
   * 
   * The app state is updated with the allocation, and builds up a list
   * of assignments and RM operations. The assignments are 
   * handed off into the pool of service launchers to asynchronously schedule
   * container launch operations.
   * 
   * The operations are run in sequence; they are expected to be 0 or more
   * release operations (to handle over-allocations)
   * 
   * @param allocatedContainers list of containers that are now ready to be
   * given work.
   */
  @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
  @Override //AMRMClientAsync
  public void onContainersAllocated(List<Container> allocatedContainers) {
    LOG_YARN.info("onContainersAllocated({})", allocatedContainers.size());
    List<ContainerAssignment> assignments = new ArrayList<>();
    List<AbstractRMOperation> operations = new ArrayList<>();
    
    //app state makes all the decisions
    appState.onContainersAllocated(allocatedContainers, assignments, operations);

    //for each assignment: instantiate that role
    for (ContainerAssignment assignment : assignments) {
      try {
        launchService.launchRole(assignment, getInstanceDefinition(),
            buildContainerCredentials());
      } catch (IOException e) {
        // Can be caused by failure to renew credentials with the remote
        // service. If so, don't launch the application. Container is retained,
        // though YARN will take it away after a timeout.
        log.error("Failed to build credentials to launch container: {}", e, e);

      }
    }

    //for all the operations, exec them
    execute(operations);
    log.info("Diagnostics: {}", getContainerDiagnosticInfo());
  }

  @Override //AMRMClientAsync
  public synchronized void onContainersCompleted(List<ContainerStatus> completedContainers) {
    LOG_YARN.info("onContainersCompleted([{}]", completedContainers.size());
    for (ContainerStatus status : completedContainers) {
      ContainerId containerId = status.getContainerId();
      LOG_YARN.info("Container Completion for" +
                    " containerID={}," +
                    " state={}," +
                    " exitStatus={}," +
                    " diagnostics={}",
                    containerId, status.getState(),
                    status.getExitStatus(),
                    status.getDiagnostics());

      // non complete containers should not be here
      assert (status.getState() == ContainerState.COMPLETE);
      AppState.NodeCompletionResult result = appState.onCompletedNode(status);
      if (result.containerFailed) {
        RoleInstance ri = result.roleInstance;
        log.error("Role instance {} failed ", ri);
      }

      //  known nodes trigger notifications
      if(!result.unknownNode) {
        getProviderService().notifyContainerCompleted(containerId);
        queue(new UnregisterComponentInstance(containerId, 0,
            TimeUnit.MILLISECONDS));
      }
    }

    reviewRequestAndReleaseNodes("onContainersCompleted");
  }

  /**
   * Signal that containers are being upgraded. Containers specified with
   * --containers option and all containers of all roles specified with
   * --components option are merged and upgraded.
   * 
   * @param upgradeContainersRequest
   *          request containing upgrade details
   */
  public synchronized void onUpgradeContainers(
      ActionUpgradeContainers upgradeContainersRequest) throws IOException,
      SliderException {
    LOG_YARN.info("onUpgradeContainers({})",
        upgradeContainersRequest.getMessage());
    Set<String> containers = upgradeContainersRequest.getContainers() == null ? new HashSet<String>()
        : upgradeContainersRequest.getContainers();
    LOG_YARN.info("  Container list provided (total {}) : {}",
        containers.size(), containers);
    Set<String> components = upgradeContainersRequest.getComponents() == null ? new HashSet<String>()
        : upgradeContainersRequest.getComponents();
    LOG_YARN.info("  Component list provided (total {}) : {}",
        components.size(), components);
    // If components are specified as well, then grab all the containers of
    // each of the components (roles)
    if (CollectionUtils.isNotEmpty(components)) {
      Map<ContainerId, RoleInstance> liveContainers = appState.getLiveContainers();
      if (CollectionUtils.isNotEmpty(liveContainers.keySet())) {
        Map<String, Set<String>> roleContainerMap = prepareRoleContainerMap(liveContainers);
        for (String component : components) {
          Set<String> roleContainers = roleContainerMap.get(component);
          if (roleContainers != null) {
            containers.addAll(roleContainers);
          }
        }
      }
    }
    LOG_YARN.info("Final list of containers to be upgraded (total {}) : {}",
        containers.size(), containers);
    if (providerService instanceof AgentProviderService) {
      AgentProviderService agentProviderService = (AgentProviderService) providerService;
      agentProviderService.setInUpgradeMode(true);
      agentProviderService.addUpgradeContainers(containers);
    }
  }

  // create a reverse map of roles -> set of all live containers
  private Map<String, Set<String>> prepareRoleContainerMap(
      Map<ContainerId, RoleInstance> liveContainers) {
    // liveContainers is ensured to be not empty
    Map<String, Set<String>> roleContainerMap = new HashMap<>();
    for (Map.Entry<ContainerId, RoleInstance> liveContainer : liveContainers
        .entrySet()) {
      RoleInstance role = liveContainer.getValue();
      if (roleContainerMap.containsKey(role.role)) {
        roleContainerMap.get(role.role).add(liveContainer.getKey().toString());
      } else {
        Set<String> containers = new HashSet<String>();
        containers.add(liveContainer.getKey().toString());
        roleContainerMap.put(role.role, containers);
      }
    }
    return roleContainerMap;
  }

  /**
   * Implementation of cluster flexing.
   * It should be the only way that anything -even the AM itself on startup-
   * asks for nodes. 
   * @param resources the resource tree
   * @throws SliderException slider problems, including invalid configs
   * @throws IOException IO problems
   */
  public void flexCluster(ConfTree resources)
      throws IOException, SliderException {

    AggregateConf newConf =
        new AggregateConf(appState.getInstanceDefinitionSnapshot());
    newConf.setResources(resources);
    // verify the new definition is valid
    sliderAMProvider.validateInstanceDefinition(newConf);
    providerService.validateInstanceDefinition(newConf);

    appState.updateResourceDefinitions(resources);

    // reset the scheduled windows...the values
    // may have changed
    appState.resetFailureCounts();

    // ask for more containers if needed
    reviewRequestAndReleaseNodes("flexCluster");
  }

  /**
   * Schedule the failure window
   * @param resources the resource tree
   * @throws BadConfigException if the window is out of range
   */
  private void scheduleFailureWindowResets(ConfTree resources) throws
      BadConfigException {
    ResetFailureWindow reset = new ResetFailureWindow(rmOperationHandler);
    ConfTreeOperations ops = new ConfTreeOperations(resources);
    MapOperations globals = ops.getGlobalOptions();
    long seconds = globals.getTimeRange(ResourceKeys.CONTAINER_FAILURE_WINDOW,
        ResourceKeys.DEFAULT_CONTAINER_FAILURE_WINDOW_DAYS,
        ResourceKeys.DEFAULT_CONTAINER_FAILURE_WINDOW_HOURS,
        ResourceKeys.DEFAULT_CONTAINER_FAILURE_WINDOW_MINUTES, 0);
    if (seconds > 0) {
      log.info(
          "Scheduling the failure window reset interval to every {} seconds",
          seconds);
      RenewingAction<ResetFailureWindow> renew = new RenewingAction<>(
          reset, seconds, seconds, TimeUnit.SECONDS, 0);
      actionQueues.renewing("failures", renew);
    } else {
      log.info("Failure window reset interval is not set");
    }
  }

  /**
   * Schedule the escalation action
   * @param internal
   * @throws BadConfigException
   */
  private void scheduleEscalation(ConfTree internal) throws BadConfigException {
    EscalateOutstandingRequests escalate = new EscalateOutstandingRequests();
    ConfTreeOperations ops = new ConfTreeOperations(internal);
    int seconds = ops.getGlobalOptions().getOptionInt(InternalKeys.ESCALATION_CHECK_INTERVAL,
        InternalKeys.DEFAULT_ESCALATION_CHECK_INTERVAL);
    RenewingAction<EscalateOutstandingRequests> renew = new RenewingAction<>(
        escalate, seconds, seconds, TimeUnit.SECONDS, 0);
    actionQueues.renewing("escalation", renew);
  }
  
  /**
   * Look at where the current node state is -and whether it should be changed
   * @param reason reason for operation
   */
  private synchronized void reviewRequestAndReleaseNodes(String reason) {
    log.debug("reviewRequestAndReleaseNodes({})", reason);
    queue(new ReviewAndFlexApplicationSize(reason, 0, TimeUnit.SECONDS));
  }

  /**
   * Handle the event requesting a review ... look at the queue and decide
   * whether to act or not
   * @param action action triggering the event. It may be put
   * back into the queue
   * @throws SliderInternalStateException
   */
  public void handleReviewAndFlexApplicationSize(ReviewAndFlexApplicationSize action)
      throws SliderInternalStateException {

    if ( actionQueues.hasQueuedActionWithAttribute(
        AsyncAction.ATTR_REVIEWS_APP_SIZE | AsyncAction.ATTR_HALTS_APP)) {
      // this operation isn't needed at all -existing duplicate or shutdown due
      return;
    }
    // if there is an action which changes cluster size, wait
    if (actionQueues.hasQueuedActionWithAttribute(
        AsyncAction.ATTR_CHANGES_APP_SIZE)) {
      // place the action at the back of the queue
      actionQueues.put(action);
    }
    
    executeNodeReview(action.name);
  }
  
  /**
   * Look at where the current node state is -and whether it should be changed
   */
  public synchronized void executeNodeReview(String reason)
      throws SliderInternalStateException {
    
    log.debug("in executeNodeReview({})", reason);
    if (amCompletionFlag.get()) {
      log.info("Ignoring node review operation: shutdown in progress");
    }
    try {
      List<AbstractRMOperation> allOperations = appState.reviewRequestAndReleaseNodes();
      // tell the provider
      providerRMOperationHandler.execute(allOperations);
      //now apply the operations
      execute(allOperations);
    } catch (TriggerClusterTeardownException e) {
      //App state has decided that it is time to exit
      log.error("Cluster teardown triggered {}", e, e);
      ActionStopSlider stopSlider = new ActionStopSlider(e);
      stopSlider.setExitReason(SliderExitReason.SLIDER_AM_ERROR);
      queue(stopSlider);
    }
  }

  /**
   * Escalate operation as triggered by external timer.
   * <p>
   * Get the list of new operations off the AM, then executest them.
   */
  public void escalateOutstandingRequests() {
    List<AbstractRMOperation> operations = appState.escalateOutstandingRequests();
    providerRMOperationHandler.execute(operations);
    execute(operations);
  }


  /**
   * Shutdown operation: release all containers
   */
  private void releaseAllContainers(String releaseMessage) {
    if (providerService instanceof AgentProviderService) {
      log.info("Setting stopInitiated flag to true");
      AgentProviderService agentProviderService = (AgentProviderService) providerService;
      agentProviderService.setAppStopInitiated(true);
    }
    // Add the sleep here (before releasing containers) so that applications get
    // time to perform graceful shutdown
    try {
      long timeout = getContainerReleaseTimeout();
      if (timeout > 0) {
        Thread.sleep(timeout);
      }
    } catch (InterruptedException e) {
      log.info("Sleep for container release interrupted");
    } finally {
      List<AbstractRMOperation> operations = appState
          .releaseAllContainers(releaseMessage);
      // need to call final log link update after we release all containers
      appState.updateAllContainerLogLinks();
      providerRMOperationHandler.execute(operations);
      // now apply the operations
      execute(operations);
    }
  }

  private long getContainerReleaseTimeout() {
    // Get container release timeout in millis or 0 if the property is not set.
    // If non-zero then add the agent heartbeat delay time, since it can take up
    // to that much time for agents to receive the stop command.
    int timeout = getInstanceDefinition().getAppConfOperations()
        .getGlobalOptions()
        .getOptionInt(SliderKeys.APP_CONTAINER_RELEASE_TIMEOUT, 0);
    if (timeout > 0) {
      timeout += SliderKeys.APP_CONTAINER_HEARTBEAT_INTERVAL_SEC;
    }
    // convert to millis
    long timeoutInMillis = timeout * 1000l;
    log.info("Container release timeout in millis = {}", timeoutInMillis);
    return timeoutInMillis;
  }

  /**
   * RM wants to shut down the AM
   */
  @Override //AMRMClientAsync
  public void onShutdownRequest() {
    LOG_YARN.info("Shutdown Request received");
    ActionStopSlider stopSlider = new ActionStopSlider("stop", EXIT_SUCCESS,
        FinalApplicationStatus.SUCCEEDED, "Shutdown requested from RM");
    stopSlider.setExitReason(SliderExitReason.YARN_ERROR);
    signalAMComplete(stopSlider);
  }

  /**
   * Monitored nodes have been changed
   * @param updatedNodes list of updated nodes
   */
  @Override //AMRMClientAsync
  public void onNodesUpdated(List<NodeReport> updatedNodes) {
    LOG_YARN.info("onNodesUpdated({})", updatedNodes.size());
    log.info("Updated nodes {}", updatedNodes);
    // Check if any nodes are lost or revived and update state accordingly

    AppState.NodeUpdatedOutcome outcome = appState.onNodesUpdated(updatedNodes);
    if (!outcome.operations.isEmpty()) {
      execute(outcome.operations);
    }
    // trigger a review if the cluster changed
    if (outcome.clusterChanged) {
      reviewRequestAndReleaseNodes("nodes updated");
    }
  }

  /**
   * heartbeat operation; return the ratio of requested
   * to actual
   * @return progress
   */
  @Override //AMRMClientAsync
  public float getProgress() {
    return appState.getApplicationProgressPercentage();
  }

  @Override //AMRMClientAsync
  public void onError(Throwable e) {
    if (e instanceof InvalidResourceRequestException) {
      // stop the cluster
      LOG_YARN.error("AMRMClientAsync.onError() received {}", e, e);
      ActionStopSlider stopSlider = new ActionStopSlider("stop",
          EXIT_EXCEPTION_THROWN, FinalApplicationStatus.FAILED,
          SliderUtils.extractFirstLine(e.getLocalizedMessage()));
      stopSlider.setExitReason(SliderExitReason.APP_ERROR);
      signalAMComplete(stopSlider);
    } else if (e instanceof InvalidApplicationMasterRequestException) {
      // halt the AM
      LOG_YARN.error("AMRMClientAsync.onError() received {}", e, e);
      queue(new ActionHalt(EXIT_EXCEPTION_THROWN,
          SliderUtils.extractFirstLine(e.getLocalizedMessage())));
    } else {
      // ignore and log
      LOG_YARN.info("Ignoring AMRMClientAsync.onError() received {}", e);
    }
  }

/* =================================================================== */
/* RMOperationHandlerActions */
/* =================================================================== */

 
  @Override
  public void execute(List<AbstractRMOperation> operations) {
    rmOperationHandler.execute(operations);
  }

  @Override
  public void releaseAssignedContainer(ContainerId containerId) {
    rmOperationHandler.releaseAssignedContainer(containerId);
  }

  @Override
  public void addContainerRequest(AMRMClient.ContainerRequest req) {
    rmOperationHandler.addContainerRequest(req);
  }

  @Override
  public int cancelContainerRequests(Priority priority1,
      Priority priority2,
      int count) {
    return rmOperationHandler.cancelContainerRequests(priority1, priority2, count);
  }

  @Override
  public void cancelSingleRequest(AMRMClient.ContainerRequest request) {
    rmOperationHandler.cancelSingleRequest(request);
  }

  @Override
  public void updateBlacklist(List<String> blacklistAdditions,
      List<String> blacklistRemovals) {
    rmOperationHandler.updateBlacklist(blacklistAdditions, blacklistRemovals);
  }

/* =================================================================== */
/* END */
/* =================================================================== */

  /**
   * Launch the provider service
   *
   * @param instanceDefinition definition of the service
   * @param confDir directory of config data
   * @throws IOException
   * @throws SliderException
   */
  protected synchronized void launchProviderService(AggregateConf instanceDefinition,
                                                    File confDir)
    throws IOException, SliderException {
    Map<String, String> env = new HashMap<>();
    boolean execStarted = providerService.exec(instanceDefinition, confDir, env,
        this);
    if (execStarted) {
      providerService.registerServiceListener(this);
      providerService.start();
    } else {
      // didn't start, so don't register
      providerService.start();
      // and send the started event ourselves
      eventCallbackEvent(null);
    }
  }

  /* =================================================================== */
  /* EventCallback  from the child or ourselves directly */
  /* =================================================================== */

  @Override // ProviderCompleted
  public void eventCallbackEvent(Object parameter) {
    // signalled that the child process is up.
    appState.noteAMLive();
    // now ask for the cluster nodes
    try {
      flexCluster(getInstanceDefinition().getResources());
    } catch (Exception e) {
      // cluster flex failure: log
      log.error("Failed to flex cluster nodes: {}", e, e);
      // then what? exit
      ActionStopSlider stopSlider = new ActionStopSlider(e);
      stopSlider.setExitReason(SliderExitReason.SLIDER_AM_ERROR);
      queue(stopSlider);
    }
  }

  /**
   * report container loss. If this isn't already known about, react
   *
   * @param containerId       id of the container which has failed
   * @throws SliderException
   */
  public synchronized void providerLostContainer(
      ContainerId containerId)
      throws SliderException {
    log.info("containerLostContactWithProvider: container {} lost",
        containerId);
    RoleInstance activeContainer = appState.getOwnedContainer(containerId);
    if (activeContainer != null) {
      execute(appState.releaseContainer(containerId));
      // ask for more containers if needed
      log.info("Container released; triggering review");
      reviewRequestAndReleaseNodes("Loss of container");
    } else {
      log.info("Container not in active set - ignoring");
    }
  }

  /* =================================================================== */
  /* ServiceStateChangeListener */
  /* =================================================================== */

  /**
   * Received on listening service termination.
   * @param service the service that has changed.
   */
  @Override //ServiceStateChangeListener
  public void stateChanged(Service service) {
    if (service == providerService && service.isInState(STATE.STOPPED)) {
      //its the current master process in play
      int exitCode = providerService.getExitCode();
      int mappedProcessExitCode = exitCode;

      boolean shouldTriggerFailure = !amCompletionFlag.get()
         && (mappedProcessExitCode != 0);

      if (shouldTriggerFailure) {
        String reason =
            "Spawned process failed with raw " + exitCode + " mapped to " +
            mappedProcessExitCode;
        ActionStopSlider stop = new ActionStopSlider("stop",
            mappedProcessExitCode,
            FinalApplicationStatus.FAILED,
            reason);
        stop.setExitReason(SliderExitReason.YARN_ERROR);
        //this wasn't expected: the process finished early
        spawnedProcessExitedBeforeShutdownTriggered = true;
        log.info(
          "Process has exited with exit code {} mapped to {} -triggering termination",
          exitCode,
          mappedProcessExitCode);

        //tell the AM the cluster is complete 
        signalAMComplete(stop);
      } else {
        //we don't care
        log.info(
          "Process has exited with exit code {} mapped to {} -ignoring",
          exitCode,
          mappedProcessExitCode);
      }
    } else {
      super.stateChanged(service);
    }
  }

  /**
   * stop forked process if it the running process var is not null
   * @return the process exit code
   */
  protected synchronized Integer stopForkedProcess() {
    providerService.stop();
    return providerService.getExitCode();
  }

  /**
   *  Async start container request
   * @param container container
   * @param ctx context
   * @param instance node details
   */
  public void startContainer(Container container,
                             ContainerLaunchContext ctx,
                             RoleInstance instance) throws IOException {
    appState.containerStartSubmitted(container, instance);
        
    nmClientAsync.startContainerAsync(container, ctx);
  }

  /**
   * Build the credentials needed for containers. This will include
   * getting new delegation tokens for HDFS if the AM is running
   * with a keytab.
   * @return a buffer of credentials
   * @throws IOException
   */

  private Credentials buildContainerCredentials() throws IOException {
    Credentials credentials = new Credentials(containerCredentials);
    if (securityConfiguration.isKeytabProvided()) {
      CredentialUtils.addSelfRenewableFSDelegationTokens(
          getClusterFS().getFileSystem(),
          credentials);
    }
    return credentials;
  }

  @Override //  NMClientAsync.CallbackHandler 
  public void onContainerStopped(ContainerId containerId) {
    // do nothing but log: container events from the AM
    // are the source of container halt details to react to
    log.info("onContainerStopped {} ", containerId);
  }

  @Override //  NMClientAsync.CallbackHandler 
  public void onContainerStarted(ContainerId containerId,
      Map<String, ByteBuffer> allServiceResponse) {
    LOG_YARN.info("Started Container {} ", containerId);
    RoleInstance cinfo = appState.onNodeManagerContainerStarted(containerId);
    if (cinfo != null) {
      LOG_YARN.info("Deployed instance of role {} onto {}",
          cinfo.role, containerId);
      //trigger an async container status
      nmClientAsync.getContainerStatusAsync(containerId,
                                            cinfo.container.getNodeId());
      // push out a registration
      queue(new RegisterComponentInstance(containerId, cinfo.role, cinfo.group,
          0, TimeUnit.MILLISECONDS));
      
    } else {
      //this is a hypothetical path not seen. We react by warning
      log.error("Notified of started container that isn't pending {} - releasing",
                containerId);
      //then release it
      asyncRMClient.releaseAssignedContainer(containerId);
    }
  }

  @Override //  NMClientAsync.CallbackHandler 
  public void onStartContainerError(ContainerId containerId, Throwable t) {
    LOG_YARN.error("Failed to start Container " + containerId, t);
    appState.onNodeManagerContainerStartFailed(containerId, t);
  }

  @Override //  NMClientAsync.CallbackHandler 
  public void onContainerStatusReceived(ContainerId containerId,
      ContainerStatus containerStatus) {
    LOG_YARN.info("Container Status: id={}, status={}", containerId,
        containerStatus);
    appState.onContainerStatusReceived(containerId, containerStatus);
  }

  @Override //  NMClientAsync.CallbackHandler 
  public void onGetContainerStatusError(
      ContainerId containerId, Throwable t) {
    LOG_YARN.error("Failed to query the status of Container " + containerId, t);
  }

  @Override //  NMClientAsync.CallbackHandler 
  public void onStopContainerError(ContainerId containerId, Throwable t) {
    LOG_YARN.error("Failed to stop Container " + containerId, t);
  }

  public AggregateConf getInstanceDefinition() {
    return appState.getInstanceDefinition();
  }

  /**
   * This is the status, the live model
   */
  public ClusterDescription getClusterDescription() {
    return appState.getClusterStatus();
  }

  /**
   * This is app level diagnostics with info for each and every container
   * allocated for this application during its entire lifetime.
   */
  public ApplicationDiagnostics getApplicationDiagnostics() {
    return getClusterDescription().appDiagnostics;
  }

  public ProviderService getProviderService() {
    return providerService;
  }

  /**
   * Queue an action for immediate execution in the executor thread
   * @param action action to execute
   */
  public void queue(AsyncAction action) {
    actionQueues.put(action);
  }

  /**
   * Schedule an action
   * @param action for delayed execution
   */
  public void schedule(AsyncAction action) {
    actionQueues.schedule(action);
  }


  /**
   * Handle any exception in a thread. If the exception provides an exit
   * code, that is the one that will be used.
   * @param thread thread throwing the exception
   * @param exception exception
   */
  public void onExceptionInThread(Thread thread, Throwable exception) {
    log.error("Exception in {}: {}", thread.getName(), exception, exception);

    // if there is a teardown in progress, ignore it
    if (amCompletionFlag.get()) {
      log.info("Ignoring exception: shutdown in progress");
    } else {
      int exitCode = EXIT_EXCEPTION_THROWN;
      if (exception instanceof ExitCodeProvider) {
        exitCode = ((ExitCodeProvider) exception).getExitCode();
      }
      ActionStopSlider stopSlider = new ActionStopSlider("stop", exitCode,
          FinalApplicationStatus.FAILED,
          SliderUtils.extractFirstLine(exception.getLocalizedMessage()));
      stopSlider.setExitReason(SliderExitReason.SLIDER_AM_ERROR);
      signalAMComplete(stopSlider);
    }
  }

  /**
   * Start the chaos monkey
   * @return true if it started
   */
  private boolean maybeStartMonkey() {
    MapOperations internals = getGlobalInternalOptions();

    Boolean enabled =
        internals.getOptionBool(InternalKeys.CHAOS_MONKEY_ENABLED,
            InternalKeys.DEFAULT_CHAOS_MONKEY_ENABLED);
    if (!enabled) {
      log.debug("Chaos monkey disabled");
      return false;
    }
    
    long monkeyInterval = internals.getTimeRange(
        InternalKeys.CHAOS_MONKEY_INTERVAL,
        InternalKeys.DEFAULT_CHAOS_MONKEY_INTERVAL_DAYS,
        InternalKeys.DEFAULT_CHAOS_MONKEY_INTERVAL_HOURS,
        InternalKeys.DEFAULT_CHAOS_MONKEY_INTERVAL_MINUTES,
        0);
    if (monkeyInterval == 0) {
      log.debug(
          "Chaos monkey not configured with a time interval...not enabling");
      return false;
    }

    long monkeyDelay = internals.getTimeRange(
        InternalKeys.CHAOS_MONKEY_DELAY,
        0,
        0,
        0,
        (int)monkeyInterval);
    
    log.info("Adding Chaos Monkey scheduled every {} seconds ({} hours -delay {}",
        monkeyInterval, monkeyInterval/(60*60), monkeyDelay);
    monkey = new ChaosMonkeyService(metrics, actionQueues);
    initAndAddService(monkey);
    
    // configure the targets
    
    // launch failure: special case with explicit failure triggered now
    int amLaunchFailProbability = internals.getOptionInt(
        InternalKeys.CHAOS_MONKEY_PROBABILITY_AM_LAUNCH_FAILURE,
        0);
    if (amLaunchFailProbability> 0 && monkey.chaosCheck(amLaunchFailProbability)) {
      log.info("Chaos Monkey has triggered AM Launch failure");
      // trigger a failure
      ActionStopSlider stop = new ActionStopSlider("stop",
          0, TimeUnit.SECONDS,
          LauncherExitCodes.EXIT_FALSE,
          FinalApplicationStatus.FAILED,
          E_TRIGGERED_LAUNCH_FAILURE);
      stop.setExitReason(SliderExitReason.CHAOS_MONKEY);
      queue(stop);
    }
    
    int amKillProbability = internals.getOptionInt(
        InternalKeys.CHAOS_MONKEY_PROBABILITY_AM_FAILURE,
        InternalKeys.DEFAULT_CHAOS_MONKEY_PROBABILITY_AM_FAILURE);
    monkey.addTarget("AM killer",
        new ChaosKillAM(actionQueues, -1), amKillProbability);
    int containerKillProbability = internals.getOptionInt(
        InternalKeys.CHAOS_MONKEY_PROBABILITY_CONTAINER_FAILURE,
        InternalKeys.DEFAULT_CHAOS_MONKEY_PROBABILITY_CONTAINER_FAILURE);
    monkey.addTarget("Container killer",
        new ChaosKillContainer(appState, actionQueues, rmOperationHandler),
        containerKillProbability);
    
    // and schedule it
    if (monkey.schedule(monkeyDelay, monkeyInterval, TimeUnit.SECONDS)) {
      log.info("Chaos Monkey is running");
      return true;
    } else {
      log.info("Chaos monkey not started");
      return false;
    }
  }
  
  /**
   * This is the main entry point for the service launcher.
   * @param args command line arguments.
   */
  public static void main(String[] args) {

    //turn the args to a list
    List<String> argsList = Arrays.asList(args);
    //create a new list, as the ArrayList type doesn't push() on an insert
    List<String> extendedArgs = new ArrayList<String>(argsList);
    //insert the service name
    extendedArgs.add(0, SERVICE_CLASSNAME);
    //now have the service launcher do its work
    ServiceLauncher.serviceMain(extendedArgs);
  }

}
