SLIDER-481. Exports should allow a multiple line items per export and a more hierarchical structure
diff --git a/app-packages/memcached/metainfo.xml b/app-packages/memcached/metainfo.xml
index 5801ad2..0984dc9 100644
--- a/app-packages/memcached/metainfo.xml
+++ b/app-packages/memcached/metainfo.xml
@@ -23,17 +23,23 @@
     <comment>Memcache is a network accessible key/value storage system, often used as a distributed cache.</comment>
     <version>1.0.0</version>
     <exportedConfigs>None</exportedConfigs>
+    <exportGroups>
+      <exportGroup>
+        <name>Servers</name>
+        <exports>
+          <export>
+            <name>host_port</name>
+            <value>${MEMCACHED_HOST}:${site.global.listen_port}</value>
+          </export>
+        </exports>
+      </exportGroup>
+    </exportGroups>
 
     <components>
       <component>
         <name>MEMCACHED</name>
         <category>MASTER</category>
-        <componentExports>
-          <componentExport>
-            <name>host_port</name>
-            <value>${THIS_HOST}:${site.global.listen_port}</value>
-          </componentExport>
-        </componentExports>
+        <compExports>Servers-host_port</compExports>
         <commandScript>
           <script>scripts/memcached.py</script>
           <scriptType>PYTHON</scriptType>
diff --git a/slider-core/src/main/java/org/apache/slider/client/SliderClient.java b/slider-core/src/main/java/org/apache/slider/client/SliderClient.java
index 50a7097..eeeee26 100644
--- a/slider-core/src/main/java/org/apache/slider/client/SliderClient.java
+++ b/slider-core/src/main/java/org/apache/slider/client/SliderClient.java
@@ -115,6 +115,9 @@
 import org.apache.slider.core.registry.docstore.PublishedConfigSet;
 import org.apache.slider.core.registry.docstore.PublishedConfiguration;
 import org.apache.slider.core.registry.docstore.PublishedConfigurationOutputter;
+import org.apache.slider.core.registry.docstore.PublishedExports;
+import org.apache.slider.core.registry.docstore.PublishedExportsOutputter;
+import org.apache.slider.core.registry.docstore.PublishedExportsSet;
 import org.apache.slider.core.registry.retrieve.RegistryRetriever;
 import org.apache.slider.core.zk.BlockingZKWatcher;
 import org.apache.slider.core.zk.ZKIntegration;
