SLIDER-481. Exports should allow a multiple line items per export and a more hierarchical structure
diff --git a/app-packages/memcached/metainfo.xml b/app-packages/memcached/metainfo.xml
index 5801ad2..0984dc9 100644
--- a/app-packages/memcached/metainfo.xml
+++ b/app-packages/memcached/metainfo.xml
@@ -23,17 +23,23 @@
<comment>Memcache is a network accessible key/value storage system, often used as a distributed cache.</comment>
<version>1.0.0</version>
<exportedConfigs>None</exportedConfigs>
+ <exportGroups>
+ <exportGroup>
+ <name>Servers</name>
+ <exports>
+ <export>
+ <name>host_port</name>
+ <value>${MEMCACHED_HOST}:${site.global.listen_port}</value>
+ </export>
+ </exports>
+ </exportGroup>
+ </exportGroups>
<components>
<component>
<name>MEMCACHED</name>
<category>MASTER</category>
- <componentExports>
- <componentExport>
- <name>host_port</name>
- <value>${THIS_HOST}:${site.global.listen_port}</value>
- </componentExport>
- </componentExports>
+ <compExports>Servers-host_port</compExports>
<commandScript>
<script>scripts/memcached.py</script>
<scriptType>PYTHON</scriptType>
diff --git a/slider-core/src/main/java/org/apache/slider/client/SliderClient.java b/slider-core/src/main/java/org/apache/slider/client/SliderClient.java
index 50a7097..eeeee26 100644
--- a/slider-core/src/main/java/org/apache/slider/client/SliderClient.java
+++ b/slider-core/src/main/java/org/apache/slider/client/SliderClient.java
@@ -115,6 +115,9 @@
import org.apache.slider.core.registry.docstore.PublishedConfigSet;
import org.apache.slider.core.registry.docstore.PublishedConfiguration;
import org.apache.slider.core.registry.docstore.PublishedConfigurationOutputter;
+import org.apache.slider.core.registry.docstore.PublishedExports;
+import org.apache.slider.core.registry.docstore.PublishedExportsOutputter;
+import org.apache.slider.core.registry.docstore.PublishedExportsSet;
import org.apache.slider.core.registry.retrieve.RegistryRetriever;
import org.apache.slider.core.zk.BlockingZKWatcher;
import org.apache.slider.core.zk.ZKIntegration;
@@ -2285,11 +2288,19 @@
} else if (registryArgs.listConf) {
// list the configurations
actionRegistryListConfigsYarn(registryArgs);
+ } else if (registryArgs.listExports) {
+ // list the exports
+ actionRegistryListExports(registryArgs);
} else if (SliderUtils.isSet(registryArgs.getConf)) {
// get a configuration
PublishedConfiguration publishedConfiguration =
actionRegistryGetConfig(registryArgs);
outputConfig(publishedConfiguration, registryArgs);
+ } else if (SliderUtils.isSet(registryArgs.getExport)) {
+ // get a export group
+ PublishedExports publishedExports =
+ actionRegistryGetExport(registryArgs);
+ outputExport(publishedExports, registryArgs);
} else {
// it's an unknown command
log.info(ActionRegistryArgs.USAGE);
@@ -2698,6 +2709,34 @@
}
/**
+ * list exports available for an instance
+ *
+ * @param registryArgs registry Arguments
+ * @throws YarnException YARN problems
+ * @throws IOException Network or other problems
+ */
+ public void actionRegistryListExports(ActionRegistryArgs registryArgs)
+ throws YarnException, IOException {
+ ServiceRecord instance = lookupServiceRecord(registryArgs);
+
+ RegistryRetriever retriever = new RegistryRetriever(instance);
+ PublishedExportsSet exports =
+ retriever.getExports(!registryArgs.internal);
+
+ for (String exportName : exports.keys()) {
+ if (!registryArgs.verbose) {
+ log.info("{}", exportName);
+ } else {
+ PublishedExports published =
+ exports.get(exportName);
+ log.info("{} : {}",
+ exportName,
+ published.description);
+ }
+ }
+ }
+
+ /**
* list configs available for an instance
*
* @param registryArgs registry Arguments
@@ -2722,6 +2761,31 @@
}
/**
+ * get a specific export group
+ *
+ * @param registryArgs registry Arguments
+ *
+ * @throws YarnException YARN problems
+ * @throws IOException Network or other problems
+ * @throws FileNotFoundException if the config is not found
+ */
+ @VisibleForTesting
+ public PublishedExports actionRegistryGetExport(ActionRegistryArgs registryArgs)
+ throws YarnException, IOException {
+ ServiceRecord instance = lookupServiceRecord(registryArgs);
+
+ RegistryRetriever retriever = new RegistryRetriever(instance);
+ boolean external = !registryArgs.internal;
+ PublishedExportsSet exports =
+ retriever.getExports(external);
+
+ PublishedExports published = retriever.retrieveExports(exports,
+ registryArgs.getExport,
+ external);
+ return published;
+ }
+
+ /**
* write out the config. If a destination is provided and that dir is a
* directory, the entry is written to it with the name provided + extension,
* else it is printed to standard out.
@@ -2761,6 +2825,44 @@
}
/**
+ * write out the config
+ * @param published
+ * @param registryArgs
+ * @throws BadCommandArgumentsException
+ * @throws IOException
+ */
+ private void outputExport(PublishedExports published,
+ ActionRegistryArgs registryArgs) throws
+ BadCommandArgumentsException,
+ IOException {
+ // decide whether or not to print
+ String entry = registryArgs.getExport;
+ String format = ConfigFormat.JSON.toString();
+ ConfigFormat configFormat = ConfigFormat.resolve(format);
+ if (configFormat == null || configFormat != ConfigFormat.JSON) {
+ throw new BadCommandArgumentsException(
+ "Unknown/Unsupported format %s . Only JSON is supported.", format);
+ }
+
+ PublishedExportsOutputter outputter =
+ PublishedExportsOutputter.createOutputter(configFormat,
+ published);
+ boolean print = registryArgs.out == null;
+ if (!print) {
+ File destFile;
+ destFile = registryArgs.out;
+ if (destFile.isDirectory()) {
+ // creating it under a directory
+ destFile = new File(destFile, entry + "." + format);
+ }
+ log.info("Destination path: {}", destFile);
+ outputter.save(destFile);
+ } else {
+ print(outputter.asString());
+ }
+ }
+
+ /**
* Look up an instance
* @return instance data
* @throws SliderException other failures
diff --git a/slider-core/src/main/java/org/apache/slider/client/SliderClient.java.orig b/slider-core/src/main/java/org/apache/slider/client/SliderClient.java.orig
new file mode 100644
index 0000000..50a7097
--- /dev/null
+++ b/slider-core/src/main/java/org/apache/slider/client/SliderClient.java.orig
@@ -0,0 +1,2913 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.slider.client;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathNotFoundException;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
+import org.apache.hadoop.registry.client.exceptions.InvalidRecordException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.alias.CredentialProvider;
+import org.apache.hadoop.security.alias.CredentialProviderFactory;
+import org.apache.hadoop.security.alias.CredentialShell;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.registry.client.api.RegistryConstants;
+import org.apache.hadoop.registry.client.api.RegistryOperations;
+import static org.apache.hadoop.registry.client.binding.RegistryUtils.*;
+
+import org.apache.hadoop.registry.client.binding.RegistryUtils;
+import org.apache.hadoop.registry.client.exceptions.NoRecordException;
+import org.apache.hadoop.registry.client.types.Endpoint;
+import org.apache.hadoop.registry.client.types.ServiceRecord;
+import org.apache.hadoop.registry.client.types.yarn.YarnRegistryAttributes;
+import org.apache.slider.api.ClusterDescription;
+import org.apache.slider.api.ClusterNode;
+import org.apache.slider.api.InternalKeys;
+import org.apache.slider.api.OptionKeys;
+import org.apache.slider.api.ResourceKeys;
+import org.apache.slider.api.SliderClusterProtocol;
+import org.apache.slider.api.proto.Messages;
+import org.apache.slider.common.Constants;
+import org.apache.slider.common.SliderExitCodes;
+import org.apache.slider.common.SliderKeys;
+import org.apache.slider.common.params.AbstractActionArgs;
+import org.apache.slider.common.params.AbstractClusterBuildingActionArgs;
+import org.apache.slider.common.params.ActionDiagnosticArgs;
+import org.apache.slider.common.params.ActionInstallPackageArgs;
+import org.apache.slider.common.params.ActionAMSuicideArgs;
+import org.apache.slider.common.params.ActionCreateArgs;
+import org.apache.slider.common.params.ActionEchoArgs;
+import org.apache.slider.common.params.ActionFlexArgs;
+import org.apache.slider.common.params.ActionFreezeArgs;
+import org.apache.slider.common.params.ActionKillContainerArgs;
+import org.apache.slider.common.params.ActionRegistryArgs;
+import org.apache.slider.common.params.ActionResolveArgs;
+import org.apache.slider.common.params.ActionStatusArgs;
+import org.apache.slider.common.params.ActionThawArgs;
+import org.apache.slider.common.params.Arguments;
+import org.apache.slider.common.params.ClientArgs;
+import org.apache.slider.common.params.LaunchArgsAccessor;
+import org.apache.slider.common.tools.ConfigHelper;
+import org.apache.slider.common.tools.Duration;
+import org.apache.slider.common.tools.SliderFileSystem;
+import org.apache.slider.common.tools.SliderUtils;
+import org.apache.slider.common.tools.SliderVersionInfo;
+import org.apache.slider.core.build.InstanceBuilder;
+import org.apache.slider.core.build.InstanceIO;
+import org.apache.slider.core.conf.AggregateConf;
+import org.apache.slider.core.conf.ConfTree;
+import org.apache.slider.core.conf.ConfTreeOperations;
+import org.apache.slider.core.conf.MapOperations;
+import org.apache.slider.core.conf.ResourcesInputPropertiesValidator;
+import org.apache.slider.core.conf.TemplateInputPropertiesValidator;
+import org.apache.slider.core.exceptions.BadClusterStateException;
+import org.apache.slider.core.exceptions.BadCommandArgumentsException;
+import org.apache.slider.core.exceptions.BadConfigException;
+import org.apache.slider.core.exceptions.ErrorStrings;
+import org.apache.slider.core.exceptions.NoSuchNodeException;
+import org.apache.slider.core.exceptions.SliderException;
+import org.apache.slider.core.exceptions.UnknownApplicationInstanceException;
+import org.apache.slider.core.exceptions.WaitTimeoutException;
+import org.apache.slider.core.launch.AppMasterLauncher;
+import org.apache.slider.core.launch.ClasspathConstructor;
+import org.apache.slider.core.launch.CommandLineBuilder;
+import org.apache.slider.core.launch.JavaCommandLineBuilder;
+import org.apache.slider.core.launch.LaunchedApplication;
+import org.apache.slider.core.launch.RunningApplication;
+import org.apache.slider.core.main.RunService;
+import org.apache.slider.core.persist.ConfPersister;
+import org.apache.slider.core.persist.LockAcquireFailedException;
+import org.apache.slider.core.registry.SliderRegistryUtils;
+import org.apache.slider.core.registry.YarnAppListClient;
+import org.apache.slider.core.registry.docstore.ConfigFormat;
+import org.apache.slider.core.registry.docstore.PublishedConfigSet;
+import org.apache.slider.core.registry.docstore.PublishedConfiguration;
+import org.apache.slider.core.registry.docstore.PublishedConfigurationOutputter;
+import org.apache.slider.core.registry.retrieve.RegistryRetriever;
+import org.apache.slider.core.zk.BlockingZKWatcher;
+import org.apache.slider.core.zk.ZKIntegration;
+import org.apache.slider.core.zk.ZKPathBuilder;
+import org.apache.slider.providers.AbstractClientProvider;
+import org.apache.slider.providers.SliderProviderFactory;
+import org.apache.slider.providers.agent.AgentKeys;
+import org.apache.slider.providers.slideram.SliderAMClientProvider;
+import org.apache.slider.server.appmaster.SliderAppMaster;
+import org.apache.slider.server.appmaster.rpc.RpcBinder;
+import org.apache.slider.server.services.utility.AbstractSliderLaunchedService;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.io.Writer;
+import java.net.InetSocketAddress;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import static org.apache.slider.common.params.SliderActions.*;
+
+/**
+ * Client service for Slider
+ */
+
+public class SliderClient extends AbstractSliderLaunchedService implements RunService,
+ SliderExitCodes, SliderKeys, ErrorStrings, SliderClientAPI {
+ private static final Logger log = LoggerFactory.getLogger(SliderClient.class);
+
+ private ClientArgs serviceArgs;
+ public ApplicationId applicationId;
+
+ private String deployedClusterName;
+ /**
+ * Cluster opaerations against the deployed cluster -will be null
+ * if no bonding has yet taken place
+ */
+ private SliderClusterOperations sliderClusterOperations;
+
+ private SliderFileSystem sliderFileSystem;
+
+ /**
+ * Yarn client service
+ */
+ private SliderYarnClientImpl yarnClient;
+ private YarnAppListClient YarnAppListClient;
+ private AggregateConf launchedInstanceDefinition;
+// private SliderRegistryService registry;
+
+
+ /**
+ * The YARN registry service
+ */
+ private RegistryOperations registryOperations;
+
+ /**
+ * Constructor
+ */
+ public SliderClient() {
+ super("Slider Client");
+ new HdfsConfiguration();
+ new YarnConfiguration();
+ }
+
+ /**
+ * This is called <i>Before serviceInit is called</i>
+ * @param config the initial configuration build up by the
+ * service launcher.
+ * @param args argument list list of arguments passed to the command line
+ * after any launcher-specific commands have been stripped.
+ * @return the post-binding configuration to pass to the <code>init()</code>
+ * operation.
+ * @throws Exception
+ */
+ @Override
+ public Configuration bindArgs(Configuration config, String... args) throws Exception {
+ config = super.bindArgs(config, args);
+ serviceArgs = new ClientArgs(args);
+ serviceArgs.parse();
+ // yarn-ify
+ YarnConfiguration yarnConfiguration = new YarnConfiguration(config);
+ return SliderUtils.patchConfiguration(yarnConfiguration);
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ Configuration clientConf = SliderUtils.loadClientConfigurationResource();
+ ConfigHelper.mergeConfigurations(conf, clientConf, CLIENT_RESOURCE);
+ serviceArgs.applyDefinitions(conf);
+ serviceArgs.applyFileSystemBinding(conf);
+ // init security with our conf
+ if (SliderUtils.isHadoopClusterSecure(conf)) {
+ SliderUtils.forceLogin();
+ SliderUtils.initProcessSecurity(conf);
+ }
+ AbstractActionArgs coreAction = serviceArgs.getCoreAction();
+ if (coreAction.getHadoopServicesRequired()) {
+ initHadoopBinding();
+ }
+ super.serviceInit(conf);
+ }
+
+ /**
+ * this is where the work is done.
+ * @return the exit code
+ * @throws Throwable anything that went wrong
+ */
+/* JDK7
+
+ @Override
+ public int runService() throws Throwable {
+
+ // choose the action
+ String action = serviceArgs.getAction();
+ int exitCode = EXIT_SUCCESS;
+ String clusterName = serviceArgs.getClusterName();
+ // actions
+ switch (action) {
+ case ACTION_BUILD:
+ exitCode = actionBuild(clusterName, serviceArgs.getActionBuildArgs());
+ break;
+ case ACTION_UPDATE:
+ exitCode = actionUpdate(clusterName, serviceArgs.getActionUpdateArgs());
+ break;
+ case ACTION_CREATE:
+ exitCode = actionCreate(clusterName, serviceArgs.getActionCreateArgs());
+ break;
+ case ACTION_FREEZE:
+ exitCode = actionFreeze(clusterName, serviceArgs.getActionFreezeArgs());
+ break;
+ case ACTION_THAW:
+ exitCode = actionThaw(clusterName, serviceArgs.getActionThawArgs());
+ break;
+ case ACTION_DESTROY:
+ exitCode = actionDestroy(clusterName);
+ break;
+ case ACTION_EXISTS:
+ exitCode = actionExists(clusterName,
+ serviceArgs.getActionExistsArgs().live);
+ break;
+ case ACTION_FLEX:
+ exitCode = actionFlex(clusterName, serviceArgs.getActionFlexArgs());
+ break;
+ case ACTION_GETCONF:
+ exitCode =
+ actionGetConf(clusterName, serviceArgs.getActionGetConfArgs());
+ break;
+ case ACTION_HELP:
+ case ACTION_USAGE:
+ log.info(serviceArgs.usage());
+ break;
+ case ACTION_KILL_CONTAINER:
+ exitCode = actionKillContainer(clusterName,
+ serviceArgs.getActionKillContainerArgs());
+ break;
+ case ACTION_AM_SUICIDE:
+ exitCode = actionAmSuicide(clusterName,
+ serviceArgs.getActionAMSuicideArgs());
+ break;
+ case ACTION_LIST:
+ exitCode = actionList(clusterName);
+ break;
+ case ACTION_REGISTRY:
+ exitCode = actionRegistry(
+ serviceArgs.getActionRegistryArgs());
+ break;
+ case ACTION_STATUS:
+ exitCode = actionStatus(clusterName,
+ serviceArgs.getActionStatusArgs());
+ break;
+ case ACTION_VERSION:
+ exitCode = actionVersion();
+ break;
+ default:
+ throw new SliderException(EXIT_UNIMPLEMENTED,
+ "Unimplemented: " + action);
+ }
+
+ return exitCode;
+ }
+
+*/
+ @Override
+ public int runService() throws Throwable {
+
+ // choose the action
+ String action = serviceArgs.getAction();
+
+ int exitCode = EXIT_SUCCESS;
+ String clusterName = serviceArgs.getClusterName();
+ // actions
+ if (ACTION_INSTALL_PACKAGE.equals(action)) {
+ exitCode = actionInstallPkg(serviceArgs.getActionInstallPackageArgs());
+ } else if (ACTION_BUILD.equals(action)) {
+ exitCode = actionBuild(clusterName, serviceArgs.getActionBuildArgs());
+ } else if (ACTION_CREATE.equals(action)) {
+ exitCode = actionCreate(clusterName, serviceArgs.getActionCreateArgs());
+ } else if (ACTION_FREEZE.equals(action)) {
+ exitCode = actionFreeze(clusterName,
+ serviceArgs.getActionFreezeArgs());
+ } else if (ACTION_THAW.equals(action)) {
+ exitCode = actionThaw(clusterName, serviceArgs.getActionThawArgs());
+ } else if (ACTION_DESTROY.equals(action)) {
+ exitCode = actionDestroy(clusterName);
+ } else if (ACTION_DIAGNOSTIC.equals(action)) {
+ exitCode = actionDiagnostic(serviceArgs.getActionDiagnosticArgs());
+ } else if (ACTION_EXISTS.equals(action)) {
+ exitCode = actionExists(clusterName,
+ serviceArgs.getActionExistsArgs().live);
+ } else if (ACTION_FLEX.equals(action)) {
+ exitCode = actionFlex(clusterName, serviceArgs.getActionFlexArgs());
+ } else if (ACTION_HELP.equals(action) ||
+ ACTION_USAGE.equals(action)) {
+ log.info(serviceArgs.usage());
+ } else if (ACTION_KILL_CONTAINER.equals(action)) {
+ exitCode = actionKillContainer(clusterName,
+ serviceArgs.getActionKillContainerArgs());
+ } else if (ACTION_AM_SUICIDE.equals(action)) {
+ exitCode = actionAmSuicide(clusterName,
+ serviceArgs.getActionAMSuicideArgs());
+ } else if (ACTION_LIST.equals(action)) {
+ exitCode = actionList(clusterName);
+ } else if (ACTION_REGISTRY.equals(action)) {
+ exitCode = actionRegistry(serviceArgs.getActionRegistryArgs());
+ } else if (ACTION_RESOLVE.equals(action)) {
+ exitCode = actionResolve(serviceArgs.getActionResolveArgs());
+ } else if (ACTION_STATUS.equals(action)) {
+ exitCode = actionStatus(clusterName,
+ serviceArgs.getActionStatusArgs());
+ } else if (ACTION_UPDATE.equals(action)) {
+ exitCode = actionUpdate(clusterName, serviceArgs.getActionUpdateArgs());
+ } else if (ACTION_VERSION.equals(action)) {
+ exitCode = actionVersion();
+ } else {
+ throw new SliderException(EXIT_UNIMPLEMENTED,
+ "Unimplemented: " + action);
+ }
+
+ return exitCode;
+ }
+
+/**
+ * Perform everything needed to init the hadoop binding.
+ * This assumes that the service is already in inited or started state
+ * @throws IOException
+ * @throws SliderException
+ */
+ protected void initHadoopBinding() throws IOException, SliderException {
+ // validate the client
+ SliderUtils.validateSliderClientEnvironment(null);
+ //create the YARN client
+ yarnClient = new SliderYarnClientImpl();
+ yarnClient.init(getConfig());
+ if (getServiceState() == STATE.STARTED) {
+ yarnClient.start();
+ }
+ addService(yarnClient);
+ YarnAppListClient =
+ new YarnAppListClient(yarnClient, getUsername(), getConfig());
+ // create the filesystem
+ sliderFileSystem = new SliderFileSystem(getConfig());
+ }
+
+ /**
+ * Delete the zookeeper node associated with the calling user and the cluster
+ * TODO: YARN registry operations
+ **/
+ @Deprecated
+ @VisibleForTesting
+ public boolean deleteZookeeperNode(String clusterName) throws YarnException, IOException {
+ String user = getUsername();
+ String zkPath = ZKIntegration.mkClusterPath(user, clusterName);
+ Exception e = null;
+ try {
+ Configuration config = getConfig();
+ if (!SliderUtils.isHadoopClusterSecure(config)) {
+ ZKIntegration client = getZkClient(clusterName, user);
+ if (client != null) {
+ if (client.exists(zkPath)) {
+ log.info("Deleting zookeeper path {}", zkPath);
+ }
+ client.deleteRecursive(zkPath);
+ return true;
+ }
+ } else {
+ log.warn("Default zookeeper node is not available for secure cluster");
+ }
+ } catch (InterruptedException ignored) {
+ e = ignored;
+ } catch (KeeperException ignored) {
+ e = ignored;
+ } catch (BadConfigException ignored) {
+ e = ignored;
+ }
+ if (e != null) {
+ log.warn("Unable to recursively delete zk node {}", zkPath);
+ log.debug("Reason: ", e);
+ }
+
+ return false;
+ }
+
+ /**
+ * Create the zookeeper node associated with the calling user and the cluster
+ */
+ @VisibleForTesting
+ @Deprecated
+ public String createZookeeperNode(String clusterName, Boolean nameOnly) throws YarnException, IOException {
+ String user = getUsername();
+ String zkPath = ZKIntegration.mkClusterPath(user, clusterName);
+ if(nameOnly) {
+ return zkPath;
+ }
+ Configuration config = getConfig();
+ if (!SliderUtils.isHadoopClusterSecure(config)) {
+ ZKIntegration client = getZkClient(clusterName, user);
+ if (client != null) {
+ try {
+ client.createPath(zkPath, "", ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ return zkPath;
+
+ //JDK7
+// } catch (InterruptedException | KeeperException e) {
+ } catch (InterruptedException e) {
+ log.warn("Unable to create zk node {}", zkPath, e);
+ } catch ( KeeperException e) {
+ log.warn("Unable to create zk node {}", zkPath, e);
+ }
+ }
+ }
+
+ return null;
+ }
+
+ /**
+ * Gets a zookeeper client, returns null if it cannot connect to zookeeper
+ **/
+ protected ZKIntegration getZkClient(String clusterName, String user) throws YarnException {
+ String registryQuorum = lookupZKQuorum();
+ ZKIntegration client = null;
+ try {
+ BlockingZKWatcher watcher = new BlockingZKWatcher();
+ client = ZKIntegration.newInstance(registryQuorum, user, clusterName, true, false, watcher);
+ client.init();
+ watcher.waitForZKConnection(2 * 1000);
+ } catch (InterruptedException e) {
+ client = null;
+ log.warn("Unable to connect to zookeeper quorum {}", registryQuorum, e);
+ } catch (IOException e) {
+ log.warn("Unable to connect to zookeeper quorum {}", registryQuorum, e);
+ }
+ return client;
+ }
+
+ @Override
+ public int actionDestroy(String clustername) throws YarnException,
+ IOException {
+ // verify that a live cluster isn't there
+ SliderUtils.validateClusterName(clustername);
+ //no=op, it is now mandatory.
+ verifyBindingsDefined();
+ verifyNoLiveClusters(clustername);
+
+ // create the directory path
+ Path clusterDirectory = sliderFileSystem.buildClusterDirPath(clustername);
+ // delete the directory;
+ boolean exists = sliderFileSystem.getFileSystem().exists(clusterDirectory);
+ if (exists) {
+ log.info("Application Instance {} found at {}: destroying", clustername, clusterDirectory);
+ } else {
+ log.info("Application Instance {} already destroyed", clustername);
+ }
+ boolean deleted =
+ sliderFileSystem.getFileSystem().delete(clusterDirectory, true);
+ if (!deleted) {
+ log.warn("Filesystem returned false from delete() operation");
+ }
+
+ // rm the registry entry —do not let this block the destroy operations
+ String registryPath = SliderRegistryUtils.registryPathForInstance(
+ clustername);
+ try {
+ getRegistryOperations().delete(registryPath, true);
+ } catch (IOException e) {
+ log.warn("Error deleting {}: {} ", registryPath, e, e);
+ } catch (SliderException e) {
+ log.warn("Error binding to registry {} ", e, e);
+ }
+
+ List<ApplicationReport> instances = findAllLiveInstances(clustername);
+ // detect any race leading to cluster creation during the check/destroy process
+ // and report a problem.
+ if (!instances.isEmpty()) {
+ throw new SliderException(EXIT_APPLICATION_IN_USE,
+ clustername + ": "
+ + E_DESTROY_CREATE_RACE_CONDITION
+ + " :" +
+ instances.get(0));
+ }
+ log.info("Destroyed cluster {}", clustername);
+ return EXIT_SUCCESS;
+ }
+
+ @Override
+ public int actionAmSuicide(String clustername,
+ ActionAMSuicideArgs args) throws YarnException, IOException {
+ SliderClusterOperations clusterOperations =
+ createClusterOperations(clustername);
+ clusterOperations.amSuicide(args.message, args.exitcode, args.waittime);
+ return EXIT_SUCCESS;
+ }
+
+ @Override
+ public AbstractClientProvider createClientProvider(String provider)
+ throws SliderException {
+ SliderProviderFactory factory =
+ SliderProviderFactory.createSliderProviderFactory(provider);
+ return factory.createClientProvider();
+ }
+
+ /**
+ * Create the cluster -saving the arguments to a specification file first
+ * @param clustername cluster name
+ * @return the status code
+ * @throws YarnException Yarn problems
+ * @throws IOException other problems
+ * @throws BadCommandArgumentsException bad arguments.
+ */
+ public int actionCreate(String clustername, ActionCreateArgs createArgs) throws
+ YarnException,
+ IOException {
+
+ actionBuild(clustername, createArgs);
+ Path clusterDirectory = sliderFileSystem.buildClusterDirPath(clustername);
+ AggregateConf instanceDefinition = loadInstanceDefinitionUnresolved(
+ clustername, clusterDirectory);
+ try {
+ checkForCredentials(getConfig(), instanceDefinition.getAppConf());
+ } catch (Exception e) {
+ throw new IOException("problem setting credentials", e);
+ }
+ return startCluster(clustername, createArgs);
+ }
+
+ private void checkForCredentials(Configuration conf,
+ ConfTree tree) throws Exception {
+ if (tree.credentials == null || tree.credentials.size()==0) {
+ log.info("No credentials requested");
+ return;
+ }
+ CredentialShell credentialShell = null;
+ for (Entry<String, List<String>> cred : tree.credentials.entrySet()) {
+ String provider = cred.getKey();
+ List<String> aliases = cred.getValue();
+ if (aliases == null || aliases.size()==0) {
+ continue;
+ }
+ if (credentialShell == null) {
+ credentialShell = new CredentialShell();
+ credentialShell.setConf(conf);
+ }
+ Configuration c = new Configuration(conf);
+ c.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH, provider);
+ CredentialProvider credentialProvider =
+ CredentialProviderFactory.getProviders(c).get(0);
+ Set<String> existingAliases = new HashSet<String>(credentialProvider.getAliases());
+ for (String alias : aliases) {
+ if (existingAliases.contains(alias.toLowerCase())) {
+ log.warn("Skipping creation of credentials for {}, " +
+ "alias already exists in {}", alias, provider);
+ continue;
+ }
+ String[] csarg = new String[]{
+ "create", alias, "-provider", provider};
+ log.info("Creating credentials for {} in {}", alias, provider);
+ credentialShell.run(csarg);
+ }
+ }
+ }
+
+ @Override
+ public int actionBuild(String clustername,
+ AbstractClusterBuildingActionArgs buildInfo) throws
+ YarnException,
+ IOException {
+
+ buildInstanceDefinition(clustername, buildInfo, false, false);
+ return EXIT_SUCCESS;
+ }
+
+ @Override
+ public int actionInstallPkg(ActionInstallPackageArgs installPkgInfo) throws
+ YarnException,
+ IOException {
+
+ Path srcFile = null;
+ if (StringUtils.isEmpty(installPkgInfo.name )) {
+ throw new BadCommandArgumentsException("A valid application type name is required (e.g. HBASE).");
+ }
+
+ if (StringUtils.isEmpty(installPkgInfo.packageURI)) {
+ throw new BadCommandArgumentsException("A valid application package location required.");
+ } else {
+ File pkgFile = new File(installPkgInfo.packageURI);
+ if (!pkgFile.exists() || pkgFile.isDirectory()) {
+ throw new BadCommandArgumentsException("Unable to access supplied pkg file at " +
+ pkgFile.getAbsolutePath());
+ } else {
+ srcFile = new Path(pkgFile.toURI());
+ }
+ }
+
+ Path pkgPath = sliderFileSystem.buildPackageDirPath(installPkgInfo.name);
+ sliderFileSystem.getFileSystem().mkdirs(pkgPath);
+
+ Path fileInFs = new Path(pkgPath, srcFile.getName());
+ log.info("Installing package {} at {} and overwrite is {}.", srcFile, fileInFs, installPkgInfo.replacePkg);
+ if (sliderFileSystem.getFileSystem().exists(fileInFs) && !installPkgInfo.replacePkg) {
+ throw new BadCommandArgumentsException("Pkg exists at " +
+ fileInFs.toUri().toString() +
+ ". Use --replacepkg to overwrite.");
+ }
+
+ sliderFileSystem.getFileSystem().copyFromLocalFile(false, installPkgInfo.replacePkg, srcFile, fileInFs);
+ return EXIT_SUCCESS;
+ }
+
+ @Override
+ public int actionUpdate(String clustername,
+ AbstractClusterBuildingActionArgs buildInfo) throws
+ YarnException, IOException {
+ buildInstanceDefinition(clustername, buildInfo, true, true);
+ return EXIT_SUCCESS;
+ }
+
+ /**
+ * Build up the AggregateConfiguration for an application instance then
+ * persists it
+ * @param clustername name of the cluster
+ * @param buildInfo the arguments needed to build the cluster
+ * @param overwrite true if existing cluster directory can be overwritten
+ * @param liveClusterAllowed true if live cluster can be modified
+ * @throws YarnException
+ * @throws IOException
+ */
+
+ public void buildInstanceDefinition(String clustername,
+ AbstractClusterBuildingActionArgs buildInfo, boolean overwrite, boolean liveClusterAllowed)
+ throws YarnException, IOException {
+ // verify that a live cluster isn't there
+ SliderUtils.validateClusterName(clustername);
+ verifyBindingsDefined();
+ if (!liveClusterAllowed) {
+ verifyNoLiveClusters(clustername);
+ }
+
+ Configuration conf = getConfig();
+ String registryQuorum = lookupZKQuorum();
+
+ Path appconfdir = buildInfo.getConfdir();
+ // Provider
+ String providerName = buildInfo.getProvider();
+ requireArgumentSet(Arguments.ARG_PROVIDER, providerName);
+ log.debug("Provider is {}", providerName);
+ SliderAMClientProvider sliderAM = new SliderAMClientProvider(conf);
+ AbstractClientProvider provider =
+ createClientProvider(providerName);
+ InstanceBuilder builder =
+ new InstanceBuilder(sliderFileSystem,
+ getConfig(),
+ clustername);
+
+ AggregateConf instanceDefinition = new AggregateConf();
+ ConfTreeOperations appConf = instanceDefinition.getAppConfOperations();
+ ConfTreeOperations resources = instanceDefinition.getResourceOperations();
+ ConfTreeOperations internal = instanceDefinition.getInternalOperations();
+ //initial definition is set by the providers
+ sliderAM.prepareInstanceConfiguration(instanceDefinition);
+ provider.prepareInstanceConfiguration(instanceDefinition);
+
+ //load in any specified on the command line
+ if (buildInfo.resources != null) {
+ try {
+ resources.mergeFile(buildInfo.resources,
+ new ResourcesInputPropertiesValidator());
+
+ } catch (IOException e) {
+ throw new BadConfigException(e,
+ "incorrect argument to %s: \"%s\" : %s ",
+ Arguments.ARG_RESOURCES,
+ buildInfo.resources,
+ e.toString());
+ }
+ }
+ if (buildInfo.template != null) {
+ try {
+ appConf.mergeFile(buildInfo.template,
+ new TemplateInputPropertiesValidator());
+ } catch (IOException e) {
+ throw new BadConfigException(e,
+ "incorrect argument to %s: \"%s\" : %s ",
+ Arguments.ARG_TEMPLATE,
+ buildInfo.template,
+ e.toString());
+ }
+ }
+
+ //get the command line options
+ ConfTree cmdLineAppOptions = buildInfo.buildAppOptionsConfTree();
+ ConfTree cmdLineResourceOptions = buildInfo.buildResourceOptionsConfTree();
+
+ appConf.merge(cmdLineAppOptions);
+
+ // put the role counts into the resources file
+ Map<String, String> argsRoleMap = buildInfo.getComponentMap();
+ for (Map.Entry<String, String> roleEntry : argsRoleMap.entrySet()) {
+ String count = roleEntry.getValue();
+ String key = roleEntry.getKey();
+ log.debug("{} => {}", key, count);
+ resources.getOrAddComponent(key)
+ .put(ResourceKeys.COMPONENT_INSTANCES, count);
+ }
+
+ //all CLI role options
+ Map<String, Map<String, String>> appOptionMap =
+ buildInfo.getCompOptionMap();
+ appConf.mergeComponents(appOptionMap);
+
+ //internal picks up core. values only
+ internal.propagateGlobalKeys(appConf, "slider.");
+ internal.propagateGlobalKeys(appConf, "internal.");
+
+ //copy over role. and yarn. values ONLY to the resources
+ if (PROPAGATE_RESOURCE_OPTION) {
+ resources.propagateGlobalKeys(appConf, "component.");
+ resources.propagateGlobalKeys(appConf, "role.");
+ resources.propagateGlobalKeys(appConf, "yarn.");
+ resources.mergeComponentsPrefix(appOptionMap, "component.", true);
+ resources.mergeComponentsPrefix(appOptionMap, "yarn.", true);
+ resources.mergeComponentsPrefix(appOptionMap, "role.", true);
+ }
+
+ // resource component args
+ appConf.merge(cmdLineResourceOptions);
+ resources.merge(cmdLineResourceOptions);
+ resources.mergeComponents(buildInfo.getResourceCompOptionMap());
+
+ builder.init(providerName, instanceDefinition);
+ builder.propagateFilename();
+ builder.propagatePrincipals();
+ builder.setImageDetailsIfAvailable(buildInfo.getImage(),
+ buildInfo.getAppHomeDir());
+ builder.setQueue(buildInfo.queue);
+
+ String quorum = buildInfo.getZKhosts();
+ if (SliderUtils.isUnset(quorum)) {
+ quorum = registryQuorum;
+ }
+ if (isUnset(quorum)) {
+ throw new BadConfigException("No Zookeeper quorum defined");
+ }
+ ZKPathBuilder zkPaths = new ZKPathBuilder(getAppName(),
+ getUsername(),
+ clustername,
+ registryQuorum,
+ quorum);
+ String zookeeperRoot = buildInfo.getAppZKPath();
+
+ if (isSet(zookeeperRoot)) {
+ zkPaths.setAppPath(zookeeperRoot);
+ } else {
+ String createDefaultZkNode = appConf.getGlobalOptions().getOption(AgentKeys.CREATE_DEF_ZK_NODE, "false");
+ if (createDefaultZkNode.equals("true")) {
+ String defaultZKPath = createZookeeperNode(clustername, false);
+ log.info("ZK node created for application instance: {}.", defaultZKPath);
+ if (defaultZKPath != null) {
+ zkPaths.setAppPath(defaultZKPath);
+ }
+ } else {
+ // create AppPath if default is being used
+ String defaultZKPath = createZookeeperNode(clustername, true);
+ log.info("ZK node assigned to application instance: {}.", defaultZKPath);
+ zkPaths.setAppPath(defaultZKPath);
+ }
+ }
+
+ builder.addZKBinding(zkPaths);
+
+ //then propagate any package URI
+ if (buildInfo.packageURI != null) {
+ appConf.set(AgentKeys.PACKAGE_PATH, buildInfo.packageURI);
+ }
+
+ // make any substitutions needed at this stage
+ replaceTokens(appConf.getConfTree(), getUsername(), clustername);
+
+ // providers to validate what there is
+ AggregateConf instanceDescription = builder.getInstanceDescription();
+ validateInstanceDefinition(sliderAM, instanceDescription, sliderFileSystem);
+ validateInstanceDefinition(provider, instanceDescription, sliderFileSystem);
+ try {
+ persistInstanceDefinition(overwrite, appconfdir, builder);
+ } catch (LockAcquireFailedException e) {
+ log.warn("Failed to get a Lock on {} : {}", builder, e);
+ throw new BadClusterStateException("Failed to save " + clustername
+ + ": " + e);
+ }
+ }
+
+ protected void persistInstanceDefinition(boolean overwrite,
+ Path appconfdir,
+ InstanceBuilder builder)
+ throws IOException, SliderException, LockAcquireFailedException {
+ builder.persist(appconfdir, overwrite);
+ }
+
+ @VisibleForTesting
+ public static void replaceTokens(ConfTree conf,
+ String userName, String clusterName) throws IOException {
+ Map<String,String> newglobal = new HashMap<String,String>();
+ for (Entry<String,String> entry : conf.global.entrySet()) {
+ newglobal.put(entry.getKey(), replaceTokens(entry.getValue(),
+ userName, clusterName));
+ }
+ conf.global.putAll(newglobal);
+
+ Map<String,List<String>> newcred = new HashMap<String,List<String>>();
+ for (Entry<String,List<String>> entry : conf.credentials.entrySet()) {
+ List<String> resultList = new ArrayList<String>();
+ for (String v : entry.getValue()) {
+ resultList.add(replaceTokens(v, userName, clusterName));
+ }
+ newcred.put(replaceTokens(entry.getKey(), userName, clusterName),
+ resultList);
+ }
+ conf.credentials.clear();
+ conf.credentials.putAll(newcred);
+ }
+
+ private static String replaceTokens(String s, String userName,
+ String clusterName) throws IOException {
+ return s.replaceAll(Pattern.quote("${USER}"), userName)
+ .replaceAll(Pattern.quote("${CLUSTER_NAME}"), clusterName);
+ }
+
+ public FsPermission getClusterDirectoryPermissions(Configuration conf) {
+ String clusterDirPermsOct =
+ conf.get(CLUSTER_DIRECTORY_PERMISSIONS,
+ DEFAULT_CLUSTER_DIRECTORY_PERMISSIONS);
+ return new FsPermission(clusterDirPermsOct);
+ }
+
+ /**
+ * Verify that the Resource MAnager is configured, if not fail
+ * with a useful error message
+ * @throws BadCommandArgumentsException the exception raised on an invalid config
+ */
+ public void verifyBindingsDefined() throws BadCommandArgumentsException {
+ InetSocketAddress rmAddr = SliderUtils.getRmAddress(getConfig());
+ if (!SliderUtils.isAddressDefined(rmAddr)) {
+ throw new BadCommandArgumentsException(
+ "No valid Resource Manager address provided in the argument "
+ + Arguments.ARG_MANAGER
+ + " or the configuration property "
+ + YarnConfiguration.RM_ADDRESS
+ + " value :" + rmAddr);
+ }
+
+ }
+
+ /**
+ * Load and start a cluster specification.
+ * This assumes that all validation of args and cluster state
+ * have already taken place
+ *
+ * @param clustername name of the cluster.
+ * @param launchArgs launch arguments
+ * @return the exit code
+ * @throws YarnException
+ * @throws IOException
+ */
+ private int startCluster(String clustername,
+ LaunchArgsAccessor launchArgs) throws
+ YarnException,
+ IOException {
+ Path clusterDirectory = sliderFileSystem.buildClusterDirPath(clustername);
+ AggregateConf instanceDefinition = loadInstanceDefinitionUnresolved(
+ clustername,
+ clusterDirectory);
+
+ LaunchedApplication launchedApplication =
+ launchApplication(clustername, clusterDirectory, instanceDefinition,
+ serviceArgs.isDebug());
+ applicationId = launchedApplication.getApplicationId();
+
+ return waitForAppAccepted(launchedApplication, launchArgs.getWaittime());
+ }
+
+ /**
+ * Load the instance definition. It is not resolved at this point
+ * @param name cluster name
+ * @param clusterDirectory cluster dir
+ * @return the loaded configuration
+ * @throws IOException
+ * @throws SliderException
+ * @throws UnknownApplicationInstanceException if the file is not found
+ */
+ public AggregateConf loadInstanceDefinitionUnresolved(String name,
+ Path clusterDirectory) throws
+ IOException,
+ SliderException {
+
+ try {
+ AggregateConf definition =
+ InstanceIO.loadInstanceDefinitionUnresolved(sliderFileSystem,
+ clusterDirectory);
+ return definition;
+ } catch (FileNotFoundException e) {
+ throw UnknownApplicationInstanceException.unknownInstance(name, e);
+ }
+ }
+ /**
+ * Load the instance definition.
+ * @param name cluster name
+ * @param resolved flag to indicate the cluster should be resolved
+ * @return the loaded configuration
+ * @throws IOException IO problems
+ * @throws SliderException slider explicit issues
+ * @throws UnknownApplicationInstanceException if the file is not found
+ */
+ public AggregateConf loadInstanceDefinition(String name,
+ boolean resolved) throws
+ IOException,
+ SliderException {
+
+ Path clusterDirectory = sliderFileSystem.buildClusterDirPath(name);
+ AggregateConf instanceDefinition = loadInstanceDefinitionUnresolved(
+ name,
+ clusterDirectory);
+ if (resolved) {
+ instanceDefinition.resolve();
+ }
+ return instanceDefinition;
+
+ }
+
+ /**
+ *
+ * @param clustername name of the cluster
+ * @param clusterDirectory cluster dir
+ * @param instanceDefinition the instance definition
+ * @param debugAM enable debug AM options
+ * @return the launched application
+ * @throws YarnException
+ * @throws IOException
+ */
+ public LaunchedApplication launchApplication(String clustername,
+ Path clusterDirectory,
+ AggregateConf instanceDefinition,
+ boolean debugAM)
+ throws YarnException, IOException {
+
+
+ deployedClusterName = clustername;
+ SliderUtils.validateClusterName(clustername);
+ verifyNoLiveClusters(clustername);
+ Configuration config = getConfig();
+ lookupZKQuorum();
+ boolean clusterSecure = SliderUtils.isHadoopClusterSecure(config);
+ //create the Slider AM provider -this helps set up the AM
+ SliderAMClientProvider sliderAM = new SliderAMClientProvider(config);
+
+ instanceDefinition.resolve();
+ launchedInstanceDefinition = instanceDefinition;
+
+ ConfTreeOperations internalOperations =
+ instanceDefinition.getInternalOperations();
+ MapOperations internalOptions = internalOperations.getGlobalOptions();
+ ConfTreeOperations resourceOperations =
+ instanceDefinition.getResourceOperations();
+ ConfTreeOperations appOperations =
+ instanceDefinition.getAppConfOperations();
+ Path generatedConfDirPath =
+ createPathThatMustExist(internalOptions.getMandatoryOption(
+ InternalKeys.INTERNAL_GENERATED_CONF_PATH));
+ Path snapshotConfPath =
+ createPathThatMustExist(internalOptions.getMandatoryOption(
+ InternalKeys.INTERNAL_SNAPSHOT_CONF_PATH));
+
+
+ // cluster Provider
+ AbstractClientProvider provider = createClientProvider(
+ internalOptions.getMandatoryOption(
+ InternalKeys.INTERNAL_PROVIDER_NAME));
+ // make sure the conf dir is valid;
+
+ if (log.isDebugEnabled()) {
+ log.debug(instanceDefinition.toString());
+ }
+ MapOperations sliderAMResourceComponent =
+ resourceOperations.getOrAddComponent(SliderKeys.COMPONENT_AM);
+ MapOperations resourceGlobalOptions = resourceOperations.getGlobalOptions();
+
+ // add the tags if available
+ Set<String> applicationTags = provider.getApplicationTags(sliderFileSystem,
+ appOperations.getGlobalOptions().get(AgentKeys.APP_DEF));
+ AppMasterLauncher amLauncher = new AppMasterLauncher(clustername,
+ SliderKeys.APP_TYPE,
+ config,
+ sliderFileSystem,
+ yarnClient,
+ clusterSecure,
+ sliderAMResourceComponent,
+ resourceGlobalOptions,
+ applicationTags);
+
+ ApplicationId appId = amLauncher.getApplicationId();
+ // set the application name;
+ amLauncher.setKeepContainersOverRestarts(true);
+
+ int maxAppAttempts = config.getInt(KEY_AM_RESTART_LIMIT, 0);
+ amLauncher.setMaxAppAttempts(maxAppAttempts);
+
+ sliderFileSystem.purgeAppInstanceTempFiles(clustername);
+ Path tempPath = sliderFileSystem.createAppInstanceTempPath(
+ clustername,
+ appId.toString() + "/am");
+ String libdir = "lib";
+ Path libPath = new Path(tempPath, libdir);
+ sliderFileSystem.getFileSystem().mkdirs(libPath);
+ log.debug("FS={}, tempPath={}, libdir={}", sliderFileSystem.toString(),
+ tempPath, libPath);
+ // set local resources for the application master
+ // local files or archives as needed
+ // In this scenario, the jar file for the application master is part of the local resources
+ Map<String, LocalResource> localResources = amLauncher.getLocalResources();
+ // conf directory setup
+ Path remoteConfPath = null;
+ String relativeConfDir = null;
+ String confdirProp =
+ System.getProperty(SliderKeys.PROPERTY_CONF_DIR);
+ if (confdirProp == null || confdirProp.isEmpty()) {
+ log.debug("No local configuration directory provided as system property");
+ } else {
+ File confDir = new File(confdirProp);
+ if (!confDir.exists()) {
+ throw new BadConfigException(E_CONFIGURATION_DIRECTORY_NOT_FOUND,
+ confDir);
+ }
+ Path localConfDirPath = SliderUtils.createLocalPath(confDir);
+ log.debug("Copying AM configuration data from {}", localConfDirPath);
+ remoteConfPath = new Path(clusterDirectory,
+ SliderKeys.SUBMITTED_CONF_DIR);
+ SliderUtils.copyDirectory(config, localConfDirPath, remoteConfPath,
+ null);
+ }
+ // the assumption here is that minimr cluster => this is a test run
+ // and the classpath can look after itself
+
+ boolean usingMiniMRCluster = getUsingMiniMRCluster();
+ if (!usingMiniMRCluster) {
+
+ log.debug("Destination is not a MiniYARNCluster -copying full classpath");
+
+ // insert conf dir first
+ if (remoteConfPath != null) {
+ relativeConfDir = SliderKeys.SUBMITTED_CONF_DIR;
+ Map<String, LocalResource> submittedConfDir =
+ sliderFileSystem.submitDirectory(remoteConfPath,
+ relativeConfDir);
+ SliderUtils.mergeMaps(localResources, submittedConfDir);
+ }
+ }
+ // build up the configuration
+ // IMPORTANT: it is only after this call that site configurations
+ // will be valid.
+
+ propagatePrincipals(config, instanceDefinition);
+ // validate security data
+/*
+ // turned off until tested
+ SecurityConfiguration securityConfiguration =
+ new SecurityConfiguration(config,
+ instanceDefinition, clustername);
+
+*/
+ Configuration clientConfExtras = new Configuration(false);
+ // then build up the generated path.
+ FsPermission clusterPerms = getClusterDirectoryPermissions(config);
+ SliderUtils.copyDirectory(config, snapshotConfPath, generatedConfDirPath,
+ clusterPerms);
+
+
+ // standard AM resources
+ sliderAM.prepareAMAndConfigForLaunch(sliderFileSystem,
+ config,
+ amLauncher,
+ instanceDefinition,
+ snapshotConfPath,
+ generatedConfDirPath,
+ clientConfExtras,
+ libdir,
+ tempPath,
+ usingMiniMRCluster);
+ //add provider-specific resources
+ provider.prepareAMAndConfigForLaunch(sliderFileSystem,
+ config,
+ amLauncher,
+ instanceDefinition,
+ snapshotConfPath,
+ generatedConfDirPath,
+ clientConfExtras,
+ libdir,
+ tempPath,
+ usingMiniMRCluster);
+
+ // now that the site config is fully generated, the provider gets
+ // to do a quick review of them.
+ log.debug("Preflight validation of cluster configuration");
+
+
+ sliderAM.preflightValidateClusterConfiguration(sliderFileSystem,
+ clustername,
+ config,
+ instanceDefinition,
+ clusterDirectory,
+ generatedConfDirPath,
+ clusterSecure
+ );
+
+ provider.preflightValidateClusterConfiguration(sliderFileSystem,
+ clustername,
+ config,
+ instanceDefinition,
+ clusterDirectory,
+ generatedConfDirPath,
+ clusterSecure
+ );
+
+
+ // TODO: consider supporting apps that don't have an image path
+ Path imagePath =
+ SliderUtils.extractImagePath(sliderFileSystem, internalOptions);
+ if (sliderFileSystem.maybeAddImagePath(localResources, imagePath)) {
+ log.debug("Registered image path {}", imagePath);
+ }
+
+ // build the environment
+ amLauncher.putEnv(
+ SliderUtils.buildEnvMap(sliderAMResourceComponent));
+ ClasspathConstructor classpath = SliderUtils.buildClasspath(relativeConfDir,
+ libdir,
+ getConfig(),
+ usingMiniMRCluster);
+ amLauncher.setClasspath(classpath);
+ if (log.isDebugEnabled()) {
+ log.debug("AM classpath={}", classpath);
+ log.debug("Environment Map:\n{}",
+ SliderUtils.stringifyMap(amLauncher.getEnv()));
+ log.debug("Files in lib path\n{}", sliderFileSystem.listFSDir(libPath));
+ }
+
+ // rm address
+
+ InetSocketAddress rmSchedulerAddress;
+ try {
+ rmSchedulerAddress = SliderUtils.getRmSchedulerAddress(config);
+ } catch (IllegalArgumentException e) {
+ throw new BadConfigException("%s Address invalid: %s",
+ YarnConfiguration.RM_SCHEDULER_ADDRESS,
+ config.get(
+ YarnConfiguration.RM_SCHEDULER_ADDRESS)
+ );
+
+ }
+ String rmAddr = NetUtils.getHostPortString(rmSchedulerAddress);
+
+ JavaCommandLineBuilder commandLine = new JavaCommandLineBuilder();
+ // insert any JVM options);
+ sliderAM.addJVMOptions(instanceDefinition, commandLine);
+ // enable asserts if the text option is set
+ commandLine.enableJavaAssertions();
+ // add the AM sevice entry point
+ commandLine.add(SliderAppMaster.SERVICE_CLASSNAME);
+
+ // create action and the cluster name
+ commandLine.add(ACTION_CREATE, clustername);
+
+ // debug
+ if (debugAM) {
+ commandLine.add(Arguments.ARG_DEBUG);
+ }
+
+ // set the cluster directory path
+ commandLine.add(Arguments.ARG_CLUSTER_URI, clusterDirectory.toUri());
+
+ if (!isUnset(rmAddr)) {
+ commandLine.add(Arguments.ARG_RM_ADDR, rmAddr);
+ }
+
+ if (serviceArgs.getFilesystemBinding() != null) {
+ commandLine.add(Arguments.ARG_FILESYSTEM, serviceArgs.getFilesystemBinding());
+ }
+
+ addConfOptionToCLI(commandLine, config, REGISTRY_PATH,
+ DEFAULT_REGISTRY_PATH);
+ addMandatoryConfOptionToCLI(commandLine, config,
+ RegistryConstants.KEY_REGISTRY_ZK_QUORUM);
+
+ if (clusterSecure) {
+ // if the cluster is secure, make sure that
+ // the relevant security settings go over
+/*
+ addConfOptionToCLI(commandLine, config, KEY_SECURITY);
+*/
+ addConfOptionToCLI(commandLine,
+ config,
+ DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY);
+ }
+ // write out the path output
+ commandLine.addOutAndErrFiles(STDOUT_AM, STDERR_AM);
+
+ String cmdStr = commandLine.build();
+ log.debug("Completed setting up app master command {}", cmdStr);
+
+ amLauncher.addCommandLine(commandLine);
+
+ // the Slider AM gets to configure the AM requirements, not the custom provider
+ sliderAM.prepareAMResourceRequirements(sliderAMResourceComponent,
+ amLauncher.getResource());
+
+
+ // Set the priority for the application master
+
+ int amPriority = config.getInt(KEY_YARN_QUEUE_PRIORITY,
+ DEFAULT_YARN_QUEUE_PRIORITY);
+
+
+ amLauncher.setPriority(amPriority);
+
+ // Set the queue to which this application is to be submitted in the RM
+ // Queue for App master
+ String amQueue = config.get(KEY_YARN_QUEUE, DEFAULT_YARN_QUEUE);
+ String suppliedQueue = internalOperations.getGlobalOptions().get(InternalKeys.INTERNAL_QUEUE);
+ if(!SliderUtils.isUnset(suppliedQueue)) {
+ amQueue = suppliedQueue;
+ log.info("Using queue {} for the application instance.", amQueue);
+ }
+
+ if (amQueue != null) {
+ amLauncher.setQueue(amQueue);
+ }
+
+ // Submit the application to the applications manager
+ // SubmitApplicationResponse submitResp = applicationsManager.submitApplication(appRequest);
+ // Ignore the response as either a valid response object is returned on success
+ // or an exception thrown to denote some form of a failure
+
+
+ // submit the application
+ LaunchedApplication launchedApplication = amLauncher.submitApplication();
+ return launchedApplication;
+ }
+
+
+ /**
+ * Wait for the launched app to be accepted
+ * @param waittime time in millis
+ * @return exit code
+ * @throws YarnException
+ * @throws IOException
+ */
+ public int waitForAppAccepted(LaunchedApplication launchedApplication,
+ int waittime) throws
+ YarnException,
+ IOException {
+ assert launchedApplication != null;
+ int exitCode;
+ // wait for the submit state to be reached
+ ApplicationReport report = launchedApplication.monitorAppToState(
+ YarnApplicationState.ACCEPTED,
+ new Duration(Constants.ACCEPT_TIME));
+
+
+ // may have failed, so check that
+ if (SliderUtils.hasAppFinished(report)) {
+ exitCode = buildExitCode(report);
+ } else {
+ // exit unless there is a wait
+ exitCode = EXIT_SUCCESS;
+
+ if (waittime != 0) {
+ // waiting for state to change
+ Duration duration = new Duration(waittime * 1000);
+ duration.start();
+ report = launchedApplication.monitorAppToState(
+ YarnApplicationState.RUNNING, duration);
+ if (report != null &&
+ report.getYarnApplicationState() == YarnApplicationState.RUNNING) {
+ exitCode = EXIT_SUCCESS;
+ } else {
+
+ launchedApplication.kill("");
+ exitCode = buildExitCode(report);
+ }
+ }
+ }
+ return exitCode;
+ }
+
+
+ /**
+ * Propagate any critical principals from the current site config down to the HBase one.
+ * @param config config to read from
+ * @param clusterSpec cluster spec
+ */
+ private void propagatePrincipals(Configuration config,
+ AggregateConf clusterSpec) {
+ String dfsPrincipal = config.get(
+ DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY);
+ if (dfsPrincipal != null) {
+ String siteDfsPrincipal = OptionKeys.SITE_XML_PREFIX +
+ DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY;
+ clusterSpec.getAppConfOperations().getGlobalOptions().putIfUnset(
+ siteDfsPrincipal,
+ dfsPrincipal);
+ }
+ }
+
+
+ private boolean addConfOptionToCLI(CommandLineBuilder cmdLine,
+ Configuration conf,
+ String key) {
+ String val = conf.get(key);
+ return defineIfSet(cmdLine, key, val);
+ }
+
+ private String addConfOptionToCLI(CommandLineBuilder cmdLine,
+ Configuration conf,
+ String key,
+ String defVal) {
+ String val = conf.get(key, defVal);
+ define(cmdLine, key, val);
+ return val;
+ }
+
+ /**
+ * Add a <code>-D key=val</code> command to the CLI
+ * @param cmdLine command line
+ * @param key key
+ * @param val value
+ */
+ private void define(CommandLineBuilder cmdLine, String key, String val) {
+ Preconditions.checkArgument(key != null, "null key");
+ Preconditions.checkArgument(val != null, "null value");
+ cmdLine.add(Arguments.ARG_DEFINE, key + "=" + val);
+ }
+
+ /**
+ * Add a <code>-D key=val</code> command to the CLI if <code>val</code>
+ * is not null
+ * @param cmdLine command line
+ * @param key key
+ * @param val value
+ */
+ private boolean defineIfSet(CommandLineBuilder cmdLine, String key, String val) {
+ Preconditions.checkArgument(key != null, "null key");
+ if (val != null) {
+ define(cmdLine, key, val);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ private void addMandatoryConfOptionToCLI(CommandLineBuilder cmdLine,
+ Configuration conf,
+ String key) throws BadConfigException {
+ if (!addConfOptionToCLI(cmdLine, conf, key)) {
+ throw new BadConfigException("Missing configuration option: " + key);
+ }
+ }
+
+ /**
+ * Create a path that must exist in the cluster fs
+ * @param uri uri to create
+ * @return the path
+ * @throws FileNotFoundException if the path does not exist
+ */
+ public Path createPathThatMustExist(String uri) throws
+ SliderException,
+ IOException {
+ return sliderFileSystem.createPathThatMustExist(uri);
+ }
+
+ /**
+ * verify that a live cluster isn't there
+ * @param clustername cluster name
+ * @throws SliderException with exit code EXIT_CLUSTER_LIVE
+ * if a cluster of that name is either live or starting up.
+ */
+ public void verifyNoLiveClusters(String clustername) throws
+ IOException,
+ YarnException {
+ List<ApplicationReport> existing = findAllLiveInstances(clustername);
+
+ if (!existing.isEmpty()) {
+ throw new SliderException(EXIT_APPLICATION_IN_USE,
+ clustername + ": " + E_CLUSTER_RUNNING + " :" +
+ existing.get(0));
+ }
+ }
+
+ public String getUsername() throws IOException {
+ return RegistryUtils.currentUser();
+ }
+
+ /**
+ * Get the name of any deployed cluster
+ * @return the cluster name
+ */
+ public String getDeployedClusterName() {
+ return deployedClusterName;
+ }
+
+ @VisibleForTesting
+ public void setDeployedClusterName(String deployedClusterName) {
+ this.deployedClusterName = deployedClusterName;
+ }
+
+ /**
+ * ask if the client is using a mini MR cluster
+ * @return true if they are
+ */
+ private boolean getUsingMiniMRCluster() {
+ return getConfig().getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER,
+ false);
+ }
+
+ /**
+ * Get the application name used in the zookeeper root paths
+ * @return an application-specific path in ZK
+ */
+ private String getAppName() {
+ return "slider";
+ }
+
+ /**
+ * Wait for the app to start running (or go past that state)
+ * @param duration time to wait
+ * @return the app report; null if the duration turned out
+ * @throws YarnException YARN or app issues
+ * @throws IOException IO problems
+ */
+ @VisibleForTesting
+ public ApplicationReport monitorAppToRunning(Duration duration)
+ throws YarnException, IOException {
+ return monitorAppToState(YarnApplicationState.RUNNING, duration);
+ }
+
+ /**
+ * Build an exit code for an application Id and its report.
+ * If the report parameter is null, the app is killed
+ * @param report report
+ * @return the exit code
+ */
+ private int buildExitCode(ApplicationReport report) throws
+ IOException,
+ YarnException {
+ if (null == report) {
+ forceKillApplication("Reached client specified timeout for application");
+ return EXIT_TIMED_OUT;
+ }
+
+ YarnApplicationState state = report.getYarnApplicationState();
+ FinalApplicationStatus dsStatus = report.getFinalApplicationStatus();
+ switch (state) {
+ case FINISHED:
+ if (FinalApplicationStatus.SUCCEEDED == dsStatus) {
+ log.info("Application has completed successfully");
+ return EXIT_SUCCESS;
+ } else {
+ log.info("Application finished unsuccessfully." +
+ "YarnState = {}, DSFinalStatus = {} Breaking monitoring loop",
+ state, dsStatus);
+ return EXIT_YARN_SERVICE_FINISHED_WITH_ERROR;
+ }
+
+ case KILLED:
+ log.info("Application did not finish. YarnState={}, DSFinalStatus={}",
+ state, dsStatus);
+ return EXIT_YARN_SERVICE_KILLED;
+
+ case FAILED:
+ log.info("Application Failed. YarnState={}, DSFinalStatus={}", state,
+ dsStatus);
+ return EXIT_YARN_SERVICE_FAILED;
+ default:
+ //not in any of these states
+ return EXIT_SUCCESS;
+ }
+ }
+
+ /**
+ * Monitor the submitted application for reaching the requested state.
+ * Will also report if the app reaches a later state (failed, killed, etc)
+ * Kill application if duration!= null & time expires.
+ * Prerequisite: the applicatin was launched.
+ * @param desiredState desired state.
+ * @param duration how long to wait -must be more than 0
+ * @return the application report -null on a timeout
+ * @throws YarnException
+ * @throws IOException
+ */
+ @VisibleForTesting
+ public ApplicationReport monitorAppToState(
+ YarnApplicationState desiredState,
+ Duration duration)
+ throws YarnException, IOException {
+ LaunchedApplication launchedApplication =
+ new LaunchedApplication(applicationId, yarnClient);
+ return launchedApplication.monitorAppToState(desiredState, duration);
+ }
+
+ @Override
+ public ApplicationReport getApplicationReport() throws
+ IOException,
+ YarnException {
+ return getApplicationReport(applicationId);
+ }
+
+ @Override
+ public boolean forceKillApplication(String reason)
+ throws YarnException, IOException {
+ if (applicationId != null) {
+ new LaunchedApplication(applicationId, yarnClient).forceKill(reason);
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * List Slider instances belonging to a specific user
+ * @param user user: "" means all users
+ * @return a possibly empty list of Slider AMs
+ */
+ @VisibleForTesting
+ public List<ApplicationReport> listSliderInstances(String user)
+ throws YarnException, IOException {
+ return YarnAppListClient.listInstances();
+ }
+
+ @Override
+ @VisibleForTesting
+ public int actionList(String clustername) throws IOException, YarnException {
+ verifyBindingsDefined();
+
+ String user = UserGroupInformation.getCurrentUser().getUserName();
+ List<ApplicationReport> instances = listSliderInstances(user);
+
+ if (isUnset(clustername)) {
+ log.info("Instances for {}: {}",
+ (user != null ? user : "all users"),
+ instances.size());
+ for (ApplicationReport report : instances) {
+ logAppReport(report);
+ }
+ return EXIT_SUCCESS;
+ } else {
+ SliderUtils.validateClusterName(clustername);
+ log.debug("Listing cluster named {}", clustername);
+ ApplicationReport report =
+ findClusterInInstanceList(instances, clustername);
+ if (report != null) {
+ logAppReport(report);
+ return EXIT_SUCCESS;
+ } else {
+ throw unknownClusterException(clustername);
+ }
+ }
+ }
+
+ /**
+ * Log the application report at INFO
+ * @param report report to log
+ */
+ public void logAppReport(ApplicationReport report) {
+ log.info(SliderUtils.appReportToString(report, "\n"));
+ }
+
+ @Override
+ @VisibleForTesting
+ public int actionFlex(String name, ActionFlexArgs args) throws YarnException, IOException {
+ verifyBindingsDefined();
+ SliderUtils.validateClusterName(name);
+ log.debug("actionFlex({})", name);
+ Map<String, Integer> roleInstances = new HashMap<String, Integer>();
+ Map<String, String> roleMap = args.getComponentMap();
+ for (Map.Entry<String, String> roleEntry : roleMap.entrySet()) {
+ String key = roleEntry.getKey();
+ String val = roleEntry.getValue();
+ try {
+ roleInstances.put(key, Integer.valueOf(val));
+ } catch (NumberFormatException e) {
+ throw new BadCommandArgumentsException("Requested count of role %s" +
+ " is not a number: \"%s\"",
+ key, val);
+ }
+ }
+ return flex(name, roleInstances);
+ }
+
+ @Override
+ @VisibleForTesting
+ public int actionExists(String name, boolean checkLive) throws YarnException, IOException {
+ verifyBindingsDefined();
+ SliderUtils.validateClusterName(name);
+ log.debug("actionExists({}, {})", name, checkLive);
+
+ //initial probe for a cluster in the filesystem
+ Path clusterDirectory = sliderFileSystem.buildClusterDirPath(name);
+ if (!sliderFileSystem.getFileSystem().exists(clusterDirectory)) {
+ throw unknownClusterException(name);
+ }
+
+ //test for liveness if desired
+
+ if (checkLive) {
+ ApplicationReport instance = findInstance(name);
+ if (instance == null) {
+ log.info("Cluster {} not running", name);
+ return EXIT_FALSE;
+ } else {
+ // the app exists, but it may be in a terminated state
+ SliderUtils.OnDemandReportStringifier report =
+ new SliderUtils.OnDemandReportStringifier(instance);
+ YarnApplicationState state =
+ instance.getYarnApplicationState();
+ if (state.ordinal() >= YarnApplicationState.FINISHED.ordinal()) {
+ //cluster in the list of apps but not running
+ log.info("Cluster {} found but is in state {}", name, state);
+ log.debug("State {}", report);
+ return EXIT_FALSE;
+ }
+ log.info("Cluster {} is live:\n{}", name, report);
+ }
+ } else {
+ log.info("Cluster {} exists", name);
+ }
+ return EXIT_SUCCESS;
+ }
+
+
+ @Override
+ public int actionKillContainer(String name,
+ ActionKillContainerArgs args) throws YarnException, IOException {
+ String id = args.id;
+ if (SliderUtils.isUnset(id)) {
+ throw new BadCommandArgumentsException("Missing container id");
+ }
+ log.info("killingContainer {}:{}", name, id);
+ SliderClusterOperations clusterOps =
+ new SliderClusterOperations(bondToCluster(name));
+ try {
+ clusterOps.killContainer(id);
+ } catch (NoSuchNodeException e) {
+ throw new BadClusterStateException("Container %s not found in cluster %s",
+ id, name);
+ }
+ return EXIT_SUCCESS;
+ }
+
+ @Override
+ public String actionEcho(String name, ActionEchoArgs args) throws
+ YarnException,
+ IOException {
+ String message = args.message;
+ if (message == null) {
+ throw new BadCommandArgumentsException("missing message");
+ }
+ SliderClusterOperations clusterOps =
+ new SliderClusterOperations(bondToCluster(name));
+ return clusterOps.echo(message);
+ }
+
+ /**
+ * Get at the service registry operations
+ * @return registry client -valid after the service is inited.
+ */
+ public YarnAppListClient getYarnAppListClient() {
+ return YarnAppListClient;
+ }
+
+ /**
+ * Find an instance of an application belonging to the current user
+ * @param appname application name
+ * @return the app report or null if none is found
+ * @throws YarnException YARN issues
+ * @throws IOException IO problems
+ */
+ private ApplicationReport findInstance(String appname)
+ throws YarnException, IOException {
+ return YarnAppListClient.findInstance(appname);
+ }
+
+ private RunningApplication findApplication(String appname)
+ throws YarnException, IOException {
+ ApplicationReport applicationReport = findInstance(appname);
+ return applicationReport != null ? new RunningApplication(yarnClient, applicationReport): null;
+
+ }
+
+ /**
+ * find all live instances of a specific app -if there is >1 in the cluster,
+ * this returns them all. State should be running or less
+ * @param appname application name
+ * @return the list of all matching application instances
+ */
+ private List<ApplicationReport> findAllLiveInstances(String appname)
+ throws YarnException, IOException {
+
+ return YarnAppListClient.findAllLiveInstances(appname);
+ }
+
+
+ public ApplicationReport findClusterInInstanceList(List<ApplicationReport> instances,
+ String appname) {
+ return yarnClient.findClusterInInstanceList(instances, appname);
+ }
+
+ /**
+ * Connect to a Slider AM
+ * @param app application report providing the details on the application
+ * @return an instance
+ * @throws YarnException
+ * @throws IOException
+ */
+ private SliderClusterProtocol connect(ApplicationReport app)
+ throws YarnException, IOException {
+
+ try {
+ return RpcBinder.getProxy(getConfig(),
+ yarnClient.getRmClient(),
+ app,
+ Constants.CONNECT_TIMEOUT,
+ Constants.RPC_TIMEOUT);
+ } catch (InterruptedException e) {
+ throw new SliderException(SliderExitCodes.EXIT_TIMED_OUT,
+ e,
+ "Interrupted waiting for communications with the Slider AM");
+ }
+ }
+
+ @Override
+ @VisibleForTesting
+ public int actionStatus(String clustername, ActionStatusArgs statusArgs) throws
+ YarnException,
+ IOException {
+ verifyBindingsDefined();
+ SliderUtils.validateClusterName(clustername);
+ String outfile = statusArgs.getOutput();
+ ClusterDescription status = getClusterDescription(clustername);
+ String text = status.toJsonString();
+ if (outfile == null) {
+ log.info(text);
+ } else {
+ status.save(new File(outfile).getAbsoluteFile());
+ }
+ return EXIT_SUCCESS;
+ }
+
+ @Override
+ public int actionVersion() {
+ SliderVersionInfo.loadAndPrintVersionInfo(log);
+ return EXIT_SUCCESS;
+ }
+
+ @Override
+ public int actionFreeze(String clustername,
+ ActionFreezeArgs freezeArgs) throws YarnException, IOException {
+ verifyBindingsDefined();
+ SliderUtils.validateClusterName(clustername);
+ int waittime = freezeArgs.getWaittime();
+ String text = freezeArgs.message;
+ boolean forcekill = freezeArgs.force;
+ log.debug("actionFreeze({}, reason={}, wait={}, force={})", clustername,
+ text,
+ waittime,
+ forcekill);
+
+ //is this actually a known cluster?
+ sliderFileSystem.locateInstanceDefinition(clustername);
+ ApplicationReport app = findInstance(clustername);
+ if (app == null) {
+ // exit early
+ log.info("Cluster {} not running", clustername);
+ // not an error to stop a stopped cluster
+ return EXIT_SUCCESS;
+ }
+ log.debug("App to stop was found: {}:\n{}", clustername,
+ new SliderUtils.OnDemandReportStringifier(app));
+ if (app.getYarnApplicationState().ordinal() >=
+ YarnApplicationState.FINISHED.ordinal()) {
+ log.info("Cluster {} is a terminated state {}", clustername,
+ app.getYarnApplicationState());
+ return EXIT_SUCCESS;
+ }
+ LaunchedApplication application = new LaunchedApplication(yarnClient, app);
+ applicationId = application.getApplicationId();
+
+ if (forcekill) {
+ //escalating to forced kill
+ application.kill("Forced stop of " + clustername +
+ ": " + text);
+ } else {
+ try {
+ SliderClusterProtocol appMaster = connect(app);
+ Messages.StopClusterRequestProto r =
+ Messages.StopClusterRequestProto
+ .newBuilder()
+ .setMessage(text)
+ .build();
+ appMaster.stopCluster(r);
+
+ log.debug("Cluster stop command issued");
+
+ } catch (YarnException e) {
+ log.warn("Exception while trying to terminate {}: {}", clustername, e);
+ return EXIT_FALSE;
+ } catch (IOException e) {
+ log.warn("Exception while trying to terminate {}: {}", clustername, e);
+ return EXIT_FALSE;
+ }
+ }
+
+ //wait for completion. We don't currently return an exception during this process
+ //as the stop operation has been issued, this is just YARN.
+ try {
+ if (waittime > 0) {
+ ApplicationReport applicationReport =
+ application.monitorAppToState(YarnApplicationState.FINISHED,
+ new Duration(waittime * 1000));
+ if (applicationReport == null) {
+ log.info("application did not shut down in time");
+ return EXIT_FALSE;
+ }
+ }
+
+// JDK7 } catch (YarnException | IOException e) {
+ } catch (YarnException e) {
+ log.warn("Exception while waiting for the cluster {} to shut down: {}",
+ clustername, e);
+ } catch ( IOException e) {
+ log.warn("Exception while waiting for the cluster {} to shut down: {}",
+ clustername, e);
+ }
+
+ return EXIT_SUCCESS;
+ }
+
+ @Override
+ public int actionThaw(String clustername, ActionThawArgs thaw) throws YarnException, IOException {
+ SliderUtils.validateClusterName(clustername);
+ // see if it is actually running and bail out;
+ verifyBindingsDefined();
+ verifyNoLiveClusters(clustername);
+
+
+ //start the cluster
+ return startCluster(clustername, thaw);
+ }
+
+ /**
+ * Implement flexing
+ * @param clustername name of the cluster
+ * @param roleInstances map of new role instances
+ * @return EXIT_SUCCESS if the #of nodes in a live cluster changed
+ * @throws YarnException
+ * @throws IOException
+ */
+ public int flex(String clustername, Map<String, Integer> roleInstances)
+ throws YarnException, IOException {
+ verifyBindingsDefined();
+ SliderUtils.validateClusterName(clustername);
+ Path clusterDirectory = sliderFileSystem.buildClusterDirPath(clustername);
+ AggregateConf instanceDefinition = loadInstanceDefinitionUnresolved(
+ clustername,
+ clusterDirectory);
+
+ ConfTreeOperations resources =
+ instanceDefinition.getResourceOperations();
+ for (Map.Entry<String, Integer> entry : roleInstances.entrySet()) {
+ String role = entry.getKey();
+ int count = entry.getValue();
+ resources.getOrAddComponent(role).put(ResourceKeys.COMPONENT_INSTANCES,
+ Integer.toString(count));
+
+ log.debug("Flexed cluster specification ( {} -> {}) : \n{}",
+ role,
+ count,
+ resources);
+ }
+ SliderAMClientProvider sliderAM = new SliderAMClientProvider(getConfig());
+ AbstractClientProvider provider = createClientProvider(
+ instanceDefinition.getInternalOperations().getGlobalOptions().getMandatoryOption(
+ InternalKeys.INTERNAL_PROVIDER_NAME));
+ // slider provider to validate what there is
+ validateInstanceDefinition(sliderAM, instanceDefinition, sliderFileSystem);
+ validateInstanceDefinition(provider, instanceDefinition, sliderFileSystem);
+
+ int exitCode = EXIT_FALSE;
+ // save the specification
+ try {
+ InstanceIO.updateInstanceDefinition(sliderFileSystem, clusterDirectory, instanceDefinition);
+ } catch (LockAcquireFailedException e) {
+ // lock failure
+ log.debug("Failed to lock dir {}", clusterDirectory, e);
+ log.warn("Failed to save new resource definition to {} : {}", clusterDirectory, e);
+ }
+
+ // now see if it is actually running and tell it about the update if it is
+ ApplicationReport instance = findInstance(clustername);
+ if (instance != null) {
+ log.info("Flexing running cluster");
+ SliderClusterProtocol appMaster = connect(instance);
+ SliderClusterOperations clusterOps = new SliderClusterOperations(appMaster);
+ clusterOps.flex(instanceDefinition.getResources());
+ log.info("application instance size updated");
+ exitCode = EXIT_SUCCESS;
+ } else {
+ log.info("No running instance to update");
+ }
+ return exitCode;
+ }
+
+ /**
+ * Validate an instance definition against a provider.
+ * @param provider the provider performing the validation
+ * @param instanceDefinition the instance definition
+ * @throws SliderException if invalid.
+ */
+ protected void validateInstanceDefinition(AbstractClientProvider provider,
+ AggregateConf instanceDefinition, SliderFileSystem fs) throws SliderException {
+ try {
+ provider.validateInstanceDefinition(instanceDefinition, fs);
+ } catch (SliderException e) {
+ //problem, reject it
+ log.info("Error {} validating application instance definition ", e.getMessage());
+ log.debug("Error validating application instance definition ", e);
+ log.info(instanceDefinition.toString());
+ throw e;
+ }
+ }
+
+
+ /**
+ * Load the persistent cluster description
+ * @param clustername name of the cluster
+ * @return the description in the filesystem
+ * @throws IOException any problems loading -including a missing file
+ */
+ @VisibleForTesting
+ public AggregateConf loadPersistedClusterDescription(String clustername)
+ throws IOException, SliderException, LockAcquireFailedException {
+ Path clusterDirectory = sliderFileSystem.buildClusterDirPath(clustername);
+ ConfPersister persister = new ConfPersister(sliderFileSystem, clusterDirectory);
+ AggregateConf instanceDescription = new AggregateConf();
+ persister.load(instanceDescription);
+ return instanceDescription;
+ }
+
+ /**
+ * Connect to a live cluster and get its current state
+ * @param clustername the cluster name
+ * @return its description
+ */
+ @VisibleForTesting
+ public ClusterDescription getClusterDescription(String clustername) throws
+ YarnException,
+ IOException {
+ SliderClusterOperations clusterOperations =
+ createClusterOperations(clustername);
+ return clusterOperations.getClusterDescription();
+ }
+
+ /**
+ * Connect to the cluster and get its current state
+ * @return its description
+ */
+ @VisibleForTesting
+ public ClusterDescription getClusterDescription() throws
+ YarnException,
+ IOException {
+ return getClusterDescription(getDeployedClusterName());
+ }
+
+ /**
+ * List all node UUIDs in a role
+ * @param role role name or "" for all
+ * @return an array of UUID strings
+ * @throws IOException
+ * @throws YarnException
+ */
+ @VisibleForTesting
+ public String[] listNodeUUIDsByRole(String role) throws
+ IOException,
+ YarnException {
+ return createClusterOperations()
+ .listNodeUUIDsByRole(role);
+ }
+
+ /**
+ * List all nodes in a role. This is a double round trip: once to list
+ * the nodes in a role, another to get their details
+ * @param role component/role to look for
+ * @return an array of ContainerNode instances
+ * @throws IOException
+ * @throws YarnException
+ */
+ @VisibleForTesting
+ public List<ClusterNode> listClusterNodesInRole(String role) throws
+ IOException,
+ YarnException {
+ return createClusterOperations().listClusterNodesInRole(role);
+ }
+
+ /**
+ * Get the details on a list of uuids
+ * @param uuids uuids to ask for
+ * @return a possibly empty list of node details
+ * @throws IOException
+ * @throws YarnException
+ */
+ @VisibleForTesting
+ public List<ClusterNode> listClusterNodes(String[] uuids) throws
+ IOException,
+ YarnException {
+
+ if (uuids.length == 0) {
+ // short cut on an empty list
+ return new LinkedList<ClusterNode>();
+ }
+ return createClusterOperations().listClusterNodes(uuids);
+ }
+
+ /**
+ * Get a node from the AM
+ * @param uuid uuid of node
+ * @return deserialized node
+ * @throws IOException IO problems
+ * @throws NoSuchNodeException if the node isn't found
+ */
+ @VisibleForTesting
+ public ClusterNode getNode(String uuid) throws IOException, YarnException {
+ return createClusterOperations().getNode(uuid);
+ }
+
+ /**
+ * Get the instance definition from the far end
+ */
+ @VisibleForTesting
+ public AggregateConf getLiveInstanceDefinition() throws IOException, YarnException {
+ return createClusterOperations().getInstanceDefinition();
+ }
+
+ /**
+ * Bond to a running cluster
+ * @param clustername cluster name
+ * @return the AM RPC client
+ * @throws SliderException if the cluster is unkown
+ */
+ private SliderClusterProtocol bondToCluster(String clustername) throws
+ YarnException,
+ IOException {
+ verifyBindingsDefined();
+ if (clustername == null) {
+ throw unknownClusterException("(undefined)");
+ }
+ ApplicationReport instance = findInstance(clustername);
+ if (null == instance) {
+ throw unknownClusterException(clustername);
+ }
+ return connect(instance);
+ }
+
+ /**
+ * Create a cluster operations instance against a given cluster
+ * @param clustername cluster name
+ * @return a bonded cluster operations instance
+ * @throws YarnException YARN issues
+ * @throws IOException IO problems
+ */
+ private SliderClusterOperations createClusterOperations(String clustername) throws
+ YarnException,
+ IOException {
+ SliderClusterProtocol sliderAM = bondToCluster(clustername);
+ return new SliderClusterOperations(sliderAM);
+ }
+
+ /**
+ * Create a cluster operations instance against the active cluster
+ * -returning any previous created one if held.
+ * @return a bonded cluster operations instance
+ * @throws YarnException YARN issues
+ * @throws IOException IO problems
+ */
+ public SliderClusterOperations createClusterOperations() throws
+ YarnException,
+ IOException {
+ if (sliderClusterOperations == null) {
+ sliderClusterOperations =
+ createClusterOperations(getDeployedClusterName());
+ }
+ return sliderClusterOperations;
+ }
+
+ /**
+ * Wait for an instance of a named role to be live (or past it in the lifecycle)
+ * @param role role to look for
+ * @param timeout time to wait
+ * @return the state. If still in CREATED, the cluster didn't come up
+ * in the time period. If LIVE, all is well. If >LIVE, it has shut for a reason
+ * @throws IOException IO
+ * @throws SliderException Slider
+ * @throws WaitTimeoutException if the wait timed out
+ */
+ @VisibleForTesting
+ public int waitForRoleInstanceLive(String role, long timeout)
+ throws WaitTimeoutException, IOException, YarnException {
+ return createClusterOperations().waitForRoleInstanceLive(role, timeout);
+ }
+
+ /**
+ * Generate an exception for an unknown cluster
+ * @param clustername cluster name
+ * @return an exception with text and a relevant exit code
+ */
+ public UnknownApplicationInstanceException unknownClusterException(String clustername) {
+ return UnknownApplicationInstanceException.unknownInstance(clustername);
+ }
+
+ @Override
+ public String toString() {
+ return "Slider Client in state " + getServiceState()
+ + " and Slider Application Instance " + deployedClusterName;
+ }
+
+ /**
+ * Get all YARN applications
+ * @return a possibly empty list
+ * @throws YarnException
+ * @throws IOException
+ */
+ @VisibleForTesting
+ public List<ApplicationReport> getApplications() throws YarnException, IOException {
+ return yarnClient.getApplications();
+ }
+
+ @VisibleForTesting
+ public ApplicationReport getApplicationReport(ApplicationId appId)
+ throws YarnException, IOException {
+ return new LaunchedApplication(appId, yarnClient).getApplicationReport();
+
+ }
+
+ /**
+ * The configuration used for deployment (after resolution)
+ * @return
+ */
+ @VisibleForTesting
+ public AggregateConf getLaunchedInstanceDefinition() {
+ return launchedInstanceDefinition;
+ }
+
+
+ @Override
+ public int actionResolve(ActionResolveArgs args)
+ throws YarnException, IOException {
+ // as this is an API entry point, validate
+ // the arguments
+ args.validate();
+ RegistryOperations operations = getRegistryOperations();
+ String path = SliderRegistryUtils.resolvePath(args.path);
+ ServiceRecordMarshal serviceRecordMarshal = new ServiceRecordMarshal();
+ try {
+ if (args.list) {
+ File destDir = args.destdir;
+ if (destDir != null) {
+ destDir.mkdirs();
+ }
+
+ Map<String, ServiceRecord> recordMap =
+ listServiceRecords(operations, path);
+
+ for (Entry<String, ServiceRecord> recordEntry : recordMap.entrySet()) {
+ String name = recordEntry.getKey();
+ ServiceRecord instance = recordEntry.getValue();
+ String json = serviceRecordMarshal.toJson(instance);
+ if (destDir == null) {
+ print(name);
+ print(json);
+ } else {
+ String filename = RegistryPathUtils.lastPathEntry(name) + ".json";
+ File jsonFile = new File(destDir, filename);
+ SliderUtils.write(jsonFile,
+ serviceRecordMarshal.toBytes(instance), true);
+ }
+ }
+ } else {
+ // resolve single entry
+ ServiceRecord instance = resolve(path);
+ File outFile = args.out;
+ if (args.destdir != null) {
+ outFile = new File(args.destdir, RegistryPathUtils.lastPathEntry(path));
+ }
+ if (outFile != null) {
+ SliderUtils.write(outFile, serviceRecordMarshal.toBytes(instance), true);
+ } else {
+ print(serviceRecordMarshal.toJson(instance));
+ }
+ }
+// TODO JDK7
+ } catch (PathNotFoundException e) {
+ // no record at this path
+ return EXIT_NOT_FOUND;
+ } catch (NoRecordException e) {
+ return EXIT_NOT_FOUND;
+ } catch (UnknownApplicationInstanceException e) {
+ return EXIT_NOT_FOUND;
+ } catch (InvalidRecordException e) {
+ // it is not a record
+ log.error("{}", e);
+ log.debug("{}", e, e);
+ return EXIT_EXCEPTION_THROWN;
+ }
+ return EXIT_SUCCESS;
+ }
+
+ @Override
+ public int actionRegistry(ActionRegistryArgs registryArgs) throws
+ YarnException,
+ IOException {
+ // as this is also a test entry point, validate
+ // the arguments
+ registryArgs.validate();
+ try {
+ if (registryArgs.list) {
+ actionRegistryList(registryArgs);
+ } else if (registryArgs.listConf) {
+ // list the configurations
+ actionRegistryListConfigsYarn(registryArgs);
+ } else if (SliderUtils.isSet(registryArgs.getConf)) {
+ // get a configuration
+ PublishedConfiguration publishedConfiguration =
+ actionRegistryGetConfig(registryArgs);
+ outputConfig(publishedConfiguration, registryArgs);
+ } else {
+ // it's an unknown command
+ log.info(ActionRegistryArgs.USAGE);
+ return EXIT_USAGE;
+ }
+// JDK7
+ } catch (FileNotFoundException e) {
+ log.info("{}", e);
+ log.debug("{}", e, e);
+ return EXIT_NOT_FOUND;
+ } catch (PathNotFoundException e) {
+ log.info("{}", e);
+ log.debug("{}", e, e);
+ return EXIT_NOT_FOUND;
+ }
+ return EXIT_SUCCESS;
+ }
+
+ /**
+ * Registry operation
+ *
+ * @param registryArgs registry Arguments
+ * @return the instances (for tests)
+ * @throws YarnException YARN problems
+ * @throws IOException Network or other problems
+ */
+ @VisibleForTesting
+ public Collection<ServiceRecord> actionRegistryList(
+ ActionRegistryArgs registryArgs)
+ throws YarnException, IOException {
+ String serviceType = registryArgs.serviceType;
+ String name = registryArgs.name;
+ RegistryOperations operations = getRegistryOperations();
+ Collection<ServiceRecord> serviceRecords;
+ if (StringUtils.isEmpty(name)) {
+ String path =
+ serviceclassPath(
+ currentUser(),
+ serviceType);
+
+ try {
+ Map<String, ServiceRecord> recordMap =
+ listServiceRecords(operations, path);
+ if (recordMap.isEmpty()) {
+ throw new UnknownApplicationInstanceException(
+ "No applications registered under " + path);
+ }
+ serviceRecords = recordMap.values();
+ } catch (PathNotFoundException e) {
+ throw new UnknownApplicationInstanceException(path, e);
+ }
+ } else {
+ ServiceRecord instance = lookupServiceRecord(registryArgs);
+ serviceRecords = new ArrayList<ServiceRecord>(1);
+ serviceRecords.add(instance);
+ }
+
+ for (ServiceRecord serviceRecord : serviceRecords) {
+ logInstance(serviceRecord, registryArgs.verbose);
+ }
+ return serviceRecords;
+ }
+
+ @Override
+ public int actionDiagnostic(ActionDiagnosticArgs diagnosticArgs) {
+ try {
+ if (diagnosticArgs.client) {
+ actionDiagnosticClient(diagnosticArgs);
+ } else if (SliderUtils.isSet(diagnosticArgs.application)) {
+ actionDiagnosticApplication(diagnosticArgs);
+ } else if (SliderUtils.isSet(diagnosticArgs.slider)) {
+ actionDiagnosticSlider(diagnosticArgs);
+ } else if (diagnosticArgs.yarn) {
+ actionDiagnosticYarn(diagnosticArgs);
+ } else if (diagnosticArgs.credentials) {
+ actionDiagnosticCredentials();
+ } else if (SliderUtils.isSet(diagnosticArgs.all)) {
+ actionDiagnosticAll(diagnosticArgs);
+ } else if (SliderUtils.isSet(diagnosticArgs.level)) {
+ actionDiagnosticIntelligent(diagnosticArgs);
+ } else {
+ // it's an unknown command
+ log.info(ActionDiagnosticArgs.USAGE);
+ return EXIT_USAGE;
+ }
+ } catch (Exception e) {
+ log.error(e.toString());
+ return EXIT_FALSE;
+ }
+ return EXIT_SUCCESS;
+ }
+
+ private void actionDiagnosticIntelligent(ActionDiagnosticArgs diagnosticArgs)
+ throws YarnException, IOException, URISyntaxException {
+ // not using member variable clustername because we want to place
+ // application name after --application option and member variable
+ // cluster name has to be put behind action
+ String clusterName = diagnosticArgs.level;
+
+ try {
+ SliderUtils.validateClientConfigFile();
+ log.info("Slider-client.xml is accessible");
+ } catch (IOException e) {
+ // we are catching exceptions here because those are indication of
+ // validation result, and we need to print them here
+ log.error("validation of slider-client.xml fails because: "
+ + e.toString(), e);
+ return;
+ }
+ SliderClusterOperations clusterOperations = createClusterOperations(clusterName);
+ // cluster not found exceptions will be thrown upstream
+ ClusterDescription clusterDescription = clusterOperations
+ .getClusterDescription();
+ log.info("Slider AppMaster is accessible");
+
+ if (clusterDescription.state == ClusterDescription.STATE_LIVE) {
+ AggregateConf instanceDefinition = clusterOperations
+ .getInstanceDefinition();
+ String imagePath = instanceDefinition.getInternalOperations().get(
+ InternalKeys.INTERNAL_APPLICATION_IMAGE_PATH);
+ //if null, that means slider uploaded the agent tarball for the user
+ //and we need to use where slider has put
+ if(imagePath == null){
+ ApplicationReport appReport = findInstance(clusterName);
+ Path path1 = sliderFileSystem.getTempPathForCluster(clusterName);
+ Path subPath = new Path(path1, appReport.getApplicationId().toString() + "/am");
+ imagePath = subPath.toString();
+ }
+ try {
+ SliderUtils.validateHDFSFile(sliderFileSystem, imagePath);
+ log.info("Slider agent package is properly installed");
+ } catch (FileNotFoundException e) {
+ log.error("can not find agent package: {}", e);
+ return;
+ } catch (IOException e) {
+ log.error("can not open agent package: {}", e, e);
+ return;
+ }
+ String pkgTarballPath = instanceDefinition.getAppConfOperations()
+ .getGlobalOptions().getMandatoryOption(AgentKeys.APP_DEF);
+ try {
+ SliderUtils.validateHDFSFile(sliderFileSystem, pkgTarballPath);
+ log.info("Application package is properly installed");
+ } catch (FileNotFoundException e) {
+ log.error("can not find application package: {}", e);
+ return;
+ } catch (IOException e) {
+ log.error("can not open application package: {} ", e);
+ return;
+ }
+ }
+ }
+
+ private void actionDiagnosticAll(ActionDiagnosticArgs diagnosticArgs)
+ throws IOException, YarnException {
+ //assign application name from param to each sub diagnostic function
+ diagnosticArgs.application = diagnosticArgs.all;
+ diagnosticArgs.slider = diagnosticArgs.all;
+ actionDiagnosticClient(diagnosticArgs);
+ actionDiagnosticApplication(diagnosticArgs);
+ actionDiagnosticSlider(diagnosticArgs);
+ actionDiagnosticYarn(diagnosticArgs);
+ actionDiagnosticCredentials();
+ }
+
+ private void actionDiagnosticCredentials() throws BadConfigException, IOException
+ {
+ if (SliderUtils.isHadoopClusterSecure(SliderUtils
+ .loadClientConfigurationResource())) {
+ String credentialCacheFileDescription = null;
+ try {
+ credentialCacheFileDescription = SliderUtils
+ .checkCredentialCacheFile();
+ } catch (BadConfigException e) {
+ log.error("The credential config is not valid: " + e.toString());
+ throw e;
+ } catch (IOException e) {
+ log.error("Unable to read the credential file: " + e.toString());
+ throw e;
+ }
+ log.info("Credential cache file for the current user: "
+ + credentialCacheFileDescription);
+ } else {
+ log.info("the cluster is not in secure mode");
+ }
+ }
+
+ private void actionDiagnosticYarn(ActionDiagnosticArgs diagnosticArgs) throws IOException, YarnException {
+ JSONObject converter = null;
+ log.info("the node in the YARN cluster has below state: ");
+ List<NodeReport> yarnClusterInfo;
+ try {
+ yarnClusterInfo = yarnClient.getNodeReports(NodeState.RUNNING);
+ } catch (YarnException e1) {
+ log.error("Exception happened when fetching node report from the YARN cluster: " + e1.toString());
+ throw e1;
+ } catch (IOException e1) {
+ log.error("Network problem happened when fetching node report YARN cluster: " + e1.toString());
+ throw e1;
+ }
+ for(NodeReport nodeReport : yarnClusterInfo){
+ log.info(nodeReport.toString());
+ }
+
+ if (diagnosticArgs.verbose) {
+ Writer configWriter = new StringWriter();
+ try {
+ Configuration.dumpConfiguration(yarnClient.getConfig(), configWriter);
+ } catch (IOException e1) {
+ log.error("Network problem happened when retrieving YARN config from YARN: " + e1.toString());
+ throw e1;
+ }
+ try {
+ converter = new JSONObject(configWriter.toString());
+ log.info("the configuration of the YARN cluster is: "
+ + converter.toString(2));
+
+ } catch (JSONException e) {
+ log.error("JSONException happened during parsing response from YARN: " + e.toString());
+ }
+ }
+ }
+
+ private void actionDiagnosticSlider(ActionDiagnosticArgs diagnosticArgs) throws YarnException, IOException
+ {
+ // not using member variable clustername because we want to place
+ // application name after --application option and member variable
+ // cluster name has to be put behind action
+ String clusterName = diagnosticArgs.slider;
+ SliderClusterOperations clusterOperations;
+ AggregateConf instanceDefinition = null;
+ try {
+ clusterOperations = createClusterOperations(clusterName);
+ instanceDefinition = clusterOperations
+ .getInstanceDefinition();
+ } catch (YarnException e) {
+ log.error("Exception happened when retrieving instance definition from YARN: " + e.toString());
+ throw e;
+ } catch (IOException e) {
+ log.error("Network problem happened when retrieving instance definition from YARN: " + e.toString());
+ throw e;
+ }
+ String imagePath = instanceDefinition.getInternalOperations().get(
+ InternalKeys.INTERNAL_APPLICATION_IMAGE_PATH);
+ //if null, it will be uploaded by Slider and thus at slider's path
+ if(imagePath == null){
+ ApplicationReport appReport = findInstance(clusterName);
+ Path path1 = sliderFileSystem.getTempPathForCluster(clusterName);
+ Path subPath = new Path(path1, appReport.getApplicationId().toString() + "/am");
+ imagePath = subPath.toString();
+ }
+ log.info("The path of slider agent tarball on HDFS is: " + imagePath);
+ }
+
+ private void actionDiagnosticApplication(ActionDiagnosticArgs diagnosticArgs) throws YarnException, IOException
+ {
+ // not using member variable clustername because we want to place
+ // application name after --application option and member variable
+ // cluster name has to be put behind action
+ String clusterName = diagnosticArgs.application;
+ SliderClusterOperations clusterOperations;
+ AggregateConf instanceDefinition = null;
+ try {
+ clusterOperations = createClusterOperations(clusterName);
+ instanceDefinition = clusterOperations
+ .getInstanceDefinition();
+ } catch (YarnException e) {
+ log.error("Exception happened when retrieving instance definition from YARN: " + e.toString());
+ throw e;
+ } catch (IOException e) {
+ log.error("Network problem happened when retrieving instance definition from YARN: " + e.toString());
+ throw e;
+ }
+ String clusterDir = instanceDefinition.getAppConfOperations()
+ .getGlobalOptions().get(AgentKeys.APP_ROOT);
+ String pkgTarball = instanceDefinition.getAppConfOperations()
+ .getGlobalOptions().get(AgentKeys.APP_DEF);
+ String runAsUser = instanceDefinition.getAppConfOperations()
+ .getGlobalOptions().get(AgentKeys.RUNAS_USER);
+
+ log.info("The location of the cluster instance directory in HDFS is: "
+ + clusterDir);
+ log.info("The name of the application package tarball on HDFS is: "
+ + pkgTarball);
+ log.info("The runas user of the application in the cluster is: "
+ + runAsUser);
+
+ if (diagnosticArgs.verbose) {
+ log.info("App config of the application: "
+ + instanceDefinition.getAppConf().toJson());
+ log.info("Resource config of the application: "
+ + instanceDefinition.getResources().toJson());
+ }
+ }
+
+ private void actionDiagnosticClient(ActionDiagnosticArgs diagnosticArgs)
+ throws SliderException, IOException {
+ try {
+ String currentCommandPath = SliderUtils.getCurrentCommandPath();
+ SliderVersionInfo.loadAndPrintVersionInfo(log);
+ String clientConfigPath = SliderUtils.getClientConfigPath();
+ String jdkInfo = SliderUtils.getJDKInfo();
+ println("The slider command path: %s", currentCommandPath);
+ println("The slider-client.xml used by current running command path: %s",
+ clientConfigPath);
+ println(jdkInfo);
+
+ // security info
+ Configuration config = getConfig();
+ if (SliderUtils.isHadoopClusterSecure(config)) {
+ println("Hadoop Cluster is secure");
+ println("Login user is %s", UserGroupInformation.getLoginUser());
+ println("Current user is %s", UserGroupInformation.getCurrentUser());
+
+
+ } else {
+ println("Hadoop Cluster is insecure");
+ }
+
+
+ // verbose?
+ if (diagnosticArgs.verbose) {
+ // do the environment
+ Map<String, String> env = System.getenv();
+ Set<String> envList = ConfigHelper.sortedConfigKeys(env.entrySet());
+ StringBuilder builder = new StringBuilder("Environment variables:\n");
+ for (String key : envList) {
+ builder.append(key)
+ .append("=")
+ .append(env.get(key))
+ .append("\n");
+ }
+ println(builder.toString());
+
+ // then the config
+ println("Slider client configuration:\n" +
+ ConfigHelper.dumpConfigToString(config));
+ }
+
+
+ SliderUtils.validateSliderClientEnvironment(log);
+ } catch (SliderException e) {
+ log.error(e.toString());
+ throw e;
+ } catch (IOException e) {
+ log.error(e.toString());
+ throw e;
+ }
+
+ }
+
+
+ /**
+ * Log a service record instance
+ * @param instance record
+ * @param verbose verbose logging of all external endpoints
+ */
+ private void logInstance(ServiceRecord instance,
+ boolean verbose) {
+ if (!verbose) {
+ log.info("{}", instance.get(YarnRegistryAttributes.YARN_ID, ""));
+ } else {
+ log.info("{}: ", instance.get(YarnRegistryAttributes.YARN_ID, ""));
+ logEndpoints(instance);
+ }
+ }
+
+ /**
+ * Log the external endpoints of a service record
+ * @param instance service record instance
+ */
+ private void logEndpoints(ServiceRecord instance) {
+ List<Endpoint> endpoints = instance.external;
+ for (Endpoint endpoint : endpoints) {
+ log.info(endpoint.toString());
+ }
+ }
+
+ /**
+ * list configs available for an instance
+ *
+ * @param registryArgs registry Arguments
+ * @throws YarnException YARN problems
+ * @throws IOException Network or other problems
+ */
+ public void actionRegistryListConfigsYarn(ActionRegistryArgs registryArgs)
+ throws YarnException, IOException {
+
+ ServiceRecord instance = lookupServiceRecord(registryArgs);
+
+ RegistryRetriever retriever = new RegistryRetriever(instance);
+ PublishedConfigSet configurations =
+ retriever.getConfigurations(!registryArgs.internal);
+
+ for (String configName : configurations.keys()) {
+ if (!registryArgs.verbose) {
+ log.info("{}", configName);
+ } else {
+ PublishedConfiguration published =
+ configurations.get(configName);
+ log.info("{} : {}",
+ configName,
+ published.description);
+ }
+ }
+ }
+
+ /**
+ * list configs available for an instance
+ *
+ * @param registryArgs registry Arguments
+ * @throws YarnException YARN problems
+ * @throws IOException Network or other problems
+ * @throws FileNotFoundException if the config is not found
+ */
+ @VisibleForTesting
+ public PublishedConfiguration actionRegistryGetConfig(ActionRegistryArgs registryArgs)
+ throws YarnException, IOException {
+ ServiceRecord instance = lookupServiceRecord(registryArgs);
+
+ RegistryRetriever retriever = new RegistryRetriever(instance);
+ boolean external = !registryArgs.internal;
+ PublishedConfigSet configurations =
+ retriever.getConfigurations(external);
+
+ PublishedConfiguration published = retriever.retrieveConfiguration(configurations,
+ registryArgs.getConf,
+ external);
+ return published;
+ }
+
+ /**
+ * write out the config. If a destination is provided and that dir is a
+ * directory, the entry is written to it with the name provided + extension,
+ * else it is printed to standard out.
+ * @param published published config
+ * @param registryArgs registry Arguments
+ * @throws BadCommandArgumentsException
+ * @throws IOException
+ */
+ private void outputConfig(PublishedConfiguration published,
+ ActionRegistryArgs registryArgs) throws
+ BadCommandArgumentsException,
+ IOException {
+ // decide whether or not to print
+ String entry = registryArgs.getConf;
+ String format = registryArgs.format;
+ ConfigFormat configFormat = ConfigFormat.resolve(format);
+ if (configFormat == null) {
+ throw new BadCommandArgumentsException(
+ "Unknown/Unsupported format %s ", format);
+ }
+ PublishedConfigurationOutputter outputter =
+ PublishedConfigurationOutputter.createOutputter(configFormat,
+ published);
+ boolean print = registryArgs.out == null;
+ if (!print) {
+ File outputPath = registryArgs.out;
+ if (outputPath.isDirectory()) {
+ // creating it under a directory
+ outputPath = new File(outputPath, entry + "." + format);
+ }
+ log.debug("Destination path: {}", outputPath);
+ outputter.save(outputPath);
+ } else {
+ print(outputter.asString());
+ }
+
+ }
+
+ /**
+ * Look up an instance
+ * @return instance data
+ * @throws SliderException other failures
+ * @throws IOException IO problems or wrapped exceptions
+ */
+ private ServiceRecord lookupServiceRecord(ActionRegistryArgs registryArgs) throws
+ SliderException,
+ IOException {
+ String user;
+ if (StringUtils.isNotEmpty(registryArgs.user)) {
+ user = RegistryPathUtils.encodeForRegistry(registryArgs.user);
+ } else {
+ user = currentUser();
+ }
+
+ String path = servicePath(user, registryArgs.serviceType,
+ registryArgs.name);
+ return resolve(path);
+ }
+
+ /**
+ * Look up an instance
+ * @param serviceType service type
+ * @param id instance ID
+ * @return instance data
+ * @throws UnknownApplicationInstanceException no path or service record
+ * at the end of the path
+ * @throws SliderException other failures
+ * @throws IOException IO problems or wrapped exceptions
+ */
+ public ServiceRecord lookupServiceRecord(String serviceType, String id)
+ throws IOException, SliderException {
+ String path = servicePath(currentUser(), serviceType, id);
+ return resolve(path);
+ }
+
+ /**
+ *
+ * Look up an instance
+ * @param path path
+ * @return instance data
+ * @throws UnknownApplicationInstanceException no path or service record
+ * at the end of the path
+ * @throws SliderException other failures
+ * @throws IOException IO problems or wrapped exceptions
+ */
+ public ServiceRecord resolve(String path)
+ throws IOException, SliderException {
+ try {
+ return getRegistryOperations().resolve(
+ path);
+ // TODO JDK7 SWITCH
+ } catch (PathNotFoundException e) {
+ throw new UnknownApplicationInstanceException(e.getPath().toString(), e);
+ } catch (NoRecordException e) {
+ throw new UnknownApplicationInstanceException(e.getPath().toString(), e);
+ }
+ }
+
+ /**
+ * List instances in the registry for the current user
+ * @return a list of slider registry instances
+ * @throws IOException Any IO problem ... including no path in the registry
+ * to slider service classes for this user
+ * @throws YarnException
+ */
+
+ public Map<String, ServiceRecord> listRegistryInstances()
+ throws IOException, YarnException {
+ Map<String, ServiceRecord> recordMap = listServiceRecords(
+ getRegistryOperations(),
+ serviceclassPath(currentUser(), SliderKeys.APP_TYPE));
+ return recordMap;
+ }
+
+ /**
+ * List instances in the registry
+ * @return the instance IDs
+ * @throws IOException
+ * @throws YarnException
+ */
+ public List<String> listRegisteredSliderInstances() throws
+ IOException,
+ YarnException {
+ try {
+ Map<String, ServiceRecord> recordMap = listServiceRecords(
+ getRegistryOperations(),
+ serviceclassPath(currentUser(), SliderKeys.APP_TYPE));
+ return new ArrayList<String>(recordMap.keySet());
+/// JDK7 } catch (YarnException | IOException e) {
+ } catch (IOException e) {
+ throw e;
+ } catch (YarnException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * Start the registry if it is not there yet
+ * @return the registry service
+ * @throws SliderException
+ * @throws IOException
+ */
+ private synchronized RegistryOperations maybeStartYarnRegistry()
+ throws SliderException, IOException {
+
+ if (registryOperations == null) {
+ registryOperations = startRegistryOperationsService();
+ }
+ return registryOperations;
+ }
+
+ @Override
+ public RegistryOperations getRegistryOperations()
+ throws SliderException, IOException {
+ return maybeStartYarnRegistry();
+ }
+
+ /**
+ * Output to standard out/stderr (implementation specific detail)
+ * @param src source
+ */
+ @SuppressWarnings("UseOfSystemOutOrSystemErr")
+ private static void print(CharSequence src) {
+ System.out.append(src);
+ }
+
+ /**
+ * Output to standard out/stderr with a newline after
+ * @param message message
+ */
+ private static void println(String message) {
+ print(message);
+ print("\n");
+ }
+ /**
+ * Output to standard out/stderr with a newline after, formatted
+ * @param message message
+ * @param args arguments for string formatting
+ */
+ private static void println(String message, Object ... args) {
+ print(String.format(message, args));
+ print("\n");
+ }
+
+}
+
+
diff --git a/slider-core/src/main/java/org/apache/slider/common/params/ActionRegistryArgs.java b/slider-core/src/main/java/org/apache/slider/common/params/ActionRegistryArgs.java
index deae4eb..1e4aba5 100644
--- a/slider-core/src/main/java/org/apache/slider/common/params/ActionRegistryArgs.java
+++ b/slider-core/src/main/java/org/apache/slider/common/params/ActionRegistryArgs.java
@@ -47,8 +47,10 @@
+ " ("
+ Arguments.ARG_LIST + "|"
+ Arguments.ARG_LISTCONF + "|"
+ + Arguments.ARG_LISTEXP + "|"
+ Arguments.ARG_LISTFILES + "|"
- + Arguments.ARG_GETCONF + "> "
+ + Arguments.ARG_GETCONF + "|"
+ + Arguments.ARG_GETEXP + "> "
+ Arguments.ARG_NAME + " <name> "
+ " )"
+ "[" + Arguments.ARG_VERBOSE + "] "
@@ -56,6 +58,8 @@
+ "[" + Arguments.ARG_OUTPUT + " <filename> ] "
+ "[" + Arguments.ARG_SERVICETYPE + " <servicetype> ] "
+ "[" + Arguments.ARG_FORMAT + " <xml|json|properties>] "
+ + System.getProperty("line.separator")
+ + "Arguments.ARG_GETEXP only supports " + Arguments.ARG_FORMAT + " json"
;
public ActionRegistryArgs() {
}
@@ -90,7 +94,14 @@
description = "get configuration")
public String getConf;
- @Parameter(names = {ARG_LISTFILES},
+ @Parameter(names = {ARG_LISTEXP},
+ description = "list exports")
+ public boolean listExports;
+ @Parameter(names = {ARG_GETEXP},
+ description = "get export")
+ public String getExport;
+
+ @Parameter(names = {ARG_LISTFILES},
description = "list files")
public String listFiles;
@@ -135,8 +146,8 @@
super.validate();
//verify that at most one of the operations is set
- int gets = s(getConf) + s(getFiles);
- int lists = s(list) + s(listConf) + s(listFiles);
+ int gets = s(getConf) + s(getFiles) + s(getExport);
+ int lists = s(list) + s(listConf) + s(listFiles) + s(listExports);
int set = lists + gets;
if (set > 1) {
throw new UsageException(USAGE);
diff --git a/slider-core/src/main/java/org/apache/slider/common/params/Arguments.java b/slider-core/src/main/java/org/apache/slider/common/params/Arguments.java
index ff064c8..2c536d9 100644
--- a/slider-core/src/main/java/org/apache/slider/common/params/Arguments.java
+++ b/slider-core/src/main/java/org/apache/slider/common/params/Arguments.java
@@ -52,6 +52,7 @@
String ARG_FORMAT = "--format";
String ARG_FORCE = "--force";
String ARG_GETCONF = "--getconf";
+ String ARG_GETEXP = "--getexp";
String ARG_GETFILES = "--getfiles";
String ARG_HELP = "--help";
String ARG_ID = "--id";
@@ -60,6 +61,7 @@
String ARG_LEVEL = "--level";
String ARG_LIST = "--list";
String ARG_LISTCONF = "--listconf";
+ String ARG_LISTEXP = "--listexp";
String ARG_LISTFILES = "--listfiles";
String ARG_LIVE = "--live";
String ARG_MANAGER = "--manager";
diff --git a/slider-core/src/main/java/org/apache/slider/core/registry/docstore/ExportEntry.java b/slider-core/src/main/java/org/apache/slider/core/registry/docstore/ExportEntry.java
new file mode 100644
index 0000000..4bcf6c1
--- /dev/null
+++ b/slider-core/src/main/java/org/apache/slider/core/registry/docstore/ExportEntry.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.core.registry.docstore;
+
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+/**
+ * JSON-serializable description of a published key-val configuration.
+ *
+ * The values themselves are not serialized in the external view; they have to be served up by the far end
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+public class ExportEntry {
+
+ /**
+ * The value of the export
+ */
+ private String value;
+ /**
+ * The container id of the container that is responsible for the export
+ */
+ private String containerId;
+ /**
+ * Tag associated with the container - its usually an identifier different than container id
+ * that allows a soft serial id to all containers of a component - e.g. 1, 2, 3, ...
+ */
+ private String tag;
+ /**
+ * An export can be at the level of a component or an application
+ */
+ private String level;
+ /**
+ * The time when the export was updated
+ */
+ private String updatedTime;
+ /**
+ * The time when the export expires
+ */
+ private String validUntil;
+
+ public ExportEntry() {
+ }
+
+ public String getValue() {
+ return value;
+ }
+
+ public void setValue(String value) {
+ this.value = value;
+ }
+
+ public String getContainerId() {
+ return containerId;
+ }
+
+ public void setContainerId(String containerId) {
+ this.containerId = containerId;
+ }
+
+ public String getTag() {
+ return tag;
+ }
+
+ public void setTag(String tag) {
+ this.tag = tag;
+ }
+
+ public String getLevel() {
+ return level;
+ }
+
+ public void setLevel(String level) {
+ this.level = level;
+ }
+ public String getUpdatedTime() {
+ return updatedTime;
+ }
+
+ public void setUpdatedTime(String updatedTime) {
+ this.updatedTime = updatedTime;
+ }
+
+ public String getValidUntil() {
+ return validUntil;
+ }
+
+ public void setValidUntil(String validUntil) {
+ this.validUntil = validUntil;
+ }
+
+ @Override
+ public String toString() {
+ return new StringBuilder("ExportEntry{").
+ append("value='").append(value).append("',").
+ append("containerId='").append(containerId).append("',").
+ append("tag='").append(tag).append("',").
+ append("level='").append(level).append("'").
+ append("updatedTime='").append(updatedTime).append("'").
+ append("validUntil='").append(validUntil).append("'").
+ append(" }").toString();
+ }
+}
diff --git a/slider-core/src/main/java/org/apache/slider/core/registry/docstore/PublishedExports.java b/slider-core/src/main/java/org/apache/slider/core/registry/docstore/PublishedExports.java
new file mode 100644
index 0000000..d6d285c
--- /dev/null
+++ b/slider-core/src/main/java/org/apache/slider/core/registry/docstore/PublishedExports.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.core.registry.docstore;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * JSON-serializable description of a published key-val configuration.
+ *
+ * The values themselves are not serialized in the external view; they have to be served up by the far end
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+public class PublishedExports {
+
+ public String description;
+ public long updated;
+ public String updatedTime;
+ public Map<String, List<ExportEntry>> entries = new HashMap<String, List<ExportEntry>>();
+
+ public PublishedExports() {
+ }
+
+ /**
+ * build an empty published configuration
+ *
+ * @param description configuration description
+ */
+ public PublishedExports(String description) {
+ this.description = description;
+ }
+
+ /**
+ * Build a configuration from the entries
+ *
+ * @param description configuration description
+ * @param entries entries to put
+ */
+ public PublishedExports(String description,
+ Iterable<Map.Entry<String, List<ExportEntry>>> entries) {
+ this.description = description;
+ putValues(entries);
+ }
+
+ /**
+ * Is the configuration empty. This means either that it has not been given any values, or it is stripped down copy
+ * set down over the wire.
+ *
+ * @return
+ */
+ public boolean isEmpty() {
+ return entries.isEmpty();
+ }
+
+ public long getUpdated() {
+ return updated;
+ }
+
+ public void setUpdated(long updated) {
+ this.updated = updated;
+ this.updatedTime = new Date(updated).toString();
+ }
+
+ /**
+ * Set the values from an iterable (this includes a Hadoop Configuration and Java properties object). Any existing
+ * value set is discarded
+ *
+ * @param entries entries to put
+ */
+ public void putValues(Iterable<Map.Entry<String, List<ExportEntry>>> entries) {
+ this.entries = new HashMap<String, List<ExportEntry>>();
+ for (Map.Entry<String, List<ExportEntry>> entry : entries) {
+ this.entries.put(entry.getKey(), entry.getValue());
+ }
+ }
+
+ /**
+ * Return the values as json string
+ *
+ * @return
+ *
+ * @throws IOException
+ */
+ public String asJson() throws IOException {
+ ObjectMapper mapper = new ObjectMapper();
+ String json = mapper.writeValueAsString(entries);
+ return json;
+ }
+
+ /**
+ * This makes a copy without the nested content -so is suitable for returning as part of the list of a parent's
+ * values
+ *
+ * @return the copy
+ */
+ public PublishedExports shallowCopy() {
+ PublishedExports that = new PublishedExports();
+ that.description = this.description;
+ that.updated = this.updated;
+ that.updatedTime = this.updatedTime;
+ return that;
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb =
+ new StringBuilder("PublishedConfiguration{");
+ sb.append("description='").append(description).append('\'');
+ sb.append(" entries = ").append(entries.size());
+ sb.append('}');
+ return sb.toString();
+ }
+}
diff --git a/slider-core/src/main/java/org/apache/slider/core/registry/docstore/PublishedExportsOutputter.java b/slider-core/src/main/java/org/apache/slider/core/registry/docstore/PublishedExportsOutputter.java
new file mode 100644
index 0000000..b21e717
--- /dev/null
+++ b/slider-core/src/main/java/org/apache/slider/core/registry/docstore/PublishedExportsOutputter.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.core.registry.docstore;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+/** Output a published configuration */
+public abstract class PublishedExportsOutputter {
+
+ protected final PublishedExports exports;
+
+ protected PublishedExportsOutputter(PublishedExports exports) {
+ this.exports = exports;
+ }
+
+ /**
+ * Create an outputter for the chosen format
+ *
+ * @param format format enumeration
+ * @param exports owning config
+ * @return the outputter
+ */
+
+ public static PublishedExportsOutputter createOutputter(ConfigFormat format,
+ PublishedExports exports) {
+ Preconditions.checkNotNull(exports);
+ switch (format) {
+ case JSON:
+ return new JsonOutputter(exports);
+ default:
+ throw new RuntimeException("Unsupported format :" + format);
+ }
+ }
+
+ public void save(File dest) throws IOException {
+ FileOutputStream out = null;
+ try {
+ out = new FileOutputStream(dest);
+ save(out);
+ out.close();
+ } finally {
+ org.apache.hadoop.io.IOUtils.closeStream(out);
+ }
+ }
+
+ /**
+ * Save the content. The default saves the asString() value to the output stream
+ *
+ * @param out output stream
+ * @throws IOException
+ */
+ public void save(OutputStream out) throws IOException {
+ IOUtils.write(asString(), out, Charsets.UTF_8);
+ }
+
+ /**
+ * Convert to a string
+ *
+ * @return
+ * @throws IOException
+ */
+ public abstract String asString() throws IOException;
+
+ public static class JsonOutputter extends PublishedExportsOutputter {
+
+ public JsonOutputter(PublishedExports exports) {
+ super(exports);
+ }
+
+ @Override
+ public void save(File dest) throws IOException {
+ FileUtils.writeStringToFile(dest, asString(), Charsets.UTF_8);
+ }
+
+ @Override
+ public String asString() throws IOException {
+ return exports.asJson();
+ }
+ }
+}
diff --git a/slider-core/src/main/java/org/apache/slider/core/registry/docstore/PublishedExportsSet.java b/slider-core/src/main/java/org/apache/slider/core/registry/docstore/PublishedExportsSet.java
new file mode 100644
index 0000000..cdd35de
--- /dev/null
+++ b/slider-core/src/main/java/org/apache/slider/core/registry/docstore/PublishedExportsSet.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.core.registry.docstore;
+
+import org.apache.slider.server.appmaster.web.rest.RestPaths;
+import org.apache.slider.server.services.utility.PatternValidator;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+/**
+ * Represents a set of configurations for an application, component, etc.
+ * Json serialisable; accessors are synchronized
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+public class PublishedExportsSet {
+
+ private static final PatternValidator validator = new PatternValidator(
+ RestPaths.PUBLISHED_CONFIGURATION_REGEXP);
+
+ public Map<String, PublishedExports> exports =
+ new HashMap<String, PublishedExports>();
+
+ public PublishedExportsSet() {
+ }
+
+ /**
+ * Put a name -it will be converted to lower case before insertion.
+ * Any existing entry will be overwritten (that includes an entry
+ * with a different case in the original name)
+ * @param name name of entry
+ * @param export published export
+ * @throws IllegalArgumentException if not a valid name
+ */
+ public void put(String name, PublishedExports export) {
+ String name1 = name.toLowerCase(Locale.ENGLISH);
+ validateName(name1);
+ exports.put(name1, export);
+ }
+
+ /**
+ * Validate the name -restricting it to the set defined in
+ * {@link RestPaths#PUBLISHED_CONFIGURATION_REGEXP}
+ * @param name name to validate
+ * @throws IllegalArgumentException if not a valid name
+ */
+ public static void validateName(String name) {
+ validator.validate(name);
+
+ }
+
+ public PublishedExports get(String name) {
+ return exports.get(name);
+ }
+
+ public boolean contains(String name) {
+ return exports.containsKey(name);
+ }
+
+ public int size() {
+ return exports.size();
+ }
+
+ public Set<String> keys() {
+ TreeSet<String> keys = new TreeSet<String>();
+ keys.addAll(exports.keySet());
+ return keys;
+ }
+
+ public PublishedExportsSet shallowCopy() {
+ PublishedExportsSet that = new PublishedExportsSet();
+ for (Map.Entry<String, PublishedExports> entry :
+ exports.entrySet()) {
+ that.put(entry.getKey(), entry.getValue().shallowCopy());
+ }
+ return that;
+ }
+}
diff --git a/slider-core/src/main/java/org/apache/slider/core/registry/info/CustomRegistryConstants.java b/slider-core/src/main/java/org/apache/slider/core/registry/info/CustomRegistryConstants.java
index 65c122f..67b9feb 100644
--- a/slider-core/src/main/java/org/apache/slider/core/registry/info/CustomRegistryConstants.java
+++ b/slider-core/src/main/java/org/apache/slider/core/registry/info/CustomRegistryConstants.java
@@ -35,6 +35,9 @@
public static final String PUBLISHER_CONFIGURATIONS_API =
"org.apache.slider.publisher.configurations";
+ public static final String PUBLISHER_EXPORTS_API =
+ "org.apache.slider.publisher.exports";
+
public static final String PUBLISHER_DOCUMENTS_API =
"org.apache.slider.publisher.documents";
diff --git a/slider-core/src/main/java/org/apache/slider/core/registry/retrieve/RegistryRetriever.java b/slider-core/src/main/java/org/apache/slider/core/registry/retrieve/RegistryRetriever.java
index 101efb2..a91f515 100644
--- a/slider-core/src/main/java/org/apache/slider/core/registry/retrieve/RegistryRetriever.java
+++ b/slider-core/src/main/java/org/apache/slider/core/registry/retrieve/RegistryRetriever.java
@@ -33,6 +33,8 @@
import org.apache.slider.core.exceptions.ExceptionConverter;
import org.apache.slider.core.registry.docstore.PublishedConfigSet;
import org.apache.slider.core.registry.docstore.PublishedConfiguration;
+import org.apache.slider.core.registry.docstore.PublishedExports;
+import org.apache.slider.core.registry.docstore.PublishedExportsSet;
import org.apache.slider.core.registry.info.CustomRegistryConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,6 +54,8 @@
private final String externalConfigurationURL;
private final String internalConfigurationURL;
+ private final String externalExportsURL;
+ private final String internalExportsURL;
private static final Client jerseyClient;
static {
@@ -63,9 +67,12 @@
jerseyClient.setFollowRedirects(true);
}
- public RegistryRetriever(String externalConfigurationURL, String internalConfigurationURL) {
- this.externalConfigurationURL = externalConfigurationURL;
- this.internalConfigurationURL = internalConfigurationURL;
+ public RegistryRetriever(String externalConfigurationURL, String internalConfigurationURL,
+ String externalExportsURL, String internalExportsURL) {
+ this.externalConfigurationURL = externalConfigurationURL;
+ this.internalConfigurationURL = internalConfigurationURL;
+ this.externalExportsURL = externalExportsURL;
+ this.internalExportsURL = internalExportsURL;
}
/**
@@ -99,6 +106,29 @@
}
}
externalConfigurationURL = url;
+
+ internal = record.getInternalEndpoint(
+ CustomRegistryConstants.PUBLISHER_EXPORTS_API);
+ url = null;
+ if (internal != null) {
+ List<String> addresses = RegistryTypeUtils.retrieveAddressesUriType(
+ internal);
+ if (addresses != null && !addresses.isEmpty()) {
+ url = addresses.get(0);
+ }
+ }
+ internalExportsURL = url;
+ external = record.getExternalEndpoint(
+ CustomRegistryConstants.PUBLISHER_EXPORTS_API);
+ url = null;
+ if (external != null) {
+ List<String> addresses =
+ RegistryTypeUtils.retrieveAddressesUriType(external);
+ if (addresses != null && !addresses.isEmpty()) {
+ url = addresses.get(0);
+ }
+ }
+ externalExportsURL = url;
}
/**
@@ -138,6 +168,33 @@
return confURL;
}
+ protected String getExportURL(boolean external) throws FileNotFoundException {
+ String confURL = external ? externalExportsURL: internalExportsURL;
+ if (Strings.isStringEmpty(confURL)) {
+ throw new FileNotFoundException("No configuration URL");
+ }
+ return confURL;
+ }
+
+ /**
+ * Get the configurations of the registry
+ * @param external flag to indicate that it is the external entries to fetch
+ * @return the configuration sets
+ */
+ public PublishedExportsSet getExports(boolean external) throws
+ FileNotFoundException, IOException {
+
+ String exportsUrl = getExportURL(external);
+ try {
+ WebResource webResource = jsonResource(exportsUrl);
+ log.debug("GET {}", exportsUrl);
+ PublishedExportsSet exportSet = webResource.get(PublishedExportsSet.class);
+ return exportSet;
+ } catch (UniformInterfaceException e) {
+ throw ExceptionConverter.convertJerseyException(exportsUrl, e);
+ }
+ }
+
private WebResource resource(String url) {
WebResource resource = jerseyClient.resource(url);
return resource;
@@ -174,7 +231,33 @@
throw ExceptionConverter.convertJerseyException(confURL, e);
}
}
-
+
+ /**
+ * Get a complete export, with all values
+ * @param exportSet
+ * @param name name of the configuration
+ * @param external flag to indicate that it is an external configuration
+ * @return the retrieved config
+ * @throws IOException IO problems
+ */
+ public PublishedExports retrieveExports(PublishedExportsSet exportSet,
+ String name,
+ boolean external) throws IOException {
+ if (!exportSet.contains(name)) {
+ throw new FileNotFoundException("Unknown export " + name);
+ }
+ String exportsURL = getExportURL(external);
+ exportsURL = SliderUtils.appendToURL(exportsURL, name);
+ try {
+ WebResource webResource = jsonResource(exportsURL);
+ PublishedExports publishedExports =
+ webResource.get(PublishedExports.class);
+ return publishedExports;
+ } catch (UniformInterfaceException e) {
+ throw ExceptionConverter.convertJerseyException(exportsURL, e);
+ }
+ }
+
@Override
public String toString() {
return super.toString()
diff --git a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
index 44777c3..8b9b257 100644
--- a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
+++ b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
@@ -52,7 +52,9 @@
import org.apache.slider.core.exceptions.SliderException;
import org.apache.slider.core.launch.CommandLineBuilder;
import org.apache.slider.core.launch.ContainerLauncher;
+import org.apache.slider.core.registry.docstore.ExportEntry;
import org.apache.slider.core.registry.docstore.PublishedConfiguration;
+import org.apache.slider.core.registry.docstore.PublishedExports;
import org.apache.slider.core.registry.info.CustomRegistryConstants;
import org.apache.slider.providers.AbstractProviderService;
import org.apache.slider.providers.ProviderCore;
@@ -95,10 +97,13 @@
import java.net.URISyntaxException;
import java.net.URL;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
@@ -106,6 +111,7 @@
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -127,10 +133,15 @@
private static final String CONTAINER_ID = "container_id";
private static final String GLOBAL_CONFIG_TAG = "global";
private static final String LOG_FOLDERS_TAG = "LogFolders";
+ private static final String HOST_FOLDER_FORMAT = "%s:%s";
+ private static final String CONTAINER_LOGS_TAG = "container_log_dirs";
+ private static final String CONTAINER_PWDS_TAG = "container_work_dirs";
+ private static final String COMPONENT_TAG = "component";
+ private static final String APPLICATION_TAG = "application";
private static final String COMPONENT_DATA_TAG = "ComponentInstanceData";
private static final String SHARED_PORT_TAG = "SHARED";
private static final String PER_CONTAINER_TAG = "{PER_CONTAINER}";
- private static final int MAX_LOG_ENTRIES = 20;
+ private static final int MAX_LOG_ENTRIES = 40;
private static final int DEFAULT_HEARTBEAT_MONITOR_INTERVAL = 60 * 1000;
private final Object syncLock = new Object();
@@ -149,16 +160,25 @@
new ConcurrentHashMap<String, ComponentInstanceState>();
private final Map<String, Map<String, String>> componentInstanceData =
new ConcurrentHashMap<String, Map<String, String>>();
- private final Map<String, Map<String, String>> exportGroups =
- new ConcurrentHashMap<String, Map<String, String>>();
+ private final Map<String, Map<String, List<ExportEntry>>> exportGroups =
+ new ConcurrentHashMap<String, Map<String, List<ExportEntry>>>();
private final Map<String, Map<String, String>> allocatedPorts =
new ConcurrentHashMap<String, Map<String, String>>();
- private final Map<String, String> workFolders =
- Collections.synchronizedMap(new LinkedHashMap<String, String>(MAX_LOG_ENTRIES, 0.75f, false) {
+
+ private final Map<String, ExportEntry> logFolderExports =
+ Collections.synchronizedMap(new LinkedHashMap<String, ExportEntry>(MAX_LOG_ENTRIES, 0.75f, false) {
protected boolean removeEldestEntry(Map.Entry eldest) {
return size() > MAX_LOG_ENTRIES;
}
});
+ private final Map<String, ExportEntry> workFolderExports =
+ Collections.synchronizedMap(new LinkedHashMap<String, ExportEntry>(MAX_LOG_ENTRIES, 0.75f, false) {
+ protected boolean removeEldestEntry(Map.Entry eldest) {
+ return size() > MAX_LOG_ENTRIES;
+ }
+ });
+ private final Map<String, Set<String>> containerExportsMap =
+ new HashMap<String, Set<String>>();
/**
* Create an instance of AgentProviderService
@@ -491,7 +511,7 @@
Map<String, String> folders = registration.getLogFolders();
if (folders != null && !folders.isEmpty()) {
- publishLogFolderPaths(folders, containerId, roleName, hostFqdn);
+ publishFolderPaths(folders, containerId, roleName, hostFqdn);
}
} else {
response.setResponseStatus(RegistrationStatus.FAILED);
@@ -558,7 +578,7 @@
log.info("Component operation. Status: {}", result);
if (command == Command.INSTALL && report.getFolders() != null && report.getFolders().size() > 0) {
- publishLogFolderPaths(report.getFolders(), containerId, roleName, heartBeat.getFqdn());
+ publishFolderPaths(report.getFolders(), containerId, roleName, heartBeat.getFqdn());
}
}
@@ -643,6 +663,7 @@
// component specific publishes
processAndPublishComponentSpecificData(ports, containerId, fqdn, roleName);
+ processAndPublishComponentSpecificExports(ports, containerId, fqdn, roleName);
// and update registration entries
if (instance != null) {
@@ -692,7 +713,7 @@
throw new IOException(e);
}
}
-
+
@Override
public void notifyContainerCompleted(ContainerId containerId) {
if (containerId != null) {
@@ -714,6 +735,25 @@
}
}
}
+
+ synchronized (this.containerExportsMap) {
+ Set<String> containerExportSets = containerExportsMap.get(containerIdStr);
+ if (containerExportSets != null) {
+ for (String containerExportStr : containerExportSets) {
+ String[] parts = containerExportStr.split(":");
+ Map<String, List<ExportEntry>> exportGroup = getCurrentExports(parts[0]);
+ List<ExportEntry> exports = exportGroup.get(parts[1]);
+ List<ExportEntry> exportToRemove = new ArrayList<ExportEntry>();
+ for (ExportEntry export : exports) {
+ if (containerIdStr.equals(export.getContainerId())) {
+ exportToRemove.add(export);
+ }
+ }
+ exports.removeAll(exportToRemove);
+ }
+ containerExportsMap.remove(containerIdStr);
+ }
+ }
}
}
@@ -749,6 +789,16 @@
}
@VisibleForTesting
+ protected Map<String, ExportEntry> getLogFolderExports() {
+ return logFolderExports;
+ }
+
+ @VisibleForTesting
+ protected Map<String, ExportEntry> getWorkFolderExports() {
+ return workFolderExports;
+ }
+
+ @VisibleForTesting
protected Metainfo getMetainfo() {
return this.metainfo;
}
@@ -896,15 +946,59 @@
* @param hostFqdn
* @param roleName
*/
- protected void publishLogFolderPaths(
+ protected void publishFolderPaths(
Map<String, String> folders, String containerId, String roleName, String hostFqdn) {
- for (Map.Entry<String, String> entry: folders.entrySet()) {
- workFolders.put(String.format("%s->%s->%s->%s", roleName, hostFqdn, entry.getKey(), containerId),
- entry.getValue());
+ Date now = new Date();
+ for (Map.Entry<String, String> entry : folders.entrySet()) {
+ ExportEntry exportEntry = new ExportEntry();
+ exportEntry.setValue(String.format(HOST_FOLDER_FORMAT, hostFqdn, entry.getValue()));
+ exportEntry.setContainerId(containerId);
+ exportEntry.setLevel(COMPONENT_TAG);
+ exportEntry.setTag(roleName);
+ exportEntry.setUpdatedTime(now.toString());
+ if (entry.getKey().equals("AGENT_LOG_ROOT")) {
+ synchronized (logFolderExports) {
+ getLogFolderExports().put(containerId, exportEntry);
+ }
+ } else {
+ synchronized (workFolderExports) {
+ getWorkFolderExports().put(containerId, exportEntry);
+ }
+ }
+ log.info("Updating log and pwd folders for container {}", containerId);
}
- publishApplicationInstanceData(LOG_FOLDERS_TAG, LOG_FOLDERS_TAG,
- (new HashMap<String, String>(this.workFolders)).entrySet());
+ PublishedExports exports = new PublishedExports(CONTAINER_LOGS_TAG);
+ exports.setUpdated(now.getTime());
+ synchronized (logFolderExports) {
+ updateExportsFromList(exports, getLogFolderExports());
+ }
+ getAmState().getPublishedExportsSet().put(CONTAINER_LOGS_TAG, exports);
+
+ exports = new PublishedExports(CONTAINER_PWDS_TAG);
+ exports.setUpdated(now.getTime());
+ synchronized (workFolderExports) {
+ updateExportsFromList(exports, getWorkFolderExports());
+ }
+ getAmState().getPublishedExportsSet().put(CONTAINER_PWDS_TAG, exports);
+ }
+
+ /**
+ * Update the export data from the map
+ * @param exports
+ * @param folderExports
+ */
+ private void updateExportsFromList(PublishedExports exports, Map<String, ExportEntry> folderExports) {
+ Map<String, List<ExportEntry>> perComponentList = new HashMap<String, List<ExportEntry>>();
+ for(Map.Entry<String, ExportEntry> logEntry : folderExports.entrySet())
+ {
+ String componentName = logEntry.getValue().getTag();
+ if(!perComponentList.containsKey(componentName)) {
+ perComponentList.put(componentName, new ArrayList<ExportEntry>());
+ }
+ perComponentList.get(componentName).add(logEntry.getValue());
+ }
+ exports.putValues(perComponentList.entrySet());
}
@@ -949,13 +1043,12 @@
}
}
- List<ExportGroup> exportGroups = application.getExportGroups();
- boolean hasExportGroups = exportGroups != null && !exportGroups.isEmpty();
+ List<ExportGroup> appExportGroups = application.getExportGroups();
+ boolean hasExportGroups = appExportGroups != null && !appExportGroups.isEmpty();
Set<String> appExports = new HashSet();
String appExportsStr = getApplicationComponent(roleName).getAppExports();
- boolean hasNoAppExports = appExportsStr == null || appExportsStr.isEmpty();
- if (!hasNoAppExports) {
+ if (SliderUtils.isSet(appExportsStr)) {
for (String appExport : appExportsStr.split(",")) {
if (appExport.trim().length() > 0) {
appExports.add(appExport.trim());
@@ -983,11 +1076,12 @@
}
Set<String> modifiedGroups = new HashSet<String>();
- for (ExportGroup exportGroup : exportGroups) {
+ for (ExportGroup exportGroup : appExportGroups) {
List<Export> exports = exportGroup.getExports();
if (exports != null && !exports.isEmpty()) {
String exportGroupName = exportGroup.getName();
- Map<String, String> map = getCurrentExports(exportGroupName);
+ ConcurrentHashMap<String, List<ExportEntry>> map =
+ (ConcurrentHashMap<String, List<ExportEntry>>)getCurrentExports(exportGroupName);
for (Export export : exports) {
if (canBeExported(exportGroupName, export.getName(), appExports)) {
String value = export.getValue();
@@ -997,7 +1091,12 @@
value = value.replace(token, replaceTokens.get(token));
}
}
- map.put(export.getName(), value);
+ ExportEntry entry = new ExportEntry();
+ entry.setLevel(APPLICATION_TAG);
+ entry.setValue(value);
+ entry.setUpdatedTime(new Date().toString());
+ // over-write, app exports are singletons
+ map.put(export.getName(), new ArrayList(Arrays.asList(entry)));
log.info("Preparing to publish. Key {} and Value {}", export.getName(), value);
}
}
@@ -1019,11 +1118,11 @@
return appExports.contains(String.format("%s-%s", exportGroupName, name));
}
- protected Map<String, String> getCurrentExports(String groupName) {
+ protected Map<String, List<ExportEntry>> getCurrentExports(String groupName) {
if (!this.exportGroups.containsKey(groupName)) {
synchronized (this.exportGroups) {
if (!this.exportGroups.containsKey(groupName)) {
- this.exportGroups.put(groupName, new ConcurrentHashMap<String, String>());
+ this.exportGroups.put(groupName, new ConcurrentHashMap<String, List<ExportEntry>>());
}
}
}
@@ -1032,10 +1131,24 @@
}
private void publishModifiedExportGroups(Set<String> modifiedGroups) {
- synchronized (this.exportGroups) {
- for (String groupName : modifiedGroups) {
- publishApplicationInstanceData(groupName, groupName, this.exportGroups.get(groupName).entrySet());
+ for (String groupName : modifiedGroups) {
+ Map<String, List<ExportEntry>> entries = this.exportGroups.get(groupName);
+
+ // Publish in old format for the time being
+ Map<String, String> simpleEntries = new HashMap<String, String>();
+ for (Map.Entry<String, List<ExportEntry>> entry : entries.entrySet()) {
+ List<ExportEntry> exports = entry.getValue();
+ if(exports != null && exports.size() > 0) {
+ // there is no support for multiple exports per name - so extract only the first one
+ simpleEntries.put(entry.getKey(), entry.getValue().get(0).getValue());
+ }
}
+ publishApplicationInstanceData(groupName, groupName, simpleEntries.entrySet());
+
+ PublishedExports exports = new PublishedExports(groupName);
+ exports.setUpdated(new Date().getTime());
+ exports.putValues(entries.entrySet());
+ getAmState().getPublishedExportsSet().put(groupName, exports);
}
}
@@ -1090,14 +1203,102 @@
}
}
+ /** Publish component instance specific data if the component demands it */
+ protected void processAndPublishComponentSpecificExports(Map<String, String> ports,
+ String containerId,
+ String hostFqdn,
+ String roleName) {
+ String portVarFormat = "${site.%s}";
+ String hostNamePattern = "${" + roleName + "_HOST}";
+
+ List<ExportGroup> appExportGroups = getMetainfo().getApplication().getExportGroups();
+ Component component = getMetainfo().getApplicationComponent(roleName);
+ if (component != null && SliderUtils.isSet(component.getCompExports())
+ && appExportGroups != null && appExportGroups.size() > 0) {
+
+ Set<String> compExports = new HashSet();
+ String compExportsStr = component.getCompExports();
+ for (String appExport : compExportsStr.split(",")) {
+ if (appExport.trim().length() > 0) {
+ compExports.add(appExport.trim());
+ }
+ }
+
+ Date now = new Date();
+ Set<String> modifiedGroups = new HashSet<String>();
+ for (ExportGroup exportGroup : appExportGroups) {
+ List<Export> exports = exportGroup.getExports();
+ if (exports != null && !exports.isEmpty()) {
+ String exportGroupName = exportGroup.getName();
+ ConcurrentHashMap<String, List<ExportEntry>> map =
+ (ConcurrentHashMap<String, List<ExportEntry>>) getCurrentExports(exportGroupName);
+ for (Export export : exports) {
+ if (canBeExported(exportGroupName, export.getName(), compExports)) {
+ log.info("Attempting to publish {} of group {} for component type {}",
+ export.getName(), exportGroupName, roleName);
+ String templateToExport = export.getValue();
+ for (String portName : ports.keySet()) {
+ boolean publishData = false;
+ String portValPattern = String.format(portVarFormat, portName);
+ if (templateToExport.contains(portValPattern)) {
+ templateToExport = templateToExport.replace(portValPattern, ports.get(portName));
+ publishData = true;
+ }
+ if (templateToExport.contains(hostNamePattern)) {
+ templateToExport = templateToExport.replace(hostNamePattern, hostFqdn);
+ publishData = true;
+ }
+ if (publishData) {
+ ExportEntry entryToAdd = new ExportEntry();
+ entryToAdd.setLevel(COMPONENT_TAG);
+ entryToAdd.setValue(templateToExport);
+ entryToAdd.setUpdatedTime(now.toString());
+ entryToAdd.setContainerId(containerId);
+
+ List<ExportEntry> existingList =
+ map.putIfAbsent(export.getName(), new CopyOnWriteArrayList(Arrays.asList(entryToAdd)));
+
+ // in-place edit, no lock needed
+ if (existingList != null) {
+ boolean updatedInPlace = false;
+ for (ExportEntry entry : existingList) {
+ if (containerId.equalsIgnoreCase(entry.getContainerId())) {
+ entryToAdd.setValue(templateToExport);
+ entryToAdd.setUpdatedTime(now.toString());
+ updatedInPlace = true;
+ }
+ }
+ if (!updatedInPlace) {
+ existingList.add(entryToAdd);
+ }
+ }
+
+ log.info("Publishing {} for name {} and container {}",
+ templateToExport, export.getName(), containerId);
+ modifiedGroups.add(exportGroupName);
+ synchronized (containerExportsMap) {
+ if (!containerExportsMap.containsKey(containerId)) {
+ containerExportsMap.put(containerId, new HashSet<String>());
+ }
+ Set<String> containerExportMaps = containerExportsMap.get(containerId);
+ containerExportMaps.add(String.format("%s:%s", exportGroupName, export.getName()));
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ publishModifiedExportGroups(modifiedGroups);
+ }
+ }
+
private void publishComponentInstanceData() {
Map<String, String> dataToPublish = new HashMap<String, String>();
- synchronized (this.componentInstanceData) {
- for (String container : getComponentInstanceData().keySet()) {
- for (String prop : getComponentInstanceData().get(container).keySet()) {
- dataToPublish.put(
- container + "." + prop, getComponentInstanceData().get(container).get(prop));
- }
+ for (String container : getComponentInstanceData().keySet()) {
+ for (String prop : getComponentInstanceData().get(container).keySet()) {
+ dataToPublish.put(
+ container + "." + prop, getComponentInstanceData().get(container).get(prop));
}
}
publishApplicationInstanceData(COMPONENT_DATA_TAG, COMPONENT_DATA_TAG, dataToPublish.entrySet());
@@ -1580,5 +1781,4 @@
"");
}
}
-
}
diff --git a/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/Component.java b/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/Component.java
index 9f3dd0f..418868c 100644
--- a/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/Component.java
+++ b/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/Component.java
@@ -34,6 +34,7 @@
String maxInstanceCount;
String autoStartOnFailure;
String appExports;
+ String compExports;
CommandScript commandScript;
List<ComponentExport> componentExports;
@@ -82,6 +83,14 @@
this.appExports = appExports;
}
+ public String getCompExports() {
+ return compExports;
+ }
+
+ public void setCompExports(String compExports) {
+ this.compExports = compExports;
+ }
+
public String getMinInstanceCount() {
return minInstanceCount;
}
diff --git a/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/MetainfoParser.java b/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/MetainfoParser.java
index c92c265..1d8403f 100644
--- a/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/MetainfoParser.java
+++ b/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/MetainfoParser.java
@@ -61,6 +61,7 @@
digester.addBeanPropertySetter("*/component/maxInstanceCount");
digester.addBeanPropertySetter("*/component/autoStartOnFailure");
digester.addBeanPropertySetter("*/component/appExports");
+ digester.addBeanPropertySetter("*/component/compExports");
digester.addObjectCreate("*/componentExport", ComponentExport.class);
digester.addBeanPropertySetter("*/componentExport/name");
digester.addBeanPropertySetter("*/componentExport/value");
diff --git a/slider-core/src/main/java/org/apache/slider/providers/slideram/SliderAMProviderService.java b/slider-core/src/main/java/org/apache/slider/providers/slideram/SliderAMProviderService.java
index 601c3f9..afe6428 100644
--- a/slider-core/src/main/java/org/apache/slider/providers/slideram/SliderAMProviderService.java
+++ b/slider-core/src/main/java/org/apache/slider/providers/slideram/SliderAMProviderService.java
@@ -146,6 +146,8 @@
String configurationsURL = SliderUtils.appendToURL(
publisherURL.toExternalForm(), RestPaths.SLIDER_CONFIGSET);
+ String exportsURL = SliderUtils.appendToURL(
+ publisherURL.toExternalForm(), RestPaths.SLIDER_EXPORTS);
serviceRecord.addExternalEndpoint(
RegistryTypeUtils.webEndpoint(
@@ -166,6 +168,10 @@
RegistryTypeUtils.restEndpoint(
CustomRegistryConstants.PUBLISHER_CONFIGURATIONS_API,
new URI(configurationsURL)));
+ serviceRecord.addExternalEndpoint(
+ RegistryTypeUtils.restEndpoint(
+ CustomRegistryConstants.PUBLISHER_EXPORTS_API,
+ new URI(exportsURL)));
} catch (URISyntaxException e) {
throw new IOException(e);
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/ProviderAppState.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/ProviderAppState.java
index a0871ae..9c5da12 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/ProviderAppState.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/ProviderAppState.java
@@ -26,6 +26,7 @@
import org.apache.slider.core.conf.ConfTreeOperations;
import org.apache.slider.core.exceptions.NoSuchNodeException;
import org.apache.slider.core.registry.docstore.PublishedConfigSet;
+import org.apache.slider.core.registry.docstore.PublishedExportsSet;
import org.apache.slider.server.appmaster.web.rest.RestPaths;
import org.apache.slider.server.services.utility.PatternValidator;
@@ -40,6 +41,7 @@
private final Map<String, PublishedConfigSet> publishedConfigSets =
new ConcurrentHashMap<String, PublishedConfigSet>(5);
+ private final PublishedExportsSet publishedExportsSets = new PublishedExportsSet();
private static final PatternValidator validator = new PatternValidator(
RestPaths.PUBLISHED_CONFIGURATION_SET_REGEXP);
private String applicationName;
@@ -66,6 +68,11 @@
}
@Override
+ public PublishedExportsSet getPublishedExportsSet() {
+ return publishedExportsSets;
+ }
+
+ @Override
public PublishedConfigSet getPublishedConfigSet(String name) {
return publishedConfigSets.get(name);
}
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/StateAccessForProviders.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/StateAccessForProviders.java
index 1714f75..b907b06 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/StateAccessForProviders.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/StateAccessForProviders.java
@@ -26,6 +26,7 @@
import org.apache.slider.core.conf.ConfTreeOperations;
import org.apache.slider.core.exceptions.NoSuchNodeException;
import org.apache.slider.core.registry.docstore.PublishedConfigSet;
+import org.apache.slider.core.registry.docstore.PublishedExportsSet;
import java.util.Collection;
import java.util.List;
@@ -51,6 +52,12 @@
PublishedConfigSet getPublishedSliderConfigurations();
/**
+ * Get the published exports set
+ * @return
+ */
+ PublishedExportsSet getPublishedExportsSet();
+
+ /**
* Get a named published config set
* @param name name to look up
* @return the instance or null
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/RestPaths.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/RestPaths.java
index 93601ad..94f1e4c 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/RestPaths.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/RestPaths.java
@@ -61,6 +61,7 @@
= "[a-z0-9][a-z0-9_.\\+-]*";
public static final String SLIDER_CONFIGSET = "slider";
+ public static final String SLIDER_EXPORTS = "exports";
public static final String SLIDER_CLASSPATH = "classpath";
}
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/publisher/PublisherResource.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/publisher/PublisherResource.java
index 5d8b657..e47bbb9 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/publisher/PublisherResource.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/publisher/PublisherResource.java
@@ -23,6 +23,8 @@
import org.apache.slider.core.registry.docstore.PublishedConfigSet;
import org.apache.slider.core.registry.docstore.PublishedConfiguration;
import org.apache.slider.core.registry.docstore.PublishedConfigurationOutputter;
+import org.apache.slider.core.registry.docstore.PublishedExports;
+import org.apache.slider.core.registry.docstore.PublishedExportsSet;
import org.apache.slider.core.registry.docstore.UriMap;
import org.apache.slider.server.appmaster.state.StateAccessForProviders;
import org.apache.slider.server.appmaster.web.WebAppApi;
@@ -56,7 +58,10 @@
protected static final Logger log =
LoggerFactory.getLogger(PublisherResource.class);
private final WebAppApi slider;
- public static final String SET_NAME =
+ public static final String EXPORTS_NAME = "exports";
+ public static final String EXPORTS_RESOURCES_PATH = "/" + EXPORTS_NAME;
+ public static final String EXPORT_RESOURCE_PATH = EXPORTS_RESOURCES_PATH + "/{exportname}" ;
+ public static final String SET_NAME =
"{setname: " + PUBLISHED_CONFIGURATION_SET_REGEXP + "}";
private static final String CONFIG =
SET_NAME + "/{config: " + PUBLISHED_CONFIGURATION_REGEXP + "}";
@@ -101,7 +106,9 @@
UriMap uriMap = new UriMap();
for (String name : appState.listConfigSets()) {
uriMap.put(name, baseURL + name);
+ log.info("Tick tack {} and {}", name, baseURL);
}
+ uriMap.put(EXPORTS_NAME, baseURL + EXPORTS_NAME);
return uriMap;
}
@@ -114,6 +121,26 @@
}
@GET
+ @Path(EXPORTS_RESOURCES_PATH)
+ @Produces({MediaType.APPLICATION_JSON})
+ public PublishedExportsSet gePublishedExports() {
+
+ PublishedExportsSet set = appState.getPublishedExportsSet();
+ return set.shallowCopy();
+ }
+
+ @GET
+ @Path(EXPORT_RESOURCE_PATH)
+ @Produces({MediaType.APPLICATION_JSON})
+ public PublishedExports getAMExports2(@PathParam("exportname") String exportname,
+ @Context UriInfo uriInfo,
+ @Context HttpServletResponse res) {
+ init(res, uriInfo);
+ PublishedExportsSet set = appState.getPublishedExportsSet();
+ return set.get(exportname);
+ }
+
+ @GET
@Path("/"+ SET_NAME)
@Produces({MediaType.APPLICATION_JSON})
public PublishedConfigSet getPublishedConfiguration(
@@ -129,7 +156,7 @@
}
private void logRequest(UriInfo uriInfo) {
- log.debug(uriInfo.getRequestUri().toString());
+ log.info(uriInfo.getRequestUri().toString());
}
@GET
diff --git a/slider-core/src/test/groovy/org/apache/slider/client/TestClientBadArgs.groovy b/slider-core/src/test/groovy/org/apache/slider/client/TestClientBadArgs.groovy
index d1f8a8f..7d596d6 100644
--- a/slider-core/src/test/groovy/org/apache/slider/client/TestClientBadArgs.groovy
+++ b/slider-core/src/test/groovy/org/apache/slider/client/TestClientBadArgs.groovy
@@ -24,8 +24,10 @@
import org.apache.slider.common.params.ActionRegistryArgs
import org.apache.slider.common.params.Arguments
import org.apache.slider.common.params.SliderActions
+import org.apache.slider.core.exceptions.BadCommandArgumentsException
import org.apache.slider.core.exceptions.ErrorStrings
import org.apache.slider.core.exceptions.UsageException
+import org.apache.slider.core.main.ServiceLauncher
import org.apache.slider.core.main.ServiceLauncherBaseTest
import org.junit.Test
@@ -88,4 +90,45 @@
log.info(exception.toString())
}
+ @Test
+ public void testRegistryExportBadUsage1() throws Throwable {
+ def exception = launchExpectingException(SliderClient,
+ new Configuration(),
+ "Expected a value after parameter --getexp",
+ [SliderActions.ACTION_REGISTRY,
+ Arguments.ARG_NAME,
+ "cl1",
+ Arguments.ARG_GETEXP])
+ assert exception instanceof BadCommandArgumentsException
+ log.info(exception.toString())
+ }
+
+ @Test
+ public void testRegistryExportBadUsage2() throws Throwable {
+ def exception = launchExpectingException(SliderClient,
+ new Configuration(),
+ "Expected a value after parameter --getexp",
+ [SliderActions.ACTION_REGISTRY,
+ Arguments.ARG_NAME,
+ "cl1",
+ Arguments.ARG_LISTEXP,
+ Arguments.ARG_GETEXP])
+ assert exception instanceof BadCommandArgumentsException
+ log.info(exception.toString())
+ }
+
+ @Test
+ public void testRegistryExportBadUsage3() throws Throwable {
+ def exception = launchExpectingException(SliderClient,
+ new Configuration(),
+ "Usage: registry",
+ [SliderActions.ACTION_REGISTRY,
+ Arguments.ARG_NAME,
+ "cl1",
+ Arguments.ARG_LISTEXP,
+ Arguments.ARG_GETEXP,
+ "export1"])
+ assert exception instanceof UsageException
+ log.info(exception.toString())
+ }
}
diff --git a/slider-core/src/test/java/org/apache/slider/providers/agent/TestAgentProviderService.java b/slider-core/src/test/java/org/apache/slider/providers/agent/TestAgentProviderService.java
index 7eff45a..a20e7a9 100644
--- a/slider-core/src/test/java/org/apache/slider/providers/agent/TestAgentProviderService.java
+++ b/slider-core/src/test/java/org/apache/slider/providers/agent/TestAgentProviderService.java
@@ -25,6 +25,7 @@
import org.apache.hadoop.registry.client.api.RegistryConstants;
import org.apache.hadoop.registry.client.api.RegistryOperations;
import org.apache.hadoop.registry.client.types.ServiceRecord;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
@@ -45,6 +46,9 @@
import org.apache.slider.core.conf.MapOperations;
import org.apache.slider.core.exceptions.SliderException;
import org.apache.slider.core.launch.ContainerLauncher;
+import org.apache.slider.core.registry.docstore.ExportEntry;
+import org.apache.slider.core.registry.docstore.PublishedExports;
+import org.apache.slider.core.registry.docstore.PublishedExportsSet;
import org.apache.slider.providers.ProviderRole;
import org.apache.slider.providers.agent.application.metadata.Application;
import org.apache.slider.providers.agent.application.metadata.CommandOrder;
@@ -94,6 +98,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.createNiceMock;
@@ -136,6 +141,10 @@
+ " <name>Master_Status</name>\n"
+ " <value>http://${HBASE_MASTER_HOST}:${site.hbase-site.hbase.master.info.port}/master-status</value>\n"
+ " </export>\n"
+ + " <export>\n"
+ + " <name>Comp_Endpoint</name>\n"
+ + " <value>http://${HBASE_REGIONSERVER_HOST}:${site.global.listen_port}</value>\n"
+ + " </export>\n"
+ " </exports>\n"
+ " </exportGroup>\n"
+ " </exportGroups>\n"
@@ -165,6 +174,7 @@
+ " <publishConfig>true</publishConfig>\n"
+ " <autoStartOnFailure>true</autoStartOnFailure>\n"
+ " <appExports>QuickLinks-JMX_Endpoint,QuickLinks-Master_Status</appExports>\n"
+ + " <compExports>QuickLinks-Comp_Endpoint</compExports>\n"
+ " <minInstanceCount>1</minInstanceCount>\n"
+ " <maxInstanceCount>2</maxInstanceCount>\n"
+ " <commandScript>\n"
@@ -182,6 +192,7 @@
+ " <script>scripts/hbase_regionserver.py</script>\n"
+ " <scriptType>PYTHON</scriptType>\n"
+ " </commandScript>\n"
+ + " <compExports>QuickLinks-Comp_Endpoint</compExports>\n"
+ " <componentExports>\n"
+ " <componentExport>\n"
+ " <name>PropertyA</name>\n"
@@ -322,10 +333,10 @@
anyMap()
);
- doNothing().when(mockAps).publishLogFolderPaths(anyMap(),
- anyString(),
- anyString(),
- anyString()
+ doNothing().when(mockAps).publishFolderPaths(anyMap(),
+ anyString(),
+ anyString(),
+ anyString()
);
expect(access.isApplicationLive()).andReturn(true).anyTimes();
ClusterDescription desc = new ClusterDescription();
@@ -378,7 +389,7 @@
anyMap()
);
- Mockito.verify(mockAps, Mockito.times(1)).publishLogFolderPaths(
+ Mockito.verify(mockAps, Mockito.times(1)).publishFolderPaths(
anyMap(),
anyString(),
anyString(),
@@ -694,6 +705,92 @@
}
}
+
+ @Test
+ public void testComponentSpecificPublishes2() throws Exception {
+ InputStream metainfo_1 = new ByteArrayInputStream(metainfo_1_str.getBytes());
+ Metainfo metainfo = new MetainfoParser().parse(metainfo_1);
+ AgentProviderService aps = createAgentProviderService(new Configuration());
+ AgentProviderService mockAps = Mockito.spy(aps);
+ doNothing().when(mockAps).publishApplicationInstanceData(anyString(), anyString(), anyCollection());
+ doReturn(metainfo).when(mockAps).getMetainfo();
+ StateAccessForProviders access = createNiceMock(StateAccessForProviders.class);
+ doReturn(access).when(mockAps).getAmState();
+ PublishedExportsSet pubExpSet = new PublishedExportsSet();
+ expect(access.getPublishedExportsSet()).andReturn(pubExpSet).anyTimes();
+ replay(access);
+
+ Map<String, String> ports = new HashMap<String, String>();
+ ports.put("global.listen_port", "10010");
+ mockAps.processAndPublishComponentSpecificExports(ports,
+ "mockcontainer_1",
+ "host1",
+ "HBASE_REGIONSERVER");
+ ArgumentCaptor<Collection> entriesCaptor = ArgumentCaptor.
+ forClass(Collection.class);
+ ArgumentCaptor<String> publishNameCaptor = ArgumentCaptor.
+ forClass(String.class);
+ Mockito.verify(mockAps, Mockito.times(1)).publishApplicationInstanceData(
+ anyString(),
+ publishNameCaptor.capture(),
+ entriesCaptor.capture());
+
+ PublishedExports pubExports = pubExpSet.get("QuickLinks".toLowerCase());
+ Assert.assertEquals(1, pubExports.entries.size());
+ Assert.assertEquals("QuickLinks", pubExports.description);
+ List<ExportEntry> expEntries = pubExports.entries.get("Comp_Endpoint");
+ Assert.assertEquals(1, expEntries.size());
+ Assert.assertEquals("mockcontainer_1", expEntries.get(0).getContainerId());
+ Assert.assertEquals("component", expEntries.get(0).getLevel());
+ Assert.assertEquals(null, expEntries.get(0).getTag());
+ Assert.assertEquals("http://host1:10010", expEntries.get(0).getValue());
+ Assert.assertNotNull(expEntries.get(0).getUpdatedTime());
+ Assert.assertNull(expEntries.get(0).getValidUntil());
+
+ assert entriesCaptor.getAllValues().size() == 1;
+ for (Collection coll : entriesCaptor.getAllValues()) {
+ Set<Map.Entry<String, String>> entrySet = (Set<Map.Entry<String, String>>) coll;
+ for (Map.Entry entry : entrySet) {
+ log.info("{}:{}", entry.getKey(), entry.getValue().toString());
+ if (entry.getKey().equals("Comp_Endpoint")) {
+ assert entry.getValue().toString().equals("http://host1:10010");
+ }
+ }
+ }
+ assert publishNameCaptor.getAllValues().size() == 1;
+ for (String coll : publishNameCaptor.getAllValues()) {
+ assert coll.equals("QuickLinks");
+ }
+
+ mockAps.notifyContainerCompleted(new MockContainerId(1));
+ pubExports = pubExpSet.get("QuickLinks".toLowerCase());
+ Assert.assertEquals(1, pubExports.entries.size());
+ Assert.assertEquals("QuickLinks", pubExports.description);
+ expEntries = pubExports.entries.get("Comp_Endpoint");
+ Assert.assertEquals(0, expEntries.size());
+
+ mockAps.notifyContainerCompleted(new MockContainerId(1));
+ mockAps.notifyContainerCompleted(new MockContainerId(2));
+
+ mockAps.processAndPublishComponentSpecificExports(ports,
+ "mockcontainer_1",
+ "host1",
+ "HBASE_REGIONSERVER");
+ mockAps.processAndPublishComponentSpecificExports(ports,
+ "mockcontainer_2",
+ "host1",
+ "HBASE_REGIONSERVER");
+ pubExports = pubExpSet.get("QuickLinks".toLowerCase());
+ Assert.assertEquals(1, pubExports.entries.size());
+ Assert.assertEquals("QuickLinks", pubExports.description);
+ expEntries = pubExports.entries.get("Comp_Endpoint");
+ Assert.assertEquals(2, expEntries.size());
+
+ mockAps.notifyContainerCompleted(new MockContainerId(2));
+ expEntries = pubExports.entries.get("Comp_Endpoint");
+ Assert.assertEquals(1, expEntries.size());
+ }
+
@Test
public void testProcessConfig() throws Exception {
InputStream metainfo_1 = new ByteArrayInputStream(metainfo_1_str.getBytes());
@@ -728,6 +825,11 @@
doNothing().when(mockAps).publishApplicationInstanceData(anyString(), anyString(), anyCollection());
doReturn(metainfo).when(mockAps).getMetainfo();
doReturn(roleClusterNodeMap).when(mockAps).getRoleClusterNodeMapping();
+ StateAccessForProviders access = createNiceMock(StateAccessForProviders.class);
+ doReturn(access).when(mockAps).getAmState();
+ PublishedExportsSet pubExpSet = new PublishedExportsSet();
+ expect(access.getPublishedExportsSet()).andReturn(pubExpSet).anyTimes();
+ replay(access);
mockAps.publishConfigAndExportGroups(hb, componentStatus, "HBASE_MASTER");
Assert.assertTrue(componentStatus.getConfigReported());
@@ -748,15 +850,30 @@
}
}
- Map<String, String> exports = mockAps.getCurrentExports("QuickLinks");
+ Map<String, List<ExportEntry>> exports = mockAps.getCurrentExports("QuickLinks");
Assert.assertEquals(2, exports.size());
- Assert.assertEquals(exports.get("JMX_Endpoint"), "http://HOST1:60012/jmx");
+ Assert.assertEquals(exports.get("JMX_Endpoint").get(0).getValue(), "http://HOST1:60012/jmx");
mockAps.publishConfigAndExportGroups(hb, componentStatus, "HBASE_REST");
Mockito.verify(mockAps, Mockito.times(3)).publishApplicationInstanceData(
anyString(),
anyString(),
entriesCaptor.capture());
+ PublishedExports pubExports = pubExpSet.get("QuickLinks".toLowerCase());
+ Assert.assertEquals(2, pubExports.entries.size());
+ Assert.assertEquals("QuickLinks", pubExports.description);
+ List<ExportEntry> expEntries = pubExports.entries.get("JMX_Endpoint");
+ Assert.assertEquals(1, expEntries.size());
+ Assert.assertEquals(null, expEntries.get(0).getContainerId());
+ Assert.assertEquals("application", expEntries.get(0).getLevel());
+ Assert.assertEquals(null, expEntries.get(0).getTag());
+ Assert.assertEquals("http://HOST1:60012/jmx", expEntries.get(0).getValue());
+ Assert.assertNull(expEntries.get(0).getValidUntil());
+
+ expEntries = pubExports.entries.get("Master_Status");
+ Assert.assertEquals(1, expEntries.size());
+ expEntries = pubExports.entries.get("JMX_Endpoint");
+ Assert.assertEquals("http://HOST1:60012/jmx", expEntries.get(0).getValue());
}
@Test
@@ -781,6 +898,7 @@
Assert.assertEquals(component.getCategory(), "MASTER");
Assert.assertEquals(component.getComponentExports().size(), 0);
Assert.assertEquals(component.getAppExports(), "QuickLinks-JMX_Endpoint,QuickLinks-Master_Status");
+ Assert.assertEquals(component.getCompExports(), "QuickLinks-Comp_Endpoint");
found++;
}
if (component.getName().equals("HBASE_REGIONSERVER")) {
@@ -807,7 +925,7 @@
List<ExportGroup> egs = application.getExportGroups();
ExportGroup eg = egs.get(0);
Assert.assertEquals(eg.getName(), "QuickLinks");
- Assert.assertEquals(eg.getExports().size(), 2);
+ Assert.assertEquals(eg.getExports().size(), 3);
found = 0;
for (Export export : eg.getExports()) {
@@ -975,15 +1093,18 @@
anyString(),
anyString(),
any(HeartBeatResponse.class));
- doNothing().when(mockAps).publishApplicationInstanceData(
+ doNothing().when(mockAps).publishFolderPaths(
+ anyMap(),
anyString(),
anyString(),
- anyCollection());
+ anyString());
doReturn(conf).when(mockAps).getConfig();
} catch (SliderException e) {
}
+ PublishedExportsSet pubExpSet = new PublishedExportsSet();
expect(access.isApplicationLive()).andReturn(true).anyTimes();
+ expect(access.getPublishedExportsSet()).andReturn(pubExpSet).anyTimes();
ClusterDescription desc = new ClusterDescription();
desc.setOption(OptionKeys.ZOOKEEPER_QUORUM, "host1:2181");
desc.setInfo(OptionKeys.APPLICATION_NAME, "HBASE");
@@ -996,6 +1117,7 @@
treeOps.set(OptionKeys.APPLICATION_NAME, "HBASE");
expect(access.getInstanceDefinitionSnapshot()).andReturn(aggConf).anyTimes();
expect(access.getInternalsSnapshot()).andReturn(treeOps).anyTimes();
+ doNothing().when(mockAps).publishApplicationInstanceData(anyString(), anyString(), anyCollection());
replay(access, ctx, container, sliderFileSystem, mockFs);
// build two containers
@@ -1094,6 +1216,7 @@
hb = new HeartBeat();
hb.setResponseId(2);
hb.setHostname("mockcontainer_1___HBASE_MASTER");
+ hb.setFqdn("host1");
cr = new CommandReport();
cr.setRole("HBASE_MASTER");
cr.setRoleCommand("INSTALL");
@@ -1160,10 +1283,11 @@
log.warn(he.getMessage());
}
- Mockito.verify(mockAps, Mockito.times(1)).publishApplicationInstanceData(
+ Mockito.verify(mockAps, Mockito.times(1)).publishFolderPaths(
+ anyMap(),
anyString(),
anyString(),
- anyCollection());
+ anyString());
}
protected AgentProviderService createAgentProviderService(Configuration conf) throws
@@ -1193,6 +1317,44 @@
}
@Test
+ public void testPublishFolderPaths() throws IOException {
+ AgentProviderService aps = createAgentProviderService(new Configuration());
+ StateAccessForProviders access = createNiceMock(StateAccessForProviders.class);
+ AgentProviderService mockAps = Mockito.spy(aps);
+ doReturn(access).when(mockAps).getAmState();
+ PublishedExportsSet pubExpSet = new PublishedExportsSet();
+ expect(access.getPublishedExportsSet()).andReturn(pubExpSet).anyTimes();
+ replay(access);
+
+ Map<String, String> folders = new HashMap<String, String>();
+ folders.put("AGENT_LOG_ROOT", "aFolder");
+ folders.put("AGENT_WORK_ROOT", "folderB");
+ mockAps.publishFolderPaths(folders, "cid", "role", "fqdn");
+
+ PublishedExports exports = pubExpSet.get("container_log_dirs");
+ Assert.assertEquals(1, exports.entries.size());
+ List<ExportEntry> expEntries = exports.entries.get("role");
+ Assert.assertEquals(1, expEntries.size());
+ Assert.assertEquals("cid", expEntries.get(0).getContainerId());
+ Assert.assertEquals("component", expEntries.get(0).getLevel());
+ Assert.assertEquals("role", expEntries.get(0).getTag());
+ Assert.assertEquals("fqdn:aFolder", expEntries.get(0).getValue());
+ Assert.assertNull(expEntries.get(0).getValidUntil());
+ Assert.assertEquals(null, expEntries.get(0).getValidUntil());
+
+ exports = pubExpSet.get("container_work_dirs");
+ Assert.assertEquals(1, exports.entries.size());
+ expEntries = exports.entries.get("role");
+ Assert.assertEquals(1, expEntries.size());
+ Assert.assertEquals("cid", expEntries.get(0).getContainerId());
+ Assert.assertEquals("component", expEntries.get(0).getLevel());
+ Assert.assertEquals("role", expEntries.get(0).getTag());
+ Assert.assertEquals("fqdn:folderB", expEntries.get(0).getValue());
+ Assert.assertNull(expEntries.get(0).getValidUntil());
+ Assert.assertEquals(null, expEntries.get(0).getValidUntil());
+ }
+
+ @Test
public void testNotifyContainerCompleted() throws IOException {
AgentProviderService aps = createAgentProviderService(new Configuration());
AgentProviderService mockAps = Mockito.spy(aps);
diff --git a/slider-funtest/src/main/groovy/org/apache/slider/funtest/framework/AgentCommandTestBase.groovy b/slider-funtest/src/main/groovy/org/apache/slider/funtest/framework/AgentCommandTestBase.groovy
index d5448bf..db9fa6d 100644
--- a/slider-funtest/src/main/groovy/org/apache/slider/funtest/framework/AgentCommandTestBase.groovy
+++ b/slider-funtest/src/main/groovy/org/apache/slider/funtest/framework/AgentCommandTestBase.groovy
@@ -164,6 +164,33 @@
return null;
}
+ public static boolean containsString(SliderShell shell, String lookThisUp, int n = 1) {
+ int count = 0
+ for (String str in shell.out) {
+ int subCount = countString(str, lookThisUp)
+ count = count + subCount
+ if (count == n) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ public static int countString(String str, String search) {
+ int count = 0
+ if (SliderUtils.isUnset(str) || SliderUtils.isUnset(search)) {
+ return count
+ }
+
+ int index = str.indexOf(search, 0)
+ while (index > 0) {
+ index = str.indexOf(search, index + 1)
+ ++count
+ }
+ return count
+ }
+
public static String findLineEntryValue(SliderShell shell, String[] locaters) {
String line = findLineEntry(shell, locaters);
diff --git a/slider-funtest/src/test/groovy/org/apache/slider/funtest/lifecycle/AppsThroughAgentIT.groovy b/slider-funtest/src/test/groovy/org/apache/slider/funtest/lifecycle/AppsThroughAgentIT.groovy
index 234275a..0796355 100644
--- a/slider-funtest/src/test/groovy/org/apache/slider/funtest/lifecycle/AppsThroughAgentIT.groovy
+++ b/slider-funtest/src/test/groovy/org/apache/slider/funtest/lifecycle/AppsThroughAgentIT.groovy
@@ -75,6 +75,46 @@
assertComponentCount(COMMAND_LOGGER, 2, shell)
assertSuccess(shell)
+
+ // get log folders
+ shell = slider(EXIT_SUCCESS,
+ [
+ ACTION_REGISTRY,
+ ARG_NAME,
+ APPLICATION_NAME,
+ ARG_LISTEXP])
+ if(!containsString(shell, "container_log_dirs") ||
+ !containsString(shell, "container_work_dirs")) {
+ logShell(shell)
+ assert fail("Should list default exports container_log_dirs or container_work_dirs")
+ }
+
+ // get log folders
+ shell = slider(EXIT_SUCCESS,
+ [
+ ACTION_REGISTRY,
+ ARG_NAME,
+ APPLICATION_NAME,
+ ARG_GETEXP,
+ "container_log_dirs"])
+ if(!containsString(shell, "\"tag\":\"COMMAND_LOGGER\",\"level\":\"component\"", 2)) {
+ logShell(shell)
+ assert fail("Should list 2 entries for log folders")
+ }
+
+ // get log folders
+ shell = slider(EXIT_SUCCESS,
+ [
+ ACTION_REGISTRY,
+ ARG_NAME,
+ APPLICATION_NAME,
+ ARG_GETEXP,
+ "container_work_dirs"])
+ if(!containsString(shell, "\"tag\":\"COMMAND_LOGGER\",\"level\":\"component\"", 2)) {
+ logShell(shell)
+ assert fail("Should list 2 entries for work folder")
+ }
+
assert isApplicationInState("RUNNING", APPLICATION_NAME), 'App is not running.'
}
}