blob: fe0e038319e40c0694a566f496db008df53ce178 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.slider.client;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.io.Files;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathNotFoundException;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.registry.client.api.RegistryConstants;
import org.apache.hadoop.registry.client.api.RegistryOperations;
import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
import org.apache.hadoop.registry.client.binding.RegistryUtils;
import org.apache.hadoop.registry.client.exceptions.NoRecordException;
import org.apache.hadoop.registry.client.types.Endpoint;
import org.apache.hadoop.registry.client.types.RegistryPathStatus;
import org.apache.hadoop.registry.client.types.ServiceRecord;
import org.apache.hadoop.registry.client.types.yarn.YarnRegistryAttributes;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.alias.CredentialProvider;
import org.apache.hadoop.security.alias.CredentialProviderFactory;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.slider.api.ClusterDescription;
import org.apache.slider.api.ClusterNode;
import org.apache.slider.api.InternalKeys;
import org.apache.slider.api.OptionKeys;
import org.apache.slider.api.ResourceKeys;
import org.apache.slider.api.SliderClusterProtocol;
import org.apache.slider.api.StateValues;
import org.apache.slider.api.proto.Messages;
import org.apache.slider.api.types.ContainerInformation;
import org.apache.slider.api.types.SliderInstanceDescription;
import org.apache.slider.client.ipc.SliderClusterOperations;
import org.apache.slider.common.Constants;
import org.apache.slider.common.SliderExitCodes;
import org.apache.slider.common.SliderKeys;
import org.apache.slider.common.SliderXmlConfKeys;
import org.apache.slider.common.params.AbstractActionArgs;
import org.apache.slider.common.params.AbstractClusterBuildingActionArgs;
import org.apache.slider.common.params.ActionAMSuicideArgs;
import org.apache.slider.common.params.ActionClientArgs;
import org.apache.slider.common.params.ActionCreateArgs;
import org.apache.slider.common.params.ActionDependencyArgs;
import org.apache.slider.common.params.ActionDiagnosticArgs;
import org.apache.slider.common.params.ActionEchoArgs;
import org.apache.slider.common.params.ActionExistsArgs;
import org.apache.slider.common.params.ActionFlexArgs;
import org.apache.slider.common.params.ActionFreezeArgs;
import org.apache.slider.common.params.ActionInstallKeytabArgs;
import org.apache.slider.common.params.ActionInstallPackageArgs;
import org.apache.slider.common.params.ActionKeytabArgs;
import org.apache.slider.common.params.ActionKillContainerArgs;
import org.apache.slider.common.params.ActionListArgs;
import org.apache.slider.common.params.ActionLookupArgs;
import org.apache.slider.common.params.ActionPackageArgs;
import org.apache.slider.common.params.ActionRegistryArgs;
import org.apache.slider.common.params.ActionResolveArgs;
import org.apache.slider.common.params.ActionStatusArgs;
import org.apache.slider.common.params.ActionThawArgs;
import org.apache.slider.common.params.ActionUpgradeArgs;
import org.apache.slider.common.params.Arguments;
import org.apache.slider.common.params.ClientArgs;
import org.apache.slider.common.params.CommonArgs;
import org.apache.slider.common.params.LaunchArgsAccessor;
import org.apache.slider.common.params.SliderActions;
import org.apache.slider.common.tools.ConfigHelper;
import org.apache.slider.common.tools.Duration;
import org.apache.slider.common.tools.SliderFileSystem;
import org.apache.slider.common.tools.SliderUtils;
import org.apache.slider.common.tools.SliderVersionInfo;
import org.apache.slider.core.build.InstanceBuilder;
import org.apache.slider.core.build.InstanceIO;
import org.apache.slider.core.conf.AggregateConf;
import org.apache.slider.core.conf.ConfTree;
import org.apache.slider.core.conf.ConfTreeOperations;
import org.apache.slider.core.conf.MapOperations;
import org.apache.slider.core.conf.ResourcesInputPropertiesValidator;
import org.apache.slider.core.conf.TemplateInputPropertiesValidator;
import org.apache.slider.core.exceptions.BadClusterStateException;
import org.apache.slider.core.exceptions.BadCommandArgumentsException;
import org.apache.slider.core.exceptions.BadConfigException;
import org.apache.slider.core.exceptions.ErrorStrings;
import org.apache.slider.core.exceptions.NoSuchNodeException;
import org.apache.slider.core.exceptions.NotFoundException;
import org.apache.slider.core.exceptions.SliderException;
import org.apache.slider.core.exceptions.UnknownApplicationInstanceException;
import org.apache.slider.core.exceptions.UsageException;
import org.apache.slider.core.exceptions.WaitTimeoutException;
import org.apache.slider.core.launch.AppMasterLauncher;
import org.apache.slider.core.launch.ClasspathConstructor;
import org.apache.slider.core.launch.CommandLineBuilder;
import org.apache.slider.core.launch.JavaCommandLineBuilder;
import org.apache.slider.core.launch.LaunchedApplication;
import org.apache.slider.core.launch.RunningApplication;
import org.apache.slider.core.launch.SerializedApplicationReport;
import org.apache.slider.core.main.RunService;
import org.apache.slider.core.persist.AppDefinitionPersister;
import org.apache.slider.core.persist.ApplicationReportSerDeser;
import org.apache.slider.core.persist.ConfPersister;
import org.apache.slider.core.persist.LockAcquireFailedException;
import org.apache.slider.core.registry.SliderRegistryUtils;
import org.apache.slider.core.registry.YarnAppListClient;
import org.apache.slider.core.registry.docstore.ConfigFormat;
import org.apache.slider.core.registry.docstore.PublishedConfigSet;
import org.apache.slider.core.registry.docstore.PublishedConfiguration;
import org.apache.slider.core.registry.docstore.PublishedConfigurationOutputter;
import org.apache.slider.core.registry.docstore.PublishedExports;
import org.apache.slider.core.registry.docstore.PublishedExportsOutputter;
import org.apache.slider.core.registry.docstore.PublishedExportsSet;
import org.apache.slider.core.registry.retrieve.RegistryRetriever;
import org.apache.slider.core.zk.BlockingZKWatcher;
import org.apache.slider.core.zk.ZKIntegration;
import org.apache.slider.core.zk.ZKPathBuilder;
import org.apache.slider.providers.AbstractClientProvider;
import org.apache.slider.providers.SliderProviderFactory;
import org.apache.slider.providers.agent.AgentKeys;
import org.apache.slider.providers.slideram.SliderAMClientProvider;
import org.apache.slider.server.appmaster.SliderAppMaster;
import org.apache.slider.server.appmaster.rpc.RpcBinder;
import org.apache.slider.server.services.security.SecurityStore;
import org.apache.slider.server.services.utility.AbstractSliderLaunchedService;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.File;
import java.io.FilenameFilter;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.InterruptedIOException;
import java.io.PrintStream;
import java.io.StringWriter;
import java.io.Writer;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URISyntaxException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static org.apache.hadoop.registry.client.binding.RegistryUtils.ServiceRecordMarshal;
import static org.apache.hadoop.registry.client.binding.RegistryUtils.currentUser;
import static org.apache.hadoop.registry.client.binding.RegistryUtils.extractServiceRecords;
import static org.apache.hadoop.registry.client.binding.RegistryUtils.listServiceRecords;
import static org.apache.hadoop.registry.client.binding.RegistryUtils.servicePath;
import static org.apache.hadoop.registry.client.binding.RegistryUtils.serviceclassPath;
import static org.apache.hadoop.registry.client.binding.RegistryUtils.statChildren;
import static org.apache.slider.common.params.SliderActions.ACTION_AM_SUICIDE;
import static org.apache.slider.common.params.SliderActions.ACTION_BUILD;
import static org.apache.slider.common.params.SliderActions.ACTION_CLIENT;
import static org.apache.slider.common.params.SliderActions.ACTION_CREATE;
import static org.apache.slider.common.params.SliderActions.ACTION_DEPENDENCY;
import static org.apache.slider.common.params.SliderActions.ACTION_DESTROY;
import static org.apache.slider.common.params.SliderActions.ACTION_DIAGNOSTICS;
import static org.apache.slider.common.params.SliderActions.ACTION_EXISTS;
import static org.apache.slider.common.params.SliderActions.ACTION_FLEX;
import static org.apache.slider.common.params.SliderActions.ACTION_FREEZE;
import static org.apache.slider.common.params.SliderActions.ACTION_HELP;
import static org.apache.slider.common.params.SliderActions.ACTION_INSTALL_KEYTAB;
import static org.apache.slider.common.params.SliderActions.ACTION_INSTALL_PACKAGE;
import static org.apache.slider.common.params.SliderActions.ACTION_KEYTAB;
import static org.apache.slider.common.params.SliderActions.ACTION_KILL_CONTAINER;
import static org.apache.slider.common.params.SliderActions.ACTION_LIST;
import static org.apache.slider.common.params.SliderActions.ACTION_LOOKUP;
import static org.apache.slider.common.params.SliderActions.ACTION_PACKAGE;
import static org.apache.slider.common.params.SliderActions.ACTION_REGISTRY;
import static org.apache.slider.common.params.SliderActions.ACTION_RESOLVE;
import static org.apache.slider.common.params.SliderActions.ACTION_STATUS;
import static org.apache.slider.common.params.SliderActions.ACTION_THAW;
import static org.apache.slider.common.params.SliderActions.ACTION_UPDATE;
import static org.apache.slider.common.params.SliderActions.ACTION_UPGRADE;
import static org.apache.slider.common.params.SliderActions.ACTION_VERSION;
/**
* Client service for Slider
*/
public class SliderClient extends AbstractSliderLaunchedService implements RunService,
SliderExitCodes, SliderKeys, ErrorStrings, SliderClientAPI {
private static final Logger log = LoggerFactory.getLogger(SliderClient.class);
public static final String E_MUST_BE_A_VALID_JSON_FILE
= "Invalid configuration. Must be a valid json file.";
public static final String E_INVALID_INSTALL_LOCATION
= "A valid install location must be provided for the client.";
public static final String E_UNABLE_TO_READ_SUPPLIED_PACKAGE_FILE
= "Unable to read supplied package file";
public static final String E_INVALID_APPLICATION_PACKAGE_LOCATION
= "A valid application package location required.";
public static final String E_INVALID_INSTALL_PATH = "Install path is not a valid directory";
public static final String E_INSTALL_PATH_DOES_NOT_EXIST = "Install path does not exist";
public static final String E_INVALID_APPLICATION_TYPE_NAME
= "A valid application type name is required (e.g. HBASE).";
public static final String E_USE_REPLACEPKG_TO_OVERWRITE = "Use --replacepkg to overwrite.";
public static final String E_PACKAGE_DOES_NOT_EXIST = "Package does not exist";
public static final String E_NO_ZOOKEEPER_QUORUM = "No Zookeeper quorum defined";
public static final String E_NO_RESOURCE_MANAGER = "No valid Resource Manager address provided";
public static final String E_PACKAGE_EXISTS = "Package exists";
private static PrintStream clientOutputStream = System.out;
// value should not be changed without updating string find in slider.py
private static final String PASSWORD_PROMPT = "Enter password for";
private ClientArgs serviceArgs;
public ApplicationId applicationId;
private String deployedClusterName;
/**
* Cluster operations against the deployed cluster -will be null
* if no bonding has yet taken place
*/
private SliderClusterOperations sliderClusterOperations;
protected SliderFileSystem sliderFileSystem;
/**
* Yarn client service
*/
private SliderYarnClientImpl yarnClient;
private YarnAppListClient yarnAppListClient;
private AggregateConf launchedInstanceDefinition;
/**
* The YARN registry service
*/
private RegistryOperations registryOperations;
/**
* Constructor
*/
public SliderClient() {
super("Slider Client");
new HdfsConfiguration();
new YarnConfiguration();
}
/**
* This is called <i>Before serviceInit is called</i>
* @param config the initial configuration build up by the
* service launcher.
* @param args argument list list of arguments passed to the command line
* after any launcher-specific commands have been stripped.
* @return the post-binding configuration to pass to the <code>init()</code>
* operation.
* @throws Exception
*/
@Override
public Configuration bindArgs(Configuration config, String... args) throws Exception {
config = super.bindArgs(config, args);
serviceArgs = new ClientArgs(args);
serviceArgs.parse();
// add the slider XML config
ConfigHelper.injectSliderXMLResource();
// yarn-ify
YarnConfiguration yarnConfiguration = new YarnConfiguration(config);
return SliderUtils.patchConfiguration(yarnConfiguration);
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
Configuration clientConf = SliderUtils.loadSliderClientXML();
ConfigHelper.mergeConfigurations(conf, clientConf, SLIDER_CLIENT_XML, true);
serviceArgs.applyDefinitions(conf);
serviceArgs.applyFileSystemBinding(conf);
// init security with our conf
if (SliderUtils.isHadoopClusterSecure(conf)) {
SliderUtils.forceLogin();
SliderUtils.initProcessSecurity(conf);
}
AbstractActionArgs coreAction = serviceArgs.getCoreAction();
if (coreAction.getHadoopServicesRequired()) {
initHadoopBinding();
}
super.serviceInit(conf);
}
/**
* this is where the work is done.
* @return the exit code
* @throws Throwable anything that went wrong
*/
/* JDK7
@Override
public int runService() throws Throwable {
// choose the action
String action = serviceArgs.getAction();
int exitCode = EXIT_SUCCESS;
String clusterName = serviceArgs.getClusterName();
// actions
switch (action) {
case ACTION_BUILD:
exitCode = actionBuild(clusterName, serviceArgs.getActionBuildArgs());
break;
case ACTION_UPDATE:
exitCode = actionUpdate(clusterName, serviceArgs.getActionUpdateArgs());
break;
case ACTION_UPGRADE:
exitCode = actionUpgrade(clusterName, serviceArgs.getActionUpgradeArgs());
break;
case ACTION_CREATE:
exitCode = actionCreate(clusterName, serviceArgs.getActionCreateArgs());
break;
case ACTION_FREEZE:
exitCode = actionFreeze(clusterName, serviceArgs.getActionFreezeArgs());
break;
case ACTION_THAW:
exitCode = actionThaw(clusterName, serviceArgs.getActionThawArgs());
break;
case ACTION_DESTROY:
exitCode = actionDestroy(clusterName);
break;
case ACTION_EXISTS:
exitCode = actionExists(clusterName,
serviceArgs.getActionExistsArgs().live);
break;
case ACTION_FLEX:
exitCode = actionFlex(clusterName, serviceArgs.getActionFlexArgs());
break;
case ACTION_GETCONF:
exitCode =
actionGetConf(clusterName, serviceArgs.getActionGetConfArgs());
break;
case ACTION_HELP:
case ACTION_USAGE:
log.info(serviceArgs.usage());
break;
case ACTION_KILL_CONTAINER:
exitCode = actionKillContainer(clusterName,
serviceArgs.getActionKillContainerArgs());
break;
case ACTION_AM_SUICIDE:
exitCode = actionAmSuicide(clusterName,
serviceArgs.getActionAMSuicideArgs());
break;
case ACTION_LIST:
exitCode = actionList(clusterName, serviceArgs.getActionListArgs());
break;
case ACTION_REGISTRY:
exitCode = actionRegistry(
serviceArgs.getActionRegistryArgs());
break;
case ACTION_STATUS:
exitCode = actionStatus(clusterName,
serviceArgs.getActionStatusArgs());
break;
case ACTION_VERSION:
exitCode = actionVersion();
break;
default:
throw new SliderException(EXIT_UNIMPLEMENTED,
"Unimplemented: " + action);
}
return exitCode;
}
*/
/**
* Launched service execution. This runs {@link #exec()}
* then catches some exceptions and converts them to exit codes
* @return an exit code
* @throws Throwable
*/
@Override
public int runService() throws Throwable {
try {
return exec();
} catch (FileNotFoundException | PathNotFoundException nfe) {
throw new NotFoundException(nfe, nfe.toString());
}
}
/**
* Execute the command line
* @return an exit code
* @throws Throwable on a failure
*/
public int exec() throws Throwable {
// choose the action
String action = serviceArgs.getAction();
if (SliderUtils.isUnset(action)) {
throw new SliderException(EXIT_USAGE,
serviceArgs.usage());
}
int exitCode = EXIT_SUCCESS;
String clusterName = serviceArgs.getClusterName();
// actions
switch (action) {
case ACTION_AM_SUICIDE:
exitCode = actionAmSuicide(clusterName,
serviceArgs.getActionAMSuicideArgs());
break;
case ACTION_BUILD:
exitCode = actionBuild(clusterName, serviceArgs.getActionBuildArgs());
break;
case ACTION_CLIENT:
exitCode = actionClient(serviceArgs.getActionClientArgs());
break;
case ACTION_CREATE:
exitCode = actionCreate(clusterName, serviceArgs.getActionCreateArgs());
break;
case ACTION_DEPENDENCY:
exitCode = actionDependency(serviceArgs.getActionDependencyArgs());
break;
case ACTION_DESTROY:
exitCode = actionDestroy(clusterName);
break;
case ACTION_DIAGNOSTICS:
exitCode = actionDiagnostic(serviceArgs.getActionDiagnosticArgs());
break;
case ACTION_EXISTS:
exitCode = actionExists(clusterName,
serviceArgs.getActionExistsArgs());
break;
case ACTION_FLEX:
exitCode = actionFlex(clusterName, serviceArgs.getActionFlexArgs());
break;
case ACTION_FREEZE:
exitCode = actionFreeze(clusterName, serviceArgs.getActionFreezeArgs());
break;
case ACTION_HELP:
log.info(serviceArgs.usage());
break;
case ACTION_KILL_CONTAINER:
exitCode = actionKillContainer(clusterName,
serviceArgs.getActionKillContainerArgs());
break;
case ACTION_INSTALL_KEYTAB:
exitCode =
actionInstallKeytab(serviceArgs.getActionInstallKeytabArgs());
break;
case ACTION_INSTALL_PACKAGE:
exitCode = actionInstallPkg(serviceArgs.getActionInstallPackageArgs());
break;
case ACTION_KEYTAB:
exitCode = actionKeytab(serviceArgs.getActionKeytabArgs());
break;
case ACTION_LIST:
exitCode = actionList(clusterName, serviceArgs.getActionListArgs());
break;
case ACTION_LOOKUP:
exitCode = actionLookup(serviceArgs.getActionLookupArgs());
break;
case ACTION_PACKAGE:
exitCode = actionPackage(serviceArgs.getActionPackageArgs());
break;
case ACTION_REGISTRY:
exitCode = actionRegistry(serviceArgs.getActionRegistryArgs());
break;
case ACTION_RESOLVE:
exitCode = actionResolve(serviceArgs.getActionResolveArgs());
break;
case ACTION_STATUS:
exitCode = actionStatus(clusterName, serviceArgs.getActionStatusArgs());
break;
case ACTION_THAW:
exitCode = actionThaw(clusterName, serviceArgs.getActionThawArgs());
break;
case ACTION_UPDATE:
exitCode = actionUpdate(clusterName, serviceArgs.getActionUpdateArgs());
break;
case ACTION_UPGRADE:
exitCode = actionUpgrade(clusterName, serviceArgs.getActionUpgradeArgs());
break;
case ACTION_VERSION:
exitCode = actionVersion();
break;
default:
throw new SliderException(EXIT_UNIMPLEMENTED,
"Unimplemented: " + action);
}
return exitCode;
}
/**
* Perform everything needed to init the hadoop binding.
* This assumes that the service is already in inited or started state
* @throws IOException
* @throws SliderException
*/
protected void initHadoopBinding() throws IOException, SliderException {
// validate the client
SliderUtils.validateSliderClientEnvironment(null);
//create the YARN client
yarnClient = new SliderYarnClientImpl();
yarnClient.init(getConfig());
if (getServiceState() == STATE.STARTED) {
yarnClient.start();
}
addService(yarnClient);
yarnAppListClient =
new YarnAppListClient(yarnClient, getUsername(), getConfig());
// create the filesystem
sliderFileSystem = new SliderFileSystem(getConfig());
}
/**
* Delete the zookeeper node associated with the calling user and the cluster
* TODO: YARN registry operations
**/
@VisibleForTesting
public boolean deleteZookeeperNode(String clusterName) throws YarnException, IOException {
String user = getUsername();
String zkPath = ZKIntegration.mkClusterPath(user, clusterName);
Exception e = null;
try {
Configuration config = getConfig();
ZKIntegration client = getZkClient(clusterName, user);
if (client != null) {
if (client.exists(zkPath)) {
log.info("Deleting zookeeper path {}", zkPath);
}
client.deleteRecursive(zkPath);
return true;
}
} catch (InterruptedException | BadConfigException | KeeperException ex) {
e = ex;
}
if (e != null) {
log.warn("Unable to recursively delete zk node {}", zkPath, e);
}
return false;
}
/**
* Create the zookeeper node associated with the calling user and the cluster
*/
@VisibleForTesting
public String createZookeeperNode(String clusterName, Boolean nameOnly) throws YarnException, IOException {
try {
return createZookeeperNodeInner(clusterName, nameOnly);
} catch (KeeperException.NodeExistsException e) {
return null;
} catch (KeeperException e) {
return null;
} catch (InterruptedException e) {
throw new InterruptedIOException(e.toString());
}
}
/**
* Create the zookeeper node associated with the calling user and the cluster
* -throwing exceptions on any failure
* @param clusterName cluster name
* @param nameOnly create the path, not the node
* @return the path, with the node created
* @throws YarnException
* @throws IOException
* @throws KeeperException
* @throws InterruptedException
*/
@VisibleForTesting
public String createZookeeperNodeInner(String clusterName, Boolean nameOnly)
throws YarnException, IOException, KeeperException, InterruptedException {
String user = getUsername();
String zkPath = ZKIntegration.mkClusterPath(user, clusterName);
if (nameOnly) {
return zkPath;
}
ZKIntegration client = getZkClient(clusterName, user);
if (client != null) {
// set up the permissions. This must be done differently on a secure cluster from an insecure
// one
List<ACL> zkperms = new ArrayList<ACL>();
if (UserGroupInformation.isSecurityEnabled()) {
zkperms.add(new ACL(ZooDefs.Perms.ALL, ZooDefs.Ids.AUTH_IDS));
zkperms.add(new ACL(ZooDefs.Perms.READ, ZooDefs.Ids.ANYONE_ID_UNSAFE));
} else {
zkperms.add(new ACL(ZooDefs.Perms.ALL, ZooDefs.Ids.ANYONE_ID_UNSAFE));
}
client.createPath(zkPath, "",
zkperms,
CreateMode.PERSISTENT);
return zkPath;
} else {
return null;
}
}
/**
* Gets a zookeeper client, returns null if it cannot connect to zookeeper
**/
protected ZKIntegration getZkClient(String clusterName, String user) throws YarnException {
String registryQuorum = lookupZKQuorum();
ZKIntegration client = null;
try {
BlockingZKWatcher watcher = new BlockingZKWatcher();
client = ZKIntegration.newInstance(registryQuorum, user, clusterName, true, false, watcher);
client.init();
watcher.waitForZKConnection(2 * 1000);
} catch (InterruptedException e) {
client = null;
log.warn("Unable to connect to zookeeper quorum {}", registryQuorum, e);
} catch (IOException e) {
log.warn("Unable to connect to zookeeper quorum {}", registryQuorum, e);
}
return client;
}
@Override
public int actionDestroy(String clustername) throws YarnException,
IOException {
// verify that a live cluster isn't there
SliderUtils.validateClusterName(clustername);
//no=op, it is now mandatory.
verifyBindingsDefined();
verifyNoLiveClusters(clustername, "Destroy");
// create the directory path
Path clusterDirectory = sliderFileSystem.buildClusterDirPath(clustername);
// delete the directory;
FileSystem fs = sliderFileSystem.getFileSystem();
boolean exists = fs.exists(clusterDirectory);
if (exists) {
log.debug("Application Instance {} found at {}: destroying", clustername, clusterDirectory);
boolean deleted =
fs.delete(clusterDirectory, true);
if (!deleted) {
log.warn("Filesystem returned false from delete() operation");
}
if(!deleteZookeeperNode(clustername)) {
log.warn("Unable to perform node cleanup in Zookeeper.");
}
if (fs.exists(clusterDirectory)) {
log.warn("Failed to delete {}", clusterDirectory);
}
} else {
log.debug("Application Instance {} already destroyed", clustername);
}
// rm the registry entry —do not let this block the destroy operations
String registryPath = SliderRegistryUtils.registryPathForInstance(
clustername);
try {
getRegistryOperations().delete(registryPath, true);
} catch (IOException e) {
log.warn("Error deleting registry entry {}: {} ", registryPath, e, e);
} catch (SliderException e) {
log.warn("Error binding to registry {} ", e, e);
}
List<ApplicationReport> instances = findAllLiveInstances(clustername);
// detect any race leading to cluster creation during the check/destroy process
// and report a problem.
if (!instances.isEmpty()) {
throw new SliderException(EXIT_APPLICATION_IN_USE,
clustername + ": "
+ E_DESTROY_CREATE_RACE_CONDITION
+ " :" +
instances.get(0));
}
log.info("Destroyed cluster {}", clustername);
return EXIT_SUCCESS;
}
@Override
public int actionAmSuicide(String clustername,
ActionAMSuicideArgs args) throws YarnException, IOException {
SliderClusterOperations clusterOperations =
createClusterOperations(clustername);
clusterOperations.amSuicide(args.message, args.exitcode, args.waittime);
return EXIT_SUCCESS;
}
@Override
public AbstractClientProvider createClientProvider(String provider)
throws SliderException {
SliderProviderFactory factory =
SliderProviderFactory.createSliderProviderFactory(provider);
return factory.createClientProvider();
}
/**
* Create the cluster -saving the arguments to a specification file first
* @param clustername cluster name
* @return the status code
* @throws YarnException Yarn problems
* @throws IOException other problems
* @throws BadCommandArgumentsException bad arguments.
*/
public int actionCreate(String clustername, ActionCreateArgs createArgs) throws
YarnException,
IOException {
actionBuild(clustername, createArgs);
Path clusterDirectory = sliderFileSystem.buildClusterDirPath(clustername);
AggregateConf instanceDefinition = loadInstanceDefinitionUnresolved(
clustername, clusterDirectory);
try {
checkForCredentials(getConfig(), instanceDefinition.getAppConf());
} catch (IOException e) {
sliderFileSystem.getFileSystem().delete(clusterDirectory, true);
throw e;
}
return startCluster(clustername, createArgs);
}
@Override
public int actionUpgrade(String clustername, ActionUpgradeArgs upgradeArgs)
throws YarnException, IOException {
File template = upgradeArgs.template;
File resources = upgradeArgs.resources;
List<String> containers = upgradeArgs.containers;
List<String> components = upgradeArgs.components;
// For upgrade spec, let's be little more strict with validation. If either
// --template or --resources is specified, then both needs to be specified.
// Otherwise the internal app config and resources states of the app will be
// unwantedly modified and the change will take effect to the running app
// immediately.
if (template != null && resources == null) {
throw new BadCommandArgumentsException(
"Option %s must be specified with option %s",
Arguments.ARG_RESOURCES, Arguments.ARG_TEMPLATE);
}
if (resources != null && template == null) {
throw new BadCommandArgumentsException(
"Option %s must be specified with option %s",
Arguments.ARG_TEMPLATE, Arguments.ARG_RESOURCES);
}
// For upgrade spec, both --template and --resources should be specified
// and neither of --containers or --components should be used
if (template != null && resources != null) {
if (CollectionUtils.isNotEmpty(containers)) {
throw new BadCommandArgumentsException(
"Option %s cannot be specified with %s or %s",
Arguments.ARG_CONTAINERS, Arguments.ARG_TEMPLATE,
Arguments.ARG_RESOURCES);
}
if (CollectionUtils.isNotEmpty(components)) {
throw new BadCommandArgumentsException(
"Option %s cannot be specified with %s or %s",
Arguments.ARG_COMPONENTS, Arguments.ARG_TEMPLATE,
Arguments.ARG_RESOURCES);
}
// not an error to try to upgrade a stopped cluster, just return success
// code, appropriate log messages have already been dumped
if (!isAppInRunningState(clustername)) {
return EXIT_SUCCESS;
}
// Now initiate the upgrade spec flow
buildInstanceDefinition(clustername, upgradeArgs, true, true, true);
SliderClusterOperations clusterOperations = createClusterOperations(clustername);
clusterOperations.amSuicide("AM restarted for application upgrade", 1, 1000);
return EXIT_SUCCESS;
}
// Since neither --template or --resources were specified, it is upgrade
// containers flow. Here any one or both of --containers and --components
// can be specified. If a container is specified with --containers option
// and also belongs to a component type specified with --components, it will
// be upgraded only once.
return actionUpgradeContainers(clustername, upgradeArgs);
}
private int actionUpgradeContainers(String clustername,
ActionUpgradeArgs upgradeArgs) throws YarnException, IOException {
verifyBindingsDefined();
SliderUtils.validateClusterName(clustername);
int waittime = upgradeArgs.getWaittime(); // ignored for now
String text = "Upgrade containers";
log.debug("actionUpgradeContainers({}, reason={}, wait={})", clustername,
text, waittime);
// not an error to try to upgrade a stopped cluster, just return success
// code, appropriate log messages have already been dumped
if (!isAppInRunningState(clustername)) {
return EXIT_SUCCESS;
}
// Create sets of containers and components to get rid of duplicates and
// for quick lookup during checks below
Set<String> containers = new HashSet<>();
if (upgradeArgs.containers != null) {
containers.addAll(new ArrayList<>(upgradeArgs.containers));
}
Set<String> components = new HashSet<>();
if (upgradeArgs.components != null) {
components.addAll(new ArrayList<>(upgradeArgs.components));
}
// check validity of component names and running containers here
List<ContainerInformation> liveContainers = getContainers(clustername);
Set<String> validContainers = new HashSet<>();
Set<String> validComponents = new HashSet<>();
for (ContainerInformation liveContainer : liveContainers) {
boolean allContainersAndComponentsAccountedFor = true;
if (CollectionUtils.isNotEmpty(containers)) {
if (containers.contains(liveContainer.containerId)) {
containers.remove(liveContainer.containerId);
validContainers.add(liveContainer.containerId);
}
allContainersAndComponentsAccountedFor = false;
}
if (CollectionUtils.isNotEmpty(components)) {
if (components.contains(liveContainer.component)) {
components.remove(liveContainer.component);
validComponents.add(liveContainer.component);
}
allContainersAndComponentsAccountedFor = false;
}
if (allContainersAndComponentsAccountedFor) {
break;
}
}
// If any item remains in containers or components then they are invalid.
// Log warning for them and proceed.
if (CollectionUtils.isNotEmpty(containers)) {
log.warn("Invalid set of containers provided {}", containers);
}
if (CollectionUtils.isNotEmpty(components)) {
log.warn("Invalid set of components provided {}", components);
}
// If not a single valid container or component is specified do not proceed
if (CollectionUtils.isEmpty(validContainers)
&& CollectionUtils.isEmpty(validComponents)) {
log.error("Not a single valid container or component specified. Nothing to do.");
return EXIT_NOT_FOUND;
}
SliderClusterProtocol appMaster = connect(findInstance(clustername));
Messages.UpgradeContainersRequestProto r =
Messages.UpgradeContainersRequestProto
.newBuilder()
.setMessage(text)
.addAllContainer(validContainers)
.addAllComponent(validComponents)
.build();
appMaster.upgradeContainers(r);
log.info("Cluster upgrade issued for -");
if (CollectionUtils.isNotEmpty(validContainers)) {
log.info(" Containers (total {}): {}", validContainers.size(),
validContainers);
}
if (CollectionUtils.isNotEmpty(validComponents)) {
log.info(" Components (total {}): {}", validComponents.size(),
validComponents);
}
return EXIT_SUCCESS;
}
// returns true if and only if app is in RUNNING state
private boolean isAppInRunningState(String clustername) throws YarnException,
IOException {
// is this actually a known cluster?
sliderFileSystem.locateInstanceDefinition(clustername);
ApplicationReport app = findInstance(clustername);
if (app == null) {
// exit early
log.info("Cluster {} not running", clustername);
return false;
}
log.debug("App to upgrade was found: {}:\n{}", clustername,
new SliderUtils.OnDemandReportStringifier(app));
if (app.getYarnApplicationState().ordinal() >= YarnApplicationState.FINISHED
.ordinal()) {
log.info(
"Cluster {} is in a terminated state {}. Use command '{}' instead.",
clustername, app.getYarnApplicationState(),
SliderActions.ACTION_UPDATE);
return false;
}
// IPC request to upgrade containers is possible if the app is running.
if (app.getYarnApplicationState().ordinal() < YarnApplicationState.RUNNING
.ordinal()) {
log.info("Cluster {} is in a pre-running state {}. To upgrade it needs "
+ "to be RUNNING.", clustername, app.getYarnApplicationState());
return false;
}
return true;
}
private static void checkForCredentials(Configuration conf,
ConfTree tree) throws IOException {
if (tree.credentials == null || tree.credentials.size()==0) {
log.info("No credentials requested");
return;
}
BufferedReader br = null;
try {
for (Entry<String, List<String>> cred : tree.credentials.entrySet()) {
String provider = cred.getKey();
List<String> aliases = cred.getValue();
if (aliases == null || aliases.isEmpty()) {
continue;
}
Configuration c = new Configuration(conf);
c.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH, provider);
CredentialProvider credentialProvider =
CredentialProviderFactory.getProviders(c).get(0);
Set<String> existingAliases =
new HashSet<>(credentialProvider.getAliases());
for (String alias : aliases) {
if (existingAliases.contains(alias.toLowerCase(Locale.ENGLISH))) {
log.info("Credentials for " + alias + " found in " + provider);
} else {
if (br == null) {
br = new BufferedReader(new InputStreamReader(System.in));
}
char[] pass = readPassword(alias, br);
credentialProvider.createCredentialEntry(alias, pass);
credentialProvider.flush();
Arrays.fill(pass, ' ');
}
}
}
} finally {
if (br != null) {
br.close();
}
}
}
private static char[] readOnePassword(String alias) throws IOException {
BufferedReader br = null;
try {
br = new BufferedReader(new InputStreamReader(System.in));
return readPassword(alias, br);
} finally {
if (br != null) {
br.close();
}
}
}
// using a normal reader instead of a secure one,
// because stdin is not hooked up to the command line
private static char[] readPassword(String alias, BufferedReader br)
throws IOException {
char[] cred = null;
boolean noMatch;
do {
log.info(String.format("%s %s: ", PASSWORD_PROMPT, alias));
char[] newPassword1 = br.readLine().toCharArray();
log.info(String.format("%s %s again: ", PASSWORD_PROMPT, alias));
char[] newPassword2 = br.readLine().toCharArray();
noMatch = !Arrays.equals(newPassword1, newPassword2);
if (noMatch) {
if (newPassword1 != null) Arrays.fill(newPassword1, ' ');
log.info(String.format("Passwords don't match. Try again."));
} else {
cred = newPassword1;
}
if (newPassword2 != null) Arrays.fill(newPassword2, ' ');
} while (noMatch);
if (cred == null)
throw new IOException("Could not read credentials for " + alias +
" from stdin");
return cred;
}
@Override
public int actionBuild(String clustername,
AbstractClusterBuildingActionArgs buildInfo) throws
YarnException,
IOException {
buildInstanceDefinition(clustername, buildInfo, false, false);
return EXIT_SUCCESS;
}
@Override
public int actionKeytab(ActionKeytabArgs keytabInfo)
throws YarnException, IOException {
if (keytabInfo.install) {
return actionInstallKeytab(keytabInfo);
} else if (keytabInfo.delete) {
return actionDeleteKeytab(keytabInfo);
} else if (keytabInfo.list) {
return actionListKeytab(keytabInfo);
} else {
throw new BadCommandArgumentsException(
"Keytab option specified not found.\n"
+ CommonArgs.usage(serviceArgs, ACTION_KEYTAB));
}
}
private int actionListKeytab(ActionKeytabArgs keytabInfo) throws IOException {
String folder = keytabInfo.folder != null ? keytabInfo.folder : StringUtils.EMPTY;
Path keytabPath = sliderFileSystem.buildKeytabInstallationDirPath(folder);
RemoteIterator<LocatedFileStatus> files =
sliderFileSystem.getFileSystem().listFiles(keytabPath, true);
log.info("Keytabs:");
while (files.hasNext()) {
log.info("\t" + files.next().getPath().toString());
}
return EXIT_SUCCESS;
}
private int actionDeleteKeytab(ActionKeytabArgs keytabInfo)
throws BadCommandArgumentsException, IOException {
if (StringUtils.isEmpty(keytabInfo.folder)) {
throw new BadCommandArgumentsException(
"A valid destination keytab sub-folder name is required (e.g. 'security').\n"
+ CommonArgs.usage(serviceArgs, ACTION_KEYTAB));
}
if (StringUtils.isEmpty(keytabInfo.keytab)) {
throw new BadCommandArgumentsException("A keytab name is required.");
}
Path pkgPath = sliderFileSystem.buildKeytabInstallationDirPath(keytabInfo.folder);
Path fileInFs = new Path(pkgPath, keytabInfo.keytab );
log.info("Deleting keytab {}", fileInFs);
if (!sliderFileSystem.getFileSystem().exists(fileInFs)) {
throw new BadCommandArgumentsException("No keytab to delete found at " +
fileInFs.toUri().toString());
}
sliderFileSystem.getFileSystem().delete(fileInFs, false);
return EXIT_SUCCESS;
}
private int actionInstallKeytab(ActionKeytabArgs keytabInfo)
throws BadCommandArgumentsException, IOException {
Path srcFile = null;
if (StringUtils.isEmpty(keytabInfo.folder)) {
throw new BadCommandArgumentsException(
"A valid destination keytab sub-folder name is required (e.g. 'security').\n"
+ CommonArgs.usage(serviceArgs, ACTION_KEYTAB));
}
if (StringUtils.isEmpty(keytabInfo.keytab)) {
throw new BadCommandArgumentsException("A valid local keytab location is required.");
} else {
File keytabFile = new File(keytabInfo.keytab);
if (!keytabFile.exists() || keytabFile.isDirectory()) {
throw new BadCommandArgumentsException("Unable to access supplied keytab file at " +
keytabFile.getAbsolutePath());
} else {
srcFile = new Path(keytabFile.toURI());
}
}
Path pkgPath = sliderFileSystem.buildKeytabInstallationDirPath(keytabInfo.folder);
sliderFileSystem.getFileSystem().mkdirs(pkgPath);
sliderFileSystem.getFileSystem().setPermission(pkgPath, new FsPermission(
FsAction.ALL, FsAction.NONE, FsAction.NONE));
Path fileInFs = new Path(pkgPath, srcFile.getName());
log.info("Installing keytab {} at {} and overwrite is {}.", srcFile, fileInFs, keytabInfo.overwrite);
if (sliderFileSystem.getFileSystem().exists(fileInFs) && !keytabInfo.overwrite) {
throw new BadCommandArgumentsException("Keytab exists at " +
fileInFs.toUri().toString() +
". Use --overwrite to overwrite.");
}
sliderFileSystem.getFileSystem().copyFromLocalFile(false, keytabInfo.overwrite, srcFile, fileInFs);
sliderFileSystem.getFileSystem().setPermission(fileInFs, new FsPermission(
FsAction.READ_WRITE, FsAction.NONE, FsAction.NONE));
return EXIT_SUCCESS;
}
@Override
public int actionInstallKeytab(ActionInstallKeytabArgs installKeytabInfo)
throws YarnException, IOException {
log.warn("The 'install-keytab' option has been deprecated. Please use 'keytab --install'.");
return actionKeytab(new ActionKeytabArgs(installKeytabInfo));
}
@Override
public int actionInstallPkg(ActionInstallPackageArgs installPkgInfo) throws
YarnException,
IOException {
log.warn("The " + SliderActions.ACTION_INSTALL_PACKAGE
+ " option has been deprecated. Please use '"
+ SliderActions.ACTION_PACKAGE + " " + ClientArgs.ARG_INSTALL + "'.");
Path srcFile = null;
if (StringUtils.isEmpty(installPkgInfo.name)) {
throw new BadCommandArgumentsException(
E_INVALID_APPLICATION_TYPE_NAME +"\n"
+ CommonArgs.usage(serviceArgs, ACTION_INSTALL_PACKAGE));
}
if (StringUtils.isEmpty(installPkgInfo.packageURI)) {
throw new BadCommandArgumentsException(E_INVALID_APPLICATION_PACKAGE_LOCATION);
} else {
File pkgFile = new File(installPkgInfo.packageURI);
if (!pkgFile.exists() || pkgFile.isDirectory()) {
throw new BadCommandArgumentsException(
E_UNABLE_TO_READ_SUPPLIED_PACKAGE_FILE +": "
+ pkgFile.getAbsolutePath());
} else {
srcFile = new Path(pkgFile.toURI());
}
}
// Do not provide new options to install-package command as it is in
// deprecated mode. So version is kept null here. Use package --install.
Path pkgPath = sliderFileSystem.buildPackageDirPath(installPkgInfo.name,
null);
sliderFileSystem.getFileSystem().mkdirs(pkgPath);
Path fileInFs = new Path(pkgPath, srcFile.getName());
log.info("Installing package {} at {} and overwrite is {}.", srcFile, fileInFs, installPkgInfo.replacePkg);
if (sliderFileSystem.getFileSystem().exists(fileInFs) && !installPkgInfo.replacePkg) {
throw new BadCommandArgumentsException(
"Package exists at " + fileInFs.toUri().toString() +"."
+ E_USE_REPLACEPKG_TO_OVERWRITE);
}
sliderFileSystem.getFileSystem().copyFromLocalFile(false, installPkgInfo.replacePkg, srcFile, fileInFs);
return EXIT_SUCCESS;
}
@Override
public int actionClient(ActionClientArgs clientInfo) throws
YarnException,
IOException {
if (clientInfo.install) {
return doClientInstall(clientInfo);
} else if (clientInfo.getCertStore) {
return doCertificateStoreRetrieval(clientInfo);
} else {
throw new BadCommandArgumentsException(
"Only install, keystore, and truststore commands are supported for the client.\n"
+ CommonArgs.usage(serviceArgs, ACTION_CLIENT));
}
}
private int doCertificateStoreRetrieval(ActionClientArgs clientInfo)
throws YarnException, IOException {
if (clientInfo.keystore != null && clientInfo.truststore != null) {
throw new BadCommandArgumentsException(
"Only one of either keystore or truststore can be retrieved at one time. "
+ "Retrieval of both should be done separately\n"
+ CommonArgs.usage(serviceArgs, ACTION_CLIENT));
}
if (clientInfo.name == null) {
throw new BadCommandArgumentsException("No application name specified\n"
+ CommonArgs.usage(serviceArgs,
ACTION_CLIENT));
}
File storeFile = null;
SecurityStore.StoreType type;
if (clientInfo.keystore != null) {
storeFile = clientInfo.keystore;
type = SecurityStore.StoreType.keystore;
} else {
storeFile = clientInfo.truststore;
type = SecurityStore.StoreType.truststore;
}
if (storeFile.exists()) {
throw new BadCommandArgumentsException("File %s already exists. "
+ "Please remove that file or select a different file name.",
storeFile.getAbsolutePath());
}
String hostname = null;
if (type == SecurityStore.StoreType.keystore) {
hostname = clientInfo.hostname;
if (hostname == null) {
hostname = InetAddress.getLocalHost().getCanonicalHostName();
log.info("No hostname specified via command line. Using {}", hostname);
}
}
String password = clientInfo.password;
if (password == null) {
String provider = clientInfo.provider;
String alias = clientInfo.alias;
if (provider != null && alias != null) {
Configuration conf = new Configuration(getConfig());
conf.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH, provider);
char[] chars = conf.getPassword(alias);
if (chars == null) {
CredentialProvider credentialProvider =
CredentialProviderFactory.getProviders(conf).get(0);
chars = readOnePassword(alias);
credentialProvider.createCredentialEntry(alias, chars);
credentialProvider.flush();
}
password = String.valueOf(chars);
Arrays.fill(chars, ' ');
} else {
log.info("No password and no provider/alias pair were provided, " +
"prompting for password");
// get a password
password = String.valueOf(readOnePassword(type.name()));
}
}
byte[]
keystore =
createClusterOperations(clientInfo.name).getClientCertificateStore(
hostname, "client", password, type.name());
// persist to file
IOUtils.write(keystore, new FileOutputStream(storeFile));
return EXIT_SUCCESS;
}
private int doClientInstall(ActionClientArgs clientInfo)
throws IOException, SliderException {
if (clientInfo.installLocation == null) {
throw new BadCommandArgumentsException(
E_INVALID_INSTALL_LOCATION +"\n"
+ CommonArgs.usage(serviceArgs, ACTION_CLIENT));
} else {
if (!clientInfo.installLocation.exists()) {
throw new BadCommandArgumentsException(E_INSTALL_PATH_DOES_NOT_EXIST
+": " + clientInfo.installLocation.getAbsolutePath());
}
if (!clientInfo.installLocation.isDirectory()) {
throw new BadCommandArgumentsException(E_INVALID_INSTALL_PATH
+": " + clientInfo.installLocation.getAbsolutePath());
}
}
File pkgFile;
if (StringUtils.isEmpty(clientInfo.packageURI)) {
throw new BadCommandArgumentsException(E_INVALID_APPLICATION_PACKAGE_LOCATION);
} else {
pkgFile = new File(clientInfo.packageURI);
if (!pkgFile.exists() || pkgFile.isDirectory()) {
throw new BadCommandArgumentsException(E_UNABLE_TO_READ_SUPPLIED_PACKAGE_FILE
+" at " + pkgFile.getAbsolutePath());
}
}
JSONObject config = null;
if(clientInfo.clientConfig != null) {
try {
byte[] encoded = Files.toByteArray(clientInfo.clientConfig);
config = new JSONObject(new String(encoded, Charset.defaultCharset()));
} catch (JSONException jsonEx) {
log.error("Unable to read supplied configuration at {}: {}",
clientInfo.clientConfig, jsonEx);
log.debug("Unable to read supplied configuration at {}: {}",
clientInfo.clientConfig, jsonEx, jsonEx);
throw new BadConfigException(E_MUST_BE_A_VALID_JSON_FILE, jsonEx);
}
}
// Only INSTALL is supported
AbstractClientProvider
provider = createClientProvider(SliderProviderFactory.DEFAULT_CLUSTER_TYPE);
provider.processClientOperation(sliderFileSystem,
"INSTALL",
clientInfo.installLocation,
pkgFile,
config,
clientInfo.name);
return EXIT_SUCCESS;
}
@Override
public int actionPackage(ActionPackageArgs actionPackageInfo)
throws YarnException, IOException {
initializeOutputStream(actionPackageInfo.out);
int exitCode = -1;
if (actionPackageInfo.help) {
exitCode = actionHelp(ACTION_PACKAGE);
}
if (actionPackageInfo.install) {
exitCode = actionPackageInstall(actionPackageInfo);
}
if (actionPackageInfo.delete) {
exitCode = actionPackageDelete(actionPackageInfo);
}
if (actionPackageInfo.list) {
exitCode = actionPackageList();
}
if (actionPackageInfo.instances) {
exitCode = actionPackageInstances();
}
finalizeOutputStream(actionPackageInfo.out);
if (exitCode != -1) {
return exitCode;
}
throw new BadCommandArgumentsException(
"Select valid package operation option");
}
private void initializeOutputStream(String outFile)
throws FileNotFoundException {
if (outFile != null) {
clientOutputStream = new PrintStream(new FileOutputStream(outFile));
} else {
clientOutputStream = System.out;
}
}
private void finalizeOutputStream(String outFile) {
if (outFile != null && clientOutputStream != null) {
clientOutputStream.flush();
clientOutputStream.close();
}
clientOutputStream = System.out;
}
private int actionPackageInstances() throws YarnException, IOException {
Map<String, Path> persistentInstances = sliderFileSystem
.listPersistentInstances();
if (persistentInstances.isEmpty()) {
log.info("No slider cluster specification available");
return EXIT_SUCCESS;
}
String pkgPathValue = sliderFileSystem
.buildPackageDirPath(StringUtils.EMPTY, StringUtils.EMPTY).toUri()
.getPath();
FileSystem fs = sliderFileSystem.getFileSystem();
Iterator<Map.Entry<String, Path>> instanceItr = persistentInstances
.entrySet().iterator();
log.info("List of applications with its package name and path");
println("%-25s %15s %30s %s", "Cluster Name", "Package Name",
"Package Version", "Application Location");
while(instanceItr.hasNext()) {
Map.Entry<String, Path> entry = instanceItr.next();
String clusterName = entry.getKey();
Path clusterPath = entry.getValue();
AggregateConf instanceDefinition = loadInstanceDefinitionUnresolved(
clusterName, clusterPath);
Path appDefPath = null;
try {
appDefPath = new Path(
SliderUtils.getApplicationDefinitionPath(instanceDefinition
.getAppConfOperations()));
} catch (BadConfigException e) {
// Invalid cluster state, so move on to next. No need to log anything
// as this is just listing of instances.
continue;
}
if (!appDefPath.isUriPathAbsolute()) {
appDefPath = new Path(fs.getHomeDirectory(), appDefPath);
}
String appDefPathStr = appDefPath.toUri().toString();
try {
if (appDefPathStr.contains(pkgPathValue) && fs.isFile(appDefPath)) {
String packageName = appDefPath.getParent().getName();
String packageVersion = StringUtils.EMPTY;
if (instanceDefinition.isVersioned()) {
packageVersion = packageName;
packageName = appDefPath.getParent().getParent().getName();
}
println("%-25s %15s %30s %s", clusterName, packageName,
packageVersion, appDefPathStr);
}
} catch(IOException e) {
if(log.isDebugEnabled()) {
log.debug(clusterName + " application definition path "
+ appDefPathStr + " is not found.");
}
}
}
return EXIT_SUCCESS;
}
private int actionPackageList() throws IOException {
Path pkgPath = sliderFileSystem.buildPackageDirPath(StringUtils.EMPTY,
StringUtils.EMPTY);
log.info("Package install path : " + pkgPath);
if (!sliderFileSystem.getFileSystem().isDirectory(pkgPath)) {
log.info("No package(s) installed");
return EXIT_SUCCESS;
}
FileStatus[] fileStatus = sliderFileSystem.getFileSystem().listStatus(
pkgPath);
boolean hasPackage = false;
StringBuilder sb = new StringBuilder();
sb.append("List of installed packages:\n");
for (FileStatus fstat : fileStatus) {
if (fstat.isDirectory()) {
sb.append("\t" + fstat.getPath().getName());
sb.append("\n");
hasPackage = true;
}
}
if (hasPackage) {
println(sb.toString());
} else {
log.info("No package(s) installed");
}
return EXIT_SUCCESS;
}
private int actionPackageInstall(ActionPackageArgs actionPackageArgs) throws
YarnException,
IOException {
Path srcFile = null;
if (StringUtils.isEmpty(actionPackageArgs.name)) {
throw new BadCommandArgumentsException(
"A valid application type name is required (e.g. HBASE).\n"
+ CommonArgs.usage(serviceArgs, ACTION_PACKAGE));
}
if (StringUtils.isEmpty(actionPackageArgs.packageURI)) {
throw new BadCommandArgumentsException(
E_INVALID_APPLICATION_PACKAGE_LOCATION);
} else {
File pkgFile = new File(actionPackageArgs.packageURI);
if (!pkgFile.exists() || pkgFile.isDirectory()) {
throw new BadCommandArgumentsException(
E_UNABLE_TO_READ_SUPPLIED_PACKAGE_FILE
+ ": " + pkgFile.getAbsolutePath());
} else {
srcFile = new Path(pkgFile.toURI());
}
}
Path pkgPath = sliderFileSystem.buildPackageDirPath(actionPackageArgs.name,
actionPackageArgs.version);
if (!sliderFileSystem.getFileSystem().exists(pkgPath)) {
sliderFileSystem.getFileSystem().mkdirs(pkgPath);
}
Path fileInFs = new Path(pkgPath, srcFile.getName());
if (sliderFileSystem.getFileSystem().exists(fileInFs)
&& !actionPackageArgs.replacePkg) {
throw new BadCommandArgumentsException(E_PACKAGE_EXISTS +" at " +
fileInFs.toUri() + ". Use --replacepkg to overwrite.");
}
log.info("Installing package {} to {} (overwrite set to {})", srcFile,
fileInFs, actionPackageArgs.replacePkg);
sliderFileSystem.getFileSystem().copyFromLocalFile(false,
actionPackageArgs.replacePkg, srcFile, fileInFs);
String destPathWithHomeDir = Path
.getPathWithoutSchemeAndAuthority(fileInFs).toString();
String destHomeDir = Path.getPathWithoutSchemeAndAuthority(
sliderFileSystem.getFileSystem().getHomeDirectory()).toString();
// a somewhat contrived approach to stripping out the home directory and any trailing
// separator; designed to work on windows and unix
String destPathWithoutHomeDir;
if (destPathWithHomeDir.startsWith(destHomeDir)) {
destPathWithoutHomeDir = destPathWithHomeDir.substring(destHomeDir.length());
if (destPathWithoutHomeDir.startsWith("/") || destPathWithoutHomeDir.startsWith("\\")) {
destPathWithoutHomeDir = destPathWithoutHomeDir.substring(1);
}
} else {
destPathWithoutHomeDir = destPathWithHomeDir;
}
log.info("Set " + AgentKeys.APP_DEF + " in your app config JSON to {}",
destPathWithoutHomeDir);
return EXIT_SUCCESS;
}
private int actionPackageDelete(ActionPackageArgs actionPackageArgs) throws
YarnException, IOException {
if (StringUtils.isEmpty(actionPackageArgs.name)) {
throw new BadCommandArgumentsException(
"A valid application type name is required (e.g. HBASE).\n"
+ CommonArgs.usage(serviceArgs, ACTION_PACKAGE));
}
Path pkgPath = sliderFileSystem.buildPackageDirPath(actionPackageArgs.name,
actionPackageArgs.version);
if (!sliderFileSystem.getFileSystem().exists(pkgPath)) {
throw new BadCommandArgumentsException(E_PACKAGE_DOES_NOT_EXIST +": "
+ pkgPath.toUri().toString());
}
log.info("Deleting package {} at {}.", actionPackageArgs.name, pkgPath);
if(sliderFileSystem.getFileSystem().delete(pkgPath, true)) {
log.info("Deleted package {} " + actionPackageArgs.name);
return EXIT_SUCCESS;
} else {
log.warn("Package deletion failed.");
return EXIT_NOT_FOUND;
}
}
@Override
public int actionUpdate(String clustername,
AbstractClusterBuildingActionArgs buildInfo) throws
YarnException, IOException {
buildInstanceDefinition(clustername, buildInfo, true, true);
return EXIT_SUCCESS;
}
/**
* Build up the AggregateConfiguration for an application instance then
* persists it
* @param clustername name of the cluster
* @param buildInfo the arguments needed to build the cluster
* @param overwrite true if existing cluster directory can be overwritten
* @param liveClusterAllowed true if live cluster can be modified
* @throws YarnException
* @throws IOException
*/
public void buildInstanceDefinition(String clustername,
AbstractClusterBuildingActionArgs buildInfo, boolean overwrite,
boolean liveClusterAllowed) throws YarnException, IOException {
buildInstanceDefinition(clustername, buildInfo, overwrite,
liveClusterAllowed, false);
}
public void buildInstanceDefinition(String clustername,
AbstractClusterBuildingActionArgs buildInfo, boolean overwrite,
boolean liveClusterAllowed, boolean isUpgradeFlow) throws YarnException,
IOException {
// verify that a live cluster isn't there
SliderUtils.validateClusterName(clustername);
verifyBindingsDefined();
if (!liveClusterAllowed) {
verifyNoLiveClusters(clustername, "Create");
}
Configuration conf = getConfig();
String registryQuorum = lookupZKQuorum();
Path appconfdir = buildInfo.getConfdir();
// Provider
String providerName = buildInfo.getProvider();
requireArgumentSet(Arguments.ARG_PROVIDER, providerName);
log.debug("Provider is {}", providerName);
SliderAMClientProvider sliderAM = new SliderAMClientProvider(conf);
AbstractClientProvider provider =
createClientProvider(providerName);
InstanceBuilder builder =
new InstanceBuilder(sliderFileSystem,
getConfig(),
clustername);
AggregateConf instanceDefinition = new AggregateConf();
ConfTreeOperations appConf = instanceDefinition.getAppConfOperations();
ConfTreeOperations resources = instanceDefinition.getResourceOperations();
ConfTreeOperations internal = instanceDefinition.getInternalOperations();
//initial definition is set by the providers
sliderAM.prepareInstanceConfiguration(instanceDefinition);
provider.prepareInstanceConfiguration(instanceDefinition);
//load in any specified on the command line
if (buildInfo.resources != null) {
try {
resources.mergeFile(buildInfo.resources,
new ResourcesInputPropertiesValidator());
} catch (IOException e) {
throw new BadConfigException(e,
"incorrect argument to %s: \"%s\" : %s ",
Arguments.ARG_RESOURCES,
buildInfo.resources,
e.toString());
}
}
if (buildInfo.template != null) {
try {
appConf.mergeFile(buildInfo.template,
new TemplateInputPropertiesValidator());
} catch (IOException e) {
throw new BadConfigException(e,
"incorrect argument to %s: \"%s\" : %s ",
Arguments.ARG_TEMPLATE,
buildInfo.template,
e.toString());
}
}
if (isUpgradeFlow) {
ActionUpgradeArgs upgradeInfo = (ActionUpgradeArgs) buildInfo;
if (!upgradeInfo.force) {
validateClientAndClusterResource(clustername, resources);
}
}
//get the command line options
ConfTree cmdLineAppOptions = buildInfo.buildAppOptionsConfTree();
ConfTree cmdLineResourceOptions = buildInfo.buildResourceOptionsConfTree();
appConf.merge(cmdLineAppOptions);
AppDefinitionPersister appDefinitionPersister = new AppDefinitionPersister(sliderFileSystem);
appDefinitionPersister.processSuppliedDefinitions(clustername, buildInfo, appConf);
// put the role counts into the resources file
Map<String, String> argsRoleMap = buildInfo.getComponentMap();
for (Map.Entry<String, String> roleEntry : argsRoleMap.entrySet()) {
String count = roleEntry.getValue();
String key = roleEntry.getKey();
log.info("{} => {}", key, count);
resources.getOrAddComponent(key)
.put(ResourceKeys.COMPONENT_INSTANCES, count);
}
//all CLI role options
Map<String, Map<String, String>> appOptionMap =
buildInfo.getCompOptionMap();
appConf.mergeComponents(appOptionMap);
//internal picks up core. values only
internal.propagateGlobalKeys(appConf, "slider.");
internal.propagateGlobalKeys(appConf, "internal.");
//copy over role. and yarn. values ONLY to the resources
if (PROPAGATE_RESOURCE_OPTION) {
resources.propagateGlobalKeys(appConf, "component.");
resources.propagateGlobalKeys(appConf, "role.");
resources.propagateGlobalKeys(appConf, "yarn.");
resources.mergeComponentsPrefix(appOptionMap, "component.", true);
resources.mergeComponentsPrefix(appOptionMap, "yarn.", true);
resources.mergeComponentsPrefix(appOptionMap, "role.", true);
}
// resource component args
appConf.merge(cmdLineResourceOptions);
resources.merge(cmdLineResourceOptions);
resources.mergeComponents(buildInfo.getResourceCompOptionMap());
builder.init(providerName, instanceDefinition);
builder.propagateFilename();
builder.propagatePrincipals();
builder.setImageDetailsIfAvailable(buildInfo.getImage(),
buildInfo.getAppHomeDir());
builder.setQueue(buildInfo.queue);
String quorum = buildInfo.getZKhosts();
if (SliderUtils.isUnset(quorum)) {
quorum = registryQuorum;
}
if (isUnset(quorum)) {
throw new BadConfigException(E_NO_ZOOKEEPER_QUORUM);
}
ZKPathBuilder zkPaths = new ZKPathBuilder(getAppName(),
getUsername(),
clustername,
registryQuorum,
quorum);
String zookeeperRoot = buildInfo.getAppZKPath();
if (isSet(zookeeperRoot)) {
zkPaths.setAppPath(zookeeperRoot);
} else {
String createDefaultZkNode = appConf.getGlobalOptions()
.getOption(AgentKeys.CREATE_DEF_ZK_NODE, "false");
if (createDefaultZkNode.equals("true")) {
String defaultZKPath = createZookeeperNode(clustername, false);
log.debug("ZK node created for application instance: {}", defaultZKPath);
if (defaultZKPath != null) {
zkPaths.setAppPath(defaultZKPath);
}
} else {
// create AppPath if default is being used
String defaultZKPath = createZookeeperNode(clustername, true);
log.debug("ZK node assigned to application instance: {}", defaultZKPath);
zkPaths.setAppPath(defaultZKPath);
}
}
builder.addZKBinding(zkPaths);
//then propagate any package URI
if (buildInfo.packageURI != null) {
appConf.set(AgentKeys.PACKAGE_PATH, buildInfo.packageURI);
}
propagatePythonExecutable(conf, instanceDefinition);
// make any substitutions needed at this stage
replaceTokens(appConf.getConfTree(), getUsername(), clustername);
// TODO: Refactor the validation code and persistence code
try {
persistInstanceDefinition(overwrite, appconfdir, builder);
appDefinitionPersister.persistPackages();
} catch (LockAcquireFailedException e) {
log.warn("Failed to get a Lock on {} : {}", builder, e, e);
throw new BadClusterStateException("Failed to save " + clustername
+ ": " + e);
}
// providers to validate what there is
// TODO: Validation should be done before persistence
AggregateConf instanceDescription = builder.getInstanceDescription();
validateInstanceDefinition(sliderAM, instanceDescription, sliderFileSystem);
validateInstanceDefinition(provider, instanceDescription, sliderFileSystem);
}
private void validateClientAndClusterResource(String clustername,
ConfTreeOperations clientResources) throws BadClusterStateException,
SliderException, IOException {
log.info("Validating upgrade resource definition with current cluster "
+ "state (components and instance count)");
Map<String, Integer> clientComponentInstances = new HashMap<>();
for (String componentName : clientResources.getComponentNames()) {
if (!SliderKeys.COMPONENT_AM.equals(componentName)) {
clientComponentInstances.put(componentName, clientResources
.getComponentOptInt(componentName,
ResourceKeys.COMPONENT_INSTANCES, -1));
}
}
AggregateConf clusterConf = null;
try {
clusterConf = loadPersistedClusterDescription(clustername);
} catch (LockAcquireFailedException e) {
log.warn("Failed to get a Lock on cluster resource : {}", e, e);
throw new BadClusterStateException(
"Failed to load client resource definition " + clustername + ": "
+ e);
}
Map<String, Integer> clusterComponentInstances = new HashMap<>();
for (Map.Entry<String, Map<String, String>> component : clusterConf
.getResources().components.entrySet()) {
if (!SliderKeys.COMPONENT_AM.equals(component.getKey())) {
clusterComponentInstances.put(
component.getKey(),
Integer.decode(component.getValue().get(
ResourceKeys.COMPONENT_INSTANCES)));
}
}
// client and cluster should be an exact match
Iterator<Map.Entry<String, Integer>> clientComponentInstanceIt = clientComponentInstances
.entrySet().iterator();
while (clientComponentInstanceIt.hasNext()) {
Map.Entry<String, Integer> clientComponentInstanceEntry = clientComponentInstanceIt
.next();
if (clusterComponentInstances
.containsKey(clientComponentInstanceEntry.getKey())) {
// compare instance count now and remove from both maps if they match
if (clusterComponentInstances
.get(clientComponentInstanceEntry.getKey()) == clientComponentInstanceEntry
.getValue()) {
clusterComponentInstances
.remove(clientComponentInstanceEntry.getKey());
clientComponentInstanceIt.remove();
}
}
}
if (!clientComponentInstances.isEmpty()
|| !clusterComponentInstances.isEmpty()) {
log.error("Mismatch found in upgrade resource definition and cluster "
+ "resource state");
if (!clientComponentInstances.isEmpty()) {
log.info("The upgrade resource definitions that do not match are:");
for (Map.Entry<String, Integer> clientComponentInstanceEntry : clientComponentInstances
.entrySet()) {
log.info(" Component Name: {}, Instance count: {}",
clientComponentInstanceEntry.getKey(),
clientComponentInstanceEntry.getValue());
}
}
if (!clusterComponentInstances.isEmpty()) {
log.info("The cluster resources that do not match are:");
for (Map.Entry<String, Integer> clusterComponentInstanceEntry : clusterComponentInstances
.entrySet()) {
log.info(" Component Name: {}, Instance count: {}",
clusterComponentInstanceEntry.getKey(),
clusterComponentInstanceEntry.getValue());
}
}
throw new BadConfigException("Resource definition provided for "
+ "upgrade does not match with that of the currently running "
+ "cluster.\nIf you are aware of what you are doing, rerun the "
+ "command with " + Arguments.ARG_FORCE + " option.");
}
}
protected void persistInstanceDefinition(boolean overwrite,
Path appconfdir,
InstanceBuilder builder)
throws IOException, SliderException, LockAcquireFailedException {
builder.persist(appconfdir, overwrite);
}
@VisibleForTesting
public static void replaceTokens(ConfTree conf,
String userName, String clusterName) throws IOException {
Map<String,String> newglobal = new HashMap<>();
for (Entry<String,String> entry : conf.global.entrySet()) {
newglobal.put(entry.getKey(), replaceTokens(entry.getValue(),
userName, clusterName));
}
conf.global.putAll(newglobal);
Map<String,List<String>> newcred = new HashMap<>();
for (Entry<String,List<String>> entry : conf.credentials.entrySet()) {
List<String> resultList = new ArrayList<>();
for (String v : entry.getValue()) {
resultList.add(replaceTokens(v, userName, clusterName));
}
newcred.put(replaceTokens(entry.getKey(), userName, clusterName),
resultList);
}
conf.credentials.clear();
conf.credentials.putAll(newcred);
}
private static String replaceTokens(String s, String userName,
String clusterName) throws IOException {
return s.replaceAll(Pattern.quote("${USER}"), userName)
.replaceAll(Pattern.quote("${USER_NAME}"), userName)
.replaceAll(Pattern.quote("${CLUSTER_NAME}"), clusterName);
}
public FsPermission getClusterDirectoryPermissions(Configuration conf) {
String clusterDirPermsOct =
conf.get(CLUSTER_DIRECTORY_PERMISSIONS,
DEFAULT_CLUSTER_DIRECTORY_PERMISSIONS);
return new FsPermission(clusterDirPermsOct);
}
/**
* Verify that the Resource Manager is configured (on a non-HA cluster).
* with a useful error message
* @throws BadCommandArgumentsException the exception raised on an invalid config
*/
public void verifyBindingsDefined() throws BadCommandArgumentsException {
InetSocketAddress rmAddr = SliderUtils.getRmAddress(getConfig());
if (!getConfig().getBoolean(YarnConfiguration.RM_HA_ENABLED, false)
&& !SliderUtils.isAddressDefined(rmAddr)) {
throw new BadCommandArgumentsException(
E_NO_RESOURCE_MANAGER
+ " in the argument "
+ Arguments.ARG_MANAGER
+ " or the configuration property "
+ YarnConfiguration.RM_ADDRESS
+ " value :" + rmAddr);
}
}
/**
* Load and start a cluster specification.
* This assumes that all validation of args and cluster state
* have already taken place
*
* @param clustername name of the cluster.
* @param launchArgs launch arguments
* @return the exit code
* @throws YarnException
* @throws IOException
*/
private int startCluster(String clustername,
LaunchArgsAccessor launchArgs) throws
YarnException,
IOException {
Path clusterDirectory = sliderFileSystem.buildClusterDirPath(clustername);
AggregateConf instanceDefinition = loadInstanceDefinitionUnresolved(
clustername,
clusterDirectory);
LaunchedApplication launchedApplication =
launchApplication(clustername, clusterDirectory, instanceDefinition,
serviceArgs.isDebug());
applicationId = launchedApplication.getApplicationId();
if (launchArgs.getOutputFile() != null) {
// output file has been requested. Get the app report and serialize it
ApplicationReport report =
launchedApplication.getApplicationReport();
SerializedApplicationReport sar = new SerializedApplicationReport(report);
sar.submitTime = System.currentTimeMillis();
ApplicationReportSerDeser serDeser = new ApplicationReportSerDeser();
serDeser.save(sar, launchArgs.getOutputFile());
}
int waittime = launchArgs.getWaittime();
if (waittime > 0) {
return waitForAppRunning(launchedApplication, waittime, waittime);
} else {
// no waiting
return EXIT_SUCCESS;
}
}
/**
* Load the instance definition. It is not resolved at this point
* @param name cluster name
* @param clusterDirectory cluster dir
* @return the loaded configuration
* @throws IOException
* @throws SliderException
* @throws UnknownApplicationInstanceException if the file is not found
*/
public AggregateConf loadInstanceDefinitionUnresolved(String name,
Path clusterDirectory) throws
IOException,
SliderException {
try {
AggregateConf definition =
InstanceIO.loadInstanceDefinitionUnresolved(sliderFileSystem,
clusterDirectory);
definition.setName(name);
return definition;
} catch (FileNotFoundException e) {
throw UnknownApplicationInstanceException.unknownInstance(name, e);
}
}
/**
* Load the instance definition.
* @param name cluster name
* @param resolved flag to indicate the cluster should be resolved
* @return the loaded configuration
* @throws IOException IO problems
* @throws SliderException slider explicit issues
* @throws UnknownApplicationInstanceException if the file is not found
*/
public AggregateConf loadInstanceDefinition(String name,
boolean resolved) throws
IOException,
SliderException {
Path clusterDirectory = sliderFileSystem.buildClusterDirPath(name);
AggregateConf instanceDefinition = loadInstanceDefinitionUnresolved(
name,
clusterDirectory);
if (resolved) {
instanceDefinition.resolve();
}
return instanceDefinition;
}
/**
*
* @param clustername name of the cluster
* @param clusterDirectory cluster dir
* @param instanceDefinition the instance definition
* @param debugAM enable debug AM options
* @return the launched application
* @throws YarnException
* @throws IOException
*/
public LaunchedApplication launchApplication(String clustername,
Path clusterDirectory,
AggregateConf instanceDefinition,
boolean debugAM)
throws YarnException, IOException {
deployedClusterName = clustername;
SliderUtils.validateClusterName(clustername);
verifyNoLiveClusters(clustername, "Launch");
Configuration config = getConfig();
lookupZKQuorum();
boolean clusterSecure = SliderUtils.isHadoopClusterSecure(config);
//create the Slider AM provider -this helps set up the AM
SliderAMClientProvider sliderAM = new SliderAMClientProvider(config);
instanceDefinition.resolve();
launchedInstanceDefinition = instanceDefinition;
ConfTreeOperations internalOperations =
instanceDefinition.getInternalOperations();
MapOperations internalOptions = internalOperations.getGlobalOptions();
ConfTreeOperations resourceOperations =
instanceDefinition.getResourceOperations();
ConfTreeOperations appOperations =
instanceDefinition.getAppConfOperations();
Path generatedConfDirPath =
createPathThatMustExist(internalOptions.getMandatoryOption(
InternalKeys.INTERNAL_GENERATED_CONF_PATH));
Path snapshotConfPath =
createPathThatMustExist(internalOptions.getMandatoryOption(
InternalKeys.INTERNAL_SNAPSHOT_CONF_PATH));
// cluster Provider
AbstractClientProvider provider = createClientProvider(
internalOptions.getMandatoryOption(
InternalKeys.INTERNAL_PROVIDER_NAME));
// make sure the conf dir is valid;
if (log.isDebugEnabled()) {
log.debug(instanceDefinition.toString());
}
MapOperations sliderAMResourceComponent =
resourceOperations.getOrAddComponent(SliderKeys.COMPONENT_AM);
MapOperations resourceGlobalOptions = resourceOperations.getGlobalOptions();
// add the tags if available
Set<String> applicationTags = provider.getApplicationTags(sliderFileSystem,
SliderUtils.getApplicationDefinitionPath(appOperations));
AppMasterLauncher amLauncher = new AppMasterLauncher(clustername,
SliderKeys.APP_TYPE,
config,
sliderFileSystem,
yarnClient,
clusterSecure,
sliderAMResourceComponent,
resourceGlobalOptions,
applicationTags);
ApplicationId appId = amLauncher.getApplicationId();
// set the application name;
amLauncher.setKeepContainersOverRestarts(true);
int maxAppAttempts = config.getInt(KEY_AM_RESTART_LIMIT, 0);
amLauncher.setMaxAppAttempts(maxAppAttempts);
sliderFileSystem.purgeAppInstanceTempFiles(clustername);
Path tempPath = sliderFileSystem.createAppInstanceTempPath(
clustername,
appId.toString() + "/am");
String libdir = "lib";
Path libPath = new Path(tempPath, libdir);
sliderFileSystem.getFileSystem().mkdirs(libPath);
log.debug("FS={}, tempPath={}, libdir={}",
sliderFileSystem, tempPath, libPath);
// set local resources for the application master
// local files or archives as needed
// In this scenario, the jar file for the application master is part of the local resources
Map<String, LocalResource> localResources = amLauncher.getLocalResources();
// look for the configuration directory named on the command line
boolean hasServerLog4jProperties = false;
Path remoteConfPath = null;
String relativeConfDir = null;
String confdirProp =
System.getProperty(SliderKeys.PROPERTY_CONF_DIR);
if (confdirProp == null || confdirProp.isEmpty()) {
log.debug("No local configuration directory provided as system property");
} else {
File confDir = new File(confdirProp);
if (!confDir.exists()) {
throw new BadConfigException(E_CONFIGURATION_DIRECTORY_NOT_FOUND,
confDir);
}
Path localConfDirPath = SliderUtils.createLocalPath(confDir);
remoteConfPath = new Path(clusterDirectory, SliderKeys.SUBMITTED_CONF_DIR);
log.debug("Slider configuration directory is {}; remote to be {}",
localConfDirPath, remoteConfPath);
SliderUtils.copyDirectory(config, localConfDirPath, remoteConfPath, null);
File log4jserver =
new File(confDir, SliderKeys.LOG4J_SERVER_PROP_FILENAME);
hasServerLog4jProperties = log4jserver.isFile();
}
// the assumption here is that minimr cluster => this is a test run
// and the classpath can look after itself
boolean usingMiniMRCluster = getUsingMiniMRCluster();
if (!usingMiniMRCluster) {
log.debug("Destination is not a MiniYARNCluster -copying full classpath");
// insert conf dir first
if (remoteConfPath != null) {
relativeConfDir = SliderKeys.SUBMITTED_CONF_DIR;
Map<String, LocalResource> submittedConfDir =
sliderFileSystem.submitDirectory(remoteConfPath,
relativeConfDir);
SliderUtils.mergeMaps(localResources, submittedConfDir);
}
}
// build up the configuration
// IMPORTANT: it is only after this call that site configurations
// will be valid.
propagatePrincipals(config, instanceDefinition);
// validate security data
/*
// turned off until tested
SecurityConfiguration securityConfiguration =
new SecurityConfiguration(config,
instanceDefinition, clustername);
*/
Configuration clientConfExtras = new Configuration(false);
// then build up the generated path.
FsPermission clusterPerms = getClusterDirectoryPermissions(config);
SliderUtils.copyDirectory(config, snapshotConfPath, generatedConfDirPath,
clusterPerms);
// standard AM resources
sliderAM.prepareAMAndConfigForLaunch(sliderFileSystem,
config,
amLauncher,
instanceDefinition,
snapshotConfPath,
generatedConfDirPath,
clientConfExtras,
libdir,
tempPath,
usingMiniMRCluster);
//add provider-specific resources
provider.prepareAMAndConfigForLaunch(sliderFileSystem,
config,
amLauncher,
instanceDefinition,
snapshotConfPath,
generatedConfDirPath,
clientConfExtras,
libdir,
tempPath,
usingMiniMRCluster);
// now that the site config is fully generated, the provider gets
// to do a quick review of them.
log.debug("Preflight validation of cluster configuration");
sliderAM.preflightValidateClusterConfiguration(sliderFileSystem,
clustername,
config,
instanceDefinition,
clusterDirectory,
generatedConfDirPath,
clusterSecure
);
provider.preflightValidateClusterConfiguration(sliderFileSystem,
clustername,
config,
instanceDefinition,
clusterDirectory,
generatedConfDirPath,
clusterSecure
);
// TODO: consider supporting apps that don't have an image path
Path imagePath =
SliderUtils.extractImagePath(sliderFileSystem, internalOptions);
if (sliderFileSystem.maybeAddImagePath(localResources, imagePath)) {
log.debug("Registered image path {}", imagePath);
}
// build the environment
amLauncher.putEnv(
SliderUtils.buildEnvMap(sliderAMResourceComponent));
ClasspathConstructor classpath = SliderUtils.buildClasspath(relativeConfDir,
libdir,
getConfig(),
sliderFileSystem,
usingMiniMRCluster);
amLauncher.setClasspath(classpath);
//add english env
amLauncher.setEnv("LANG", "en_US.UTF-8");
amLauncher.setEnv("LC_ALL", "en_US.UTF-8");
amLauncher.setEnv("LANGUAGE", "en_US.UTF-8");
amLauncher.putEnv(getAmLaunchEnv(config));
for (Map.Entry<String, String> envs : SliderUtils.getSystemEnv().entrySet()) {
log.debug("System env {}={}", envs.getKey(), envs.getValue());
}
if (log.isDebugEnabled()) {
log.debug("AM classpath={}", classpath);
log.debug("Environment Map:\n{}",
SliderUtils.stringifyMap(amLauncher.getEnv()));
log.debug("Files in lib path\n{}", sliderFileSystem.listFSDir(libPath));
}
// rm address
InetSocketAddress rmSchedulerAddress;
try {
rmSchedulerAddress = SliderUtils.getRmSchedulerAddress(config);
} catch (IllegalArgumentException e) {
throw new BadConfigException("%s Address invalid: %s",
YarnConfiguration.RM_SCHEDULER_ADDRESS,
config.get(
YarnConfiguration.RM_SCHEDULER_ADDRESS)
);
}
String rmAddr = NetUtils.getHostPortString(rmSchedulerAddress);
JavaCommandLineBuilder commandLine = new JavaCommandLineBuilder();
// insert any JVM options);
sliderAM.addJVMOptions(instanceDefinition, commandLine);
// enable asserts
commandLine.enableJavaAssertions();
// if the conf dir has a log4j-server.properties, switch to that
if (hasServerLog4jProperties) {
commandLine.sysprop(SYSPROP_LOG4J_CONFIGURATION, LOG4J_SERVER_PROP_FILENAME);
commandLine.sysprop(SYSPROP_LOG_DIR, ApplicationConstants.LOG_DIR_EXPANSION_VAR);
}
// add the AM sevice entry point
commandLine.add(SliderAppMaster.SERVICE_CLASSNAME);
// create action and the cluster name
commandLine.add(ACTION_CREATE, clustername);
// debug
if (debugAM) {
commandLine.add(Arguments.ARG_DEBUG);
}
// set the cluster directory path
commandLine.add(Arguments.ARG_CLUSTER_URI, clusterDirectory.toUri());
if (!isUnset(rmAddr)) {
commandLine.add(Arguments.ARG_RM_ADDR, rmAddr);
}
if (serviceArgs.getFilesystemBinding() != null) {
commandLine.add(Arguments.ARG_FILESYSTEM, serviceArgs.getFilesystemBinding());
}
/**
* pass the registry binding
*/
addConfOptionToCLI(commandLine, config,
RegistryConstants.KEY_REGISTRY_ZK_ROOT,
RegistryConstants.DEFAULT_ZK_REGISTRY_ROOT);
addMandatoryConfOptionToCLI(commandLine, config,
RegistryConstants.KEY_REGISTRY_ZK_QUORUM);
if (clusterSecure) {
// if the cluster is secure, make sure that
// the relevant security settings go over
/*
addConfOptionToCLI(commandLine, config, KEY_SECURITY);
*/
addConfOptionToCLI(commandLine,
config,
DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY);
}
// write out the path output
commandLine.addOutAndErrFiles(STDOUT_AM, STDERR_AM);
String cmdStr = commandLine.build();
log.debug("Completed setting up app master command {}", cmdStr);
amLauncher.addCommandLine(commandLine);
// the Slider AM gets to configure the AM requirements, not the custom provider
sliderAM.prepareAMResourceRequirements(sliderAMResourceComponent,
amLauncher.getResource());
// Set the priority for the application master
int amPriority = config.getInt(KEY_YARN_QUEUE_PRIORITY,
DEFAULT_YARN_QUEUE_PRIORITY);
amLauncher.setPriority(amPriority);
// Set the queue to which this application is to be submitted in the RM
// Queue for App master
String amQueue = config.get(KEY_YARN_QUEUE, DEFAULT_YARN_QUEUE);
String suppliedQueue = internalOperations.getGlobalOptions().get(InternalKeys.INTERNAL_QUEUE);
if(!SliderUtils.isUnset(suppliedQueue)) {
amQueue = suppliedQueue;
log.info("Using queue {} for the application instance.", amQueue);
}
if (amQueue != null) {
amLauncher.setQueue(amQueue);
}
// submit the application
LaunchedApplication launchedApplication = amLauncher.submitApplication();
return launchedApplication;
}
protected Map<String, String> getAmLaunchEnv(Configuration config) {
String sliderAmLaunchEnv = config.get(SliderXmlConfKeys.KEY_AM_LAUNCH_ENV);
log.debug("{} = {}", SliderXmlConfKeys.KEY_AM_LAUNCH_ENV, sliderAmLaunchEnv);
// Multiple env variables can be specified with a comma (,) separator
String[] envs = StringUtils.isEmpty(sliderAmLaunchEnv) ? null
: sliderAmLaunchEnv.split(",");
if (ArrayUtils.isEmpty(envs)) {
return Collections.emptyMap();
}
Map<String, String> amLaunchEnv = new HashMap<String, String>();
for (String env : envs) {
if (StringUtils.isNotEmpty(env)) {
// Each env name/value is separated by equals sign (=)
String[] tokens = env.split("=");
if (tokens != null && tokens.length == 2) {
String envKey = tokens[0];
String envValue = tokens[1];
for (Map.Entry<String, String> placeholder : generatePlaceholderKeyValueMap(
env).entrySet()) {
if (StringUtils.isNotEmpty(placeholder.getValue())) {
envValue = envValue.replaceAll(
Pattern.quote(placeholder.getKey()), placeholder.getValue());
}
}
if (Shell.WINDOWS) {
envValue = "%" + envKey + "%;" + envValue;
} else {
envValue = "$" + envKey + ":" + envValue;
}
log.info("Setting AM launch env {}={}", envKey, envValue);
amLaunchEnv.put(envKey, envValue);
}
}
}
return amLaunchEnv;
}
protected Map<String, String> generatePlaceholderKeyValueMap(String env) {
String PLACEHOLDER_PATTERN = "\\$\\{[^{]+\\}";
Pattern placeholderPattern = Pattern.compile(PLACEHOLDER_PATTERN);
Matcher placeholderMatcher = placeholderPattern.matcher(env);
Map<String, String> placeholderKeyValueMap = new HashMap<String, String>();
if (placeholderMatcher.find()) {
String placeholderKey = placeholderMatcher.group();
String systemKey = placeholderKey
.substring(2, placeholderKey.length() - 1).toUpperCase()
.replaceAll("\\.", "_");
String placeholderValue = SliderUtils.getSystemEnv(systemKey);
log.debug("Placeholder {}={}", placeholderKey, placeholderValue);
placeholderKeyValueMap.put(placeholderKey, placeholderValue);
}
return placeholderKeyValueMap;
}
private void propagatePythonExecutable(Configuration config,
AggregateConf instanceDefinition) {
String pythonExec = config.get(
SliderXmlConfKeys.PYTHON_EXECUTABLE_PATH);
if (pythonExec != null) {
instanceDefinition.getAppConfOperations().getGlobalOptions().putIfUnset(
SliderXmlConfKeys.PYTHON_EXECUTABLE_PATH,
pythonExec);
}
}
/**
* Wait for the launched app to be accepted in the time
* and, optionally running.
* <p>
* If the application
*
* @param launchedApplication application
* @param acceptWaitMillis time in millis to wait for accept
* @param runWaitMillis time in millis to wait for the app to be running.
* May be null, in which case no wait takes place
* @return exit code: success
* @throws YarnException
* @throws IOException
*/
public int waitForAppRunning(LaunchedApplication launchedApplication,
int acceptWaitMillis, int runWaitMillis) throws YarnException, IOException {
assert launchedApplication != null;
int exitCode;
// wait for the submit state to be reached
ApplicationReport report = launchedApplication.monitorAppToState(
YarnApplicationState.ACCEPTED,
new Duration(acceptWaitMillis));
// may have failed, so check that
if (SliderUtils.hasAppFinished(report)) {
exitCode = buildExitCode(report);
} else {
// exit unless there is a wait
if (runWaitMillis != 0) {
// waiting for state to change
Duration duration = new Duration(runWaitMillis * 1000);
duration.start();
report = launchedApplication.monitorAppToState(
YarnApplicationState.RUNNING, duration);
if (report != null &&
report.getYarnApplicationState() == YarnApplicationState.RUNNING) {
exitCode = EXIT_SUCCESS;
} else {
exitCode = buildExitCode(report);
}
} else {
exitCode = EXIT_SUCCESS;
}
}
return exitCode;
}
/**
* Propagate any critical principals from the current site config down to the HBase one.
* @param config config to read from
* @param clusterSpec cluster spec
*/
private void propagatePrincipals(Configuration config,
AggregateConf clusterSpec) {
String dfsPrincipal = config.get(
DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY);
if (dfsPrincipal != null) {
String siteDfsPrincipal = OptionKeys.SITE_XML_PREFIX +
DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY;
clusterSpec.getAppConfOperations().getGlobalOptions().putIfUnset(
siteDfsPrincipal,
dfsPrincipal);
}
}
private boolean addConfOptionToCLI(CommandLineBuilder cmdLine,
Configuration conf,
String key) {
String val = conf.get(key);
return defineIfSet(cmdLine, key, val);
}
private String addConfOptionToCLI(CommandLineBuilder cmdLine,
Configuration conf,
String key,
String defVal) {
String val = conf.get(key, defVal);
define(cmdLine, key, val);
return val;
}
/**
* Add a <code>-D key=val</code> command to the CLI
* @param cmdLine command line
* @param key key
* @param val value
*/
private void define(CommandLineBuilder cmdLine, String key, String val) {
Preconditions.checkArgument(key != null, "null key");
Preconditions.checkArgument(val != null, "null value");
cmdLine.add(Arguments.ARG_DEFINE, key + "=" + val);
}
/**
* Add a <code>-D key=val</code> command to the CLI if <code>val</code>
* is not null
* @param cmdLine command line
* @param key key
* @param val value
*/
private boolean defineIfSet(CommandLineBuilder cmdLine, String key, String val) {
Preconditions.checkArgument(key != null, "null key");
if (val != null) {
define(cmdLine, key, val);
return true;
} else {
return false;
}
}
private void addMandatoryConfOptionToCLI(CommandLineBuilder cmdLine,
Configuration conf,
String key) throws BadConfigException {
if (!addConfOptionToCLI(cmdLine, conf, key)) {
throw new BadConfigException("Missing configuration option: " + key);
}
}
/**
* Create a path that must exist in the cluster fs
* @param uri uri to create
* @return the path
* @throws FileNotFoundException if the path does not exist
*/
public Path createPathThatMustExist(String uri) throws
SliderException,
IOException {
return sliderFileSystem.createPathThatMustExist(uri);
}
/**
* verify that a live cluster isn't there
* @param clustername cluster name
* @param action
* @throws SliderException with exit code EXIT_CLUSTER_LIVE
* if a cluster of that name is either live or starting up.
*/
public void verifyNoLiveClusters(String clustername, String action) throws
IOException,
YarnException {
List<ApplicationReport> existing = findAllLiveInstances(clustername);
if (!existing.isEmpty()) {
throw new SliderException(EXIT_APPLICATION_IN_USE,
action +" failed for "
+ clustername
+ ": "
+ E_CLUSTER_RUNNING + " :" +
existing.get(0));
}
}
public String getUsername() throws IOException {
return RegistryUtils.currentUser();
}
/**
* Get the name of any deployed cluster
* @return the cluster name
*/
public String getDeployedClusterName() {
return deployedClusterName;
}
@VisibleForTesting
public void setDeployedClusterName(String deployedClusterName) {
this.deployedClusterName = deployedClusterName;
}
/**
* ask if the client is using a mini MR cluster
* @return true if they are
*/
private boolean getUsingMiniMRCluster() {
return getConfig().getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER,
false);
}
/**
* Get the application name used in the zookeeper root paths
* @return an application-specific path in ZK
*/
private String getAppName() {
return "slider";
}
/**
* Wait for the app to start running (or go past that state)
* @param duration time to wait
* @return the app report; null if the duration turned out
* @throws YarnException YARN or app issues
* @throws IOException IO problems
*/
@VisibleForTesting
public ApplicationReport monitorAppToRunning(Duration duration)
throws YarnException, IOException {
return monitorAppToState(YarnApplicationState.RUNNING, duration);
}
/**
* Build an exit code for an application from its report.
* If the report parameter is null, its interpreted as a timeout
* @param report report application report
* @return the exit code
* @throws IOException
* @throws YarnException
*/
private int buildExitCode(ApplicationReport report) throws
IOException,
YarnException {
if (null == report) {
return EXIT_TIMED_OUT;
}
YarnApplicationState state = report.getYarnApplicationState();
FinalApplicationStatus dsStatus = report.getFinalApplicationStatus();
switch (state) {
case FINISHED:
if (FinalApplicationStatus.SUCCEEDED == dsStatus) {
log.info("Application has completed successfully");
return EXIT_SUCCESS;
} else {
log.info("Application finished unsuccessfully." +
"YarnState = {}, DSFinalStatus = {} Breaking monitoring loop",
state, dsStatus);
return EXIT_YARN_SERVICE_FINISHED_WITH_ERROR;
}
case KILLED:
log.info("Application did not finish. YarnState={}, DSFinalStatus={}",
state, dsStatus);
return EXIT_YARN_SERVICE_KILLED;
case FAILED:
log.info("Application Failed. YarnState={}, DSFinalStatus={}", state,
dsStatus);
return EXIT_YARN_SERVICE_FAILED;
default:
//not in any of these states
return EXIT_SUCCESS;
}
}
/**
* Monitor the submitted application for reaching the requested state.
* Will also report if the app reaches a later state (failed, killed, etc)
* Kill application if duration!= null & time expires.
* Prerequisite: the applicatin was launched.
* @param desiredState desired state.
* @param duration how long to wait -must be more than 0
* @return the application report -null on a timeout
* @throws YarnException
* @throws IOException
*/
@VisibleForTesting
public ApplicationReport monitorAppToState(
YarnApplicationState desiredState,
Duration duration)
throws YarnException, IOException {
LaunchedApplication launchedApplication =
new LaunchedApplication(applicationId, yarnClient);
return launchedApplication.monitorAppToState(desiredState, duration);
}
@Override
public ApplicationReport getApplicationReport() throws
IOException,
YarnException {
return getApplicationReport(applicationId);
}
@Override
public boolean forceKillApplication(String reason)
throws YarnException, IOException {
if (applicationId != null) {
new LaunchedApplication(applicationId, yarnClient).forceKill(reason);
return true;
}
return false;
}
/**
* List Slider instances belonging to a specific user. This will include
* failed and killed instances; there may be duplicates
* @param user user: "" means all users, null means "default"
* @return a possibly empty list of Slider AMs
*/
public List<ApplicationReport> listSliderInstances(String user)
throws YarnException, IOException {
return yarnAppListClient.listInstances(user);
}
/**
* A basic list action to list live instances
* @param clustername cluster name
* @return success if the listing was considered successful
* @throws IOException
* @throws YarnException
*/
public int actionList(String clustername) throws IOException, YarnException {
ActionListArgs args = new ActionListArgs();
args.live = true;
return actionList(clustername, args);
}
/**
* Implement the list action.
* @param clustername List out specific instance name
* @param args Action list arguments
* @return 0 if one or more entries were listed
* @throws IOException
* @throws YarnException
* @throws UnknownApplicationInstanceException if a specific instance
* was named but it was not found
*/
@Override
public int actionList(String clustername, ActionListArgs args)
throws IOException, YarnException {
if (args.help) {
return actionHelp(ACTION_LIST);
}
verifyBindingsDefined();
boolean live = args.live;
String state = args.state;
boolean listContainers = args.containers;
boolean verbose = args.verbose;
String version = args.version;
Set<String> components = args.components;
if (live && !state.isEmpty()) {
throw new BadCommandArgumentsException(
Arguments.ARG_LIVE + " and " + Arguments.ARG_STATE + " are exclusive");
}
if (listContainers && isUnset(clustername)) {
throw new BadCommandArgumentsException(
"Should specify an application instance with "
+ Arguments.ARG_CONTAINERS);
}
// specifying both --version and --components with --containers is okay
if (StringUtils.isNotEmpty(version) && !listContainers) {
throw new BadCommandArgumentsException(Arguments.ARG_VERSION
+ " can be specified only with " + Arguments.ARG_CONTAINERS);
}
if (!components.isEmpty() && !listContainers) {
throw new BadCommandArgumentsException(Arguments.ARG_COMPONENTS
+ " can be specified only with " + Arguments.ARG_CONTAINERS);
}
// flag to indicate only services in a specific state are to be listed
boolean listOnlyInState = live || !state.isEmpty();
YarnApplicationState min, max;
if (live) {
min = YarnApplicationState.NEW;
max = YarnApplicationState.RUNNING;
} else if (!state.isEmpty()) {
YarnApplicationState stateVal = extractYarnApplicationState(state);
min = max = stateVal;
} else {
min = YarnApplicationState.NEW;
max = YarnApplicationState.KILLED;
}
// get the complete list of persistent instances
Map<String, Path> persistentInstances = sliderFileSystem.listPersistentInstances();
if (persistentInstances.isEmpty() && isUnset(clustername)) {
// an empty listing is a success if no cluster was named
log.debug("No application instances found");
return EXIT_SUCCESS;
}
// and those the RM knows about
List<ApplicationReport> instances = listSliderInstances(null);
SliderUtils.sortApplicationsByMostRecent(instances);
Map<String, ApplicationReport> reportMap =
SliderUtils.buildApplicationReportMap(instances, min, max);
log.debug("Persisted {} deployed {} filtered[{}-{}] & de-duped to {}",
persistentInstances.size(),
instances.size(),
min, max,
reportMap.size() );
List<ContainerInformation> containers = null;
if (isSet(clustername)) {
// only one instance is expected
// resolve the persistent value
Path persistent = persistentInstances.get(clustername);
if (persistent == null) {
throw unknownClusterException(clustername);
}
// create a new map with only that instance in it.
// this restricts the output of results to this instance
persistentInstances = new HashMap<>();
persistentInstances.put(clustername, persistent);
if (listContainers) {
containers = getContainers(clustername);
}
}
// at this point there is either the entire list or a stripped down instance
int listed = 0;
for (String name : persistentInstances.keySet()) {
ApplicationReport report = reportMap.get(name);
if (!listOnlyInState || report != null) {
// list the details if all were requested, or the filtering contained
// a report
listed++;
// containers will be non-null when only one instance is requested
String details = SliderUtils.instanceDetailsToString(name, report,
containers, version, components, verbose);
print(details);
}
}
return listed > 0 ? EXIT_SUCCESS: EXIT_FALSE;
}
public List<ContainerInformation> getContainers(String name)
throws YarnException, IOException {
SliderClusterOperations clusterOps = new SliderClusterOperations(
bondToCluster(name));
try {
return clusterOps.getContainers();
} catch (NoSuchNodeException e) {
throw new BadClusterStateException(
"Containers not found for application instance %s", name);
}
}
/**
* Enumerate slider instances for the current user, and the
* most recent app report, where available.
* @param listOnlyInState boolean to indicate that the instances should
* only include those in a YARN state
* <code> minAppState &lt;= currentState &lt;= maxAppState </code>
*
* @param minAppState minimum application state to include in enumeration.
* @param maxAppState maximum application state to include
* @return a map of application instance name to description
* @throws IOException Any IO problem
* @throws YarnException YARN problems
*/
@Override
public Map<String, SliderInstanceDescription> enumSliderInstances(
boolean listOnlyInState,
YarnApplicationState minAppState,
YarnApplicationState maxAppState)
throws IOException, YarnException {
return yarnAppListClient.enumSliderInstances(listOnlyInState,
minAppState,
maxAppState);
}
/**
* Extract the state of a Yarn application --state argument
* @param state state argument
* @return the application state
* @throws BadCommandArgumentsException if the argument did not match
* any known state
*/
private YarnApplicationState extractYarnApplicationState(String state) throws
BadCommandArgumentsException {
YarnApplicationState stateVal;
try {
stateVal = YarnApplicationState.valueOf(state.toUpperCase(Locale.ENGLISH));
} catch (IllegalArgumentException e) {
throw new BadCommandArgumentsException("Unknown state: " + state);
}
return stateVal;
}
/**
* Is an application active: accepted or running
* @param report the application report
* @return true if it is running or scheduled to run.
*/
public boolean isApplicationActive(ApplicationReport report) {
return report.getYarnApplicationState() == YarnApplicationState.RUNNING
|| report.getYarnApplicationState() == YarnApplicationState.ACCEPTED;
}
/**
* Implement the islive action: probe for a cluster of the given name existing
* @return exit code
*/
@Override
@VisibleForTesting
public int actionFlex(String name, ActionFlexArgs args) throws YarnException, IOException {
verifyBindingsDefined();
SliderUtils.validateClusterName(name);
log.debug("actionFlex({})", name);
Map<String, Integer> roleInstances = new HashMap<>();
Map<String, String> roleMap = args.getComponentMap();
for (Map.Entry<String, String> roleEntry : roleMap.entrySet()) {
String key = roleEntry.getKey();
String val = roleEntry.getValue();
try {
roleInstances.put(key, Integer.valueOf(val));
} catch (NumberFormatException e) {
throw new BadCommandArgumentsException("Requested count of role %s" +
" is not a number: \"%s\"",
key, val);
}
}
return flex(name, roleInstances);
}
@Override
public int actionExists(String name, boolean checkLive) throws YarnException, IOException {
ActionExistsArgs args = new ActionExistsArgs();
args.live = checkLive;
return actionExists(name, args);
}
public int actionExists(String name, ActionExistsArgs args) throws YarnException, IOException {
verifyBindingsDefined();
SliderUtils.validateClusterName(name);
boolean checkLive = args.live;
log.debug("actionExists({}, {}, {})", name, checkLive, args.state);
//initial probe for a cluster in the filesystem
Path clusterDirectory = sliderFileSystem.buildClusterDirPath(name);
if (!sliderFileSystem.getFileSystem().exists(clusterDirectory)) {
throw unknownClusterException(name);
}
String state = args.state;
if (!checkLive && SliderUtils.isUnset(state)) {
log.info("Application {} exists", name);
return EXIT_SUCCESS;
}
//test for liveness/state
boolean inDesiredState = false;
ApplicationReport instance;
instance = findInstance(name);
if (instance == null) {
log.info("Application {} not running", name);
return EXIT_FALSE;
}
if (checkLive) {
// the app exists, check that it is not in any terminated state
YarnApplicationState appstate = instance.getYarnApplicationState();
log.debug(" current app state = {}", appstate);
inDesiredState =
appstate.ordinal() < YarnApplicationState.FINISHED.ordinal();
} else {
// scan for instance in single --state state
List<ApplicationReport> userInstances = yarnClient.listDeployedInstances("");
state = state.toUpperCase(Locale.ENGLISH);
YarnApplicationState desiredState = extractYarnApplicationState(state);
ApplicationReport foundInstance =
yarnClient.findAppInInstanceList(userInstances, name, desiredState);
if (foundInstance != null) {
// found in selected state: success
inDesiredState = true;
// mark this as the instance to report
instance = foundInstance;
}
}
SliderUtils.OnDemandReportStringifier report =
new SliderUtils.OnDemandReportStringifier(instance);
if (!inDesiredState) {
//cluster in the list of apps but not running
log.info("Application {} found but is in wrong state {}", name,
instance.getYarnApplicationState());
log.debug("State {}", report);
return EXIT_FALSE;
} else {
log.debug("Application instance is in desired state");
log.info("Application {} is {}\n{}", name,
instance.getYarnApplicationState(), report);
return EXIT_SUCCESS;
}
}
@Override
public int actionKillContainer(String name,
ActionKillContainerArgs args) throws YarnException, IOException {
String id = args.id;
if (SliderUtils.isUnset(id)) {
throw new BadCommandArgumentsException("Missing container id");
}
log.info("killingContainer {}:{}", name, id);
SliderClusterOperations clusterOps =
new SliderClusterOperations(bondToCluster(name));
try {
clusterOps.killContainer(id);
} catch (NoSuchNodeException e) {
throw new BadClusterStateException("Container %s not found in cluster %s",
id, name);
}
return EXIT_SUCCESS;
}
@Override
public String actionEcho(String name, ActionEchoArgs args) throws
YarnException,
IOException {
String message = args.message;
if (message == null) {
throw new BadCommandArgumentsException("missing message");
}
SliderClusterOperations clusterOps =
new SliderClusterOperations(bondToCluster(name));
return clusterOps.echo(message);
}
/**
* Get at the service registry operations
* @return registry client -valid after the service is inited.
*/
public YarnAppListClient getYarnAppListClient() {
return yarnAppListClient;
}
/**
* Find an instance of an application belonging to the current user
* @param appname application name
* @return the app report or null if none is found
* @throws YarnException YARN issues
* @throws IOException IO problems
*/
private ApplicationReport findInstance(String appname)
throws YarnException, IOException {
return yarnAppListClient.findInstance(appname);
}
private RunningApplication findApplication(String appname)
throws YarnException, IOException {
ApplicationReport applicationReport = findInstance(appname);
return applicationReport != null ?
new RunningApplication(yarnClient, applicationReport): null;
}
/**
* find all live instances of a specific app -if there is >1 in the cluster,
* this returns them all. State should be running or less
* @param appname application name
* @return the list of all matching application instances
*/
private List<ApplicationReport> findAllLiveInstances(String appname)
throws YarnException, IOException {
return yarnAppListClient.findAllLiveInstances(appname);
}
/**
* Connect to a Slider AM
* @param app application report providing the details on the application
* @return an instance
* @throws YarnException
* @throws IOException
*/
private SliderClusterProtocol connect(ApplicationReport app)
throws YarnException, IOException {
try {
return RpcBinder.getProxy(getConfig(),
yarnClient.getRmClient(),
app,
Constants.CONNECT_TIMEOUT,
Constants.RPC_TIMEOUT);
} catch (InterruptedException e) {
throw new SliderException(SliderExitCodes.EXIT_TIMED_OUT,
e,
"Interrupted waiting for communications with the Slider AM");
}
}
@Override
@VisibleForTesting
public int actionStatus(String clustername, ActionStatusArgs statusArgs) throws
YarnException,
IOException {
verifyBindingsDefined();
SliderUtils.validateClusterName(clustername);
String outfile = statusArgs.getOutput();
ClusterDescription status = getClusterDescription(clustername);
String text = status.toJsonString();
if (outfile == null) {
log.info(text);
} else {
status.save(new File(outfile).getAbsoluteFile());
}
return EXIT_SUCCESS;
}
@Override
public int actionVersion() {
SliderVersionInfo.loadAndPrintVersionInfo(log);
return EXIT_SUCCESS;
}
@Override
public int actionFreeze(String clustername,
ActionFreezeArgs freezeArgs) throws YarnException, IOException {
verifyBindingsDefined();
SliderUtils.validateClusterName(clustername);
int waittime = freezeArgs.getWaittime();
String text = freezeArgs.message;
boolean forcekill = freezeArgs.force;
log.debug("actionFreeze({}, reason={}, wait={}, force={})", clustername,
text,
waittime,
forcekill);
//is this actually a known cluster?
sliderFileSystem.locateInstanceDefinition(clustername);
ApplicationReport app = findInstance(clustername);
if (app == null) {
// exit early
log.info("Cluster {} not running", clustername);
// not an error to stop a stopped cluster
return EXIT_SUCCESS;
}
log.debug("App to stop was found: {}:\n{}", clustername,
new SliderUtils.OnDemandReportStringifier(app));
if (app.getYarnApplicationState().ordinal() >=
YarnApplicationState.FINISHED.ordinal()) {
log.info("Cluster {} is in a terminated state {}", clustername,
app.getYarnApplicationState());
return EXIT_SUCCESS;
}
// IPC request for a managed shutdown is only possible if the app is running.
// so we need to force kill if the app is accepted or submitted
if (!forcekill
&& app.getYarnApplicationState().ordinal() < YarnApplicationState.RUNNING.ordinal()) {
log.info("Cluster {} is in a pre-running state {}. Force killing it", clustername,
app.getYarnApplicationState());
forcekill = true;
}
LaunchedApplication application = new LaunchedApplication(yarnClient, app);
applicationId = application.getApplicationId();
if (forcekill) {
// escalating to forced kill
application.kill("Forced stop of " + clustername + ": " + text);
} else {
try {
SliderClusterProtocol appMaster = connect(app);
Messages.StopClusterRequestProto r =
Messages.StopClusterRequestProto
.newBuilder()
.setMessage(text)
.build();
appMaster.stopCluster(r);
log.debug("Cluster stop command issued");
} catch (YarnException e) {
log.warn("Exception while trying to terminate {}", clustername, e);
return EXIT_FALSE;
} catch (IOException e) {
log.warn("Exception while trying to terminate {}", clustername, e);
return EXIT_FALSE;
}
}
//wait for completion. We don't currently return an exception during this process
//as the stop operation has been issued, this is just YARN.
try {
if (waittime > 0) {
ApplicationReport applicationReport =
application.monitorAppToState(YarnApplicationState.FINISHED,
new Duration(waittime * 1000));
if (applicationReport == null) {
log.info("application did not shut down in time");
return EXIT_FALSE;
}
}
} catch (YarnException | IOException e) {
log.warn("Exception while waiting for the application {} to shut down: {}",
clustername, e);
}
return EXIT_SUCCESS;
}
@Override
public int actionThaw(String clustername, ActionThawArgs thaw) throws YarnException, IOException {
SliderUtils.validateClusterName(clustername);
verifyBindingsDefined();
// see if it is actually running and bail out;
verifyNoLiveClusters(clustername, "Start");
//start the cluster
return startCluster(clustername, thaw);
}
/**
* Implement flexing
* @param clustername name of the cluster
* @param roleInstances map of new role instances
* @return EXIT_SUCCESS if the #of nodes in a live cluster changed
* @throws YarnException
* @throws IOException
*/
public int flex(String clustername, Map<String, Integer> roleInstances)
throws YarnException, IOException {
verifyBindingsDefined();
SliderUtils.validateClusterName(clustername);
Path clusterDirectory = sliderFileSystem.buildClusterDirPath(clustername);
AggregateConf instanceDefinition = loadInstanceDefinitionUnresolved(
clustername,
clusterDirectory);
ConfTreeOperations resources =
instanceDefinition.getResourceOperations();
for (Map.Entry<String, Integer> entry : roleInstances.entrySet()) {
String role = entry.getKey();
int count = entry.getValue();
resources.getOrAddComponent(role).put(ResourceKeys.COMPONENT_INSTANCES,
Integer.toString(count));
log.debug("Flexed cluster specification ( {} -> {}) : \n{}",
role,
count,
resources);
}
SliderAMClientProvider sliderAM = new SliderAMClientProvider(getConfig());
AbstractClientProvider provider = createClientProvider(
instanceDefinition.getInternalOperations().getGlobalOptions().getMandatoryOption(
InternalKeys.INTERNAL_PROVIDER_NAME));
// slider provider to validate what there is
validateInstanceDefinition(sliderAM, instanceDefinition, sliderFileSystem);
validateInstanceDefinition(provider, instanceDefinition, sliderFileSystem);
int exitCode = EXIT_FALSE;
// save the specification
try {
InstanceIO.saveInstanceDefinition(sliderFileSystem, clusterDirectory,
instanceDefinition);
} catch (LockAcquireFailedException e) {
// lock failure
log.debug("Failed to lock dir {}", clusterDirectory, e);
log.warn("Failed to save new resource definition to {} : {}", clusterDirectory, e);
}
// now see if it is actually running and tell it about the update if it is
ApplicationReport instance = findInstance(clustername);
if (instance != null) {
log.info("Flexing running cluster");
SliderClusterProtocol appMaster = connect(instance);
SliderClusterOperations clusterOps = new SliderClusterOperations(appMaster);
clusterOps.flex(instanceDefinition.getResources());
log.info("application instance size updated");
exitCode = EXIT_SUCCESS;
} else {
log.info("No running instance to update");
}
return exitCode;
}
/**
* Validate an instance definition against a provider.
* @param provider the provider performing the validation
* @param instanceDefinition the instance definition
* @throws SliderException if invalid.
*/
protected void validateInstanceDefinition(AbstractClientProvider provider,
AggregateConf instanceDefinition, SliderFileSystem fs) throws SliderException {
try {
provider.validateInstanceDefinition(instanceDefinition, fs);
} catch (SliderException e) {
//problem, reject it
log.info("Error {} validating application instance definition ", e.getMessage());
log.debug("Error validating application instance definition ", e);
log.info(instanceDefinition.toString());
throw e;
}
}
/**
* Load the persistent cluster description
* @param clustername name of the cluster
* @return the description in the filesystem
* @throws IOException any problems loading -including a missing file
*/
@VisibleForTesting
public AggregateConf loadPersistedClusterDescription(String clustername)
throws IOException, SliderException, LockAcquireFailedException {
Path clusterDirectory = sliderFileSystem.buildClusterDirPath(clustername);
ConfPersister persister = new ConfPersister(sliderFileSystem, clusterDirectory);
AggregateConf instanceDescription = new AggregateConf();
persister.load(instanceDescription);
return instanceDescription;
}
/**
* Connect to a live cluster and get its current state
* @param clustername the cluster name
* @return its description
*/
@VisibleForTesting
public ClusterDescription getClusterDescription(String clustername) throws
YarnException,
IOException {
SliderClusterOperations clusterOperations =
createClusterOperations(clustername);
return clusterOperations.getClusterDescription();
}
/**
* Connect to the cluster and get its current state
* @return its description
*/
@VisibleForTesting
public ClusterDescription getClusterDescription() throws
YarnException,
IOException {
return getClusterDescription(getDeployedClusterName());
}
/**
* List all node UUIDs in a role
* @param role role name or "" for all
* @return an array of UUID strings
* @throws IOException
* @throws YarnException
*/
@VisibleForTesting
public String[] listNodeUUIDsByRole(String role) throws
IOException,
YarnException {
return createClusterOperations()
.listNodeUUIDsByRole(role);
}
/**
* List all nodes in a role. This is a double round trip: once to list
* the nodes in a role, another to get their details
* @param role component/role to look for
* @return an array of ContainerNode instances
* @throws IOException
* @throws YarnException
*/
@VisibleForTesting
public List<ClusterNode> listClusterNodesInRole(String role) throws
IOException,
YarnException {
return createClusterOperations().listClusterNodesInRole(role);
}
/**
* Get the details on a list of uuids
* @param uuids uuids to ask for
* @return a possibly empty list of node details
* @throws IOException
* @throws YarnException
*/
@VisibleForTesting
public List<ClusterNode> listClusterNodes(String[] uuids) throws
IOException,
YarnException {
if (uuids.length == 0) {
// short cut on an empty list
return new LinkedList<>();
}
return createClusterOperations().listClusterNodes(uuids);
}
/**
* Get the instance definition from the far end
*/
@VisibleForTesting
public AggregateConf getLiveInstanceDefinition() throws IOException, YarnException {
return createClusterOperations().getInstanceDefinition();
}
/**
* Bond to a running cluster
* @param clustername cluster name
* @return the AM RPC client
* @throws SliderException if the cluster is unkown
*/
private SliderClusterProtocol bondToCluster(String clustername) throws
YarnException,
IOException {
verifyBindingsDefined();
if (clustername == null) {
throw unknownClusterException("(undefined)");
}
ApplicationReport instance = findInstance(clustername);
if (null == instance) {
throw unknownClusterException(clustername);
}
return connect(instance);
}
/**
* Create a cluster operations instance against a given cluster
* @param clustername cluster name
* @return a bonded cluster operations instance
* @throws YarnException YARN issues
* @throws IOException IO problems
*/
private SliderClusterOperations createClusterOperations(String clustername) throws
YarnException,
IOException {
SliderClusterProtocol sliderAM = bondToCluster(clustername);
return new SliderClusterOperations(sliderAM);
}
/**
* Create a cluster operations instance against the active cluster
* -returning any previous created one if held.
* @return a bonded cluster operations instance
* @throws YarnException YARN issues
* @throws IOException IO problems
*/
private SliderClusterOperations createClusterOperations() throws
YarnException,
IOException {
if (sliderClusterOperations == null) {
sliderClusterOperations =
createClusterOperations(getDeployedClusterName());
}
return sliderClusterOperations;
}
/**
* Wait for an instance of a named role to be live (or past it in the lifecycle)
* @param role role to look for
* @param timeout time to wait
* @return the state. If still in CREATED, the cluster didn't come up
* in the time period. If LIVE, all is well. If >LIVE, it has shut for a reason
* @throws IOException IO
* @throws SliderException Slider
* @throws WaitTimeoutException if the wait timed out
*/
@VisibleForTesting
public int waitForRoleInstanceLive(String role, long timeout)
throws WaitTimeoutException, IOException, YarnException {
return createClusterOperations().waitForRoleInstanceLive(role, timeout);
}
/**
* Generate an exception for an unknown cluster
* @param clustername cluster name
* @return an exception with text and a relevant exit code
*/
public UnknownApplicationInstanceException unknownClusterException(String clustername) {
return UnknownApplicationInstanceException.unknownInstance(clustername);
}
@Override
public String toString() {
return "Slider Client in state " + getServiceState()
+ " and Slider Application Instance " + deployedClusterName;
}
/**
* Get all YARN applications
* @return a possibly empty list
* @throws YarnException
* @throws IOException
*/
@VisibleForTesting
public List<ApplicationReport> getApplications() throws YarnException, IOException {
return yarnClient.getApplications();
}
@VisibleForTesting
public ApplicationReport getApplicationReport(ApplicationId appId)
throws YarnException, IOException {
return new LaunchedApplication(appId, yarnClient).getApplicationReport();
}
/**
* The configuration used for deployment (after resolution).
* Non-null only after the client has launched the application
* @return the resolved configuration or null
*/
@VisibleForTesting
public AggregateConf getLaunchedInstanceDefinition() {
return launchedInstanceDefinition;
}
@Override
public int actionResolve(ActionResolveArgs args)
throws YarnException, IOException {
// as this is an API entry point, validate
// the arguments
args.validate();
RegistryOperations operations = getRegistryOperations();
String path = SliderRegistryUtils.resolvePath(args.path);
ServiceRecordMarshal serviceRecordMarshal = new ServiceRecordMarshal();
try {
if (args.list) {
File destDir = args.destdir;
if (destDir != null) {
destDir.mkdirs();
}
Map<String, ServiceRecord> recordMap;
Map<String, RegistryPathStatus> znodes;
try {
znodes = statChildren(registryOperations, path);
recordMap = extractServiceRecords(registryOperations,
path,
znodes.values());
} catch (PathNotFoundException e) {
// treat the root directory as if if is always there
if ("/".equals(path)) {
znodes = new HashMap<>(0);
recordMap = new HashMap<>(0);
} else {
throw e;
}
}
// subtract all records from the znodes map to get pure directories
log.info("Entries: {}", znodes.size());
for (String name : znodes.keySet()) {
println(" " + name);
}
println("");
log.info("Service records: {}", recordMap.size());
for (Entry<String, ServiceRecord> recordEntry : recordMap.entrySet()) {
String name = recordEntry.getKey();
ServiceRecord instance = recordEntry.getValue();
String json = serviceRecordMarshal.toJson(instance);
if (destDir == null) {
println(name);
println(json);
} else {
String filename = RegistryPathUtils.lastPathEntry(name) + ".json";
File jsonFile = new File(destDir, filename);
SliderUtils.write(jsonFile,
serviceRecordMarshal.toBytes(instance),
true);
}
}
} else {
// resolve single entry
ServiceRecord instance = resolve(path);
File outFile = args.out;
if (args.destdir != null) {
outFile = new File(args.destdir, RegistryPathUtils.lastPathEntry(path));
}
if (outFile != null) {
SliderUtils.write(outFile, serviceRecordMarshal.toBytes(instance), true);
} else {
println(serviceRecordMarshal.toJson(instance));
}
}
// TODO JDK7
} catch (PathNotFoundException e) {
// no record at this path
throw new NotFoundException(e, path);
} catch (NoRecordException e) {
throw new NotFoundException(e, path);
}
return EXIT_SUCCESS;
}
@Override
public int actionRegistry(ActionRegistryArgs registryArgs) throws
YarnException,
IOException {
// as this is also a test entry point, validate
// the arguments
registryArgs.validate();
try {
if (registryArgs.list) {
actionRegistryList(registryArgs);
} else if (registryArgs.listConf) {
// list the configurations
actionRegistryListConfigsYarn(registryArgs);
} else if (registryArgs.listExports) {
// list the exports
actionRegistryListExports(registryArgs);
} else if (SliderUtils.isSet(registryArgs.getConf)) {
// get a configuration
PublishedConfiguration publishedConfiguration =
actionRegistryGetConfig(registryArgs);
outputConfig(publishedConfiguration, registryArgs);
} else if (SliderUtils.isSet(registryArgs.getExport)) {
// get a export group
PublishedExports publishedExports =
actionRegistryGetExport(registryArgs);
outputExport(publishedExports, registryArgs);
} else {
// it's an unknown command
log.info(CommonArgs.usage(serviceArgs, ACTION_DIAGNOSTICS));
return EXIT_USAGE;
}
// JDK7
} catch (FileNotFoundException e) {
log.info("{}", e);
log.debug("{}", e, e);
return EXIT_NOT_FOUND;
} catch (PathNotFoundException e) {
log.info("{}", e);
log.debug("{}", e, e);
return EXIT_NOT_FOUND;
}
return EXIT_SUCCESS;
}
/**
* Registry operation
*
* @param registryArgs registry Arguments
* @return the instances (for tests)
* @throws YarnException YARN problems
* @throws IOException Network or other problems
*/
@VisibleForTesting
public Collection<ServiceRecord> actionRegistryList(
ActionRegistryArgs registryArgs)
throws YarnException, IOException {
String serviceType = registryArgs.serviceType;
String name = registryArgs.name;
RegistryOperations operations = getRegistryOperations();
Collection<ServiceRecord> serviceRecords;
if (StringUtils.isEmpty(name)) {
String path =
serviceclassPath(
currentUser(),
serviceType);
try {
Map<String, ServiceRecord> recordMap =
listServiceRecords(operations, path);
if (recordMap.isEmpty()) {
throw new UnknownApplicationInstanceException(
"No applications registered under " + path);
}
serviceRecords = recordMap.values();
} catch (PathNotFoundException e) {
throw new NotFoundException(path, e);
}
} else {
ServiceRecord instance = lookupServiceRecord(registryArgs);
serviceRecords = new ArrayList<>(1);
serviceRecords.add(instance);
}
for (ServiceRecord serviceRecord : serviceRecords) {
logInstance(serviceRecord, registryArgs.verbose);
}
return serviceRecords;
}
@Override
public int actionDiagnostic(ActionDiagnosticArgs diagnosticArgs) {
try {
if (diagnosticArgs.client) {
actionDiagnosticClient(diagnosticArgs);
} else if (diagnosticArgs.application) {
actionDiagnosticApplication(diagnosticArgs);
} else if (diagnosticArgs.yarn) {
actionDiagnosticYarn(diagnosticArgs);
} else if (diagnosticArgs.credentials) {
actionDiagnosticCredentials();
} else if (diagnosticArgs.all) {
actionDiagnosticAll(diagnosticArgs);
} else if (diagnosticArgs.level) {
actionDiagnosticIntelligent(diagnosticArgs);
} else {
// it's an unknown option
log.info(CommonArgs.usage(serviceArgs, ACTION_DIAGNOSTICS));
return EXIT_USAGE;
}
} catch (Exception e) {
log.error(e.toString());
return EXIT_FALSE;
}
return EXIT_SUCCESS;
}
private void actionDiagnosticIntelligent(ActionDiagnosticArgs diagnosticArgs)
throws YarnException, IOException, URISyntaxException {
// not using member variable clustername because we want to place
// application name after --application option and member variable
// cluster name has to be put behind action
String clusterName = diagnosticArgs.name;
if(SliderUtils.isUnset(clusterName)){
throw new BadCommandArgumentsException("application name must be provided with --name option");
}
try {
SliderUtils.validateClientConfigFile();
log.info("Slider-client.xml is accessible");
} catch (IOException e) {
// we are catching exceptions here because those are indication of
// validation result, and we need to print them here
log.error(
"validation of slider-client.xml fails because: " + e.toString(), e);
return;
}
SliderClusterOperations clusterOperations = createClusterOperations(clusterName);
// cluster not found exceptions will be thrown upstream
ClusterDescription clusterDescription = clusterOperations
.getClusterDescription();
log.info("Slider AppMaster is accessible");
if (clusterDescription.state == StateValues.STATE_LIVE) {
AggregateConf instanceDefinition = clusterOperations
.getInstanceDefinition();
String imagePath = instanceDefinition.getInternalOperations().get(
InternalKeys.INTERNAL_APPLICATION_IMAGE_PATH);
// if null, that means slider uploaded the agent tarball for the user
// and we need to use where slider has put
if (imagePath == null) {
ApplicationReport appReport = findInstance(clusterName);
Path path1 = sliderFileSystem.getTempPathForCluster(clusterName);
Path subPath = new Path(path1, appReport.getApplicationId().toString()
+ "/agent");
imagePath = subPath.toString();
}
try {
SliderUtils.validateHDFSFile(sliderFileSystem, imagePath + "/" + AGENT_TAR);
log.info("Slider agent package is properly installed");
} catch (FileNotFoundException e) {
log.error("can not find agent package: " + e.toString());
return;
} catch (IOException e) {
log.error("can not open agent package: " + e.toString());
return;
}
String pkgTarballPath = SliderUtils
.getApplicationDefinitionPath(instanceDefinition
.getAppConfOperations());
try {
SliderUtils.validateHDFSFile(sliderFileSystem, pkgTarballPath);
log.info("Application package is properly installed");
} catch (FileNotFoundException e) {
log.error("can not find application package: {}", e);
} catch (IOException e) {
log.error("can not open application package: {} ", e);
}
}
}
private void actionDiagnosticAll(ActionDiagnosticArgs diagnosticArgs)
throws IOException, YarnException {
// assign application name from param to each sub diagnostic function
actionDiagnosticClient(diagnosticArgs);
actionDiagnosticApplication(diagnosticArgs);
actionDiagnosticSlider(diagnosticArgs);
actionDiagnosticYarn(diagnosticArgs);
actionDiagnosticCredentials();
}
private void actionDiagnosticCredentials() throws BadConfigException,
IOException {
if (SliderUtils.isHadoopClusterSecure(SliderUtils
.loadSliderClientXML())) {
String credentialCacheFileDescription = null;
try {
credentialCacheFileDescription = SliderUtils.checkCredentialCacheFile();
} catch (BadConfigException e) {
log.error("The credential config is not valid: " + e.toString());
throw e;
} catch (IOException e) {
log.error("Unable to read the credential file: " + e.toString());
throw e;
}
log.info("Credential cache file for the current user: "
+ credentialCacheFileDescription);
} else {
log.info("the cluster is not in secure mode");
}
}
private void actionDiagnosticYarn(ActionDiagnosticArgs diagnosticArgs)
throws IOException, YarnException {
JSONObject converter = null;
log.info("the node in the YARN cluster has below state: ");
List<NodeReport> yarnClusterInfo;
try {
yarnClusterInfo = yarnClient.getNodeReports(NodeState.RUNNING);
} catch (YarnException e1) {
log.error("Exception happened when fetching node report from the YARN cluster: "
+ e1.toString());
throw e1;
} catch (IOException e1) {
log.error("Network problem happened when fetching node report YARN cluster: "
+ e1.toString());
throw e1;
}
for (NodeReport nodeReport : yarnClusterInfo) {
log.info(nodeReport.toString());
}
if (diagnosticArgs.verbose) {
Writer configWriter = new StringWriter();
try {
Configuration.dumpConfiguration(yarnClient.getConfig(), configWriter);
} catch (IOException e1) {
log.error("Network problem happened when retrieving YARN config from YARN: "
+ e1.toString());
throw e1;
}
try {
converter = new JSONObject(configWriter.toString());
log.info("the configuration of the YARN cluster is: "
+ converter.toString(2));
} catch (JSONException e) {
log.error("JSONException happened during parsing response from YARN: "
+ e.toString());
}
}
}
private void actionDiagnosticSlider(ActionDiagnosticArgs diagnosticArgs)
throws YarnException, IOException {
// not using member variable clustername because we want to place
// application name after --application option and member variable
// cluster name has to be put behind action
String clusterName = diagnosticArgs.name;
if(SliderUtils.isUnset(clusterName)){
throw new BadCommandArgumentsException("application name must be provided with --name option");
}
SliderClusterOperations clusterOperations;
AggregateConf instanceDefinition = null;
try {
clusterOperations = createClusterOperations(clusterName);
instanceDefinition = clusterOperations.getInstanceDefinition();
} catch (YarnException e) {
log.error("Exception happened when retrieving instance definition from YARN: "
+ e.toString());
throw e;
} catch (IOException e) {
log.error("Network problem happened when retrieving instance definition from YARN: "
+ e.toString());
throw e;
}
String imagePath = instanceDefinition.getInternalOperations().get(
InternalKeys.INTERNAL_APPLICATION_IMAGE_PATH);
// if null, it will be uploaded by Slider and thus at slider's path
if (imagePath == null) {
ApplicationReport appReport = findInstance(clusterName);
Path path1 = sliderFileSystem.getTempPathForCluster(clusterName);
Path subPath = new Path(path1, appReport.getApplicationId().toString()
+ "/agent");
imagePath = subPath.toString();
}
log.info("The path of slider agent tarball on HDFS is: " + imagePath);
}
private void actionDiagnosticApplication(ActionDiagnosticArgs diagnosticArgs)
throws YarnException, IOException {
// not using member variable clustername because we want to place
// application name after --application option and member variable
// cluster name has to be put behind action
String clusterName = diagnosticArgs.name;
if(SliderUtils.isUnset(clusterName)){
throw new BadCommandArgumentsException("application name must be provided with --name option");
}
SliderClusterOperations clusterOperations;
AggregateConf instanceDefinition = null;
try {
clusterOperations = createClusterOperations(clusterName);
instanceDefinition = clusterOperations.getInstanceDefinition();
} catch (YarnException e) {
log.error("Exception happened when retrieving instance definition from YARN: "
+ e.toString());
throw e;
} catch (IOException e) {
log.error("Network problem happened when retrieving instance definition from YARN: "
+ e.toString());
throw e;
}
String clusterDir = instanceDefinition.getAppConfOperations()
.getGlobalOptions().get(AgentKeys.APP_ROOT);
String pkgTarball = SliderUtils
.getApplicationDefinitionPath(instanceDefinition.getAppConfOperations());
String runAsUser = instanceDefinition.getAppConfOperations()
.getGlobalOptions().get(AgentKeys.RUNAS_USER);
log.info("The location of the cluster instance directory in HDFS is: "
+ clusterDir);
log.info("The name of the application package tarball on HDFS is: "
+ pkgTarball);
log.info("The runas user of the application in the cluster is: "
+ runAsUser);
if (diagnosticArgs.verbose) {
log.info("App config of the application: "
+ instanceDefinition.getAppConf().toJson());
log.info("Resource config of the application: "
+ instanceDefinition.getResources().toJson());
}
}
private void actionDiagnosticClient(ActionDiagnosticArgs diagnosticArgs)
throws SliderException, IOException {
try {
String currentCommandPath = SliderUtils.getCurrentCommandPath();
SliderVersionInfo.loadAndPrintVersionInfo(log);
String clientConfigPath = SliderUtils.getClientConfigPath();
String jdkInfo = SliderUtils.getJDKInfo();
println("The slider command path: %s", currentCommandPath);
println("The slider-client.xml used by current running command path: %s",
clientConfigPath);
println(jdkInfo);
// security info
Configuration config = getConfig();
if (SliderUtils.isHadoopClusterSecure(config)) {
println("Hadoop Cluster is secure");
println("Login user is %s", UserGroupInformation.getLoginUser());
println("Current user is %s", UserGroupInformation.getCurrentUser());
} else {
println("Hadoop Cluster is insecure");
}
// verbose?
if (diagnosticArgs.verbose) {
// do the environment
Map<String, String> env = SliderUtils.getSystemEnv();
Set<String> envList = ConfigHelper.sortedConfigKeys(env.entrySet());
StringBuilder builder = new StringBuilder("Environment variables:\n");
for (String key : envList) {
builder.append(key).append("=").append(env.get(key)).append("\n");
}
println(builder.toString());
// Java properties
builder = new StringBuilder("JVM Properties\n");
Map<String, String> props =
SliderUtils.sortedMap(SliderUtils.toMap(System.getProperties()));
for (Entry<String, String> entry : props.entrySet()) {
builder.append(entry.getKey()).append("=")
.append(entry.getValue()).append("\n");
}
println(builder.toString());
// then the config
println("Slider client configuration:\n"
+ ConfigHelper.dumpConfigToString(config));
}
SliderUtils.validateSliderClientEnvironment(log);
} catch (SliderException | IOException e) {
log.error(e.toString());
throw e;
}
}
/**
* Log a service record instance
* @param instance record
* @param verbose verbose logging of all external endpoints
*/
private void logInstance(ServiceRecord instance,
boolean verbose) {
if (!verbose) {
log.info("{}", instance.get(YarnRegistryAttributes.YARN_ID, ""));
} else {
log.info("{}: ", instance.get(YarnRegistryAttributes.YARN_ID, ""));
logEndpoints(instance);
}
}
/**
* Log the external endpoints of a service record
* @param instance service record instance
*/
private void logEndpoints(ServiceRecord instance) {
List<Endpoint> endpoints = instance.external;
for (Endpoint endpoint : endpoints) {
log.info(endpoint.toString());
}
}
/**
* list configs available for an instance
*
* @param registryArgs registry Arguments
* @throws YarnException YARN problems
* @throws IOException Network or other problems
*/
public void actionRegistryListConfigsYarn(ActionRegistryArgs registryArgs)
throws YarnException, IOException {
ServiceRecord instance = lookupServiceRecord(registryArgs);
RegistryRetriever retriever = new RegistryRetriever(getConfig(), instance);
PublishedConfigSet configurations =
retriever.getConfigurations(!registryArgs.internal);
PrintStream out = null;
try {
if (registryArgs.out != null) {
out = new PrintStream(new FileOutputStream(registryArgs.out));
} else {
out = System.out;
}
for (String configName : configurations.keys()) {
if (!registryArgs.verbose) {
out.println(configName);
} else {
PublishedConfiguration published =
configurations.get(configName);
out.printf("%s: %s\n",
configName,
published.description);
}
}
} finally {
if (registryArgs.out != null && out != null) {
out.flush();
out.close();
}
}
}
/**
* list exports available for an instance
*
* @param registryArgs registry Arguments
* @throws YarnException YARN problems
* @throws IOException Network or other problems
*/
public void actionRegistryListExports(ActionRegistryArgs registryArgs)
throws YarnException, IOException {
ServiceRecord instance = lookupServiceRecord(registryArgs);
RegistryRetriever retriever = new RegistryRetriever(getConfig(), instance);
PublishedExportsSet exports =
retriever.getExports(!registryArgs.internal);
PrintStream out = null;
boolean streaming = false;
try {
if (registryArgs.out != null) {
out = new PrintStream(new FileOutputStream(registryArgs.out));
streaming = true;
log.debug("Saving output to {}", registryArgs.out);
} else {
out = System.out;
}
log.debug("Number of exports: {}", exports.keys().size());
for (String exportName : exports.keys()) {
if (streaming) {
log.debug(exportName);
}
if (!registryArgs.verbose) {
out.println(exportName);
} else {
PublishedExports published = exports.get(exportName);
out.printf("%s: %s\n",
exportName,
published.description);
}
}
} finally {
if (streaming) {
out.flush();
out.close();
}
}
}
/**
* list configs available for an instance
*
* @param registryArgs registry Arguments
* @throws YarnException YARN problems
* @throws IOException Network or other problems
* @throws FileNotFoundException if the config is not found
*/
@VisibleForTesting
public PublishedConfiguration actionRegistryGetConfig(ActionRegistryArgs registryArgs)
throws YarnException, IOException {
ServiceRecord instance = lookupServiceRecord(registryArgs);
RegistryRetriever retriever = new RegistryRetriever(getConfig(), instance);
boolean external = !registryArgs.internal;
PublishedConfigSet configurations =
retriever.getConfigurations(external);
PublishedConfiguration published = retriever.retrieveConfiguration(configurations,
registryArgs.getConf,
external);
return published;
}
/**
* get a specific export group
*
* @param registryArgs registry Arguments
*
* @throws YarnException YARN problems
* @throws IOException Network or other problems
* @throws FileNotFoundException if the config is not found
*/
@VisibleForTesting
public PublishedExports actionRegistryGetExport(ActionRegistryArgs registryArgs)
throws YarnException, IOException {
ServiceRecord instance = lookupServiceRecord(registryArgs);
RegistryRetriever retriever = new RegistryRetriever(getConfig(), instance);
boolean external = !registryArgs.internal;
PublishedExportsSet exports =
retriever.getExports(external);
PublishedExports published = retriever.retrieveExports(exports,
registryArgs.getExport,
external);
return published;
}
/**
* write out the config. If a destination is provided and that dir is a
* directory, the entry is written to it with the name provided + extension,
* else it is printed to standard out.
* @param published published config
* @param registryArgs registry Arguments
* @throws BadCommandArgumentsException
* @throws IOException
*/
private void outputConfig(PublishedConfiguration published,
ActionRegistryArgs registryArgs) throws
BadCommandArgumentsException,
IOException {
// decide whether or not to print
String entry = registryArgs.getConf;
String format = registryArgs.format;
ConfigFormat configFormat = ConfigFormat.resolve(format);
if (configFormat == null) {
throw new BadCommandArgumentsException(
"Unknown/Unsupported format %s ", format);
}
PublishedConfigurationOutputter outputter =
PublishedConfigurationOutputter.createOutputter(configFormat,
published);
boolean print = registryArgs.out == null;
if (!print) {
File outputPath = registryArgs.out;
if (outputPath.isDirectory()) {
// creating it under a directory
outputPath = new File(outputPath, entry + "." + format);
}
log.debug("Destination path: {}", outputPath);
outputter.save(outputPath);
} else {
print(outputter.asString());
}
}
/**
* write out the config
* @param published
* @param registryArgs
* @throws BadCommandArgumentsException
* @throws IOException
*/
private void outputExport(PublishedExports published,
ActionRegistryArgs registryArgs) throws
BadCommandArgumentsException,
IOException {
// decide whether or not to print
String entry = registryArgs.getExport;
String format = ConfigFormat.JSON.toString();
ConfigFormat configFormat = ConfigFormat.resolve(format);
if (configFormat == null || configFormat != ConfigFormat.JSON) {
throw new BadCommandArgumentsException(
"Unknown/Unsupported format %s . Only JSON is supported.", format);
}
PublishedExportsOutputter outputter =
PublishedExportsOutputter.createOutputter(configFormat,
published);
boolean print = registryArgs.out == null;
if (!print) {
File destFile;
destFile = registryArgs.out;
if (destFile.isDirectory()) {
// creating it under a directory
destFile = new File(destFile, entry + "." + format);
}
log.info("Destination path: {}", destFile);
outputter.save(destFile);
} else {
print(outputter.asString());
}
}
/**
* Look up an instance
* @return instance data
* @throws SliderException other failures
* @throws IOException IO problems or wrapped exceptions
*/
private ServiceRecord lookupServiceRecord(ActionRegistryArgs registryArgs) throws
SliderException,
IOException {
String user;
if (StringUtils.isNotEmpty(registryArgs.user)) {
user = RegistryPathUtils.encodeForRegistry(registryArgs.user);
} else {
user = currentUser();
}
String path = servicePath(user, registryArgs.serviceType,
registryArgs.name);
return resolve(path);
}
/**
* Look up a service record of the current user
* @param serviceType service type
* @param id instance ID
* @return instance data
* @throws UnknownApplicationInstanceException no path or service record
* at the end of the path
* @throws SliderException other failures
* @throws IOException IO problems or wrapped exceptions
*/
public ServiceRecord lookupServiceRecord(String serviceType, String id)
throws IOException, SliderException {
String path = servicePath(currentUser(), serviceType, id);
return resolve(path);
}
/**
*
* Look up an instance
* @param path path
* @return instance data
* @throws NotFoundException no path/no service record
* at the end of the path
* @throws SliderException other failures
* @throws IOException IO problems or wrapped exceptions
*/
public ServiceRecord resolve(String path)
throws IOException, SliderException {
try {
return getRegistryOperations().resolve(path);
} catch (PathNotFoundException | NoRecordException e) {
throw new NotFoundException(e.getPath().toString(), e);
}
}
/**
* List instances in the registry for the current user
* @return a list of slider registry instances
* @throws IOException Any IO problem ... including no path in the registry
* to slider service classes for this user
* @throws SliderException other failures
*/
public Map<String, ServiceRecord> listRegistryInstances()
throws IOException, SliderException {
Map<String, ServiceRecord> recordMap = listServiceRecords(
getRegistryOperations(),
serviceclassPath(currentUser(), SliderKeys.APP_TYPE));
return recordMap;
}
/**
* List instances in the registry
* @return the instance IDs
* @throws IOException
* @throws YarnException
*/
public List<String> listRegisteredSliderInstances() throws
IOException,
YarnException {
try {
Map<String, ServiceRecord> recordMap = listServiceRecords(
getRegistryOperations(),
serviceclassPath(currentUser(), SliderKeys.APP_TYPE));
return new ArrayList<>(recordMap.keySet());
} catch (PathNotFoundException e) {
log.debug("No registry path for slider instances for current user: {}", e, e);
// no entries: return an empty list
return new ArrayList<>(0);
} catch (IOException | YarnException e) {
throw e;
} catch (Exception e) {
throw new IOException(e);
}
}
/**
* Start the registry if it is not there yet
* @return the registry service
* @throws SliderException
* @throws IOException
*/
private synchronized RegistryOperations maybeStartYarnRegistry()
throws SliderException, IOException {
if (registryOperations == null) {
registryOperations = startRegistryOperationsService();
}
return registryOperations;
}
@Override
public RegistryOperations getRegistryOperations()
throws SliderException, IOException {
return maybeStartYarnRegistry();
}
/**
* Output to standard out/stderr (implementation specific detail)
* @param src source
*/
@SuppressWarnings("UseOfSystemOutOrSystemErr")
private static void print(CharSequence src) {
clientOutputStream.append(src);
}
/**
* Output to standard out/stderr with a newline after
* @param message message
*/
private static void println(String message) {
print(message);
print("\n");
}
/**
* Output to standard out/stderr with a newline after, formatted
* @param message message
* @param args arguments for string formatting
*/
private static void println(String message, Object ... args) {
print(String.format(message, args));
print("\n");
}
/**
* Implement the lookup action.
* @param args Action arguments
* @return 0 if the entry was found
* @throws IOException
* @throws YarnException
* @throws UnknownApplicationInstanceException if a specific instance
* was named but it was not found
*/
@VisibleForTesting
public int actionLookup(ActionLookupArgs args)
throws IOException, YarnException {
verifyBindingsDefined();
try {
ApplicationId id = ConverterUtils.toApplicationId(args.id);
ApplicationReport report = yarnClient.getApplicationReport(id);
SerializedApplicationReport sar = new SerializedApplicationReport(report);
ApplicationReportSerDeser serDeser = new ApplicationReportSerDeser();
if (args.outputFile != null) {
serDeser.save(sar, args.outputFile);
} else {
println(serDeser.toJson(sar));
}
} catch (IllegalArgumentException e) {
throw new BadCommandArgumentsException(e, "%s : %s", args, e);
} catch (ApplicationAttemptNotFoundException | ApplicationNotFoundException notFound) {
throw new NotFoundException(notFound, notFound.toString());
}
return EXIT_SUCCESS;
}
@Override
public int actionDependency(ActionDependencyArgs args) throws IOException,
YarnException {
// check to ensure if the current user is hdfs
String currentUser = getUsername();
String hdfsUser = "hdfs";
if (!hdfsUser.equalsIgnoreCase(currentUser)) {
log.error("Please run this command as user {}", hdfsUser);
return EXIT_FALSE;
}
String version = SliderUtils.getSliderVersion();
Path dependencyLibTarGzip = sliderFileSystem.getDependencyTarGzip();
// Check if dependency has already been uploaded, in which case log
// appropriately and exit success (unless overwrite has been requested)
if (sliderFileSystem.isFile(dependencyLibTarGzip) && !args.overwrite) {
println(String.format(
"Dependency libs are already uploaded to %s. Use %s "
+ "if you want to re-upload", dependencyLibTarGzip.toUri(),
Arguments.ARG_OVERWRITE));
return EXIT_SUCCESS;
}
String libDir = System.getProperty(SliderKeys.PROPERTY_LIB_DIR);
if (SliderUtils.isSet(libDir)) {
File srcFolder = new File(libDir);
File tempLibTarGzipFile = File.createTempFile(
SliderKeys.SLIDER_DEPENDENCY_TAR_GZ_FILE_NAME + "_",
SliderKeys.SLIDER_DEPENDENCY_TAR_GZ_FILE_EXT);
// copy all jars except slider-core-<version>.jar
FilenameFilter jarFilter = new FilenameFilter() {
public boolean accept(File dir, String name) {
String lowercaseName = name.toLowerCase();
if (lowercaseName.endsWith(".jar")) {
return true;
} else {
return false;
}
}
};
SliderUtils.tarGzipFolder(srcFolder, tempLibTarGzipFile, jarFilter);
log.info("Uploading dependency for AM (version {}) from {} to {}",
version, tempLibTarGzipFile.toURI(), dependencyLibTarGzip.toUri());
sliderFileSystem.copyLocalFileToHdfs(tempLibTarGzipFile,
dependencyLibTarGzip, new FsPermission(
SliderKeys.SLIDER_DEPENDENCY_DIR_PERMISSIONS));
return EXIT_SUCCESS;
} else {
return EXIT_FALSE;
}
}
private int actionHelp(String actionName) throws YarnException, IOException {
throw new UsageException(CommonArgs.usage(serviceArgs, actionName));
}
private int actionHelp(String errMsg, String actionName)
throws YarnException, IOException {
throw new UsageException("%s %s", errMsg, CommonArgs.usage(serviceArgs,
actionName));
}
}