@@ -2285,11 +2288,19 @@
       } else if (registryArgs.listConf) {
         // list the configurations
         actionRegistryListConfigsYarn(registryArgs);
+      } else if (registryArgs.listExports) {
+        // list the exports
+        actionRegistryListExports(registryArgs);
       } else if (SliderUtils.isSet(registryArgs.getConf)) {
         // get a configuration
         PublishedConfiguration publishedConfiguration =
             actionRegistryGetConfig(registryArgs);
         outputConfig(publishedConfiguration, registryArgs);
+      } else if (SliderUtils.isSet(registryArgs.getExport)) {
+        // get a export group
+        PublishedExports publishedExports =
+            actionRegistryGetExport(registryArgs);
+        outputExport(publishedExports, registryArgs);
       } else {
         // it's an unknown command
         log.info(ActionRegistryArgs.USAGE);
@@ -2698,6 +2709,34 @@
   }
 
   /**
+   * list exports available for an instance
+   *
+   * @param registryArgs registry Arguments
+   * @throws YarnException YARN problems
+   * @throws IOException Network or other problems
+   */
+  public void actionRegistryListExports(ActionRegistryArgs registryArgs)
+      throws YarnException, IOException {
+    ServiceRecord instance = lookupServiceRecord(registryArgs);
+
+    RegistryRetriever retriever = new RegistryRetriever(instance);
+    PublishedExportsSet exports =
+        retriever.getExports(!registryArgs.internal);
+
+    for (String exportName : exports.keys()) {
+      if (!registryArgs.verbose) {
+        log.info("{}", exportName);
+      } else {
+        PublishedExports published =
+            exports.get(exportName);
+        log.info("{} : {}",
+                 exportName,
+                 published.description);
+      }
+    }
+  }
+
+  /**
    * list configs available for an instance
    *
    * @param registryArgs registry Arguments
@@ -2722,6 +2761,31 @@
   }
 
   /**
+   * get a specific export group
+   *
+   * @param registryArgs registry Arguments
+   *
+   * @throws YarnException         YARN problems
+   * @throws IOException           Network or other problems
+   * @throws FileNotFoundException if the config is not found
+   */
+  @VisibleForTesting
+  public PublishedExports actionRegistryGetExport(ActionRegistryArgs registryArgs)
+      throws YarnException, IOException {
+    ServiceRecord instance = lookupServiceRecord(registryArgs);
+
+    RegistryRetriever retriever = new RegistryRetriever(instance);
+    boolean external = !registryArgs.internal;
+    PublishedExportsSet exports =
+        retriever.getExports(external);
+
+    PublishedExports published = retriever.retrieveExports(exports,
+                                                           registryArgs.getExport,
+                                                           external);
+    return published;
+  }
+
+  /**
    * write out the config. If a destination is provided and that dir is a
    * directory, the entry is written to it with the name provided + extension,
    * else it is printed to standard out.
@@ -2761,6 +2825,44 @@
   }
 
   /**
+   * write out the config
+   * @param published
+   * @param registryArgs
+   * @throws BadCommandArgumentsException
+   * @throws IOException
+   */
+  private void outputExport(PublishedExports published,
+                            ActionRegistryArgs registryArgs) throws
+      BadCommandArgumentsException,
+      IOException {
+    // decide whether or not to print
+    String entry = registryArgs.getExport;
+    String format = ConfigFormat.JSON.toString();
+    ConfigFormat configFormat = ConfigFormat.resolve(format);
+    if (configFormat == null || configFormat != ConfigFormat.JSON) {
+      throw new BadCommandArgumentsException(
+          "Unknown/Unsupported format %s . Only JSON is supported.", format);
+    }
+
+    PublishedExportsOutputter outputter =
+        PublishedExportsOutputter.createOutputter(configFormat,
+                                                  published);
+    boolean print = registryArgs.out == null;
+    if (!print) {
+      File destFile;
+      destFile = registryArgs.out;
+      if (destFile.isDirectory()) {
+        // creating it under a directory
+        destFile = new File(destFile, entry + "." + format);
+      }
+      log.info("Destination path: {}", destFile);
+      outputter.save(destFile);
+    } else {
+      print(outputter.asString());
+    }
+  }
+
+  /**
    * Look up an instance
    * @return instance data
    * @throws SliderException other failures
diff --git a/slider-core/src/main/java/org/apache/slider/client/SliderClient.java.orig b/slider-core/src/main/java/org/apache/slider/client/SliderClient.java.orig
new file mode 100644
index 0000000..50a7097
--- /dev/null
+++ b/slider-core/src/main/java/org/apache/slider/client/SliderClient.java.orig
@@ -0,0 +1,2913 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.slider.client;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathNotFoundException;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
+import org.apache.hadoop.registry.client.exceptions.InvalidRecordException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.alias.CredentialProvider;
+import org.apache.hadoop.security.alias.CredentialProviderFactory;
+import org.apache.hadoop.security.alias.CredentialShell;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.registry.client.api.RegistryConstants;
+import org.apache.hadoop.registry.client.api.RegistryOperations;
+import static org.apache.hadoop.registry.client.binding.RegistryUtils.*;
+
+import org.apache.hadoop.registry.client.binding.RegistryUtils;
+import org.apache.hadoop.registry.client.exceptions.NoRecordException;
+import org.apache.hadoop.registry.client.types.Endpoint;
+import org.apache.hadoop.registry.client.types.ServiceRecord;
+import org.apache.hadoop.registry.client.types.yarn.YarnRegistryAttributes;
+import org.apache.slider.api.ClusterDescription;
+import org.apache.slider.api.ClusterNode;
+import org.apache.slider.api.InternalKeys;
+import org.apache.slider.api.OptionKeys;
+import org.apache.slider.api.ResourceKeys;
+import org.apache.slider.api.SliderClusterProtocol;
+import org.apache.slider.api.proto.Messages;
+import org.apache.slider.common.Constants;
+import org.apache.slider.common.SliderExitCodes;
+import org.apache.slider.common.SliderKeys;
+import org.apache.slider.common.params.AbstractActionArgs;
+import org.apache.slider.common.params.AbstractClusterBuildingActionArgs;
+import org.apache.slider.common.params.ActionDiagnosticArgs;
+import org.apache.slider.common.params.ActionInstallPackageArgs;
+import org.apache.slider.common.params.ActionAMSuicideArgs;
+import org.apache.slider.common.params.ActionCreateArgs;
+import org.apache.slider.common.params.ActionEchoArgs;
+import org.apache.slider.common.params.ActionFlexArgs;
+import org.apache.slider.common.params.ActionFreezeArgs;
+import org.apache.slider.common.params.ActionKillContainerArgs;
+import org.apache.slider.common.params.ActionRegistryArgs;
+import org.apache.slider.common.params.ActionResolveArgs;
+import org.apache.slider.common.params.ActionStatusArgs;
+import org.apache.slider.common.params.ActionThawArgs;
+import org.apache.slider.common.params.Arguments;
+import org.apache.slider.common.params.ClientArgs;
+import org.apache.slider.common.params.LaunchArgsAccessor;
+import org.apache.slider.common.tools.ConfigHelper;
+import org.apache.slider.common.tools.Duration;
+import org.apache.slider.common.tools.SliderFileSystem;
+import org.apache.slider.common.tools.SliderUtils;
+import org.apache.slider.common.tools.SliderVersionInfo;
+import org.apache.slider.core.build.InstanceBuilder;
+import org.apache.slider.core.build.InstanceIO;
+import org.apache.slider.core.conf.AggregateConf;
+import org.apache.slider.core.conf.ConfTree;
+import org.apache.slider.core.conf.ConfTreeOperations;
+import org.apache.slider.core.conf.MapOperations;
+import org.apache.slider.core.conf.ResourcesInputPropertiesValidator;
+import org.apache.slider.core.conf.TemplateInputPropertiesValidator;
+import org.apache.slider.core.exceptions.BadClusterStateException;
+import org.apache.slider.core.exceptions.BadCommandArgumentsException;
+import org.apache.slider.core.exceptions.BadConfigException;
+import org.apache.slider.core.exceptions.ErrorStrings;
+import org.apache.slider.core.exceptions.NoSuchNodeException;
+import org.apache.slider.core.exceptions.SliderException;
+import org.apache.slider.core.exceptions.UnknownApplicationInstanceException;
+import org.apache.slider.core.exceptions.WaitTimeoutException;
+import org.apache.slider.core.launch.AppMasterLauncher;
+import org.apache.slider.core.launch.ClasspathConstructor;
+import org.apache.slider.core.launch.CommandLineBuilder;
+import org.apache.slider.core.launch.JavaCommandLineBuilder;
+import org.apache.slider.core.launch.LaunchedApplication;
+import org.apache.slider.core.launch.RunningApplication;
+import org.apache.slider.core.main.RunService;
+import org.apache.slider.core.persist.ConfPersister;
+import org.apache.slider.core.persist.LockAcquireFailedException;
+import org.apache.slider.core.registry.SliderRegistryUtils;
+import org.apache.slider.core.registry.YarnAppListClient;
+import org.apache.slider.core.registry.docstore.ConfigFormat;
+import org.apache.slider.core.registry.docstore.PublishedConfigSet;
+import org.apache.slider.core.registry.docstore.PublishedConfiguration;
+import org.apache.slider.core.registry.docstore.PublishedConfigurationOutputter;
+import org.apache.slider.core.registry.retrieve.RegistryRetriever;
+import org.apache.slider.core.zk.BlockingZKWatcher;
+import org.apache.slider.core.zk.ZKIntegration;
+import org.apache.slider.core.zk.ZKPathBuilder;
+import org.apache.slider.providers.AbstractClientProvider;
+import org.apache.slider.providers.SliderProviderFactory;
+import org.apache.slider.providers.agent.AgentKeys;
+import org.apache.slider.providers.slideram.SliderAMClientProvider;
+import org.apache.slider.server.appmaster.SliderAppMaster;
+import org.apache.slider.server.appmaster.rpc.RpcBinder;
+import org.apache.slider.server.services.utility.AbstractSliderLaunchedService;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.io.Writer;
+import java.net.InetSocketAddress;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import static org.apache.slider.common.params.SliderActions.*;
+
+/**
+ * Client service for Slider
+ */
+
+public class SliderClient extends AbstractSliderLaunchedService implements RunService,
+    SliderExitCodes, SliderKeys, ErrorStrings, SliderClientAPI {
+  private static final Logger log = LoggerFactory.getLogger(SliderClient.class);
+
+  private ClientArgs serviceArgs;
+  public ApplicationId applicationId;
+  
+  private String deployedClusterName;
+  /**
+   * Cluster opaerations against the deployed cluster -will be null
+   * if no bonding has yet taken place
+   */
+  private SliderClusterOperations sliderClusterOperations;
+
+  private SliderFileSystem sliderFileSystem;
+
+  /**
+   * Yarn client service
+   */
+  private SliderYarnClientImpl yarnClient;
+  private YarnAppListClient YarnAppListClient;
+  private AggregateConf launchedInstanceDefinition;
+//  private SliderRegistryService registry;
+
+
+  /**
+   * The YARN registry service
+   */
+  private RegistryOperations registryOperations;
+
+  /**
+   * Constructor
+   */
+  public SliderClient() {
+    super("Slider Client");
+    new HdfsConfiguration();
+    new YarnConfiguration();
+  }
+
+  /**
+   * This is called <i>Before serviceInit is called</i>
+   * @param config the initial configuration build up by the
+   * service launcher.
+   * @param args argument list list of arguments passed to the command line
+   * after any launcher-specific commands have been stripped.
+   * @return the post-binding configuration to pass to the <code>init()</code>
+   * operation.
+   * @throws Exception
+   */
+  @Override
+  public Configuration bindArgs(Configuration config, String... args) throws Exception {
+    config = super.bindArgs(config, args);
+    serviceArgs = new ClientArgs(args);
+    serviceArgs.parse();
+    // yarn-ify
+    YarnConfiguration yarnConfiguration = new YarnConfiguration(config);
+    return SliderUtils.patchConfiguration(yarnConfiguration);
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    Configuration clientConf = SliderUtils.loadClientConfigurationResource();
+    ConfigHelper.mergeConfigurations(conf, clientConf, CLIENT_RESOURCE);
+    serviceArgs.applyDefinitions(conf);
+    serviceArgs.applyFileSystemBinding(conf);
+    // init security with our conf
+    if (SliderUtils.isHadoopClusterSecure(conf)) {
+      SliderUtils.forceLogin();
+      SliderUtils.initProcessSecurity(conf);
+    }
+    AbstractActionArgs coreAction = serviceArgs.getCoreAction();
+    if (coreAction.getHadoopServicesRequired()) {
+      initHadoopBinding();
+    }
+    super.serviceInit(conf);
+  }
+
+  /**
+   * this is where the work is done.
+   * @return the exit code
+   * @throws Throwable anything that went wrong
+   */
+/* JDK7
+
+  @Override
+  public int runService() throws Throwable {
+
+    // choose the action
+    String action = serviceArgs.getAction();
+    int exitCode = EXIT_SUCCESS;
+    String clusterName = serviceArgs.getClusterName();
+    // actions
+    switch (action) {
+      case ACTION_BUILD:
+        exitCode = actionBuild(clusterName, serviceArgs.getActionBuildArgs());
+        break;
+      case ACTION_UPDATE:
+        exitCode = actionUpdate(clusterName, serviceArgs.getActionUpdateArgs());
+        break;
+      case ACTION_CREATE:
+        exitCode = actionCreate(clusterName, serviceArgs.getActionCreateArgs());
+        break;
+      case ACTION_FREEZE:
+        exitCode = actionFreeze(clusterName, serviceArgs.getActionFreezeArgs());
+        break;
+      case ACTION_THAW:
+        exitCode = actionThaw(clusterName, serviceArgs.getActionThawArgs());
+        break;
+      case ACTION_DESTROY:
+        exitCode = actionDestroy(clusterName);
+        break;
+      case ACTION_EXISTS:
+        exitCode = actionExists(clusterName,
+            serviceArgs.getActionExistsArgs().live);
+        break;
+      case ACTION_FLEX:
+        exitCode = actionFlex(clusterName, serviceArgs.getActionFlexArgs());
+        break;
+      case ACTION_GETCONF:
+        exitCode =
+            actionGetConf(clusterName, serviceArgs.getActionGetConfArgs());
+        break;
+      case ACTION_HELP:
+      case ACTION_USAGE:
+        log.info(serviceArgs.usage());
+        break;
+      case ACTION_KILL_CONTAINER:
+        exitCode = actionKillContainer(clusterName,
+            serviceArgs.getActionKillContainerArgs());
+        break;
+      case ACTION_AM_SUICIDE:
+        exitCode = actionAmSuicide(clusterName,
+            serviceArgs.getActionAMSuicideArgs());
+        break;
+      case ACTION_LIST:
+        exitCode = actionList(clusterName);
+        break;
+      case ACTION_REGISTRY:
+        exitCode = actionRegistry(
+            serviceArgs.getActionRegistryArgs());
+        break;
+      case ACTION_STATUS:
+        exitCode = actionStatus(clusterName,
+            serviceArgs.getActionStatusArgs());
+        break;
+      case ACTION_VERSION:
+        exitCode = actionVersion();
+        break;
+      default:
+        throw new SliderException(EXIT_UNIMPLEMENTED,
+            "Unimplemented: " + action);
+    }
+
+    return exitCode;
+  }
+
+*/
+  @Override
+  public int runService() throws Throwable {
+
+    // choose the action
+    String action = serviceArgs.getAction();
+
+    int exitCode = EXIT_SUCCESS;
+    String clusterName = serviceArgs.getClusterName();
+    // actions
+    if (ACTION_INSTALL_PACKAGE.equals(action)) {
+      exitCode = actionInstallPkg(serviceArgs.getActionInstallPackageArgs());
+    } else if (ACTION_BUILD.equals(action)) {
+      exitCode = actionBuild(clusterName, serviceArgs.getActionBuildArgs());
+    } else if (ACTION_CREATE.equals(action)) {
+      exitCode = actionCreate(clusterName, serviceArgs.getActionCreateArgs());
+    } else if (ACTION_FREEZE.equals(action)) {
+      exitCode = actionFreeze(clusterName,
+            serviceArgs.getActionFreezeArgs());
+    } else if (ACTION_THAW.equals(action)) {
+      exitCode = actionThaw(clusterName, serviceArgs.getActionThawArgs());
+    } else if (ACTION_DESTROY.equals(action)) {
+      exitCode = actionDestroy(clusterName);
+    } else if (ACTION_DIAGNOSTIC.equals(action)) {
+      exitCode = actionDiagnostic(serviceArgs.getActionDiagnosticArgs());
+    } else if (ACTION_EXISTS.equals(action)) {
+      exitCode = actionExists(clusterName,
+          serviceArgs.getActionExistsArgs().live);
+    } else if (ACTION_FLEX.equals(action)) {
+      exitCode = actionFlex(clusterName, serviceArgs.getActionFlexArgs());
+    } else if (ACTION_HELP.equals(action) ||
+               ACTION_USAGE.equals(action)) {
+      log.info(serviceArgs.usage());
+    } else if (ACTION_KILL_CONTAINER.equals(action)) {
+      exitCode = actionKillContainer(clusterName,
+          serviceArgs.getActionKillContainerArgs());
+    } else if (ACTION_AM_SUICIDE.equals(action)) {
+      exitCode = actionAmSuicide(clusterName,
+          serviceArgs.getActionAMSuicideArgs());
+    } else if (ACTION_LIST.equals(action)) {
+      exitCode = actionList(clusterName);
+    } else if (ACTION_REGISTRY.equals(action)) {
+      exitCode = actionRegistry(serviceArgs.getActionRegistryArgs());
+    } else if (ACTION_RESOLVE.equals(action)) {
+      exitCode = actionResolve(serviceArgs.getActionResolveArgs());
+    } else if (ACTION_STATUS.equals(action)) {
+      exitCode = actionStatus(clusterName,
+          serviceArgs.getActionStatusArgs());
+    } else if (ACTION_UPDATE.equals(action)) {
+      exitCode = actionUpdate(clusterName, serviceArgs.getActionUpdateArgs());
+    } else if (ACTION_VERSION.equals(action)) {
+      exitCode = actionVersion();
+    } else {
+      throw new SliderException(EXIT_UNIMPLEMENTED,
+          "Unimplemented: " + action);
+    }
+
+    return exitCode;
+  }
+
+/**
+   * Perform everything needed to init the hadoop binding.
+   * This assumes that the service is already  in inited or started state
+   * @throws IOException
+   * @throws SliderException
+   */
+  protected void initHadoopBinding() throws IOException, SliderException {
+    // validate the client
+    SliderUtils.validateSliderClientEnvironment(null);
+    //create the YARN client
+    yarnClient = new SliderYarnClientImpl();
+    yarnClient.init(getConfig());
+    if (getServiceState() == STATE.STARTED) {
+      yarnClient.start();
+    }
+    addService(yarnClient);
+    YarnAppListClient =
+        new YarnAppListClient(yarnClient, getUsername(), getConfig());
+    // create the filesystem
+    sliderFileSystem = new SliderFileSystem(getConfig());    
+  }
+
+  /**
+   * Delete the zookeeper node associated with the calling user and the cluster
+   * TODO: YARN registry operations
+   **/
+  @Deprecated
+  @VisibleForTesting
+  public boolean deleteZookeeperNode(String clusterName) throws YarnException, IOException {
+    String user = getUsername();
+    String zkPath = ZKIntegration.mkClusterPath(user, clusterName);
+    Exception e = null;
+    try {
+      Configuration config = getConfig();
+      if (!SliderUtils.isHadoopClusterSecure(config)) {
+        ZKIntegration client = getZkClient(clusterName, user);
+        if (client != null) {
+          if (client.exists(zkPath)) {
+            log.info("Deleting zookeeper path {}", zkPath);
+          }
+          client.deleteRecursive(zkPath);
+          return true;
+        }
+      } else {
+        log.warn("Default zookeeper node is not available for secure cluster");
+      }
+    } catch (InterruptedException ignored) {
+      e = ignored;
+    } catch (KeeperException ignored) {
+      e = ignored;
+    } catch (BadConfigException ignored) {
+      e = ignored;
+    }
+    if (e != null) {
+      log.warn("Unable to recursively delete zk node {}", zkPath);
+      log.debug("Reason: ", e);
+    }
+
+    return false;
+  }
+
+  /**
+   * Create the zookeeper node associated with the calling user and the cluster
+   */
+  @VisibleForTesting
+  @Deprecated
+  public String createZookeeperNode(String clusterName, Boolean nameOnly) throws YarnException, IOException {
+    String user = getUsername();
+    String zkPath = ZKIntegration.mkClusterPath(user, clusterName);
+    if(nameOnly) {
+      return zkPath;
+    }
+    Configuration config = getConfig();
+    if (!SliderUtils.isHadoopClusterSecure(config)) {
+      ZKIntegration client = getZkClient(clusterName, user);
+      if (client != null) {
+        try {
+          client.createPath(zkPath, "", ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                            CreateMode.PERSISTENT);
+          return zkPath;
+          
+          //JDK7
+//        } catch (InterruptedException | KeeperException e) {
+        } catch (InterruptedException e) {
+          log.warn("Unable to create zk node {}", zkPath, e);
+        } catch ( KeeperException e) {
+          log.warn("Unable to create zk node {}", zkPath, e);
+        }
+      }
+    }
+
+    return null;
+  }
+
+  /**
+   * Gets a zookeeper client, returns null if it cannot connect to zookeeper
+   **/
+  protected ZKIntegration getZkClient(String clusterName, String user) throws YarnException {
+    String registryQuorum = lookupZKQuorum();
+    ZKIntegration client = null;
+    try {
+      BlockingZKWatcher watcher = new BlockingZKWatcher();
+      client = ZKIntegration.newInstance(registryQuorum, user, clusterName, true, false, watcher);
+      client.init();
+      watcher.waitForZKConnection(2 * 1000);
+    } catch (InterruptedException e) {
+      client = null;
+      log.warn("Unable to connect to zookeeper quorum {}", registryQuorum, e);
+    } catch (IOException e) {
+      log.warn("Unable to connect to zookeeper quorum {}", registryQuorum, e);
+    }
+    return client;
+  }
+
+  @Override
+  public int actionDestroy(String clustername) throws YarnException,
+                                                      IOException {
+    // verify that a live cluster isn't there
+    SliderUtils.validateClusterName(clustername);
+    //no=op, it is now mandatory. 
+    verifyBindingsDefined();
+    verifyNoLiveClusters(clustername);
+
+    // create the directory path
+    Path clusterDirectory = sliderFileSystem.buildClusterDirPath(clustername);
+    // delete the directory;
+    boolean exists = sliderFileSystem.getFileSystem().exists(clusterDirectory);
+    if (exists) {
+      log.info("Application Instance {} found at {}: destroying", clustername, clusterDirectory);
+    } else {
+      log.info("Application Instance {} already destroyed", clustername);
+    }
+    boolean deleted =
+      sliderFileSystem.getFileSystem().delete(clusterDirectory, true);
+    if (!deleted) {
+      log.warn("Filesystem returned false from delete() operation");
+    }
+
+    // rm the registry entry —do not let this block the destroy operations
+    String registryPath = SliderRegistryUtils.registryPathForInstance(
+        clustername);
+    try {
+      getRegistryOperations().delete(registryPath, true);
+    } catch (IOException e) {
+      log.warn("Error deleting {}: {} ", registryPath, e, e);
+    } catch (SliderException e) {
+      log.warn("Error binding to registry {} ", e, e);
+    }
+
+    List<ApplicationReport> instances = findAllLiveInstances(clustername);
+    // detect any race leading to cluster creation during the check/destroy process
+    // and report a problem.
+    if (!instances.isEmpty()) {
+      throw new SliderException(EXIT_APPLICATION_IN_USE,
+                              clustername + ": "
+                              + E_DESTROY_CREATE_RACE_CONDITION
+                              + " :" +
+                              instances.get(0));
+    }
+    log.info("Destroyed cluster {}", clustername);
+    return EXIT_SUCCESS;
+  }
+  
+  @Override
+  public int actionAmSuicide(String clustername,
+      ActionAMSuicideArgs args) throws YarnException, IOException {
+    SliderClusterOperations clusterOperations =
+      createClusterOperations(clustername);
+    clusterOperations.amSuicide(args.message, args.exitcode, args.waittime);
+    return EXIT_SUCCESS;
+  }
+
+  @Override
+  public AbstractClientProvider createClientProvider(String provider)
+    throws SliderException {
+    SliderProviderFactory factory =
+      SliderProviderFactory.createSliderProviderFactory(provider);
+    return factory.createClientProvider();
+  }
+
+  /**
+   * Create the cluster -saving the arguments to a specification file first
+   * @param clustername cluster name
+   * @return the status code
+   * @throws YarnException Yarn problems
+   * @throws IOException other problems
+   * @throws BadCommandArgumentsException bad arguments.
+   */
+  public int actionCreate(String clustername, ActionCreateArgs createArgs) throws
+                                               YarnException,
+                                               IOException {
+
+    actionBuild(clustername, createArgs);
+    Path clusterDirectory = sliderFileSystem.buildClusterDirPath(clustername);
+    AggregateConf instanceDefinition = loadInstanceDefinitionUnresolved(
+        clustername, clusterDirectory);
+    try {
+      checkForCredentials(getConfig(), instanceDefinition.getAppConf());
+    } catch (Exception e) {
+      throw new IOException("problem setting credentials", e);
+    }
+    return startCluster(clustername, createArgs);
+  }
+
+  private void checkForCredentials(Configuration conf,
+      ConfTree tree) throws Exception {
+    if (tree.credentials == null || tree.credentials.size()==0) {
+      log.info("No credentials requested");
+      return;
+    }
+    CredentialShell credentialShell = null;
+    for (Entry<String, List<String>> cred : tree.credentials.entrySet()) {
+      String provider = cred.getKey();
+      List<String> aliases = cred.getValue();
+      if (aliases == null || aliases.size()==0) {
+        continue;
+      }
+      if (credentialShell == null) {
+        credentialShell = new CredentialShell();
+        credentialShell.setConf(conf);
+      }
+      Configuration c = new Configuration(conf);
+      c.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH, provider);
+      CredentialProvider credentialProvider =
+          CredentialProviderFactory.getProviders(c).get(0);
+      Set<String> existingAliases = new HashSet<String>(credentialProvider.getAliases());
+      for (String alias : aliases) {
+        if (existingAliases.contains(alias.toLowerCase())) {
+          log.warn("Skipping creation of credentials for {}, " +
+              "alias already exists in {}", alias, provider);
+          continue;
+        }
+        String[] csarg = new String[]{
+            "create", alias, "-provider", provider};
+        log.info("Creating credentials for {} in {}", alias, provider);
+        credentialShell.run(csarg);
+      }
+    }
+  }
+
+  @Override
+  public int actionBuild(String clustername,
+      AbstractClusterBuildingActionArgs buildInfo) throws
+                                               YarnException,
+                                               IOException {
+
+    buildInstanceDefinition(clustername, buildInfo, false, false);
+    return EXIT_SUCCESS; 
+  }
+
+  @Override
+  public int actionInstallPkg(ActionInstallPackageArgs installPkgInfo) throws
+      YarnException,
+      IOException {
+
+    Path srcFile = null;
+    if (StringUtils.isEmpty(installPkgInfo.name )) {
+      throw new BadCommandArgumentsException("A valid application type name is required (e.g. HBASE).");
+    }
+
+    if (StringUtils.isEmpty(installPkgInfo.packageURI)) {
+      throw new BadCommandArgumentsException("A valid application package location required.");
+    } else {
+      File pkgFile = new File(installPkgInfo.packageURI);
+      if (!pkgFile.exists() || pkgFile.isDirectory()) {
+        throw new BadCommandArgumentsException("Unable to access supplied pkg file at " +
+                                               pkgFile.getAbsolutePath());
+      } else {
+        srcFile = new Path(pkgFile.toURI());
+      }
+    }
+
+    Path pkgPath = sliderFileSystem.buildPackageDirPath(installPkgInfo.name);
+    sliderFileSystem.getFileSystem().mkdirs(pkgPath);
+
+    Path fileInFs = new Path(pkgPath, srcFile.getName());
+    log.info("Installing package {} at {} and overwrite is {}.", srcFile, fileInFs, installPkgInfo.replacePkg);
+    if (sliderFileSystem.getFileSystem().exists(fileInFs) && !installPkgInfo.replacePkg) {
+      throw new BadCommandArgumentsException("Pkg exists at " +
+                                             fileInFs.toUri().toString() +
+                                             ". Use --replacepkg to overwrite.");
+    }
+
+    sliderFileSystem.getFileSystem().copyFromLocalFile(false, installPkgInfo.replacePkg, srcFile, fileInFs);
+    return EXIT_SUCCESS;
+  }
+
+  @Override
+  public int actionUpdate(String clustername,
+      AbstractClusterBuildingActionArgs buildInfo) throws
+      YarnException, IOException {
+    buildInstanceDefinition(clustername, buildInfo, true, true);
+    return EXIT_SUCCESS; 
+  }
+
+  /**
+   * Build up the AggregateConfiguration for an application instance then
+   * persists it
+   * @param clustername name of the cluster
+   * @param buildInfo the arguments needed to build the cluster
+   * @param overwrite true if existing cluster directory can be overwritten
+   * @param liveClusterAllowed true if live cluster can be modified
+   * @throws YarnException
+   * @throws IOException
+   */
+  
+  public void buildInstanceDefinition(String clustername,
+      AbstractClusterBuildingActionArgs buildInfo, boolean overwrite, boolean liveClusterAllowed)
+        throws YarnException, IOException {
+    // verify that a live cluster isn't there
+    SliderUtils.validateClusterName(clustername);
+    verifyBindingsDefined();
+    if (!liveClusterAllowed) {
+      verifyNoLiveClusters(clustername);
+    }
+
+    Configuration conf = getConfig();
+    String registryQuorum = lookupZKQuorum();
+
+    Path appconfdir = buildInfo.getConfdir();
+    // Provider
+    String providerName = buildInfo.getProvider();
+    requireArgumentSet(Arguments.ARG_PROVIDER, providerName);
+    log.debug("Provider is {}", providerName);
+    SliderAMClientProvider sliderAM = new SliderAMClientProvider(conf);
+    AbstractClientProvider provider =
+      createClientProvider(providerName);
+    InstanceBuilder builder =
+      new InstanceBuilder(sliderFileSystem, 
+                          getConfig(),
+                          clustername);
+    
+    AggregateConf instanceDefinition = new AggregateConf();
+    ConfTreeOperations appConf = instanceDefinition.getAppConfOperations();
+    ConfTreeOperations resources = instanceDefinition.getResourceOperations();
+    ConfTreeOperations internal = instanceDefinition.getInternalOperations();
+    //initial definition is set by the providers 
+    sliderAM.prepareInstanceConfiguration(instanceDefinition);
+    provider.prepareInstanceConfiguration(instanceDefinition);
+
+    //load in any specified on the command line
+    if (buildInfo.resources != null) {
+      try {
+        resources.mergeFile(buildInfo.resources,
+                            new ResourcesInputPropertiesValidator());
+
+      } catch (IOException e) {
+        throw new BadConfigException(e,
+               "incorrect argument to %s: \"%s\" : %s ", 
+                                     Arguments.ARG_RESOURCES,
+                                     buildInfo.resources,
+                                     e.toString());
+      }
+    }
+    if (buildInfo.template != null) {
+      try {
+        appConf.mergeFile(buildInfo.template,
+                          new TemplateInputPropertiesValidator());
+      } catch (IOException e) {
+        throw new BadConfigException(e,
+                                     "incorrect argument to %s: \"%s\" : %s ",
+                                     Arguments.ARG_TEMPLATE,
+                                     buildInfo.template,
+                                     e.toString());
+      }
+    }
+
+    //get the command line options
+    ConfTree cmdLineAppOptions = buildInfo.buildAppOptionsConfTree();
+    ConfTree cmdLineResourceOptions = buildInfo.buildResourceOptionsConfTree();
+
+    appConf.merge(cmdLineAppOptions);
+
+    // put the role counts into the resources file
+    Map<String, String> argsRoleMap = buildInfo.getComponentMap();
+    for (Map.Entry<String, String> roleEntry : argsRoleMap.entrySet()) {
+      String count = roleEntry.getValue();
+      String key = roleEntry.getKey();
+      log.debug("{} => {}", key, count);
+      resources.getOrAddComponent(key)
+                 .put(ResourceKeys.COMPONENT_INSTANCES, count);
+    }
+
+    //all CLI role options
+    Map<String, Map<String, String>> appOptionMap =
+      buildInfo.getCompOptionMap();
+    appConf.mergeComponents(appOptionMap);
+
+    //internal picks up core. values only
+    internal.propagateGlobalKeys(appConf, "slider.");
+    internal.propagateGlobalKeys(appConf, "internal.");
+
+    //copy over role. and yarn. values ONLY to the resources
+    if (PROPAGATE_RESOURCE_OPTION) {
+      resources.propagateGlobalKeys(appConf, "component.");
+      resources.propagateGlobalKeys(appConf, "role.");
+      resources.propagateGlobalKeys(appConf, "yarn.");
+      resources.mergeComponentsPrefix(appOptionMap, "component.", true);
+      resources.mergeComponentsPrefix(appOptionMap, "yarn.", true);
+      resources.mergeComponentsPrefix(appOptionMap, "role.", true);
+    }
+
+    // resource component args
+    appConf.merge(cmdLineResourceOptions);
+    resources.merge(cmdLineResourceOptions);
+    resources.mergeComponents(buildInfo.getResourceCompOptionMap());
+
+    builder.init(providerName, instanceDefinition);
+    builder.propagateFilename();
+    builder.propagatePrincipals();
+    builder.setImageDetailsIfAvailable(buildInfo.getImage(),
+                                       buildInfo.getAppHomeDir());
+    builder.setQueue(buildInfo.queue);
+
+    String quorum = buildInfo.getZKhosts();
+    if (SliderUtils.isUnset(quorum)) {
+      quorum = registryQuorum;
+    }
+    if (isUnset(quorum)) {
+      throw new BadConfigException("No Zookeeper quorum defined");
+    }
+    ZKPathBuilder zkPaths = new ZKPathBuilder(getAppName(),
+        getUsername(),
+        clustername,
+        registryQuorum,
+        quorum);
+    String zookeeperRoot = buildInfo.getAppZKPath();
+
+    if (isSet(zookeeperRoot)) {
+      zkPaths.setAppPath(zookeeperRoot);
+    } else {
+      String createDefaultZkNode = appConf.getGlobalOptions().getOption(AgentKeys.CREATE_DEF_ZK_NODE, "false");
+      if (createDefaultZkNode.equals("true")) {
+        String defaultZKPath = createZookeeperNode(clustername, false);
+        log.info("ZK node created for application instance: {}.", defaultZKPath);
+        if (defaultZKPath != null) {
+          zkPaths.setAppPath(defaultZKPath);
+        }
+      } else {
+        // create AppPath if default is being used
+        String defaultZKPath = createZookeeperNode(clustername, true);
+        log.info("ZK node assigned to application instance: {}.", defaultZKPath);
+        zkPaths.setAppPath(defaultZKPath);
+      }
+    }
+
+    builder.addZKBinding(zkPaths);
+
+    //then propagate any package URI
+    if (buildInfo.packageURI != null) {
+      appConf.set(AgentKeys.PACKAGE_PATH, buildInfo.packageURI);
+    }
+
+    // make any substitutions needed at this stage
+    replaceTokens(appConf.getConfTree(), getUsername(), clustername);
+
+    // providers to validate what there is
+    AggregateConf instanceDescription = builder.getInstanceDescription();
+    validateInstanceDefinition(sliderAM, instanceDescription, sliderFileSystem);
+    validateInstanceDefinition(provider, instanceDescription, sliderFileSystem);
+    try {
+      persistInstanceDefinition(overwrite, appconfdir, builder);
+    } catch (LockAcquireFailedException e) {
+      log.warn("Failed to get a Lock on {} : {}", builder, e);
+      throw new BadClusterStateException("Failed to save " + clustername
+                                         + ": " + e);
+    }
+  }
+
+  protected void persistInstanceDefinition(boolean overwrite,
+                                         Path appconfdir,
+                                         InstanceBuilder builder)
+      throws IOException, SliderException, LockAcquireFailedException {
+    builder.persist(appconfdir, overwrite);
+  }
+
+  @VisibleForTesting
+  public static void replaceTokens(ConfTree conf,
+      String userName, String clusterName) throws IOException {
+    Map<String,String> newglobal = new HashMap<String,String>();
+    for (Entry<String,String> entry : conf.global.entrySet()) {
+      newglobal.put(entry.getKey(), replaceTokens(entry.getValue(),
+          userName, clusterName));
+    }
+    conf.global.putAll(newglobal);
+
+    Map<String,List<String>> newcred = new HashMap<String,List<String>>();
+    for (Entry<String,List<String>> entry : conf.credentials.entrySet()) {
+      List<String> resultList = new ArrayList<String>();
+      for (String v : entry.getValue()) {
+        resultList.add(replaceTokens(v, userName, clusterName));
+      }
+      newcred.put(replaceTokens(entry.getKey(), userName, clusterName),
+          resultList);
+    }
+    conf.credentials.clear();
+    conf.credentials.putAll(newcred);
+  }
+
+  private static String replaceTokens(String s, String userName,
+      String clusterName) throws IOException {
+    return s.replaceAll(Pattern.quote("${USER}"), userName)
+        .replaceAll(Pattern.quote("${CLUSTER_NAME}"), clusterName);
+  }
+
+  public FsPermission getClusterDirectoryPermissions(Configuration conf) {
+    String clusterDirPermsOct =
+      conf.get(CLUSTER_DIRECTORY_PERMISSIONS,
+          DEFAULT_CLUSTER_DIRECTORY_PERMISSIONS);
+    return new FsPermission(clusterDirPermsOct);
+  }
+
+  /**
+   * Verify that the Resource MAnager is configured, if not fail
+   * with a useful error message
+   * @throws BadCommandArgumentsException the exception raised on an invalid config
+   */
+  public void verifyBindingsDefined() throws BadCommandArgumentsException {
+    InetSocketAddress rmAddr = SliderUtils.getRmAddress(getConfig());
+    if (!SliderUtils.isAddressDefined(rmAddr)) {
+      throw new BadCommandArgumentsException(
+        "No valid Resource Manager address provided in the argument "
+        + Arguments.ARG_MANAGER
+        + " or the configuration property "
+        + YarnConfiguration.RM_ADDRESS 
+        + " value :" + rmAddr);
+    }
+
+  }
+
+  /**
+   * Load and start a cluster specification.
+   * This assumes that all validation of args and cluster state
+   * have already taken place
+   *
+   * @param clustername name of the cluster.
+   * @param launchArgs launch arguments
+   * @return the exit code
+   * @throws YarnException
+   * @throws IOException
+   */
+  private int startCluster(String clustername,
+                           LaunchArgsAccessor launchArgs) throws
+                                                          YarnException,
+                                                          IOException {
+    Path clusterDirectory = sliderFileSystem.buildClusterDirPath(clustername);
+    AggregateConf instanceDefinition = loadInstanceDefinitionUnresolved(
+      clustername,
+      clusterDirectory);
+
+    LaunchedApplication launchedApplication =
+      launchApplication(clustername, clusterDirectory, instanceDefinition,
+                        serviceArgs.isDebug());
+    applicationId = launchedApplication.getApplicationId();
+
+    return waitForAppAccepted(launchedApplication, launchArgs.getWaittime());
+  }
+
+  /**
+   * Load the instance definition. It is not resolved at this point
+   * @param name cluster name
+   * @param clusterDirectory cluster dir
+   * @return the loaded configuration
+   * @throws IOException
+   * @throws SliderException
+   * @throws UnknownApplicationInstanceException if the file is not found
+   */
+  public AggregateConf loadInstanceDefinitionUnresolved(String name,
+                                                         Path clusterDirectory) throws
+                                                                      IOException,
+      SliderException {
+
+    try {
+      AggregateConf definition =
+        InstanceIO.loadInstanceDefinitionUnresolved(sliderFileSystem,
+                                                    clusterDirectory);
+      return definition;
+    } catch (FileNotFoundException e) {
+      throw UnknownApplicationInstanceException.unknownInstance(name, e);
+    }
+  }
+    /**
+   * Load the instance definition. 
+   * @param name cluster name
+   * @param resolved flag to indicate the cluster should be resolved
+   * @return the loaded configuration
+   * @throws IOException IO problems
+   * @throws SliderException slider explicit issues
+   * @throws UnknownApplicationInstanceException if the file is not found
+   */
+    public AggregateConf loadInstanceDefinition(String name,
+        boolean resolved) throws
+        IOException,
+        SliderException {
+
+    Path clusterDirectory = sliderFileSystem.buildClusterDirPath(name);
+    AggregateConf instanceDefinition = loadInstanceDefinitionUnresolved(
+      name,
+      clusterDirectory);
+    if (resolved) {
+      instanceDefinition.resolve();
+    }
+    return instanceDefinition;
+
+  }
+
+  /**
+   *
+   * @param clustername name of the cluster
+   * @param clusterDirectory cluster dir
+   * @param instanceDefinition the instance definition
+   * @param debugAM enable debug AM options
+   * @return the launched application
+   * @throws YarnException
+   * @throws IOException
+   */
+  public LaunchedApplication launchApplication(String clustername,
+                                               Path clusterDirectory,
+                                               AggregateConf instanceDefinition,
+                                               boolean debugAM)
+    throws YarnException, IOException {
+
+
+    deployedClusterName = clustername;
+    SliderUtils.validateClusterName(clustername);
+    verifyNoLiveClusters(clustername);
+    Configuration config = getConfig();
+    lookupZKQuorum();
+    boolean clusterSecure = SliderUtils.isHadoopClusterSecure(config);
+    //create the Slider AM provider -this helps set up the AM
+    SliderAMClientProvider sliderAM = new SliderAMClientProvider(config);
+
+    instanceDefinition.resolve();
+    launchedInstanceDefinition = instanceDefinition;
+
+    ConfTreeOperations internalOperations =
+      instanceDefinition.getInternalOperations();
+    MapOperations internalOptions = internalOperations.getGlobalOptions();
+    ConfTreeOperations resourceOperations =
+      instanceDefinition.getResourceOperations();
+    ConfTreeOperations appOperations =
+      instanceDefinition.getAppConfOperations();
+    Path generatedConfDirPath =
+      createPathThatMustExist(internalOptions.getMandatoryOption(
+        InternalKeys.INTERNAL_GENERATED_CONF_PATH));
+    Path snapshotConfPath =
+      createPathThatMustExist(internalOptions.getMandatoryOption(
+        InternalKeys.INTERNAL_SNAPSHOT_CONF_PATH));
+
+
+    // cluster Provider
+    AbstractClientProvider provider = createClientProvider(
+      internalOptions.getMandatoryOption(
+        InternalKeys.INTERNAL_PROVIDER_NAME));
+    // make sure the conf dir is valid;
+    
+    if (log.isDebugEnabled()) {
+      log.debug(instanceDefinition.toString());
+    }
+    MapOperations sliderAMResourceComponent =
+      resourceOperations.getOrAddComponent(SliderKeys.COMPONENT_AM);
+    MapOperations resourceGlobalOptions = resourceOperations.getGlobalOptions();
+
+    // add the tags if available
+    Set<String> applicationTags = provider.getApplicationTags(sliderFileSystem,
+        appOperations.getGlobalOptions().get(AgentKeys.APP_DEF));
+    AppMasterLauncher amLauncher = new AppMasterLauncher(clustername,
+        SliderKeys.APP_TYPE,
+        config,
+        sliderFileSystem,
+        yarnClient,
+        clusterSecure,
+        sliderAMResourceComponent,
+        resourceGlobalOptions,
+        applicationTags);
+
+    ApplicationId appId = amLauncher.getApplicationId();
+    // set the application name;
+    amLauncher.setKeepContainersOverRestarts(true);
+
+    int maxAppAttempts = config.getInt(KEY_AM_RESTART_LIMIT, 0);
+    amLauncher.setMaxAppAttempts(maxAppAttempts);
+
+    sliderFileSystem.purgeAppInstanceTempFiles(clustername);
+    Path tempPath = sliderFileSystem.createAppInstanceTempPath(
+        clustername,
+        appId.toString() + "/am");
+    String libdir = "lib";
+    Path libPath = new Path(tempPath, libdir);
+    sliderFileSystem.getFileSystem().mkdirs(libPath);
+    log.debug("FS={}, tempPath={}, libdir={}", sliderFileSystem.toString(),
+              tempPath, libPath);
+    // set local resources for the application master
+    // local files or archives as needed
+    // In this scenario, the jar file for the application master is part of the local resources
+    Map<String, LocalResource> localResources = amLauncher.getLocalResources();
+    // conf directory setup
+    Path remoteConfPath = null;
+    String relativeConfDir = null;
+    String confdirProp =
+      System.getProperty(SliderKeys.PROPERTY_CONF_DIR);
+    if (confdirProp == null || confdirProp.isEmpty()) {
+      log.debug("No local configuration directory provided as system property");
+    } else {
+      File confDir = new File(confdirProp);
+      if (!confDir.exists()) {
+        throw new BadConfigException(E_CONFIGURATION_DIRECTORY_NOT_FOUND,
+                                     confDir);
+      }
+      Path localConfDirPath = SliderUtils.createLocalPath(confDir);
+      log.debug("Copying AM configuration data from {}", localConfDirPath);
+      remoteConfPath = new Path(clusterDirectory,
+                                    SliderKeys.SUBMITTED_CONF_DIR);
+      SliderUtils.copyDirectory(config, localConfDirPath, remoteConfPath,
+          null);
+    }
+    // the assumption here is that minimr cluster => this is a test run
+    // and the classpath can look after itself
+
+    boolean usingMiniMRCluster = getUsingMiniMRCluster();
+    if (!usingMiniMRCluster) {
+
+      log.debug("Destination is not a MiniYARNCluster -copying full classpath");
+
+      // insert conf dir first
+      if (remoteConfPath != null) {
+        relativeConfDir = SliderKeys.SUBMITTED_CONF_DIR;
+        Map<String, LocalResource> submittedConfDir =
+          sliderFileSystem.submitDirectory(remoteConfPath,
+                                         relativeConfDir);
+        SliderUtils.mergeMaps(localResources, submittedConfDir);
+      }
+    }
+    // build up the configuration 
+    // IMPORTANT: it is only after this call that site configurations
+    // will be valid.
+
+    propagatePrincipals(config, instanceDefinition);
+    // validate security data
+/*
+    // turned off until tested
+    SecurityConfiguration securityConfiguration =
+        new SecurityConfiguration(config,
+            instanceDefinition, clustername);
+    
+*/
+    Configuration clientConfExtras = new Configuration(false);
+    // then build up the generated path.
+    FsPermission clusterPerms = getClusterDirectoryPermissions(config);
+    SliderUtils.copyDirectory(config, snapshotConfPath, generatedConfDirPath,
+        clusterPerms);
+
+
+    // standard AM resources
+    sliderAM.prepareAMAndConfigForLaunch(sliderFileSystem,
+                                       config,
+                                       amLauncher,
+                                       instanceDefinition,
+                                       snapshotConfPath,
+                                       generatedConfDirPath,
+                                       clientConfExtras,
+                                       libdir,
+                                       tempPath,
+                                       usingMiniMRCluster);
+    //add provider-specific resources
+    provider.prepareAMAndConfigForLaunch(sliderFileSystem,
+                                         config,
+                                         amLauncher,
+                                         instanceDefinition,
+                                         snapshotConfPath,
+                                         generatedConfDirPath,
+                                         clientConfExtras,
+                                         libdir,
+                                         tempPath,
+                                         usingMiniMRCluster);
+
+    // now that the site config is fully generated, the provider gets
+    // to do a quick review of them.
+    log.debug("Preflight validation of cluster configuration");
+
+
+    sliderAM.preflightValidateClusterConfiguration(sliderFileSystem,
+                                                 clustername,
+                                                 config,
+                                                 instanceDefinition,
+                                                 clusterDirectory,
+                                                 generatedConfDirPath,
+                                                 clusterSecure
+                                                );
+
+    provider.preflightValidateClusterConfiguration(sliderFileSystem,
+                                                   clustername,
+                                                   config,
+                                                   instanceDefinition,
+                                                   clusterDirectory,
+                                                   generatedConfDirPath,
+                                                   clusterSecure
+                                                  );
+
+
+    // TODO: consider supporting apps that don't have an image path
+    Path imagePath =
+        SliderUtils.extractImagePath(sliderFileSystem, internalOptions);
+    if (sliderFileSystem.maybeAddImagePath(localResources, imagePath)) {
+      log.debug("Registered image path {}", imagePath);
+    }
+
+    // build the environment
+    amLauncher.putEnv(
+      SliderUtils.buildEnvMap(sliderAMResourceComponent));
+    ClasspathConstructor classpath = SliderUtils.buildClasspath(relativeConfDir,
+        libdir,
+        getConfig(),
+        usingMiniMRCluster);
+    amLauncher.setClasspath(classpath);
+    if (log.isDebugEnabled()) {
+      log.debug("AM classpath={}", classpath);
+      log.debug("Environment Map:\n{}",
+                SliderUtils.stringifyMap(amLauncher.getEnv()));
+      log.debug("Files in lib path\n{}", sliderFileSystem.listFSDir(libPath));
+    }
+
+    // rm address
+
+    InetSocketAddress rmSchedulerAddress;
+    try {
+      rmSchedulerAddress = SliderUtils.getRmSchedulerAddress(config);
+    } catch (IllegalArgumentException e) {
+      throw new BadConfigException("%s Address invalid: %s",
+                                   YarnConfiguration.RM_SCHEDULER_ADDRESS,
+                                   config.get(
+                                     YarnConfiguration.RM_SCHEDULER_ADDRESS)
+      );
+
+    }
+    String rmAddr = NetUtils.getHostPortString(rmSchedulerAddress);
+
+    JavaCommandLineBuilder commandLine = new JavaCommandLineBuilder();
+    // insert any JVM options);
+    sliderAM.addJVMOptions(instanceDefinition, commandLine);
+    // enable asserts if the text option is set
+    commandLine.enableJavaAssertions();
+    // add the AM sevice entry point
+    commandLine.add(SliderAppMaster.SERVICE_CLASSNAME);
+
+    // create action and the cluster name
+    commandLine.add(ACTION_CREATE, clustername);
+
+    // debug
+    if (debugAM) {
+      commandLine.add(Arguments.ARG_DEBUG);
+    }
+
+    // set the cluster directory path
+    commandLine.add(Arguments.ARG_CLUSTER_URI, clusterDirectory.toUri());
+
+    if (!isUnset(rmAddr)) {
+      commandLine.add(Arguments.ARG_RM_ADDR, rmAddr);
+    }
+
+    if (serviceArgs.getFilesystemBinding() != null) {
+      commandLine.add(Arguments.ARG_FILESYSTEM, serviceArgs.getFilesystemBinding());
+    }
+    
+    addConfOptionToCLI(commandLine, config, REGISTRY_PATH,
+        DEFAULT_REGISTRY_PATH);
+    addMandatoryConfOptionToCLI(commandLine, config,
+        RegistryConstants.KEY_REGISTRY_ZK_QUORUM);
+
+    if (clusterSecure) {
+      // if the cluster is secure, make sure that
+      // the relevant security settings go over
+/*
+      addConfOptionToCLI(commandLine, config, KEY_SECURITY);
+*/
+      addConfOptionToCLI(commandLine,
+          config,
+          DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY);
+    }
+    // write out the path output
+    commandLine.addOutAndErrFiles(STDOUT_AM, STDERR_AM);
+
+    String cmdStr = commandLine.build();
+    log.debug("Completed setting up app master command {}", cmdStr);
+
+    amLauncher.addCommandLine(commandLine);
+
+    // the Slider AM gets to configure the AM requirements, not the custom provider
+    sliderAM.prepareAMResourceRequirements(sliderAMResourceComponent,
+        amLauncher.getResource());
+
+
+    // Set the priority for the application master
+
+    int amPriority = config.getInt(KEY_YARN_QUEUE_PRIORITY,
+                                   DEFAULT_YARN_QUEUE_PRIORITY);
+
+
+    amLauncher.setPriority(amPriority);
+
+    // Set the queue to which this application is to be submitted in the RM
+    // Queue for App master
+    String amQueue = config.get(KEY_YARN_QUEUE, DEFAULT_YARN_QUEUE);
+    String suppliedQueue = internalOperations.getGlobalOptions().get(InternalKeys.INTERNAL_QUEUE);
+    if(!SliderUtils.isUnset(suppliedQueue)) {
+      amQueue = suppliedQueue;
+      log.info("Using queue {} for the application instance.", amQueue);
+    }
+
+    if (amQueue != null) {
+      amLauncher.setQueue(amQueue);
+    }
+
+    // Submit the application to the applications manager
+    // SubmitApplicationResponse submitResp = applicationsManager.submitApplication(appRequest);
+    // Ignore the response as either a valid response object is returned on success
+    // or an exception thrown to denote some form of a failure
+    
+
+    // submit the application
+    LaunchedApplication launchedApplication = amLauncher.submitApplication();
+    return launchedApplication;
+  }
+  
+  
+  /**
+   * Wait for the launched app to be accepted
+   * @param waittime time in millis
+   * @return exit code
+   * @throws YarnException
+   * @throws IOException
+   */
+  public int waitForAppAccepted(LaunchedApplication launchedApplication, 
+                                int waittime) throws
+                                              YarnException,
+                                              IOException {
+    assert launchedApplication != null;
+    int exitCode;
+    // wait for the submit state to be reached
+    ApplicationReport report = launchedApplication.monitorAppToState(
+      YarnApplicationState.ACCEPTED,
+      new Duration(Constants.ACCEPT_TIME));
+
+
+    // may have failed, so check that
+    if (SliderUtils.hasAppFinished(report)) {
+      exitCode = buildExitCode(report);
+    } else {
+      // exit unless there is a wait
+      exitCode = EXIT_SUCCESS;
+
+      if (waittime != 0) {
+        // waiting for state to change
+        Duration duration = new Duration(waittime * 1000);
+        duration.start();
+        report = launchedApplication.monitorAppToState(
+          YarnApplicationState.RUNNING, duration);
+        if (report != null &&
+            report.getYarnApplicationState() == YarnApplicationState.RUNNING) {
+          exitCode = EXIT_SUCCESS;
+        } else {
+
+          launchedApplication.kill("");
+          exitCode = buildExitCode(report);
+        }
+      }
+    }
+    return exitCode;
+  }
+
+
+  /**
+   * Propagate any critical principals from the current site config down to the HBase one.
+   * @param config config to read from
+   * @param clusterSpec cluster spec
+   */
+  private void propagatePrincipals(Configuration config,
+                                   AggregateConf clusterSpec) {
+    String dfsPrincipal = config.get(
+        DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY);
+    if (dfsPrincipal != null) {
+      String siteDfsPrincipal = OptionKeys.SITE_XML_PREFIX +
+                                DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY;
+      clusterSpec.getAppConfOperations().getGlobalOptions().putIfUnset(
+        siteDfsPrincipal,
+        dfsPrincipal);
+    }
+  }
+
+
+  private boolean addConfOptionToCLI(CommandLineBuilder cmdLine,
+      Configuration conf,
+      String key) {
+    String val = conf.get(key);
+    return defineIfSet(cmdLine, key, val);
+  }
+
+  private String addConfOptionToCLI(CommandLineBuilder cmdLine,
+      Configuration conf,
+      String key,
+      String defVal) {
+    String val = conf.get(key, defVal);
+    define(cmdLine, key, val);
+    return val;
+  }
+
+  /**
+   * Add a <code>-D key=val</code> command to the CLI
+   * @param cmdLine command line
+   * @param key key
+   * @param val value
+   */
+  private void define(CommandLineBuilder cmdLine, String key, String val) {
+    Preconditions.checkArgument(key != null, "null key");
+    Preconditions.checkArgument(val != null, "null value");
+    cmdLine.add(Arguments.ARG_DEFINE, key + "=" + val);
+  }
+
+  /**
+   * Add a <code>-D key=val</code> command to the CLI if <code>val</code>
+   * is not null
+   * @param cmdLine command line
+   * @param key key
+   * @param val value
+   */
+  private boolean defineIfSet(CommandLineBuilder cmdLine, String key, String val) {
+    Preconditions.checkArgument(key != null, "null key");
+    if (val != null) {
+      define(cmdLine, key, val);
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  private void addMandatoryConfOptionToCLI(CommandLineBuilder cmdLine,
+      Configuration conf,
+      String key) throws BadConfigException {
+    if (!addConfOptionToCLI(cmdLine, conf, key)) {
+      throw new BadConfigException("Missing configuration option: " + key);
+    }
+  }
+  
+  /**
+   * Create a path that must exist in the cluster fs
+   * @param uri uri to create
+   * @return the path
+   * @throws FileNotFoundException if the path does not exist
+   */
+  public Path createPathThatMustExist(String uri) throws
+      SliderException,
+                                                  IOException {
+    return sliderFileSystem.createPathThatMustExist(uri);
+  }
+
+  /**
+   * verify that a live cluster isn't there
+   * @param clustername cluster name
+   * @throws SliderException with exit code EXIT_CLUSTER_LIVE
+   * if a cluster of that name is either live or starting up.
+   */
+  public void verifyNoLiveClusters(String clustername) throws
+                                                       IOException,
+                                                       YarnException {
+    List<ApplicationReport> existing = findAllLiveInstances(clustername);
+
+    if (!existing.isEmpty()) {
+      throw new SliderException(EXIT_APPLICATION_IN_USE,
+                              clustername + ": " + E_CLUSTER_RUNNING + " :" +
+                              existing.get(0));
+    }
+  }
+
+  public String getUsername() throws IOException {
+    return RegistryUtils.currentUser();
+  }
+
+  /**
+   * Get the name of any deployed cluster
+   * @return the cluster name
+   */
+  public String getDeployedClusterName() {
+    return deployedClusterName;
+  }
+
+  @VisibleForTesting
+  public void setDeployedClusterName(String deployedClusterName) {
+    this.deployedClusterName = deployedClusterName;
+  }
+
+  /**
+   * ask if the client is using a mini MR cluster
+   * @return true if they are
+   */
+  private boolean getUsingMiniMRCluster() {
+    return getConfig().getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER,
+        false);
+  }
+
+  /**
+   * Get the application name used in the zookeeper root paths
+   * @return an application-specific path in ZK
+   */
+  private String getAppName() {
+    return "slider";
+  }
+
+  /**
+   * Wait for the app to start running (or go past that state)
+   * @param duration time to wait
+   * @return the app report; null if the duration turned out
+   * @throws YarnException YARN or app issues
+   * @throws IOException IO problems
+   */
+  @VisibleForTesting
+  public ApplicationReport monitorAppToRunning(Duration duration)
+    throws YarnException, IOException {
+    return monitorAppToState(YarnApplicationState.RUNNING, duration);
+  }
+
+  /**
+   * Build an exit code for an application Id and its report.
+   * If the report parameter is null, the app is killed
+   * @param report report
+   * @return the exit code
+   */
+  private int buildExitCode(ApplicationReport report) throws
+                                                      IOException,
+                                                      YarnException {
+    if (null == report) {
+      forceKillApplication("Reached client specified timeout for application");
+      return EXIT_TIMED_OUT;
+    }
+
+    YarnApplicationState state = report.getYarnApplicationState();
+    FinalApplicationStatus dsStatus = report.getFinalApplicationStatus();
+    switch (state) {
+      case FINISHED:
+        if (FinalApplicationStatus.SUCCEEDED == dsStatus) {
+          log.info("Application has completed successfully");
+          return EXIT_SUCCESS;
+        } else {
+          log.info("Application finished unsuccessfully." +
+                   "YarnState = {}, DSFinalStatus = {} Breaking monitoring loop",
+                   state, dsStatus);
+          return EXIT_YARN_SERVICE_FINISHED_WITH_ERROR;
+        }
+
+      case KILLED:
+        log.info("Application did not finish. YarnState={}, DSFinalStatus={}",
+                 state, dsStatus);
+        return EXIT_YARN_SERVICE_KILLED;
+
+      case FAILED:
+        log.info("Application Failed. YarnState={}, DSFinalStatus={}", state,
+                 dsStatus);
+        return EXIT_YARN_SERVICE_FAILED;
+      default:
+        //not in any of these states
+        return EXIT_SUCCESS;
+    }
+  }
+
+  /**
+   * Monitor the submitted application for reaching the requested state.
+   * Will also report if the app reaches a later state (failed, killed, etc)
+   * Kill application if duration!= null & time expires. 
+   * Prerequisite: the applicatin was launched.
+   * @param desiredState desired state.
+   * @param duration how long to wait -must be more than 0
+   * @return the application report -null on a timeout
+   * @throws YarnException
+   * @throws IOException
+   */
+  @VisibleForTesting
+  public ApplicationReport monitorAppToState(
+    YarnApplicationState desiredState,
+    Duration duration)
+    throws YarnException, IOException {
+    LaunchedApplication launchedApplication =
+      new LaunchedApplication(applicationId, yarnClient);
+    return launchedApplication.monitorAppToState(desiredState, duration);
+  }
+
+  @Override
+  public ApplicationReport getApplicationReport() throws
+                                                  IOException,
+                                                  YarnException {
+    return getApplicationReport(applicationId);
+  }
+
+  @Override
+  public boolean forceKillApplication(String reason)
+    throws YarnException, IOException {
+    if (applicationId != null) {
+      new LaunchedApplication(applicationId, yarnClient).forceKill(reason);
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * List Slider instances belonging to a specific user
+   * @param user user: "" means all users
+   * @return a possibly empty list of Slider AMs
+   */
+  @VisibleForTesting
+  public List<ApplicationReport> listSliderInstances(String user)
+    throws YarnException, IOException {
+    return YarnAppListClient.listInstances();
+  }
+
+  @Override
+  @VisibleForTesting
+  public int actionList(String clustername) throws IOException, YarnException {
+    verifyBindingsDefined();
+
+    String user = UserGroupInformation.getCurrentUser().getUserName();
+    List<ApplicationReport> instances = listSliderInstances(user);
+
+    if (isUnset(clustername)) {
+      log.info("Instances for {}: {}",
+               (user != null ? user : "all users"),
+               instances.size());
+      for (ApplicationReport report : instances) {
+        logAppReport(report);
+      }
+      return EXIT_SUCCESS;
+    } else {
+      SliderUtils.validateClusterName(clustername);
+      log.debug("Listing cluster named {}", clustername);
+      ApplicationReport report =
+        findClusterInInstanceList(instances, clustername);
+      if (report != null) {
+        logAppReport(report);
+        return EXIT_SUCCESS;
+      } else {
+        throw unknownClusterException(clustername);
+      }
+    }
+  }
+
+  /**
+   * Log the application report at INFO
+   * @param report report to log
+   */
+  public void logAppReport(ApplicationReport report) {
+    log.info(SliderUtils.appReportToString(report, "\n"));
+  }
+
+  @Override
+  @VisibleForTesting
+  public int actionFlex(String name, ActionFlexArgs args) throws YarnException, IOException {
+    verifyBindingsDefined();
+    SliderUtils.validateClusterName(name);
+    log.debug("actionFlex({})", name);
+    Map<String, Integer> roleInstances = new HashMap<String, Integer>();
+    Map<String, String> roleMap = args.getComponentMap();
+    for (Map.Entry<String, String> roleEntry : roleMap.entrySet()) {
+      String key = roleEntry.getKey();
+      String val = roleEntry.getValue();
+      try {
+        roleInstances.put(key, Integer.valueOf(val));
+      } catch (NumberFormatException e) {
+        throw new BadCommandArgumentsException("Requested count of role %s" +
+                                               " is not a number: \"%s\"",
+                                               key, val);
+      }
+    }
+    return flex(name, roleInstances);
+  }
+
+  @Override
+  @VisibleForTesting
+  public int actionExists(String name, boolean checkLive) throws YarnException, IOException {
+    verifyBindingsDefined();
+    SliderUtils.validateClusterName(name);
+    log.debug("actionExists({}, {})", name, checkLive);
+
+    //initial probe for a cluster in the filesystem
+    Path clusterDirectory = sliderFileSystem.buildClusterDirPath(name);
+    if (!sliderFileSystem.getFileSystem().exists(clusterDirectory)) {
+      throw unknownClusterException(name);
+    }
+    
+    //test for liveness if desired
+
+    if (checkLive) {
+      ApplicationReport instance = findInstance(name);
+      if (instance == null) {
+        log.info("Cluster {} not running", name);
+        return EXIT_FALSE;
+      } else {
+        // the app exists, but it may be in a terminated state
+        SliderUtils.OnDemandReportStringifier report =
+          new SliderUtils.OnDemandReportStringifier(instance);
+        YarnApplicationState state =
+          instance.getYarnApplicationState();
+        if (state.ordinal() >= YarnApplicationState.FINISHED.ordinal()) {
+          //cluster in the list of apps but not running
+          log.info("Cluster {} found but is in state {}", name, state);
+          log.debug("State {}", report);
+          return EXIT_FALSE;
+        }
+        log.info("Cluster {} is live:\n{}", name, report);
+      }
+    } else {
+      log.info("Cluster {} exists", name);
+    }
+    return EXIT_SUCCESS;
+  }
+
+
+  @Override
+  public int actionKillContainer(String name,
+      ActionKillContainerArgs args) throws YarnException, IOException {
+    String id = args.id;
+    if (SliderUtils.isUnset(id)) {
+      throw new BadCommandArgumentsException("Missing container id");
+    }
+    log.info("killingContainer {}:{}", name, id);
+    SliderClusterOperations clusterOps =
+      new SliderClusterOperations(bondToCluster(name));
+    try {
+      clusterOps.killContainer(id);
+    } catch (NoSuchNodeException e) {
+      throw new BadClusterStateException("Container %s not found in cluster %s",
+                                         id, name);
+    }
+    return EXIT_SUCCESS;
+  }
+
+  @Override
+  public String actionEcho(String name, ActionEchoArgs args) throws
+                                                             YarnException,
+                                                             IOException {
+    String message = args.message;
+    if (message == null) {
+      throw new BadCommandArgumentsException("missing message");
+    }
+    SliderClusterOperations clusterOps =
+      new SliderClusterOperations(bondToCluster(name));
+    return clusterOps.echo(message);
+  }
+
+  /**
+   * Get at the service registry operations
+   * @return registry client -valid after the service is inited.
+   */
+  public YarnAppListClient getYarnAppListClient() {
+    return YarnAppListClient;
+  }
+
+  /**
+   * Find an instance of an application belonging to the current user
+   * @param appname application name
+   * @return the app report or null if none is found
+   * @throws YarnException YARN issues
+   * @throws IOException IO problems
+   */
+  private ApplicationReport findInstance(String appname)
+      throws YarnException, IOException {
+    return YarnAppListClient.findInstance(appname);
+  }
+  
+  private RunningApplication findApplication(String appname)
+      throws YarnException, IOException {
+    ApplicationReport applicationReport = findInstance(appname);
+    return applicationReport != null ? new RunningApplication(yarnClient, applicationReport): null; 
+      
+  }
+
+  /**
+   * find all live instances of a specific app -if there is >1 in the cluster,
+   * this returns them all. State should be running or less
+   * @param appname application name
+   * @return the list of all matching application instances
+   */
+  private List<ApplicationReport> findAllLiveInstances(String appname)
+    throws YarnException, IOException {
+    
+    return YarnAppListClient.findAllLiveInstances(appname);
+  }
+
+
+  public ApplicationReport findClusterInInstanceList(List<ApplicationReport> instances,
+                                                     String appname) {
+    return yarnClient.findClusterInInstanceList(instances, appname);
+  }
+
+  /**
+   * Connect to a Slider AM
+   * @param app application report providing the details on the application
+   * @return an instance
+   * @throws YarnException
+   * @throws IOException
+   */
+  private SliderClusterProtocol connect(ApplicationReport app)
+      throws YarnException, IOException {
+
+    try {
+      return RpcBinder.getProxy(getConfig(),
+                                yarnClient.getRmClient(),
+                                app,
+                                Constants.CONNECT_TIMEOUT,
+                                Constants.RPC_TIMEOUT);
+    } catch (InterruptedException e) {
+      throw new SliderException(SliderExitCodes.EXIT_TIMED_OUT,
+                              e,
+                              "Interrupted waiting for communications with the Slider AM");
+    }
+  }
+
+  @Override
+  @VisibleForTesting
+  public int actionStatus(String clustername, ActionStatusArgs statusArgs) throws
+                                              YarnException,
+                                              IOException {
+    verifyBindingsDefined();
+    SliderUtils.validateClusterName(clustername);
+    String outfile = statusArgs.getOutput();
+    ClusterDescription status = getClusterDescription(clustername);
+    String text = status.toJsonString();
+    if (outfile == null) {
+      log.info(text);
+    } else {
+      status.save(new File(outfile).getAbsoluteFile());
+    }
+    return EXIT_SUCCESS;
+  }
+
+  @Override
+  public int actionVersion() {
+    SliderVersionInfo.loadAndPrintVersionInfo(log);
+    return EXIT_SUCCESS;
+  }
+
+  @Override
+  public int actionFreeze(String clustername,
+      ActionFreezeArgs freezeArgs) throws YarnException, IOException {
+    verifyBindingsDefined();
+    SliderUtils.validateClusterName(clustername);
+    int waittime = freezeArgs.getWaittime();
+    String text = freezeArgs.message;
+    boolean forcekill = freezeArgs.force;
+    log.debug("actionFreeze({}, reason={}, wait={}, force={})", clustername,
+              text,
+              waittime,
+              forcekill);
+    
+    //is this actually a known cluster?
+    sliderFileSystem.locateInstanceDefinition(clustername);
+    ApplicationReport app = findInstance(clustername);
+    if (app == null) {
+      // exit early
+      log.info("Cluster {} not running", clustername);
+      // not an error to stop a stopped cluster
+      return EXIT_SUCCESS;
+    }
+    log.debug("App to stop was found: {}:\n{}", clustername,
+              new SliderUtils.OnDemandReportStringifier(app));
+    if (app.getYarnApplicationState().ordinal() >=
+        YarnApplicationState.FINISHED.ordinal()) {
+      log.info("Cluster {} is a terminated state {}", clustername,
+               app.getYarnApplicationState());
+      return EXIT_SUCCESS;
+    }
+    LaunchedApplication application = new LaunchedApplication(yarnClient, app);
+    applicationId = application.getApplicationId();
+    
+    if (forcekill) {
+      //escalating to forced kill
+      application.kill("Forced stop of " + clustername +
+                       ": " + text);
+    } else {
+      try {
+        SliderClusterProtocol appMaster = connect(app);
+        Messages.StopClusterRequestProto r =
+          Messages.StopClusterRequestProto
+                  .newBuilder()
+                  .setMessage(text)
+                  .build();
+        appMaster.stopCluster(r);
+
+        log.debug("Cluster stop command issued");
+
+      } catch (YarnException e) {
+        log.warn("Exception while trying to terminate {}: {}", clustername, e);
+        return EXIT_FALSE;
+      } catch (IOException e) {
+        log.warn("Exception while trying to terminate {}: {}", clustername, e);
+        return EXIT_FALSE;
+      }
+    }
+
+    //wait for completion. We don't currently return an exception during this process
+    //as the stop operation has been issued, this is just YARN.
+    try {
+      if (waittime > 0) {
+        ApplicationReport applicationReport =
+          application.monitorAppToState(YarnApplicationState.FINISHED,
+                                        new Duration(waittime * 1000));
+        if (applicationReport == null) {
+          log.info("application did not shut down in time");
+          return EXIT_FALSE;
+        }
+      }
+
+// JDK7    } catch (YarnException | IOException e) {
+    } catch (YarnException e) {
+      log.warn("Exception while waiting for the cluster {} to shut down: {}",
+               clustername, e);
+    } catch ( IOException e) {
+      log.warn("Exception while waiting for the cluster {} to shut down: {}",
+               clustername, e);
+    }
+
+    return EXIT_SUCCESS;
+  }
+
+  @Override
+  public int actionThaw(String clustername, ActionThawArgs thaw) throws YarnException, IOException {
+    SliderUtils.validateClusterName(clustername);
+    // see if it is actually running and bail out;
+    verifyBindingsDefined();
+    verifyNoLiveClusters(clustername);
+
+
+    //start the cluster
+    return startCluster(clustername, thaw);
+  }
+
+  /**
+   * Implement flexing
+   * @param clustername name of the cluster
+   * @param roleInstances map of new role instances
+   * @return EXIT_SUCCESS if the #of nodes in a live cluster changed
+   * @throws YarnException
+   * @throws IOException
+   */
+  public int flex(String clustername, Map<String, Integer> roleInstances)
+      throws YarnException, IOException {
+    verifyBindingsDefined();
+    SliderUtils.validateClusterName(clustername);
+    Path clusterDirectory = sliderFileSystem.buildClusterDirPath(clustername);
+    AggregateConf instanceDefinition = loadInstanceDefinitionUnresolved(
+      clustername,
+      clusterDirectory);
+
+    ConfTreeOperations resources =
+      instanceDefinition.getResourceOperations();
+    for (Map.Entry<String, Integer> entry : roleInstances.entrySet()) {
+      String role = entry.getKey();
+      int count = entry.getValue();
+      resources.getOrAddComponent(role).put(ResourceKeys.COMPONENT_INSTANCES,
+                                            Integer.toString(count));
+
+      log.debug("Flexed cluster specification ( {} -> {}) : \n{}",
+                role,
+                count,
+                resources);
+    }
+    SliderAMClientProvider sliderAM = new SliderAMClientProvider(getConfig());
+    AbstractClientProvider provider = createClientProvider(
+        instanceDefinition.getInternalOperations().getGlobalOptions().getMandatoryOption(
+            InternalKeys.INTERNAL_PROVIDER_NAME));
+    // slider provider to validate what there is
+    validateInstanceDefinition(sliderAM, instanceDefinition, sliderFileSystem);
+    validateInstanceDefinition(provider, instanceDefinition, sliderFileSystem);
+
+    int exitCode = EXIT_FALSE;
+    // save the specification
+    try {
+      InstanceIO.updateInstanceDefinition(sliderFileSystem, clusterDirectory, instanceDefinition);
+    } catch (LockAcquireFailedException e) {
+      // lock failure
+      log.debug("Failed to lock dir {}", clusterDirectory, e);
+      log.warn("Failed to save new resource definition to {} : {}", clusterDirectory, e);
+    }
+
+    // now see if it is actually running and tell it about the update if it is
+    ApplicationReport instance = findInstance(clustername);
+    if (instance != null) {
+      log.info("Flexing running cluster");
+      SliderClusterProtocol appMaster = connect(instance);
+      SliderClusterOperations clusterOps = new SliderClusterOperations(appMaster);
+      clusterOps.flex(instanceDefinition.getResources());
+      log.info("application instance size updated");
+      exitCode = EXIT_SUCCESS;
+    } else {
+      log.info("No running instance to update");
+    }
+    return exitCode;
+  }
+
+  /**
+   * Validate an instance definition against a provider.
+   * @param provider the provider performing the validation
+   * @param instanceDefinition the instance definition
+   * @throws SliderException if invalid.
+   */
+  protected void validateInstanceDefinition(AbstractClientProvider provider,
+      AggregateConf instanceDefinition, SliderFileSystem fs) throws SliderException {
+    try {
+      provider.validateInstanceDefinition(instanceDefinition, fs);
+    } catch (SliderException e) {
+      //problem, reject it
+      log.info("Error {} validating application instance definition ", e.getMessage());
+      log.debug("Error validating application instance definition ", e);
+      log.info(instanceDefinition.toString());
+      throw e;
+    }
+  }
+
+
+  /**
+   * Load the persistent cluster description
+   * @param clustername name of the cluster
+   * @return the description in the filesystem
+   * @throws IOException any problems loading -including a missing file
+   */
+  @VisibleForTesting
+  public AggregateConf loadPersistedClusterDescription(String clustername)
+      throws IOException, SliderException, LockAcquireFailedException {
+    Path clusterDirectory = sliderFileSystem.buildClusterDirPath(clustername);
+    ConfPersister persister = new ConfPersister(sliderFileSystem, clusterDirectory);
+    AggregateConf instanceDescription = new AggregateConf();
+    persister.load(instanceDescription);
+    return instanceDescription;
+  }
+
+    /**
+     * Connect to a live cluster and get its current state
+     * @param clustername the cluster name
+     * @return its description
+     */
+  @VisibleForTesting
+  public ClusterDescription getClusterDescription(String clustername) throws
+                                                                 YarnException,
+                                                                 IOException {
+    SliderClusterOperations clusterOperations =
+      createClusterOperations(clustername);
+    return clusterOperations.getClusterDescription();
+  }
+
+  /**
+   * Connect to the cluster and get its current state
+   * @return its description
+   */
+  @VisibleForTesting
+  public ClusterDescription getClusterDescription() throws
+                                               YarnException,
+                                               IOException {
+    return getClusterDescription(getDeployedClusterName());
+  }
+
+  /**
+   * List all node UUIDs in a role
+   * @param role role name or "" for all
+   * @return an array of UUID strings
+   * @throws IOException
+   * @throws YarnException
+   */
+  @VisibleForTesting
+  public String[] listNodeUUIDsByRole(String role) throws
+                                               IOException,
+                                               YarnException {
+    return createClusterOperations()
+              .listNodeUUIDsByRole(role);
+  }
+
+  /**
+   * List all nodes in a role. This is a double round trip: once to list
+   * the nodes in a role, another to get their details
+   * @param role component/role to look for
+   * @return an array of ContainerNode instances
+   * @throws IOException
+   * @throws YarnException
+   */
+  @VisibleForTesting
+  public List<ClusterNode> listClusterNodesInRole(String role) throws
+                                               IOException,
+                                               YarnException {
+    return createClusterOperations().listClusterNodesInRole(role);
+  }
+
+  /**
+   * Get the details on a list of uuids
+   * @param uuids uuids to ask for 
+   * @return a possibly empty list of node details
+   * @throws IOException
+   * @throws YarnException
+   */
+  @VisibleForTesting
+  public List<ClusterNode> listClusterNodes(String[] uuids) throws
+                                               IOException,
+                                               YarnException {
+
+    if (uuids.length == 0) {
+      // short cut on an empty list
+      return new LinkedList<ClusterNode>();
+    }
+    return createClusterOperations().listClusterNodes(uuids);
+  }
+
+  /**
+   * Get a node from the AM
+   * @param uuid uuid of node
+   * @return deserialized node
+   * @throws IOException IO problems
+   * @throws NoSuchNodeException if the node isn't found
+   */
+  @VisibleForTesting
+  public ClusterNode getNode(String uuid) throws IOException, YarnException {
+    return createClusterOperations().getNode(uuid);
+  }
+  
+  /**
+   * Get the instance definition from the far end
+   */
+  @VisibleForTesting
+  public AggregateConf getLiveInstanceDefinition() throws IOException, YarnException {
+    return createClusterOperations().getInstanceDefinition();
+  }
+
+  /**
+   * Bond to a running cluster
+   * @param clustername cluster name
+   * @return the AM RPC client
+   * @throws SliderException if the cluster is unkown
+   */
+  private SliderClusterProtocol bondToCluster(String clustername) throws
+                                                                  YarnException,
+                                                                  IOException {
+    verifyBindingsDefined();
+    if (clustername == null) {
+      throw unknownClusterException("(undefined)");
+    }
+    ApplicationReport instance = findInstance(clustername);
+    if (null == instance) {
+      throw unknownClusterException(clustername);
+    }
+    return connect(instance);
+  }
+
+  /**
+   * Create a cluster operations instance against a given cluster
+   * @param clustername cluster name
+   * @return a bonded cluster operations instance
+   * @throws YarnException YARN issues
+   * @throws IOException IO problems
+   */
+  private SliderClusterOperations createClusterOperations(String clustername) throws
+                                                                            YarnException,
+                                                                            IOException {
+    SliderClusterProtocol sliderAM = bondToCluster(clustername);
+    return new SliderClusterOperations(sliderAM);
+  }
+
+  /**
+   * Create a cluster operations instance against the active cluster
+   * -returning any previous created one if held.
+   * @return a bonded cluster operations instance
+   * @throws YarnException YARN issues
+   * @throws IOException IO problems
+   */
+  public SliderClusterOperations createClusterOperations() throws
+                                                         YarnException,
+                                                         IOException {
+    if (sliderClusterOperations == null) {
+      sliderClusterOperations =
+        createClusterOperations(getDeployedClusterName());
+    }
+    return sliderClusterOperations;
+  }
+
+  /**
+   * Wait for an instance of a named role to be live (or past it in the lifecycle)
+   * @param role role to look for
+   * @param timeout time to wait
+   * @return the state. If still in CREATED, the cluster didn't come up
+   * in the time period. If LIVE, all is well. If >LIVE, it has shut for a reason
+   * @throws IOException IO
+   * @throws SliderException Slider
+   * @throws WaitTimeoutException if the wait timed out
+   */
+  @VisibleForTesting
+  public int waitForRoleInstanceLive(String role, long timeout)
+    throws WaitTimeoutException, IOException, YarnException {
+    return createClusterOperations().waitForRoleInstanceLive(role, timeout);
+  }
+
+  /**
+   * Generate an exception for an unknown cluster
+   * @param clustername cluster name
+   * @return an exception with text and a relevant exit code
+   */
+  public UnknownApplicationInstanceException unknownClusterException(String clustername) {
+    return UnknownApplicationInstanceException.unknownInstance(clustername);
+  }
+
+  @Override
+  public String toString() {
+    return "Slider Client in state " + getServiceState()
+           + " and Slider Application Instance " + deployedClusterName;
+  }
+
+  /**
+   * Get all YARN applications
+   * @return a possibly empty list
+   * @throws YarnException
+   * @throws IOException
+   */
+  @VisibleForTesting
+  public List<ApplicationReport> getApplications() throws YarnException, IOException {
+    return yarnClient.getApplications();
+  }
+
+  @VisibleForTesting
+  public ApplicationReport getApplicationReport(ApplicationId appId)
+    throws YarnException, IOException {
+    return new LaunchedApplication(appId, yarnClient).getApplicationReport();
+
+  }
+
+  /**
+   * The configuration used for deployment (after resolution)
+   * @return
+   */
+  @VisibleForTesting
+  public AggregateConf getLaunchedInstanceDefinition() {
+    return launchedInstanceDefinition;
+  }
+
+
+  @Override
+  public int actionResolve(ActionResolveArgs args)
+      throws YarnException, IOException {
+    // as this is an API entry point, validate
+    // the arguments
+    args.validate();
+    RegistryOperations operations = getRegistryOperations();
+    String path = SliderRegistryUtils.resolvePath(args.path);
+    ServiceRecordMarshal serviceRecordMarshal = new ServiceRecordMarshal();
+    try {
+      if (args.list) {
+        File destDir = args.destdir;
+        if (destDir != null) {
+          destDir.mkdirs();
+        }
+
+        Map<String, ServiceRecord> recordMap =
+            listServiceRecords(operations, path);
+
+        for (Entry<String, ServiceRecord> recordEntry : recordMap.entrySet()) {
+          String name = recordEntry.getKey();
+          ServiceRecord instance = recordEntry.getValue();
+          String json = serviceRecordMarshal.toJson(instance);
+          if (destDir == null) {
+            print(name);
+            print(json);
+          } else {
+            String filename = RegistryPathUtils.lastPathEntry(name) + ".json";
+            File jsonFile = new File(destDir, filename);
+            SliderUtils.write(jsonFile,
+                serviceRecordMarshal.toBytes(instance), true);
+          }
+        }
+      } else  {
+        // resolve single entry
+        ServiceRecord instance = resolve(path);
+        File outFile = args.out;
+        if (args.destdir != null) {
+          outFile = new File(args.destdir, RegistryPathUtils.lastPathEntry(path));
+        }
+        if (outFile != null) {
+          SliderUtils.write(outFile, serviceRecordMarshal.toBytes(instance), true);
+        } else {
+          print(serviceRecordMarshal.toJson(instance));
+        }
+      }
+//      TODO JDK7
+    } catch (PathNotFoundException e) {
+      // no record at this path
+      return EXIT_NOT_FOUND;
+    } catch (NoRecordException e) {
+      return EXIT_NOT_FOUND;
+    } catch (UnknownApplicationInstanceException e) {
+      return EXIT_NOT_FOUND;
+    } catch (InvalidRecordException e) {
+      // it is not a record
+      log.error("{}", e);
+      log.debug("{}", e, e);
+      return EXIT_EXCEPTION_THROWN;
+    }
+    return EXIT_SUCCESS;
+  }
+
+  @Override
+  public int actionRegistry(ActionRegistryArgs registryArgs) throws
+      YarnException,
+      IOException {
+    // as this is also a test entry point, validate
+    // the arguments
+    registryArgs.validate();
+    try {
+      if (registryArgs.list) {
+        actionRegistryList(registryArgs);
+      } else if (registryArgs.listConf) {
+        // list the configurations
+        actionRegistryListConfigsYarn(registryArgs);
+      } else if (SliderUtils.isSet(registryArgs.getConf)) {
+        // get a configuration
+        PublishedConfiguration publishedConfiguration =
+            actionRegistryGetConfig(registryArgs);
+        outputConfig(publishedConfiguration, registryArgs);
+      } else {
+        // it's an unknown command
+        log.info(ActionRegistryArgs.USAGE);
+        return EXIT_USAGE;
+      }
+//      JDK7
+    } catch (FileNotFoundException e) {
+      log.info("{}", e);
+      log.debug("{}", e, e);
+      return EXIT_NOT_FOUND;
+    } catch (PathNotFoundException e) {
+      log.info("{}", e);
+      log.debug("{}", e, e);
+      return EXIT_NOT_FOUND;
+    }
+    return EXIT_SUCCESS;
+  }
+
+  /**
+   * Registry operation
+   *
+   * @param registryArgs registry Arguments
+   * @return the instances (for tests)
+   * @throws YarnException YARN problems
+   * @throws IOException Network or other problems
+   */
+  @VisibleForTesting
+  public Collection<ServiceRecord> actionRegistryList(
+      ActionRegistryArgs registryArgs)
+      throws YarnException, IOException {
+    String serviceType = registryArgs.serviceType;
+    String name = registryArgs.name;
+    RegistryOperations operations = getRegistryOperations();
+    Collection<ServiceRecord> serviceRecords;
+    if (StringUtils.isEmpty(name)) {
+      String path =
+          serviceclassPath(
+              currentUser(),
+              serviceType);
+
+      try {
+        Map<String, ServiceRecord> recordMap =
+            listServiceRecords(operations, path);
+        if (recordMap.isEmpty()) {
+          throw new UnknownApplicationInstanceException(
+              "No applications registered under " + path);
+        }
+        serviceRecords = recordMap.values();
+      } catch (PathNotFoundException e) {
+        throw new UnknownApplicationInstanceException(path, e);
+      }
+    } else {
+      ServiceRecord instance = lookupServiceRecord(registryArgs);
+      serviceRecords = new ArrayList<ServiceRecord>(1);
+      serviceRecords.add(instance);
+    }
+
+    for (ServiceRecord serviceRecord : serviceRecords) {
+      logInstance(serviceRecord, registryArgs.verbose);
+    }
+    return serviceRecords;
+  }
+
+	@Override
+  public int actionDiagnostic(ActionDiagnosticArgs diagnosticArgs) {
+		try {
+			if (diagnosticArgs.client) {
+				actionDiagnosticClient(diagnosticArgs);
+			} else if (SliderUtils.isSet(diagnosticArgs.application)) {
+				actionDiagnosticApplication(diagnosticArgs);
+			} else if (SliderUtils.isSet(diagnosticArgs.slider)) {
+				actionDiagnosticSlider(diagnosticArgs);
+			} else if (diagnosticArgs.yarn) {
+				actionDiagnosticYarn(diagnosticArgs);
+			} else if (diagnosticArgs.credentials) {
+				actionDiagnosticCredentials();
+			} else if (SliderUtils.isSet(diagnosticArgs.all)) {
+				actionDiagnosticAll(diagnosticArgs);
+			} else if (SliderUtils.isSet(diagnosticArgs.level)) {
+				actionDiagnosticIntelligent(diagnosticArgs);
+			} else {
+				// it's an unknown command
+		        log.info(ActionDiagnosticArgs.USAGE);
+		        return EXIT_USAGE;
+			}
+		} catch (Exception e) {
+			log.error(e.toString());
+			return EXIT_FALSE;
+		}
+		return EXIT_SUCCESS;
+	}
+
+	private void actionDiagnosticIntelligent(ActionDiagnosticArgs diagnosticArgs)
+			throws YarnException, IOException, URISyntaxException {
+		// not using member variable clustername because we want to place
+		// application name after --application option and member variable
+		// cluster name has to be put behind action
+		String clusterName = diagnosticArgs.level;
+
+		try {
+			SliderUtils.validateClientConfigFile();
+			log.info("Slider-client.xml is accessible");
+		} catch (IOException e) {
+			// we are catching exceptions here because those are indication of
+			// validation result, and we need to print them here
+			log.error("validation of slider-client.xml fails because: "
+					+ e.toString(), e);
+			return;
+		}
+		SliderClusterOperations clusterOperations = createClusterOperations(clusterName);
+		// cluster not found exceptions will be thrown upstream
+		ClusterDescription clusterDescription = clusterOperations
+				.getClusterDescription();
+		log.info("Slider AppMaster is accessible");
+		
+		if (clusterDescription.state == ClusterDescription.STATE_LIVE) {
+			AggregateConf instanceDefinition = clusterOperations
+					.getInstanceDefinition();
+			String imagePath = instanceDefinition.getInternalOperations().get(
+					InternalKeys.INTERNAL_APPLICATION_IMAGE_PATH);
+			//if null, that means slider uploaded the agent tarball for the user
+			//and we need to use where slider has put
+			if(imagePath == null){
+				ApplicationReport appReport = findInstance(clusterName);
+				Path path1 = sliderFileSystem.getTempPathForCluster(clusterName);
+				Path subPath = new Path(path1, appReport.getApplicationId().toString() + "/am");
+				imagePath = subPath.toString();
+			}
+			try {
+				SliderUtils.validateHDFSFile(sliderFileSystem, imagePath);
+				log.info("Slider agent package is properly installed");
+			} catch (FileNotFoundException e) {
+				log.error("can not find agent package: {}", e);
+				return;
+			} catch (IOException e) {
+				log.error("can not open agent package: {}", e, e);
+				return;
+			}
+			String pkgTarballPath = instanceDefinition.getAppConfOperations()
+					.getGlobalOptions().getMandatoryOption(AgentKeys.APP_DEF);
+			try {
+				SliderUtils.validateHDFSFile(sliderFileSystem, pkgTarballPath);
+				log.info("Application package is properly installed");
+      } catch (FileNotFoundException e) {
+        log.error("can not find application package: {}", e);
+        return;
+ 			} catch (IOException e) {
+				log.error("can not open application package: {} ", e);
+				return;
+			}
+		}
+	}
+
+	private void actionDiagnosticAll(ActionDiagnosticArgs diagnosticArgs)
+			throws IOException, YarnException {
+		//assign application name from param to each sub diagnostic function
+		diagnosticArgs.application = diagnosticArgs.all;
+		diagnosticArgs.slider = diagnosticArgs.all;
+		actionDiagnosticClient(diagnosticArgs);
+		actionDiagnosticApplication(diagnosticArgs);
+		actionDiagnosticSlider(diagnosticArgs);
+		actionDiagnosticYarn(diagnosticArgs);
+		actionDiagnosticCredentials();
+	}
+
+	private void actionDiagnosticCredentials() throws BadConfigException, IOException
+			 {
+		if (SliderUtils.isHadoopClusterSecure(SliderUtils
+				.loadClientConfigurationResource())) {
+			String credentialCacheFileDescription = null;
+			try {
+				credentialCacheFileDescription = SliderUtils
+						.checkCredentialCacheFile();
+			} catch (BadConfigException e) {
+				log.error("The credential config is not valid: " + e.toString());
+				throw e;
+			} catch (IOException e) {
+				log.error("Unable to read the credential file: " + e.toString());
+				throw e;
+			}
+			log.info("Credential cache file for the current user: "
+					+ credentialCacheFileDescription);
+		} else {
+			log.info("the cluster is not in secure mode");
+		}
+	}
+
+	private void actionDiagnosticYarn(ActionDiagnosticArgs diagnosticArgs) throws IOException, YarnException {
+		JSONObject converter = null;
+		log.info("the node in the YARN cluster has below state: ");
+		List<NodeReport> yarnClusterInfo;
+		try {
+			yarnClusterInfo = yarnClient.getNodeReports(NodeState.RUNNING);
+		} catch (YarnException e1) {
+			log.error("Exception happened when fetching node report from the YARN cluster: " + e1.toString());
+			throw e1;
+		} catch (IOException e1) {
+			log.error("Network problem happened when fetching node report YARN cluster: " + e1.toString());
+			throw e1;
+		}
+		for(NodeReport nodeReport : yarnClusterInfo){
+			log.info(nodeReport.toString());
+		}
+		
+		if (diagnosticArgs.verbose) {
+			Writer configWriter = new StringWriter();
+			try {
+				Configuration.dumpConfiguration(yarnClient.getConfig(), configWriter);
+			} catch (IOException e1) {
+				log.error("Network problem happened when retrieving YARN config from YARN: " + e1.toString());
+				throw e1;
+			}
+			try {
+				converter = new JSONObject(configWriter.toString());
+				log.info("the configuration of the YARN cluster is: "
+						+ converter.toString(2));
+				
+			} catch (JSONException e) {
+				log.error("JSONException happened during parsing response from YARN: " + e.toString());
+			}
+		}
+	}
+
+	private void actionDiagnosticSlider(ActionDiagnosticArgs diagnosticArgs) throws YarnException, IOException
+			{
+		// not using member variable clustername because we want to place
+		// application name after --application option and member variable
+		// cluster name has to be put behind action
+		String clusterName = diagnosticArgs.slider;
+		SliderClusterOperations clusterOperations;
+		AggregateConf instanceDefinition = null;
+		try {
+			clusterOperations = createClusterOperations(clusterName);
+			instanceDefinition = clusterOperations
+					.getInstanceDefinition();
+		} catch (YarnException e) {
+			log.error("Exception happened when retrieving instance definition from YARN: " + e.toString());
+			throw e;
+		} catch (IOException e) {
+			log.error("Network problem happened when retrieving instance definition from YARN: " + e.toString());
+			throw e;
+		}
+		String imagePath = instanceDefinition.getInternalOperations().get(
+				InternalKeys.INTERNAL_APPLICATION_IMAGE_PATH);
+		//if null, it will be uploaded by Slider and thus at slider's path
+		if(imagePath == null){
+			ApplicationReport appReport = findInstance(clusterName);
+			Path path1 = sliderFileSystem.getTempPathForCluster(clusterName);
+			Path subPath = new Path(path1, appReport.getApplicationId().toString() + "/am");
+			imagePath = subPath.toString();
+		}
+		log.info("The path of slider agent tarball on HDFS is: " + imagePath);
+	}
+
+	private void actionDiagnosticApplication(ActionDiagnosticArgs diagnosticArgs) throws YarnException, IOException
+			{
+		// not using member variable clustername because we want to place
+		// application name after --application option and member variable
+		// cluster name has to be put behind action
+		String clusterName = diagnosticArgs.application;
+		SliderClusterOperations clusterOperations;
+		AggregateConf instanceDefinition = null;
+		try {
+			clusterOperations = createClusterOperations(clusterName);
+			instanceDefinition = clusterOperations
+					.getInstanceDefinition();
+		} catch (YarnException e) {
+			log.error("Exception happened when retrieving instance definition from YARN: " + e.toString());
+			throw e;
+		} catch (IOException e) {
+			log.error("Network problem happened when retrieving instance definition from YARN: " + e.toString());
+			throw e;
+		}
+		String clusterDir = instanceDefinition.getAppConfOperations()
+				.getGlobalOptions().get(AgentKeys.APP_ROOT);
+		String pkgTarball = instanceDefinition.getAppConfOperations()
+				.getGlobalOptions().get(AgentKeys.APP_DEF);
+		String runAsUser = instanceDefinition.getAppConfOperations()
+				.getGlobalOptions().get(AgentKeys.RUNAS_USER);
+
+		log.info("The location of the cluster instance directory in HDFS is: "
+				+ clusterDir);
+		log.info("The name of the application package tarball on HDFS is: "
+				+ pkgTarball);
+		log.info("The runas user of the application in the cluster is: "
+				+ runAsUser);
+
+		if (diagnosticArgs.verbose) {
+			log.info("App config of the application: "
+					+ instanceDefinition.getAppConf().toJson());
+			log.info("Resource config of the application: "
+					+ instanceDefinition.getResources().toJson());
+		}
+	}
+
+  private void actionDiagnosticClient(ActionDiagnosticArgs diagnosticArgs)
+      throws SliderException, IOException {
+    try {
+      String currentCommandPath = SliderUtils.getCurrentCommandPath();
+      SliderVersionInfo.loadAndPrintVersionInfo(log);
+      String clientConfigPath = SliderUtils.getClientConfigPath();
+      String jdkInfo = SliderUtils.getJDKInfo();
+      println("The slider command path: %s", currentCommandPath);
+      println("The slider-client.xml used by current running command path: %s",
+          clientConfigPath);
+      println(jdkInfo);
+
+      // security info
+      Configuration config = getConfig();
+      if (SliderUtils.isHadoopClusterSecure(config)) { 
+        println("Hadoop Cluster is secure");
+        println("Login user is %s", UserGroupInformation.getLoginUser());
+        println("Current user is %s", UserGroupInformation.getCurrentUser());
+
+
+      } else {
+        println("Hadoop Cluster is insecure");
+      }
+
+
+      // verbose?
+      if (diagnosticArgs.verbose) {
+        // do the environment
+        Map<String, String> env = System.getenv();
+        Set<String> envList = ConfigHelper.sortedConfigKeys(env.entrySet());
+        StringBuilder builder = new StringBuilder("Environment variables:\n");
+        for (String key : envList) {
+          builder.append(key)
+                 .append("=")
+                 .append(env.get(key))
+                 .append("\n");
+        }
+        println(builder.toString());
+
+        // then the config
+        println("Slider client configuration:\n" +
+                ConfigHelper.dumpConfigToString(config));
+      }
+
+
+      SliderUtils.validateSliderClientEnvironment(log);
+    } catch (SliderException e) {
+      log.error(e.toString());
+      throw e;
+    } catch (IOException e) {
+      log.error(e.toString());
+      throw e;
+    }
+    
+	}
+
+
+  /**
+   * Log a service record instance
+   * @param instance record
+   * @param verbose verbose logging of all external endpoints
+   */
+  private void logInstance(ServiceRecord instance,
+      boolean verbose) {
+    if (!verbose) {
+      log.info("{}", instance.get(YarnRegistryAttributes.YARN_ID, ""));
+    } else {
+      log.info("{}: ", instance.get(YarnRegistryAttributes.YARN_ID, ""));
+      logEndpoints(instance);
+    }
+  }
+
+  /**
+   * Log the external endpoints of a service record
+   * @param instance service record instance
+   */
+  private void logEndpoints(ServiceRecord instance) {
+    List<Endpoint> endpoints = instance.external;
+    for (Endpoint endpoint : endpoints) {
+      log.info(endpoint.toString());
+    }
+  }
+
+ /**
+   * list configs available for an instance
+   *
+   * @param registryArgs registry Arguments
+   * @throws YarnException YARN problems
+   * @throws IOException Network or other problems
+   */
+  public void actionRegistryListConfigsYarn(ActionRegistryArgs registryArgs)
+      throws YarnException, IOException {
+
+    ServiceRecord instance = lookupServiceRecord(registryArgs);
+
+    RegistryRetriever retriever = new RegistryRetriever(instance);
+    PublishedConfigSet configurations =
+        retriever.getConfigurations(!registryArgs.internal);
+
+    for (String configName : configurations.keys()) {
+      if (!registryArgs.verbose) {
+        log.info("{}", configName);
+      } else {
+        PublishedConfiguration published =
+            configurations.get(configName);
+        log.info("{} : {}",
+            configName,
+            published.description);
+      }
+    }
+  }
+
+  /**
+   * list configs available for an instance
+   *
+   * @param registryArgs registry Arguments
+   * @throws YarnException YARN problems
+   * @throws IOException Network or other problems
+   * @throws FileNotFoundException if the config is not found
+   */
+  @VisibleForTesting
+  public PublishedConfiguration actionRegistryGetConfig(ActionRegistryArgs registryArgs)
+      throws YarnException, IOException {
+    ServiceRecord instance = lookupServiceRecord(registryArgs);
+
+    RegistryRetriever retriever = new RegistryRetriever(instance);
+    boolean external = !registryArgs.internal;
+    PublishedConfigSet configurations =
+        retriever.getConfigurations(external);
+
+    PublishedConfiguration published = retriever.retrieveConfiguration(configurations,
+            registryArgs.getConf,
+            external);
+    return published;
+  }
+
+  /**
+   * write out the config. If a destination is provided and that dir is a
+   * directory, the entry is written to it with the name provided + extension,
+   * else it is printed to standard out.
+   * @param published published config
+   * @param registryArgs registry Arguments
+   * @throws BadCommandArgumentsException
+   * @throws IOException
+   */
+  private void outputConfig(PublishedConfiguration published,
+      ActionRegistryArgs registryArgs) throws
+      BadCommandArgumentsException,
+      IOException {
+    // decide whether or not to print
+    String entry = registryArgs.getConf;
+    String format = registryArgs.format;
+    ConfigFormat configFormat = ConfigFormat.resolve(format);
+    if (configFormat == null) {
+      throw new BadCommandArgumentsException(
+          "Unknown/Unsupported format %s ", format);
+    }
+    PublishedConfigurationOutputter outputter =
+        PublishedConfigurationOutputter.createOutputter(configFormat,
+            published);
+    boolean print = registryArgs.out == null;
+    if (!print) {
+      File outputPath = registryArgs.out;
+      if (outputPath.isDirectory()) {
+        // creating it under a directory
+        outputPath = new File(outputPath, entry + "." + format);
+      }
+      log.debug("Destination path: {}", outputPath);
+      outputter.save(outputPath);
+    } else {
+      print(outputter.asString());
+    }
+    
+  }
+
+  /**
+   * Look up an instance
+   * @return instance data
+   * @throws SliderException other failures
+   * @throws IOException IO problems or wrapped exceptions
+   */
+  private ServiceRecord lookupServiceRecord(ActionRegistryArgs registryArgs) throws
+      SliderException,
+      IOException {
+    String user;
+    if (StringUtils.isNotEmpty(registryArgs.user)) {
+      user = RegistryPathUtils.encodeForRegistry(registryArgs.user);
+    } else {
+      user = currentUser();
+    }
+
+    String path = servicePath(user, registryArgs.serviceType,
+        registryArgs.name);
+    return resolve(path);
+  }
+
+  /**
+   * Look up an instance
+   * @param serviceType service type
+   * @param id instance ID
+   * @return instance data
+   * @throws UnknownApplicationInstanceException no path or service record
+   * at the end of the path
+   * @throws SliderException other failures
+   * @throws IOException IO problems or wrapped exceptions
+   */
+  public ServiceRecord lookupServiceRecord(String serviceType, String id)
+      throws IOException, SliderException {
+    String path = servicePath(currentUser(), serviceType, id);
+    return resolve(path);
+  }
+
+  /**
+   * 
+   * Look up an instance
+   * @param path path
+   * @return instance data
+   * @throws UnknownApplicationInstanceException no path or service record
+   * at the end of the path
+   * @throws SliderException other failures
+   * @throws IOException IO problems or wrapped exceptions
+   */
+  public ServiceRecord resolve(String path)
+      throws IOException, SliderException {
+    try {
+      return getRegistryOperations().resolve(
+          path);
+      // TODO JDK7 SWITCH
+    } catch (PathNotFoundException e) {
+      throw new UnknownApplicationInstanceException(e.getPath().toString(), e);
+    } catch (NoRecordException e) {
+      throw new UnknownApplicationInstanceException(e.getPath().toString(), e);
+    }
+  }
+
+  /**
+   * List instances in the registry for the current user
+   * @return a list of slider registry instances
+   * @throws IOException Any IO problem ... including no path in the registry
+   * to slider service classes for this user
+   * @throws YarnException
+   */
+
+  public Map<String, ServiceRecord> listRegistryInstances()
+      throws IOException, YarnException {
+    Map<String, ServiceRecord> recordMap = listServiceRecords(
+        getRegistryOperations(),
+        serviceclassPath(currentUser(), SliderKeys.APP_TYPE));
+    return recordMap;
+  }
+  
+  /**
+   * List instances in the registry
+   * @return the instance IDs
+   * @throws IOException
+   * @throws YarnException
+   */
+  public List<String> listRegisteredSliderInstances() throws
+      IOException,
+      YarnException {
+    try {
+      Map<String, ServiceRecord> recordMap = listServiceRecords(
+          getRegistryOperations(),
+          serviceclassPath(currentUser(), SliderKeys.APP_TYPE));
+      return new ArrayList<String>(recordMap.keySet());
+/// JDK7    } catch (YarnException | IOException e) {
+    } catch (IOException e) {
+      throw e;
+    } catch (YarnException e) {
+      throw e;
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+  /**
+   * Start the registry if it is not there yet
+   * @return the registry service
+   * @throws SliderException
+   * @throws IOException
+   */
+  private synchronized RegistryOperations maybeStartYarnRegistry()
+      throws SliderException, IOException {
+
+    if (registryOperations == null) {
+      registryOperations = startRegistryOperationsService();
+    }
+    return registryOperations;
+  }
+
+  @Override
+  public RegistryOperations getRegistryOperations()
+      throws SliderException, IOException {
+    return maybeStartYarnRegistry();
+  }
+
+  /**
+   * Output to standard out/stderr (implementation specific detail)
+   * @param src source
+   */
+  @SuppressWarnings("UseOfSystemOutOrSystemErr")
+  private static void print(CharSequence src) {
+    System.out.append(src);
+  }
+
+  /**
+   * Output to standard out/stderr with a newline after
+   * @param message message
+   */
+  private static void println(String message) {
+    print(message);
+    print("\n");
+  }
+  /**
+   * Output to standard out/stderr with a newline after, formatted
+   * @param message message
+   * @param args arguments for string formatting
+   */
+  private static void println(String message, Object ... args) {
+    print(String.format(message, args));
+    print("\n");
+  }
+
+}
+
+
diff --git a/slider-core/src/main/java/org/apache/slider/common/params/ActionRegistryArgs.java b/slider-core/src/main/java/org/apache/slider/common/params/ActionRegistryArgs.java
index deae4eb..1e4aba5 100644
--- a/slider-core/src/main/java/org/apache/slider/common/params/ActionRegistryArgs.java
+++ b/slider-core/src/main/java/org/apache/slider/common/params/ActionRegistryArgs.java
@@ -47,8 +47,10 @@
       + " ("
       + Arguments.ARG_LIST + "|"
       + Arguments.ARG_LISTCONF + "|"
+      + Arguments.ARG_LISTEXP + "|"
       + Arguments.ARG_LISTFILES + "|"
-      + Arguments.ARG_GETCONF + "> "
+      + Arguments.ARG_GETCONF + "|"
+      + Arguments.ARG_GETEXP + "> "
       + Arguments.ARG_NAME + " <name> "
       + " )"
       + "[" + Arguments.ARG_VERBOSE + "] "
@@ -56,6 +58,8 @@
       + "[" + Arguments.ARG_OUTPUT + " <filename> ] "
       + "[" + Arguments.ARG_SERVICETYPE + " <servicetype> ] "
       + "[" + Arguments.ARG_FORMAT + " <xml|json|properties>] "
+      + System.getProperty("line.separator")
+      + "Arguments.ARG_GETEXP only supports " + Arguments.ARG_FORMAT + " json"
       ;
   public ActionRegistryArgs() {
   }
@@ -90,7 +94,14 @@
       description = "get configuration")
   public String getConf;
 
-  @Parameter(names = {ARG_LISTFILES}, 
+  @Parameter(names = {ARG_LISTEXP},
+             description = "list exports")
+  public boolean listExports;
+  @Parameter(names = {ARG_GETEXP},
+             description = "get export")
+  public String getExport;
+
+  @Parameter(names = {ARG_LISTFILES},
       description = "list files")
   public String listFiles;
 
@@ -135,8 +146,8 @@
     super.validate();
 
     //verify that at most one of the operations is set
-    int gets = s(getConf) + s(getFiles);
-    int lists = s(list) + s(listConf) + s(listFiles);
+    int gets = s(getConf) + s(getFiles) + s(getExport);
+    int lists = s(list) + s(listConf) + s(listFiles) + s(listExports);
     int set = lists + gets;
     if (set > 1) {
       throw new UsageException(USAGE);
diff --git a/slider-core/src/main/java/org/apache/slider/common/params/Arguments.java b/slider-core/src/main/java/org/apache/slider/common/params/Arguments.java
index ff064c8..2c536d9 100644
--- a/slider-core/src/main/java/org/apache/slider/common/params/Arguments.java
+++ b/slider-core/src/main/java/org/apache/slider/common/params/Arguments.java
@@ -52,6 +52,7 @@
   String ARG_FORMAT = "--format";
   String ARG_FORCE = "--force";
   String ARG_GETCONF = "--getconf";
+  String ARG_GETEXP = "--getexp";
   String ARG_GETFILES = "--getfiles";
   String ARG_HELP = "--help";
   String ARG_ID = "--id";
@@ -60,6 +61,7 @@
   String ARG_LEVEL = "--level";
   String ARG_LIST = "--list";
   String ARG_LISTCONF = "--listconf";
+  String ARG_LISTEXP = "--listexp";
   String ARG_LISTFILES = "--listfiles";
   String ARG_LIVE = "--live";
   String ARG_MANAGER = "--manager";
diff --git a/slider-core/src/main/java/org/apache/slider/core/registry/docstore/ExportEntry.java b/slider-core/src/main/java/org/apache/slider/core/registry/docstore/ExportEntry.java
new file mode 100644
index 0000000..4bcf6c1
--- /dev/null
+++ b/slider-core/src/main/java/org/apache/slider/core/registry/docstore/ExportEntry.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.core.registry.docstore;
+
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+/**
+ * JSON-serializable description of a published key-val configuration.
+ *
+ * The values themselves are not serialized in the external view; they have to be served up by the far end
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+public class ExportEntry {
+
+  /**
+   * The value of the export
+   */
+  private String value;
+  /**
+   * The container id of the container that is responsible for the export
+   */
+  private String containerId;
+  /**
+   * Tag associated with the container - its usually an identifier different than container id
+   * that allows a soft serial id to all containers of a component - e.g. 1, 2, 3, ...
+   */
+  private String tag;
+  /**
+   * An export can be at the level of a component or an application
+   */
+  private String level;
+  /**
+   * The time when the export was updated
+   */
+  private String updatedTime;
+  /**
+   * The time when the export expires
+   */
+  private String validUntil;
+
+  public ExportEntry() {
+  }
+
+  public String getValue() {
+    return value;
+  }
+
+  public void setValue(String value) {
+    this.value = value;
+  }
+
+  public String getContainerId() {
+    return containerId;
+  }
+
+  public void setContainerId(String containerId) {
+    this.containerId = containerId;
+  }
+
+  public String getTag() {
+    return tag;
+  }
+
+  public void setTag(String tag) {
+    this.tag = tag;
+  }
+
+  public String getLevel() {
+    return level;
+  }
+
+  public void setLevel(String level) {
+    this.level = level;
+  }
+  public String getUpdatedTime() {
+    return updatedTime;
+  }
+
+  public void setUpdatedTime(String updatedTime) {
+    this.updatedTime = updatedTime;
+  }
+
+  public String getValidUntil() {
+    return validUntil;
+  }
+
+  public void setValidUntil(String validUntil) {
+    this.validUntil = validUntil;
+  }
+
+  @Override
+  public String toString() {
+    return new StringBuilder("ExportEntry{").
+        append("value='").append(value).append("',").
+        append("containerId='").append(containerId).append("',").
+        append("tag='").append(tag).append("',").
+        append("level='").append(level).append("'").
+        append("updatedTime='").append(updatedTime).append("'").
+        append("validUntil='").append(validUntil).append("'").
+        append(" }").toString();
+  }
+}
diff --git a/slider-core/src/main/java/org/apache/slider/core/registry/docstore/PublishedExports.java b/slider-core/src/main/java/org/apache/slider/core/registry/docstore/PublishedExports.java
new file mode 100644
index 0000000..d6d285c
--- /dev/null
+++ b/slider-core/src/main/java/org/apache/slider/core/registry/docstore/PublishedExports.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.core.registry.docstore;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * JSON-serializable description of a published key-val configuration.
+ *
+ * The values themselves are not serialized in the external view; they have to be served up by the far end
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+public class PublishedExports {
+
+  public String description;
+  public long updated;
+  public String updatedTime;
+  public Map<String, List<ExportEntry>> entries = new HashMap<String, List<ExportEntry>>();
+
+  public PublishedExports() {
+  }
+
+  /**
+   * build an empty published configuration
+   *
+   * @param description configuration description
+   */
+  public PublishedExports(String description) {
+    this.description = description;
+  }
+
+  /**
+   * Build a configuration from the entries
+   *
+   * @param description configuration description
+   * @param entries     entries to put
+   */
+  public PublishedExports(String description,
+                          Iterable<Map.Entry<String, List<ExportEntry>>> entries) {
+    this.description = description;
+    putValues(entries);
+  }
+
+  /**
+   * Is the configuration empty. This means either that it has not been given any values, or it is stripped down copy
+   * set down over the wire.
+   *
+   * @return
+   */
+  public boolean isEmpty() {
+    return entries.isEmpty();
+  }
+
+  public long getUpdated() {
+    return updated;
+  }
+
+  public void setUpdated(long updated) {
+    this.updated = updated;
+    this.updatedTime = new Date(updated).toString();
+  }
+
+  /**
+   * Set the values from an iterable (this includes a Hadoop Configuration and Java properties object). Any existing
+   * value set is discarded
+   *
+   * @param entries entries to put
+   */
+  public void putValues(Iterable<Map.Entry<String, List<ExportEntry>>> entries) {
+    this.entries = new HashMap<String, List<ExportEntry>>();
+    for (Map.Entry<String, List<ExportEntry>> entry : entries) {
+      this.entries.put(entry.getKey(), entry.getValue());
+    }
+  }
+
+  /**
+   * Return the values as json string
+   *
+   * @return
+   *
+   * @throws IOException
+   */
+  public String asJson() throws IOException {
+    ObjectMapper mapper = new ObjectMapper();
+    String json = mapper.writeValueAsString(entries);
+    return json;
+  }
+
+  /**
+   * This makes a copy without the nested content -so is suitable for returning as part of the list of a parent's
+   * values
+   *
+   * @return the copy
+   */
+  public PublishedExports shallowCopy() {
+    PublishedExports that = new PublishedExports();
+    that.description = this.description;
+    that.updated = this.updated;
+    that.updatedTime = this.updatedTime;
+    return that;
+  }
+
+  @Override
+  public String toString() {
+    final StringBuilder sb =
+        new StringBuilder("PublishedConfiguration{");
+    sb.append("description='").append(description).append('\'');
+    sb.append(" entries = ").append(entries.size());
+    sb.append('}');
+    return sb.toString();
+  }
+}
diff --git a/slider-core/src/main/java/org/apache/slider/core/registry/docstore/PublishedExportsOutputter.java b/slider-core/src/main/java/org/apache/slider/core/registry/docstore/PublishedExportsOutputter.java
new file mode 100644
index 0000000..b21e717
--- /dev/null
+++ b/slider-core/src/main/java/org/apache/slider/core/registry/docstore/PublishedExportsOutputter.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.core.registry.docstore;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+/** Output a published configuration */
+public abstract class PublishedExportsOutputter {
+
+  protected final PublishedExports exports;
+
+  protected PublishedExportsOutputter(PublishedExports exports) {
+    this.exports = exports;
+  }
+
+  /**
+   * Create an outputter for the chosen format
+   *
+   * @param format  format enumeration
+   * @param exports owning config
+   * @return the outputter
+   */
+
+  public static PublishedExportsOutputter createOutputter(ConfigFormat format,
+                                                         PublishedExports exports) {
+    Preconditions.checkNotNull(exports);
+    switch (format) {
+      case JSON:
+        return new JsonOutputter(exports);
+      default:
+        throw new RuntimeException("Unsupported format :" + format);
+    }
+  }
+
+  public void save(File dest) throws IOException {
+    FileOutputStream out = null;
+    try {
+      out = new FileOutputStream(dest);
+      save(out);
+      out.close();
+    } finally {
+      org.apache.hadoop.io.IOUtils.closeStream(out);
+    }
+  }
+
+  /**
+   * Save the content. The default saves the asString() value to the output stream
+   *
+   * @param out output stream
+   * @throws IOException
+   */
+  public void save(OutputStream out) throws IOException {
+    IOUtils.write(asString(), out, Charsets.UTF_8);
+  }
+
+  /**
+   * Convert to a string
+   *
+   * @return
+   * @throws IOException
+   */
+  public abstract String asString() throws IOException;
+
+  public static class JsonOutputter extends PublishedExportsOutputter {
+
+    public JsonOutputter(PublishedExports exports) {
+      super(exports);
+    }
+
+    @Override
+    public void save(File dest) throws IOException {
+      FileUtils.writeStringToFile(dest, asString(), Charsets.UTF_8);
+    }
+
+    @Override
+    public String asString() throws IOException {
+      return exports.asJson();
+    }
+  }
+}
diff --git a/slider-core/src/main/java/org/apache/slider/core/registry/docstore/PublishedExportsSet.java b/slider-core/src/main/java/org/apache/slider/core/registry/docstore/PublishedExportsSet.java
new file mode 100644
index 0000000..cdd35de
--- /dev/null
+++ b/slider-core/src/main/java/org/apache/slider/core/registry/docstore/PublishedExportsSet.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.core.registry.docstore;
+
+import org.apache.slider.server.appmaster.web.rest.RestPaths;
+import org.apache.slider.server.services.utility.PatternValidator;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+/**
+ * Represents a set of configurations for an application, component, etc.
+ * Json serialisable; accessors are synchronized
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+public class PublishedExportsSet {
+
+  private static final PatternValidator validator = new PatternValidator(
+      RestPaths.PUBLISHED_CONFIGURATION_REGEXP);
+  
+  public Map<String, PublishedExports> exports =
+      new HashMap<String, PublishedExports>();
+
+  public PublishedExportsSet() {
+  }
+
+  /**
+   * Put a name -it will be converted to lower case before insertion.
+   * Any existing entry will be overwritten (that includes an entry
+   * with a different case in the original name)
+   * @param name name of entry
+   * @param export published export
+   * @throws IllegalArgumentException if not a valid name
+   */
+  public void put(String name, PublishedExports export) {
+    String name1 = name.toLowerCase(Locale.ENGLISH);
+    validateName(name1);
+    exports.put(name1, export);
+  }
+
+  /**
+   * Validate the name -restricting it to the set defined in 
+   * {@link RestPaths#PUBLISHED_CONFIGURATION_REGEXP}
+   * @param name name to validate
+   * @throws IllegalArgumentException if not a valid name
+   */
+  public static void validateName(String name) {
+    validator.validate(name);
+    
+  }
+
+  public PublishedExports get(String name) {
+    return exports.get(name);
+  }
+  
+  public boolean contains(String name) {
+    return exports.containsKey(name);
+  }
+  
+  public int size() {
+    return exports.size();
+  }
+  
+  public Set<String> keys() {
+    TreeSet<String> keys = new TreeSet<String>();
+    keys.addAll(exports.keySet());
+    return keys;
+  }
+
+  public PublishedExportsSet shallowCopy() {
+    PublishedExportsSet that = new PublishedExportsSet();
+    for (Map.Entry<String, PublishedExports> entry :
+        exports.entrySet()) {
+      that.put(entry.getKey(), entry.getValue().shallowCopy());
+    }
+    return that;
+  }
+}
diff --git a/slider-core/src/main/java/org/apache/slider/core/registry/info/CustomRegistryConstants.java b/slider-core/src/main/java/org/apache/slider/core/registry/info/CustomRegistryConstants.java
index 65c122f..67b9feb 100644
--- a/slider-core/src/main/java/org/apache/slider/core/registry/info/CustomRegistryConstants.java
+++ b/slider-core/src/main/java/org/apache/slider/core/registry/info/CustomRegistryConstants.java
@@ -35,6 +35,9 @@
   public static final String PUBLISHER_CONFIGURATIONS_API =
       "org.apache.slider.publisher.configurations";
 
+  public static final String PUBLISHER_EXPORTS_API =
+      "org.apache.slider.publisher.exports";
+
   public static final String PUBLISHER_DOCUMENTS_API =
       "org.apache.slider.publisher.documents";
 
diff --git a/slider-core/src/main/java/org/apache/slider/core/registry/retrieve/RegistryRetriever.java b/slider-core/src/main/java/org/apache/slider/core/registry/retrieve/RegistryRetriever.java
index 101efb2..a91f515 100644
--- a/slider-core/src/main/java/org/apache/slider/core/registry/retrieve/RegistryRetriever.java
+++ b/slider-core/src/main/java/org/apache/slider/core/registry/retrieve/RegistryRetriever.java
@@ -33,6 +33,8 @@
 import org.apache.slider.core.exceptions.ExceptionConverter;
 import org.apache.slider.core.registry.docstore.PublishedConfigSet;
 import org.apache.slider.core.registry.docstore.PublishedConfiguration;
+import org.apache.slider.core.registry.docstore.PublishedExports;
+import org.apache.slider.core.registry.docstore.PublishedExportsSet;
 import org.apache.slider.core.registry.info.CustomRegistryConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -52,6 +54,8 @@
 
   private final String externalConfigurationURL;
   private final String internalConfigurationURL;
+  private final String externalExportsURL;
+  private final String internalExportsURL;
   private static final Client jerseyClient;
   
   static {
@@ -63,9 +67,12 @@
     jerseyClient.setFollowRedirects(true);
   }
   
-  public RegistryRetriever(String externalConfigurationURL, String internalConfigurationURL) {
-    this.externalConfigurationURL = externalConfigurationURL; 
-    this.internalConfigurationURL = internalConfigurationURL; 
+  public RegistryRetriever(String externalConfigurationURL, String internalConfigurationURL,
+                           String externalExportsURL, String internalExportsURL) {
+    this.externalConfigurationURL = externalConfigurationURL;
+    this.internalConfigurationURL = internalConfigurationURL;
+    this.externalExportsURL = externalExportsURL;
+    this.internalExportsURL = internalExportsURL;
   }
 
   /**
@@ -99,6 +106,29 @@
       }
     }
     externalConfigurationURL = url;
+
+    internal = record.getInternalEndpoint(
+        CustomRegistryConstants.PUBLISHER_EXPORTS_API);
+    url = null;
+    if (internal != null) {
+      List<String> addresses = RegistryTypeUtils.retrieveAddressesUriType(
+          internal);
+      if (addresses != null && !addresses.isEmpty()) {
+        url = addresses.get(0);
+      }
+    }
+    internalExportsURL = url;
+    external = record.getExternalEndpoint(
+        CustomRegistryConstants.PUBLISHER_EXPORTS_API);
+    url = null;
+    if (external != null) {
+      List<String> addresses =
+          RegistryTypeUtils.retrieveAddressesUriType(external);
+      if (addresses != null && !addresses.isEmpty()) {
+        url = addresses.get(0);
+      }
+    }
+    externalExportsURL = url;
   }
 
   /**
@@ -138,6 +168,33 @@
     return confURL;
   }
 
+  protected String getExportURL(boolean external) throws FileNotFoundException {
+    String confURL = external ? externalExportsURL: internalExportsURL;
+    if (Strings.isStringEmpty(confURL)) {
+      throw new FileNotFoundException("No configuration URL");
+    }
+    return confURL;
+  }
+
+  /**
+   * Get the configurations of the registry
+   * @param external flag to indicate that it is the external entries to fetch
+   * @return the configuration sets
+   */
+  public PublishedExportsSet getExports(boolean external) throws
+      FileNotFoundException, IOException {
+
+    String exportsUrl = getExportURL(external);
+    try {
+      WebResource webResource = jsonResource(exportsUrl);
+      log.debug("GET {}", exportsUrl);
+      PublishedExportsSet exportSet = webResource.get(PublishedExportsSet.class);
+      return exportSet;
+    } catch (UniformInterfaceException e) {
+      throw ExceptionConverter.convertJerseyException(exportsUrl, e);
+    }
+  }
+
   private WebResource resource(String url) {
     WebResource resource = jerseyClient.resource(url);
     return resource;
@@ -174,7 +231,33 @@
       throw ExceptionConverter.convertJerseyException(confURL, e);
     }
   }
-  
+
+  /**
+   * Get a complete export, with all values
+   * @param exportSet
+   * @param name name of the configuration
+   * @param external flag to indicate that it is an external configuration
+   * @return the retrieved config
+   * @throws IOException IO problems
+   */
+  public PublishedExports retrieveExports(PublishedExportsSet exportSet,
+                                                      String name,
+                                                      boolean external) throws IOException {
+    if (!exportSet.contains(name)) {
+      throw new FileNotFoundException("Unknown export " + name);
+    }
+    String exportsURL = getExportURL(external);
+    exportsURL = SliderUtils.appendToURL(exportsURL, name);
+    try {
+      WebResource webResource = jsonResource(exportsURL);
+      PublishedExports publishedExports =
+          webResource.get(PublishedExports.class);
+      return publishedExports;
+    } catch (UniformInterfaceException e) {
+      throw ExceptionConverter.convertJerseyException(exportsURL, e);
+    }
+  }
+
   @Override
   public String toString() {
     return super.toString() 
diff --git a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
index 44777c3..8b9b257 100644
--- a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
+++ b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
@@ -52,7 +52,9 @@
 import org.apache.slider.core.exceptions.SliderException;
 import org.apache.slider.core.launch.CommandLineBuilder;
 import org.apache.slider.core.launch.ContainerLauncher;
+import org.apache.slider.core.registry.docstore.ExportEntry;
 import org.apache.slider.core.registry.docstore.PublishedConfiguration;
+import org.apache.slider.core.registry.docstore.PublishedExports;
 import org.apache.slider.core.registry.info.CustomRegistryConstants;
 import org.apache.slider.providers.AbstractProviderService;
 import org.apache.slider.providers.ProviderCore;
@@ -95,10 +97,13 @@
 import java.net.URISyntaxException;
 import java.net.URL;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Locale;
@@ -106,6 +111,7 @@
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -127,10 +133,15 @@
   private static final String CONTAINER_ID = "container_id";
   private static final String GLOBAL_CONFIG_TAG = "global";
   private static final String LOG_FOLDERS_TAG = "LogFolders";
+  private static final String HOST_FOLDER_FORMAT = "%s:%s";
+  private static final String CONTAINER_LOGS_TAG = "container_log_dirs";
+  private static final String CONTAINER_PWDS_TAG = "container_work_dirs";
+  private static final String COMPONENT_TAG = "component";
+  private static final String APPLICATION_TAG = "application";
   private static final String COMPONENT_DATA_TAG = "ComponentInstanceData";
   private static final String SHARED_PORT_TAG = "SHARED";
   private static final String PER_CONTAINER_TAG = "{PER_CONTAINER}";
-  private static final int MAX_LOG_ENTRIES = 20;
+  private static final int MAX_LOG_ENTRIES = 40;
   private static final int DEFAULT_HEARTBEAT_MONITOR_INTERVAL = 60 * 1000;
 
   private final Object syncLock = new Object();
@@ -149,16 +160,25 @@
       new ConcurrentHashMap<String, ComponentInstanceState>();
   private final Map<String, Map<String, String>> componentInstanceData =
       new ConcurrentHashMap<String, Map<String, String>>();
-  private final Map<String, Map<String, String>> exportGroups =
-      new ConcurrentHashMap<String, Map<String, String>>();
+  private final Map<String, Map<String, List<ExportEntry>>> exportGroups =
+      new ConcurrentHashMap<String, Map<String, List<ExportEntry>>>();
   private final Map<String, Map<String, String>> allocatedPorts =
       new ConcurrentHashMap<String, Map<String, String>>();
-  private final Map<String, String> workFolders =
-      Collections.synchronizedMap(new LinkedHashMap<String, String>(MAX_LOG_ENTRIES, 0.75f, false) {
+
+  private final Map<String, ExportEntry> logFolderExports =
+      Collections.synchronizedMap(new LinkedHashMap<String, ExportEntry>(MAX_LOG_ENTRIES, 0.75f, false) {
         protected boolean removeEldestEntry(Map.Entry eldest) {
           return size() > MAX_LOG_ENTRIES;
         }
       });
+  private final Map<String, ExportEntry> workFolderExports =
+      Collections.synchronizedMap(new LinkedHashMap<String, ExportEntry>(MAX_LOG_ENTRIES, 0.75f, false) {
+        protected boolean removeEldestEntry(Map.Entry eldest) {
+          return size() > MAX_LOG_ENTRIES;
+        }
+      });
+  private final Map<String, Set<String>> containerExportsMap =
+      new HashMap<String, Set<String>>();
 
   /**
    * Create an instance of AgentProviderService
@@ -491,7 +511,7 @@
 
       Map<String, String> folders = registration.getLogFolders();
       if (folders != null && !folders.isEmpty()) {
-        publishLogFolderPaths(folders, containerId, roleName, hostFqdn);
+        publishFolderPaths(folders, containerId, roleName, hostFqdn);
       }
     } else {
       response.setResponseStatus(RegistrationStatus.FAILED);
@@ -558,7 +578,7 @@
       log.info("Component operation. Status: {}", result);
 
       if (command == Command.INSTALL && report.getFolders() != null && report.getFolders().size() > 0) {
-        publishLogFolderPaths(report.getFolders(), containerId, roleName, heartBeat.getFqdn());
+        publishFolderPaths(report.getFolders(), containerId, roleName, heartBeat.getFqdn());
       }
     }
 
@@ -643,6 +663,7 @@
 
     // component specific publishes
     processAndPublishComponentSpecificData(ports, containerId, fqdn, roleName);
+    processAndPublishComponentSpecificExports(ports, containerId, fqdn, roleName);
 
     // and update registration entries
     if (instance != null) {
@@ -692,7 +713,7 @@
       throw new IOException(e);
     }
   }
-  
+
   @Override
   public void notifyContainerCompleted(ContainerId containerId) {
     if (containerId != null) {
@@ -714,6 +735,25 @@
           }
         }
       }
+
+      synchronized (this.containerExportsMap) {
+        Set<String> containerExportSets = containerExportsMap.get(containerIdStr);
+        if (containerExportSets != null) {
+          for (String containerExportStr : containerExportSets) {
+            String[] parts = containerExportStr.split(":");
+            Map<String, List<ExportEntry>> exportGroup = getCurrentExports(parts[0]);
+            List<ExportEntry> exports = exportGroup.get(parts[1]);
+            List<ExportEntry> exportToRemove = new ArrayList<ExportEntry>();
+            for (ExportEntry export : exports) {
+              if (containerIdStr.equals(export.getContainerId())) {
+                exportToRemove.add(export);
+              }
+            }
+            exports.removeAll(exportToRemove);
+          }
+          containerExportsMap.remove(containerIdStr);
+        }
+      }
     }
   }
 
@@ -749,6 +789,16 @@
   }
 
   @VisibleForTesting
+  protected Map<String, ExportEntry> getLogFolderExports() {
+    return logFolderExports;
+  }
+
+  @VisibleForTesting
+  protected Map<String, ExportEntry> getWorkFolderExports() {
+    return workFolderExports;
+  }
+
+  @VisibleForTesting
   protected Metainfo getMetainfo() {
     return this.metainfo;
   }
@@ -896,15 +946,59 @@
    * @param hostFqdn
    * @param roleName
    */
-  protected void publishLogFolderPaths(
+  protected void publishFolderPaths(
       Map<String, String> folders, String containerId, String roleName, String hostFqdn) {
-    for (Map.Entry<String, String> entry: folders.entrySet()) {
-      workFolders.put(String.format("%s->%s->%s->%s", roleName, hostFqdn, entry.getKey(), containerId), 
-        entry.getValue());
+    Date now = new Date();
+    for (Map.Entry<String, String> entry : folders.entrySet()) {
+      ExportEntry exportEntry = new ExportEntry();
+      exportEntry.setValue(String.format(HOST_FOLDER_FORMAT, hostFqdn, entry.getValue()));
+      exportEntry.setContainerId(containerId);
+      exportEntry.setLevel(COMPONENT_TAG);
+      exportEntry.setTag(roleName);
+      exportEntry.setUpdatedTime(now.toString());
+      if (entry.getKey().equals("AGENT_LOG_ROOT")) {
+        synchronized (logFolderExports) {
+          getLogFolderExports().put(containerId, exportEntry);
+        }
+      } else {
+        synchronized (workFolderExports) {
+          getWorkFolderExports().put(containerId, exportEntry);
+        }
+      }
+      log.info("Updating log and pwd folders for container {}", containerId);
     }
 
-    publishApplicationInstanceData(LOG_FOLDERS_TAG, LOG_FOLDERS_TAG,
-                                   (new HashMap<String, String>(this.workFolders)).entrySet());
+    PublishedExports exports = new PublishedExports(CONTAINER_LOGS_TAG);
+    exports.setUpdated(now.getTime());
+    synchronized (logFolderExports) {
+      updateExportsFromList(exports, getLogFolderExports());
+    }
+    getAmState().getPublishedExportsSet().put(CONTAINER_LOGS_TAG, exports);
+
+    exports = new PublishedExports(CONTAINER_PWDS_TAG);
+    exports.setUpdated(now.getTime());
+    synchronized (workFolderExports) {
+      updateExportsFromList(exports, getWorkFolderExports());
+    }
+    getAmState().getPublishedExportsSet().put(CONTAINER_PWDS_TAG, exports);
+  }
+
+  /**
+   * Update the export data from the map
+   * @param exports
+   * @param folderExports
+   */
+  private void updateExportsFromList(PublishedExports exports, Map<String, ExportEntry> folderExports) {
+    Map<String, List<ExportEntry>> perComponentList = new HashMap<String, List<ExportEntry>>();
+    for(Map.Entry<String, ExportEntry> logEntry : folderExports.entrySet())
+    {
+      String componentName = logEntry.getValue().getTag();
+      if(!perComponentList.containsKey(componentName)) {
+        perComponentList.put(componentName, new ArrayList<ExportEntry>());
+      }
+      perComponentList.get(componentName).add(logEntry.getValue());
+    }
+    exports.putValues(perComponentList.entrySet());
   }
 
 
@@ -949,13 +1043,12 @@
             }
           }
 
-          List<ExportGroup> exportGroups = application.getExportGroups();
-          boolean hasExportGroups = exportGroups != null && !exportGroups.isEmpty();
+          List<ExportGroup> appExportGroups = application.getExportGroups();
+          boolean hasExportGroups = appExportGroups != null && !appExportGroups.isEmpty();
 
           Set<String> appExports = new HashSet();
           String appExportsStr = getApplicationComponent(roleName).getAppExports();
-          boolean hasNoAppExports = appExportsStr == null || appExportsStr.isEmpty();
-          if (!hasNoAppExports) {
+          if (SliderUtils.isSet(appExportsStr)) {
             for (String appExport : appExportsStr.split(",")) {
               if (appExport.trim().length() > 0) {
                 appExports.add(appExport.trim());
@@ -983,11 +1076,12 @@
             }
 
             Set<String> modifiedGroups = new HashSet<String>();
-            for (ExportGroup exportGroup : exportGroups) {
+            for (ExportGroup exportGroup : appExportGroups) {
               List<Export> exports = exportGroup.getExports();
               if (exports != null && !exports.isEmpty()) {
                 String exportGroupName = exportGroup.getName();
-                Map<String, String> map = getCurrentExports(exportGroupName);
+                ConcurrentHashMap<String, List<ExportEntry>> map =
+                    (ConcurrentHashMap<String, List<ExportEntry>>)getCurrentExports(exportGroupName);
                 for (Export export : exports) {
                   if (canBeExported(exportGroupName, export.getName(), appExports)) {
                     String value = export.getValue();
@@ -997,7 +1091,12 @@
                         value = value.replace(token, replaceTokens.get(token));
                       }
                     }
-                    map.put(export.getName(), value);
+                    ExportEntry entry = new ExportEntry();
+                    entry.setLevel(APPLICATION_TAG);
+                    entry.setValue(value);
+                    entry.setUpdatedTime(new Date().toString());
+                    // over-write, app exports are singletons
+                    map.put(export.getName(), new ArrayList(Arrays.asList(entry)));
                     log.info("Preparing to publish. Key {} and Value {}", export.getName(), value);
                   }
                 }
@@ -1019,11 +1118,11 @@
     return appExports.contains(String.format("%s-%s", exportGroupName, name));
   }
 
-  protected Map<String, String> getCurrentExports(String groupName) {
+  protected Map<String, List<ExportEntry>> getCurrentExports(String groupName) {
     if (!this.exportGroups.containsKey(groupName)) {
       synchronized (this.exportGroups) {
         if (!this.exportGroups.containsKey(groupName)) {
-          this.exportGroups.put(groupName, new ConcurrentHashMap<String, String>());
+          this.exportGroups.put(groupName, new ConcurrentHashMap<String, List<ExportEntry>>());
         }
       }
     }
@@ -1032,10 +1131,24 @@
   }
 
   private void publishModifiedExportGroups(Set<String> modifiedGroups) {
-    synchronized (this.exportGroups) {
-      for (String groupName : modifiedGroups) {
-        publishApplicationInstanceData(groupName, groupName, this.exportGroups.get(groupName).entrySet());
+    for (String groupName : modifiedGroups) {
+      Map<String, List<ExportEntry>> entries = this.exportGroups.get(groupName);
+
+      // Publish in old format for the time being
+      Map<String, String> simpleEntries = new HashMap<String, String>();
+      for (Map.Entry<String, List<ExportEntry>> entry : entries.entrySet()) {
+        List<ExportEntry> exports = entry.getValue();
+        if(exports != null && exports.size() > 0) {
+          // there is no support for multiple exports per name - so extract only the first one
+          simpleEntries.put(entry.getKey(), entry.getValue().get(0).getValue());
+        }
       }
+      publishApplicationInstanceData(groupName, groupName, simpleEntries.entrySet());
+
+      PublishedExports exports = new PublishedExports(groupName);
+      exports.setUpdated(new Date().getTime());
+      exports.putValues(entries.entrySet());
+      getAmState().getPublishedExportsSet().put(groupName, exports);
     }
   }
 
@@ -1090,14 +1203,102 @@
     }
   }
 
+  /** Publish component instance specific data if the component demands it */
+  protected void processAndPublishComponentSpecificExports(Map<String, String> ports,
+                                                           String containerId,
+                                                           String hostFqdn,
+                                                           String roleName) {
+    String portVarFormat = "${site.%s}";
+    String hostNamePattern = "${" + roleName + "_HOST}";
+
+    List<ExportGroup> appExportGroups = getMetainfo().getApplication().getExportGroups();
+    Component component = getMetainfo().getApplicationComponent(roleName);
+    if (component != null && SliderUtils.isSet(component.getCompExports())
+        && appExportGroups != null && appExportGroups.size() > 0) {
+
+      Set<String> compExports = new HashSet();
+      String compExportsStr = component.getCompExports();
+      for (String appExport : compExportsStr.split(",")) {
+        if (appExport.trim().length() > 0) {
+          compExports.add(appExport.trim());
+        }
+      }
+
+      Date now = new Date();
+      Set<String> modifiedGroups = new HashSet<String>();
+      for (ExportGroup exportGroup : appExportGroups) {
+        List<Export> exports = exportGroup.getExports();
+        if (exports != null && !exports.isEmpty()) {
+          String exportGroupName = exportGroup.getName();
+          ConcurrentHashMap<String, List<ExportEntry>> map =
+              (ConcurrentHashMap<String, List<ExportEntry>>) getCurrentExports(exportGroupName);
+          for (Export export : exports) {
+            if (canBeExported(exportGroupName, export.getName(), compExports)) {
+              log.info("Attempting to publish {} of group {} for component type {}",
+                       export.getName(), exportGroupName, roleName);
+              String templateToExport = export.getValue();
+              for (String portName : ports.keySet()) {
+                boolean publishData = false;
+                String portValPattern = String.format(portVarFormat, portName);
+                if (templateToExport.contains(portValPattern)) {
+                  templateToExport = templateToExport.replace(portValPattern, ports.get(portName));
+                  publishData = true;
+                }
+                if (templateToExport.contains(hostNamePattern)) {
+                  templateToExport = templateToExport.replace(hostNamePattern, hostFqdn);
+                  publishData = true;
+                }
+                if (publishData) {
+                  ExportEntry entryToAdd = new ExportEntry();
+                  entryToAdd.setLevel(COMPONENT_TAG);
+                  entryToAdd.setValue(templateToExport);
+                  entryToAdd.setUpdatedTime(now.toString());
+                  entryToAdd.setContainerId(containerId);
+
+                  List<ExportEntry> existingList =
+                      map.putIfAbsent(export.getName(), new CopyOnWriteArrayList(Arrays.asList(entryToAdd)));
+
+                  // in-place edit, no lock needed
+                  if (existingList != null) {
+                    boolean updatedInPlace = false;
+                    for (ExportEntry entry : existingList) {
+                      if (containerId.equalsIgnoreCase(entry.getContainerId())) {
+                        entryToAdd.setValue(templateToExport);
+                        entryToAdd.setUpdatedTime(now.toString());
+                        updatedInPlace = true;
+                      }
+                    }
+                    if (!updatedInPlace) {
+                      existingList.add(entryToAdd);
+                    }
+                  }
+
+                  log.info("Publishing {} for name {} and container {}",
+                           templateToExport, export.getName(), containerId);
+                  modifiedGroups.add(exportGroupName);
+                  synchronized (containerExportsMap) {
+                    if (!containerExportsMap.containsKey(containerId)) {
+                      containerExportsMap.put(containerId, new HashSet<String>());
+                    }
+                    Set<String> containerExportMaps = containerExportsMap.get(containerId);
+                    containerExportMaps.add(String.format("%s:%s", exportGroupName, export.getName()));
+                  }
+                }
+              }
+            }
+          }
+        }
+      }
+      publishModifiedExportGroups(modifiedGroups);
+    }
+  }
+
   private void publishComponentInstanceData() {
     Map<String, String> dataToPublish = new HashMap<String, String>();
-    synchronized (this.componentInstanceData) {
-      for (String container : getComponentInstanceData().keySet()) {
-        for (String prop : getComponentInstanceData().get(container).keySet()) {
-          dataToPublish.put(
-              container + "." + prop, getComponentInstanceData().get(container).get(prop));
-        }
+    for (String container : getComponentInstanceData().keySet()) {
+      for (String prop : getComponentInstanceData().get(container).keySet()) {
+        dataToPublish.put(
+            container + "." + prop, getComponentInstanceData().get(container).get(prop));
       }
     }
     publishApplicationInstanceData(COMPONENT_DATA_TAG, COMPONENT_DATA_TAG, dataToPublish.entrySet());
@@ -1580,5 +1781,4 @@
                   "");
     }
   }
-
 }
diff --git a/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/Component.java b/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/Component.java
index 9f3dd0f..418868c 100644
--- a/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/Component.java
+++ b/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/Component.java
@@ -34,6 +34,7 @@
   String maxInstanceCount;
   String autoStartOnFailure;
   String appExports;
+  String compExports;
   CommandScript commandScript;
   List<ComponentExport> componentExports;
 
@@ -82,6 +83,14 @@
     this.appExports = appExports;
   }
 
+  public String getCompExports() {
+    return compExports;
+  }
+
+  public void setCompExports(String compExports) {
+    this.compExports = compExports;
+  }
+
   public String getMinInstanceCount() {
     return minInstanceCount;
   }
diff --git a/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/MetainfoParser.java b/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/MetainfoParser.java
index c92c265..1d8403f 100644
--- a/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/MetainfoParser.java
+++ b/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/MetainfoParser.java
@@ -61,6 +61,7 @@
     digester.addBeanPropertySetter("*/component/maxInstanceCount");
     digester.addBeanPropertySetter("*/component/autoStartOnFailure");
     digester.addBeanPropertySetter("*/component/appExports");
+    digester.addBeanPropertySetter("*/component/compExports");
     digester.addObjectCreate("*/componentExport", ComponentExport.class);
     digester.addBeanPropertySetter("*/componentExport/name");
     digester.addBeanPropertySetter("*/componentExport/value");
diff --git a/slider-core/src/main/java/org/apache/slider/providers/slideram/SliderAMProviderService.java b/slider-core/src/main/java/org/apache/slider/providers/slideram/SliderAMProviderService.java
index 601c3f9..afe6428 100644
--- a/slider-core/src/main/java/org/apache/slider/providers/slideram/SliderAMProviderService.java
+++ b/slider-core/src/main/java/org/apache/slider/providers/slideram/SliderAMProviderService.java
@@ -146,6 +146,8 @@
 
       String configurationsURL = SliderUtils.appendToURL(
           publisherURL.toExternalForm(), RestPaths.SLIDER_CONFIGSET);
+      String exportsURL = SliderUtils.appendToURL(
+          publisherURL.toExternalForm(), RestPaths.SLIDER_EXPORTS);
 
       serviceRecord.addExternalEndpoint(
           RegistryTypeUtils.webEndpoint(
@@ -166,6 +168,10 @@
           RegistryTypeUtils.restEndpoint(
               CustomRegistryConstants.PUBLISHER_CONFIGURATIONS_API,
               new URI(configurationsURL)));
+      serviceRecord.addExternalEndpoint(
+          RegistryTypeUtils.restEndpoint(
+              CustomRegistryConstants.PUBLISHER_EXPORTS_API,
+              new URI(exportsURL)));
 
     } catch (URISyntaxException e) {
       throw new IOException(e);
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/ProviderAppState.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/ProviderAppState.java
index a0871ae..9c5da12 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/ProviderAppState.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/ProviderAppState.java
@@ -26,6 +26,7 @@
 import org.apache.slider.core.conf.ConfTreeOperations;
 import org.apache.slider.core.exceptions.NoSuchNodeException;
 import org.apache.slider.core.registry.docstore.PublishedConfigSet;
+import org.apache.slider.core.registry.docstore.PublishedExportsSet;
 import org.apache.slider.server.appmaster.web.rest.RestPaths;
 import org.apache.slider.server.services.utility.PatternValidator;
 
@@ -40,6 +41,7 @@
 
   private final Map<String, PublishedConfigSet> publishedConfigSets =
       new ConcurrentHashMap<String, PublishedConfigSet>(5);
+  private final PublishedExportsSet publishedExportsSets = new PublishedExportsSet();
   private static final PatternValidator validator = new PatternValidator(
       RestPaths.PUBLISHED_CONFIGURATION_SET_REGEXP);
   private String applicationName;
@@ -66,6 +68,11 @@
   }
 
   @Override
+  public PublishedExportsSet getPublishedExportsSet() {
+    return publishedExportsSets;
+  }
+
+  @Override
   public PublishedConfigSet getPublishedConfigSet(String name) {
     return publishedConfigSets.get(name);
   }
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/StateAccessForProviders.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/StateAccessForProviders.java
index 1714f75..b907b06 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/StateAccessForProviders.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/StateAccessForProviders.java
@@ -26,6 +26,7 @@
 import org.apache.slider.core.conf.ConfTreeOperations;
 import org.apache.slider.core.exceptions.NoSuchNodeException;
 import org.apache.slider.core.registry.docstore.PublishedConfigSet;
+import org.apache.slider.core.registry.docstore.PublishedExportsSet;
 
 import java.util.Collection;
 import java.util.List;
@@ -51,6 +52,12 @@
   PublishedConfigSet getPublishedSliderConfigurations();
 
   /**
+   * Get the published exports set
+   * @return
+   */
+  PublishedExportsSet getPublishedExportsSet();
+
+  /**
    * Get a named published config set
    * @param name name to look up
    * @return the instance or null
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/RestPaths.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/RestPaths.java
index 93601ad..94f1e4c 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/RestPaths.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/RestPaths.java
@@ -61,6 +61,7 @@
       = "[a-z0-9][a-z0-9_.\\+-]*";
 
   public static final String SLIDER_CONFIGSET = "slider";
+  public static final String SLIDER_EXPORTS = "exports";
 
   public static final String SLIDER_CLASSPATH = "classpath";
 }
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/publisher/PublisherResource.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/publisher/PublisherResource.java
index 5d8b657..e47bbb9 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/publisher/PublisherResource.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/publisher/PublisherResource.java
@@ -23,6 +23,8 @@
 import org.apache.slider.core.registry.docstore.PublishedConfigSet;
 import org.apache.slider.core.registry.docstore.PublishedConfiguration;
 import org.apache.slider.core.registry.docstore.PublishedConfigurationOutputter;
+import org.apache.slider.core.registry.docstore.PublishedExports;
+import org.apache.slider.core.registry.docstore.PublishedExportsSet;
 import org.apache.slider.core.registry.docstore.UriMap;
 import org.apache.slider.server.appmaster.state.StateAccessForProviders;
 import org.apache.slider.server.appmaster.web.WebAppApi;
@@ -56,7 +58,10 @@
   protected static final Logger log =
       LoggerFactory.getLogger(PublisherResource.class);
   private final WebAppApi slider;
-  public static final String SET_NAME = 
+  public static final String EXPORTS_NAME = "exports";
+  public static final String EXPORTS_RESOURCES_PATH = "/" + EXPORTS_NAME;
+  public static final String EXPORT_RESOURCE_PATH = EXPORTS_RESOURCES_PATH + "/{exportname}" ;
+  public static final String SET_NAME =
       "{setname: " + PUBLISHED_CONFIGURATION_SET_REGEXP + "}";
   private static final String CONFIG =
       SET_NAME + "/{config: " + PUBLISHED_CONFIGURATION_REGEXP + "}";
@@ -101,7 +106,9 @@
     UriMap uriMap = new UriMap();
     for (String name : appState.listConfigSets()) {
       uriMap.put(name, baseURL + name);
+      log.info("Tick tack {} and {}", name, baseURL);
     }
+    uriMap.put(EXPORTS_NAME, baseURL + EXPORTS_NAME);
     return uriMap;
   }
 
@@ -114,6 +121,26 @@
   }
 
   @GET
+  @Path(EXPORTS_RESOURCES_PATH)
+  @Produces({MediaType.APPLICATION_JSON})
+  public PublishedExportsSet gePublishedExports() {
+
+    PublishedExportsSet set = appState.getPublishedExportsSet();
+    return set.shallowCopy();
+  }
+
+  @GET
+  @Path(EXPORT_RESOURCE_PATH)
+  @Produces({MediaType.APPLICATION_JSON})
+  public PublishedExports getAMExports2(@PathParam("exportname") String exportname,
+                              @Context UriInfo uriInfo,
+                              @Context HttpServletResponse res) {
+    init(res, uriInfo);
+    PublishedExportsSet set = appState.getPublishedExportsSet();
+    return set.get(exportname);
+  }
+
+  @GET
   @Path("/"+ SET_NAME)
   @Produces({MediaType.APPLICATION_JSON})
   public PublishedConfigSet getPublishedConfiguration(
@@ -129,7 +156,7 @@
   }
 
   private void logRequest(UriInfo uriInfo) {
-    log.debug(uriInfo.getRequestUri().toString());
+    log.info(uriInfo.getRequestUri().toString());
   }
 
   @GET
diff --git a/slider-core/src/test/groovy/org/apache/slider/client/TestClientBadArgs.groovy b/slider-core/src/test/groovy/org/apache/slider/client/TestClientBadArgs.groovy
index d1f8a8f..7d596d6 100644
--- a/slider-core/src/test/groovy/org/apache/slider/client/TestClientBadArgs.groovy
+++ b/slider-core/src/test/groovy/org/apache/slider/client/TestClientBadArgs.groovy
@@ -24,8 +24,10 @@
 import org.apache.slider.common.params.ActionRegistryArgs
 import org.apache.slider.common.params.Arguments
 import org.apache.slider.common.params.SliderActions
+import org.apache.slider.core.exceptions.BadCommandArgumentsException
 import org.apache.slider.core.exceptions.ErrorStrings
 import org.apache.slider.core.exceptions.UsageException
+import org.apache.slider.core.main.ServiceLauncher
 import org.apache.slider.core.main.ServiceLauncherBaseTest
 import org.junit.Test
 
@@ -88,4 +90,45 @@
     log.info(exception.toString())
   }
 
+  @Test
+  public void testRegistryExportBadUsage1() throws Throwable {
+    def exception = launchExpectingException(SliderClient,
+        new Configuration(),
+        "Expected a value after parameter --getexp",
+        [SliderActions.ACTION_REGISTRY,
+            Arguments.ARG_NAME,
+            "cl1",
+            Arguments.ARG_GETEXP])
+    assert exception instanceof BadCommandArgumentsException
+    log.info(exception.toString())
+  }
+
+  @Test
+  public void testRegistryExportBadUsage2() throws Throwable {
+    def exception = launchExpectingException(SliderClient,
+        new Configuration(),
+        "Expected a value after parameter --getexp",
+        [SliderActions.ACTION_REGISTRY,
+            Arguments.ARG_NAME,
+            "cl1",
+            Arguments.ARG_LISTEXP,
+        Arguments.ARG_GETEXP])
+    assert exception instanceof BadCommandArgumentsException
+    log.info(exception.toString())
+  }
+
+  @Test
+  public void testRegistryExportBadUsage3() throws Throwable {
+    def exception = launchExpectingException(SliderClient,
+        new Configuration(),
+        "Usage: registry",
+        [SliderActions.ACTION_REGISTRY,
+            Arguments.ARG_NAME,
+            "cl1",
+            Arguments.ARG_LISTEXP,
+            Arguments.ARG_GETEXP,
+            "export1"])
+    assert exception instanceof UsageException
+    log.info(exception.toString())
+  }
 }
diff --git a/slider-core/src/test/java/org/apache/slider/providers/agent/TestAgentProviderService.java b/slider-core/src/test/java/org/apache/slider/providers/agent/TestAgentProviderService.java
index 7eff45a..a20e7a9 100644
--- a/slider-core/src/test/java/org/apache/slider/providers/agent/TestAgentProviderService.java
+++ b/slider-core/src/test/java/org/apache/slider/providers/agent/TestAgentProviderService.java
@@ -25,6 +25,7 @@
 import org.apache.hadoop.registry.client.api.RegistryConstants;
 import org.apache.hadoop.registry.client.api.RegistryOperations;
 import org.apache.hadoop.registry.client.types.ServiceRecord;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
@@ -45,6 +46,9 @@
 import org.apache.slider.core.conf.MapOperations;
 import org.apache.slider.core.exceptions.SliderException;
 import org.apache.slider.core.launch.ContainerLauncher;
+import org.apache.slider.core.registry.docstore.ExportEntry;
+import org.apache.slider.core.registry.docstore.PublishedExports;
+import org.apache.slider.core.registry.docstore.PublishedExportsSet;
 import org.apache.slider.providers.ProviderRole;
 import org.apache.slider.providers.agent.application.metadata.Application;
 import org.apache.slider.providers.agent.application.metadata.CommandOrder;
@@ -94,6 +98,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 
 import static org.easymock.EasyMock.anyObject;
 import static org.easymock.EasyMock.createNiceMock;
@@ -136,6 +141,10 @@
                                                + "              <name>Master_Status</name>\n"
                                                + "              <value>http://${HBASE_MASTER_HOST}:${site.hbase-site.hbase.master.info.port}/master-status</value>\n"
                                                + "            </export>\n"
+                                               + "            <export>\n"
+                                               + "              <name>Comp_Endpoint</name>\n"
+                                               + "              <value>http://${HBASE_REGIONSERVER_HOST}:${site.global.listen_port}</value>\n"
+                                               + "            </export>\n"
                                                + "          </exports>\n"
                                                + "        </exportGroup>\n"
                                                + "      </exportGroups>\n"
@@ -165,6 +174,7 @@
                                                + "          <publishConfig>true</publishConfig>\n"
                                                + "          <autoStartOnFailure>true</autoStartOnFailure>\n"
                                                + "          <appExports>QuickLinks-JMX_Endpoint,QuickLinks-Master_Status</appExports>\n"
+                                               + "          <compExports>QuickLinks-Comp_Endpoint</compExports>\n"
                                                + "          <minInstanceCount>1</minInstanceCount>\n"
                                                + "          <maxInstanceCount>2</maxInstanceCount>\n"
                                                + "          <commandScript>\n"
@@ -182,6 +192,7 @@
                                                + "            <script>scripts/hbase_regionserver.py</script>\n"
                                                + "            <scriptType>PYTHON</scriptType>\n"
                                                + "          </commandScript>\n"
+                                               + "          <compExports>QuickLinks-Comp_Endpoint</compExports>\n"
                                                + "          <componentExports>\n"
                                                + "            <componentExport>\n"
                                                + "              <name>PropertyA</name>\n"
@@ -322,10 +333,10 @@
         anyMap()
     );
 
-    doNothing().when(mockAps).publishLogFolderPaths(anyMap(),
-                                                    anyString(),
-                                                    anyString(),
-                                                    anyString()
+    doNothing().when(mockAps).publishFolderPaths(anyMap(),
+                                                 anyString(),
+                                                 anyString(),
+                                                 anyString()
     );
     expect(access.isApplicationLive()).andReturn(true).anyTimes();
     ClusterDescription desc = new ClusterDescription();
@@ -378,7 +389,7 @@
         anyMap()
     );
 
-    Mockito.verify(mockAps, Mockito.times(1)).publishLogFolderPaths(
+    Mockito.verify(mockAps, Mockito.times(1)).publishFolderPaths(
         anyMap(),
         anyString(),
         anyString(),
@@ -694,6 +705,92 @@
     }
   }
 
+
+  @Test
+  public void testComponentSpecificPublishes2() throws Exception {
+    InputStream metainfo_1 = new ByteArrayInputStream(metainfo_1_str.getBytes());
+    Metainfo metainfo = new MetainfoParser().parse(metainfo_1);
+    AgentProviderService aps = createAgentProviderService(new Configuration());
+    AgentProviderService mockAps = Mockito.spy(aps);
+    doNothing().when(mockAps).publishApplicationInstanceData(anyString(), anyString(), anyCollection());
+    doReturn(metainfo).when(mockAps).getMetainfo();
+    StateAccessForProviders access = createNiceMock(StateAccessForProviders.class);
+    doReturn(access).when(mockAps).getAmState();
+    PublishedExportsSet pubExpSet = new PublishedExportsSet();
+    expect(access.getPublishedExportsSet()).andReturn(pubExpSet).anyTimes();
+    replay(access);
+
+    Map<String, String> ports = new HashMap<String, String>();
+    ports.put("global.listen_port", "10010");
+    mockAps.processAndPublishComponentSpecificExports(ports,
+                                                      "mockcontainer_1",
+                                                      "host1",
+                                                      "HBASE_REGIONSERVER");
+    ArgumentCaptor<Collection> entriesCaptor = ArgumentCaptor.
+        forClass(Collection.class);
+    ArgumentCaptor<String> publishNameCaptor = ArgumentCaptor.
+        forClass(String.class);
+    Mockito.verify(mockAps, Mockito.times(1)).publishApplicationInstanceData(
+        anyString(),
+        publishNameCaptor.capture(),
+        entriesCaptor.capture());
+
+    PublishedExports pubExports = pubExpSet.get("QuickLinks".toLowerCase());
+    Assert.assertEquals(1, pubExports.entries.size());
+    Assert.assertEquals("QuickLinks", pubExports.description);
+    List<ExportEntry> expEntries = pubExports.entries.get("Comp_Endpoint");
+    Assert.assertEquals(1, expEntries.size());
+    Assert.assertEquals("mockcontainer_1", expEntries.get(0).getContainerId());
+    Assert.assertEquals("component", expEntries.get(0).getLevel());
+    Assert.assertEquals(null, expEntries.get(0).getTag());
+    Assert.assertEquals("http://host1:10010", expEntries.get(0).getValue());
+    Assert.assertNotNull(expEntries.get(0).getUpdatedTime());
+    Assert.assertNull(expEntries.get(0).getValidUntil());
+
+    assert entriesCaptor.getAllValues().size() == 1;
+    for (Collection coll : entriesCaptor.getAllValues()) {
+      Set<Map.Entry<String, String>> entrySet = (Set<Map.Entry<String, String>>) coll;
+      for (Map.Entry entry : entrySet) {
+        log.info("{}:{}", entry.getKey(), entry.getValue().toString());
+        if (entry.getKey().equals("Comp_Endpoint")) {
+          assert entry.getValue().toString().equals("http://host1:10010");
+        }
+      }
+    }
+    assert publishNameCaptor.getAllValues().size() == 1;
+    for (String coll : publishNameCaptor.getAllValues()) {
+      assert coll.equals("QuickLinks");
+    }
+
+    mockAps.notifyContainerCompleted(new MockContainerId(1));
+    pubExports = pubExpSet.get("QuickLinks".toLowerCase());
+    Assert.assertEquals(1, pubExports.entries.size());
+    Assert.assertEquals("QuickLinks", pubExports.description);
+    expEntries = pubExports.entries.get("Comp_Endpoint");
+    Assert.assertEquals(0, expEntries.size());
+
+    mockAps.notifyContainerCompleted(new MockContainerId(1));
+    mockAps.notifyContainerCompleted(new MockContainerId(2));
+
+    mockAps.processAndPublishComponentSpecificExports(ports,
+                                                      "mockcontainer_1",
+                                                      "host1",
+                                                      "HBASE_REGIONSERVER");
+    mockAps.processAndPublishComponentSpecificExports(ports,
+                                                      "mockcontainer_2",
+                                                      "host1",
+                                                      "HBASE_REGIONSERVER");
+    pubExports = pubExpSet.get("QuickLinks".toLowerCase());
+    Assert.assertEquals(1, pubExports.entries.size());
+    Assert.assertEquals("QuickLinks", pubExports.description);
+    expEntries = pubExports.entries.get("Comp_Endpoint");
+    Assert.assertEquals(2, expEntries.size());
+
+    mockAps.notifyContainerCompleted(new MockContainerId(2));
+    expEntries = pubExports.entries.get("Comp_Endpoint");
+    Assert.assertEquals(1, expEntries.size());
+  }
+
   @Test
   public void testProcessConfig() throws Exception {
     InputStream metainfo_1 = new ByteArrayInputStream(metainfo_1_str.getBytes());
@@ -728,6 +825,11 @@
     doNothing().when(mockAps).publishApplicationInstanceData(anyString(), anyString(), anyCollection());
     doReturn(metainfo).when(mockAps).getMetainfo();
     doReturn(roleClusterNodeMap).when(mockAps).getRoleClusterNodeMapping();
+    StateAccessForProviders access = createNiceMock(StateAccessForProviders.class);
+    doReturn(access).when(mockAps).getAmState();
+    PublishedExportsSet pubExpSet = new PublishedExportsSet();
+    expect(access.getPublishedExportsSet()).andReturn(pubExpSet).anyTimes();
+    replay(access);
 
     mockAps.publishConfigAndExportGroups(hb, componentStatus, "HBASE_MASTER");
     Assert.assertTrue(componentStatus.getConfigReported());
@@ -748,15 +850,30 @@
       }
     }
 
-    Map<String, String> exports = mockAps.getCurrentExports("QuickLinks");
+    Map<String, List<ExportEntry>> exports = mockAps.getCurrentExports("QuickLinks");
     Assert.assertEquals(2, exports.size());
-    Assert.assertEquals(exports.get("JMX_Endpoint"), "http://HOST1:60012/jmx");
+    Assert.assertEquals(exports.get("JMX_Endpoint").get(0).getValue(), "http://HOST1:60012/jmx");
 
     mockAps.publishConfigAndExportGroups(hb, componentStatus, "HBASE_REST");
     Mockito.verify(mockAps, Mockito.times(3)).publishApplicationInstanceData(
         anyString(),
         anyString(),
         entriesCaptor.capture());
+    PublishedExports pubExports = pubExpSet.get("QuickLinks".toLowerCase());
+    Assert.assertEquals(2, pubExports.entries.size());
+    Assert.assertEquals("QuickLinks", pubExports.description);
+    List<ExportEntry> expEntries = pubExports.entries.get("JMX_Endpoint");
+    Assert.assertEquals(1, expEntries.size());
+    Assert.assertEquals(null, expEntries.get(0).getContainerId());
+    Assert.assertEquals("application", expEntries.get(0).getLevel());
+    Assert.assertEquals(null, expEntries.get(0).getTag());
+    Assert.assertEquals("http://HOST1:60012/jmx", expEntries.get(0).getValue());
+    Assert.assertNull(expEntries.get(0).getValidUntil());
+
+    expEntries = pubExports.entries.get("Master_Status");
+    Assert.assertEquals(1, expEntries.size());
+    expEntries = pubExports.entries.get("JMX_Endpoint");
+    Assert.assertEquals("http://HOST1:60012/jmx", expEntries.get(0).getValue());
   }
 
   @Test
@@ -781,6 +898,7 @@
         Assert.assertEquals(component.getCategory(), "MASTER");
         Assert.assertEquals(component.getComponentExports().size(), 0);
         Assert.assertEquals(component.getAppExports(), "QuickLinks-JMX_Endpoint,QuickLinks-Master_Status");
+        Assert.assertEquals(component.getCompExports(), "QuickLinks-Comp_Endpoint");
         found++;
       }
       if (component.getName().equals("HBASE_REGIONSERVER")) {
@@ -807,7 +925,7 @@
     List<ExportGroup> egs = application.getExportGroups();
     ExportGroup eg = egs.get(0);
     Assert.assertEquals(eg.getName(), "QuickLinks");
-    Assert.assertEquals(eg.getExports().size(), 2);
+    Assert.assertEquals(eg.getExports().size(), 3);
 
     found = 0;
     for (Export export : eg.getExports()) {
@@ -975,15 +1093,18 @@
           anyString(),
           anyString(),
           any(HeartBeatResponse.class));
-      doNothing().when(mockAps).publishApplicationInstanceData(
+      doNothing().when(mockAps).publishFolderPaths(
+          anyMap(),
           anyString(),
           anyString(),
-          anyCollection());
+          anyString());
       doReturn(conf).when(mockAps).getConfig();
     } catch (SliderException e) {
     }
 
+    PublishedExportsSet pubExpSet = new PublishedExportsSet();
     expect(access.isApplicationLive()).andReturn(true).anyTimes();
+    expect(access.getPublishedExportsSet()).andReturn(pubExpSet).anyTimes();
     ClusterDescription desc = new ClusterDescription();
     desc.setOption(OptionKeys.ZOOKEEPER_QUORUM, "host1:2181");
     desc.setInfo(OptionKeys.APPLICATION_NAME, "HBASE");
@@ -996,6 +1117,7 @@
     treeOps.set(OptionKeys.APPLICATION_NAME, "HBASE");
     expect(access.getInstanceDefinitionSnapshot()).andReturn(aggConf).anyTimes();
     expect(access.getInternalsSnapshot()).andReturn(treeOps).anyTimes();
+    doNothing().when(mockAps).publishApplicationInstanceData(anyString(), anyString(), anyCollection());
     replay(access, ctx, container, sliderFileSystem, mockFs);
 
     // build two containers
@@ -1094,6 +1216,7 @@
       hb = new HeartBeat();
       hb.setResponseId(2);
       hb.setHostname("mockcontainer_1___HBASE_MASTER");
+      hb.setFqdn("host1");
       cr = new CommandReport();
       cr.setRole("HBASE_MASTER");
       cr.setRoleCommand("INSTALL");
@@ -1160,10 +1283,11 @@
       log.warn(he.getMessage());
     }
 
-    Mockito.verify(mockAps, Mockito.times(1)).publishApplicationInstanceData(
+    Mockito.verify(mockAps, Mockito.times(1)).publishFolderPaths(
+        anyMap(),
         anyString(),
         anyString(),
-        anyCollection());
+        anyString());
   }
 
   protected AgentProviderService createAgentProviderService(Configuration conf) throws
@@ -1193,6 +1317,44 @@
   }
 
   @Test
+  public void testPublishFolderPaths() throws IOException {
+    AgentProviderService aps = createAgentProviderService(new Configuration());
+    StateAccessForProviders access = createNiceMock(StateAccessForProviders.class);
+    AgentProviderService mockAps = Mockito.spy(aps);
+    doReturn(access).when(mockAps).getAmState();
+    PublishedExportsSet pubExpSet = new PublishedExportsSet();
+    expect(access.getPublishedExportsSet()).andReturn(pubExpSet).anyTimes();
+    replay(access);
+
+    Map<String, String> folders = new HashMap<String, String>();
+    folders.put("AGENT_LOG_ROOT", "aFolder");
+    folders.put("AGENT_WORK_ROOT", "folderB");
+    mockAps.publishFolderPaths(folders, "cid", "role", "fqdn");
+
+    PublishedExports exports = pubExpSet.get("container_log_dirs");
+    Assert.assertEquals(1, exports.entries.size());
+    List<ExportEntry> expEntries = exports.entries.get("role");
+    Assert.assertEquals(1, expEntries.size());
+    Assert.assertEquals("cid", expEntries.get(0).getContainerId());
+    Assert.assertEquals("component", expEntries.get(0).getLevel());
+    Assert.assertEquals("role", expEntries.get(0).getTag());
+    Assert.assertEquals("fqdn:aFolder", expEntries.get(0).getValue());
+    Assert.assertNull(expEntries.get(0).getValidUntil());
+    Assert.assertEquals(null, expEntries.get(0).getValidUntil());
+
+    exports = pubExpSet.get("container_work_dirs");
+    Assert.assertEquals(1, exports.entries.size());
+    expEntries = exports.entries.get("role");
+    Assert.assertEquals(1, expEntries.size());
+    Assert.assertEquals("cid", expEntries.get(0).getContainerId());
+    Assert.assertEquals("component", expEntries.get(0).getLevel());
+    Assert.assertEquals("role", expEntries.get(0).getTag());
+    Assert.assertEquals("fqdn:folderB", expEntries.get(0).getValue());
+    Assert.assertNull(expEntries.get(0).getValidUntil());
+    Assert.assertEquals(null, expEntries.get(0).getValidUntil());
+  }
+
+  @Test
   public void testNotifyContainerCompleted() throws IOException {
     AgentProviderService aps = createAgentProviderService(new Configuration());
     AgentProviderService mockAps = Mockito.spy(aps);
diff --git a/slider-funtest/src/main/groovy/org/apache/slider/funtest/framework/AgentCommandTestBase.groovy b/slider-funtest/src/main/groovy/org/apache/slider/funtest/framework/AgentCommandTestBase.groovy
index d5448bf..db9fa6d 100644
--- a/slider-funtest/src/main/groovy/org/apache/slider/funtest/framework/AgentCommandTestBase.groovy
+++ b/slider-funtest/src/main/groovy/org/apache/slider/funtest/framework/AgentCommandTestBase.groovy
@@ -164,6 +164,33 @@
     return null;
   }
 
+  public static boolean containsString(SliderShell shell, String lookThisUp, int n = 1) {
+    int count = 0
+    for (String str in shell.out) {
+      int subCount = countString(str, lookThisUp)
+      count = count + subCount
+      if (count == n) {
+        return true;
+      }
+    }
+
+    return false;
+  }
+
+  public static int countString(String str, String search) {
+    int count = 0
+    if (SliderUtils.isUnset(str) || SliderUtils.isUnset(search)) {
+      return count
+    }
+
+    int index = str.indexOf(search, 0)
+    while (index > 0) {
+      index = str.indexOf(search, index + 1)
+      ++count
+    }
+    return count
+  }
+
   public static String findLineEntryValue(SliderShell shell, String[] locaters) {
     String line = findLineEntry(shell, locaters);
 
diff --git a/slider-funtest/src/test/groovy/org/apache/slider/funtest/lifecycle/AppsThroughAgentIT.groovy b/slider-funtest/src/test/groovy/org/apache/slider/funtest/lifecycle/AppsThroughAgentIT.groovy
index 234275a..0796355 100644
--- a/slider-funtest/src/test/groovy/org/apache/slider/funtest/lifecycle/AppsThroughAgentIT.groovy
+++ b/slider-funtest/src/test/groovy/org/apache/slider/funtest/lifecycle/AppsThroughAgentIT.groovy
@@ -75,6 +75,46 @@
     assertComponentCount(COMMAND_LOGGER, 2, shell)
 
     assertSuccess(shell)
+
+    // get log folders
+    shell = slider(EXIT_SUCCESS,
+        [
+            ACTION_REGISTRY,
+            ARG_NAME,
+            APPLICATION_NAME,
+            ARG_LISTEXP])
+    if(!containsString(shell, "container_log_dirs") ||
+       !containsString(shell, "container_work_dirs")) {
+      logShell(shell)
+      assert fail("Should list default exports container_log_dirs or container_work_dirs")
+    }
+
+    // get log folders
+    shell = slider(EXIT_SUCCESS,
+        [
+            ACTION_REGISTRY,
+            ARG_NAME,
+            APPLICATION_NAME,
+            ARG_GETEXP,
+            "container_log_dirs"])
+    if(!containsString(shell, "\"tag\":\"COMMAND_LOGGER\",\"level\":\"component\"", 2)) {
+      logShell(shell)
+      assert fail("Should list 2 entries for log folders")
+    }
+
+    // get log folders
+    shell = slider(EXIT_SUCCESS,
+        [
+            ACTION_REGISTRY,
+            ARG_NAME,
+            APPLICATION_NAME,
+            ARG_GETEXP,
+            "container_work_dirs"])
+    if(!containsString(shell, "\"tag\":\"COMMAND_LOGGER\",\"level\":\"component\"", 2)) {
+      logShell(shell)
+      assert fail("Should list 2 entries for work folder")
+    }
+
     assert isApplicationInState("RUNNING", APPLICATION_NAME), 'App is not running.'
   }
 }