| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.slider.client; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.io.Files; |
| import org.apache.commons.collections.CollectionUtils; |
| import org.apache.commons.io.IOUtils; |
| import org.apache.commons.lang.ArrayUtils; |
| import org.apache.commons.lang.StringUtils; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FSDataOutputStream; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.LocatedFileStatus; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.PathNotFoundException; |
| import org.apache.hadoop.fs.RemoteIterator; |
| import org.apache.hadoop.fs.permission.FsAction; |
| import org.apache.hadoop.fs.permission.FsPermission; |
| import org.apache.hadoop.hdfs.HdfsConfiguration; |
| import org.apache.hadoop.net.NetUtils; |
| import org.apache.hadoop.registry.client.api.RegistryConstants; |
| import org.apache.hadoop.registry.client.api.RegistryOperations; |
| import org.apache.hadoop.registry.client.binding.RegistryPathUtils; |
| import org.apache.hadoop.registry.client.binding.RegistryUtils; |
| import org.apache.hadoop.registry.client.exceptions.NoRecordException; |
| import org.apache.hadoop.registry.client.types.Endpoint; |
| import org.apache.hadoop.registry.client.types.RegistryPathStatus; |
| import org.apache.hadoop.registry.client.types.ServiceRecord; |
| import org.apache.hadoop.registry.client.types.yarn.YarnRegistryAttributes; |
| import org.apache.hadoop.security.Credentials; |
| import org.apache.hadoop.security.KerberosDiags; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.security.alias.CredentialProvider; |
| import org.apache.hadoop.security.alias.CredentialProviderFactory; |
| import org.apache.hadoop.util.Shell; |
| import org.apache.hadoop.yarn.api.ApplicationConstants; |
| import org.apache.hadoop.yarn.api.records.ApplicationId; |
| import org.apache.hadoop.yarn.api.records.ApplicationReport; |
| import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; |
| import org.apache.hadoop.yarn.api.records.LocalResource; |
| import org.apache.hadoop.yarn.api.records.NodeReport; |
| import org.apache.hadoop.yarn.api.records.NodeState; |
| import org.apache.hadoop.yarn.api.records.YarnApplicationState; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException; |
| import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; |
| import org.apache.hadoop.yarn.exceptions.YarnException; |
| import org.apache.hadoop.yarn.util.ConverterUtils; |
| import org.apache.slider.api.ClusterDescription; |
| import org.apache.slider.api.ClusterNode; |
| import org.apache.slider.api.SliderApplicationApi; |
| import org.apache.slider.api.SliderClusterProtocol; |
| import org.apache.slider.api.StateValues; |
| import org.apache.slider.api.proto.Messages; |
| import org.apache.slider.api.types.ContainerInformation; |
| import org.apache.slider.api.types.NodeInformationList; |
| import org.apache.slider.api.types.SliderInstanceDescription; |
| import org.apache.slider.client.ipc.SliderApplicationIpcClient; |
| import org.apache.slider.client.ipc.SliderClusterOperations; |
| import org.apache.slider.common.Constants; |
| import org.apache.slider.common.SliderExitCodes; |
| import org.apache.slider.common.SliderKeys; |
| import org.apache.slider.common.params.AbstractActionArgs; |
| import org.apache.slider.common.params.AbstractClusterBuildingActionArgs; |
| import org.apache.slider.common.params.ActionAMSuicideArgs; |
| import org.apache.slider.common.params.ActionClientArgs; |
| import org.apache.slider.common.params.ActionCreateArgs; |
| import org.apache.slider.common.params.ActionDependencyArgs; |
| import org.apache.slider.common.params.ActionDestroyArgs; |
| import org.apache.slider.common.params.ActionDiagnosticArgs; |
| import org.apache.slider.common.params.ActionEchoArgs; |
| import org.apache.slider.common.params.ActionExistsArgs; |
| import org.apache.slider.common.params.ActionFlexArgs; |
| import org.apache.slider.common.params.ActionFreezeArgs; |
| import org.apache.slider.common.params.ActionInstallKeytabArgs; |
| import org.apache.slider.common.params.ActionInstallPackageArgs; |
| import org.apache.slider.common.params.ActionKDiagArgs; |
| import org.apache.slider.common.params.ActionKeytabArgs; |
| import org.apache.slider.common.params.ActionKillContainerArgs; |
| import org.apache.slider.common.params.ActionListArgs; |
| import org.apache.slider.common.params.ActionLookupArgs; |
| import org.apache.slider.common.params.ActionNodesArgs; |
| import org.apache.slider.common.params.ActionPackageArgs; |
| import org.apache.slider.common.params.ActionRegistryArgs; |
| import org.apache.slider.common.params.ActionResolveArgs; |
| import org.apache.slider.common.params.ActionStatusArgs; |
| import org.apache.slider.common.params.ActionThawArgs; |
| import org.apache.slider.common.params.ActionTokensArgs; |
| import org.apache.slider.common.params.ActionUpgradeArgs; |
| import org.apache.slider.common.params.Arguments; |
| import org.apache.slider.common.params.ClientArgs; |
| import org.apache.slider.common.params.CommonArgs; |
| import org.apache.slider.common.params.LaunchArgsAccessor; |
| import org.apache.slider.common.tools.ConfigHelper; |
| import org.apache.slider.common.tools.Duration; |
| import org.apache.slider.common.tools.SliderFileSystem; |
| import org.apache.slider.common.tools.SliderUtils; |
| import org.apache.slider.common.tools.SliderVersionInfo; |
| import org.apache.slider.core.build.InstanceBuilder; |
| import org.apache.slider.core.build.InstanceIO; |
| import org.apache.slider.core.conf.AggregateConf; |
| import org.apache.slider.core.conf.ConfTree; |
| import org.apache.slider.core.conf.ConfTreeOperations; |
| import org.apache.slider.core.conf.MapOperations; |
| import org.apache.slider.core.conf.ResourcesInputPropertiesValidator; |
| import org.apache.slider.core.conf.TemplateInputPropertiesValidator; |
| import org.apache.slider.core.exceptions.BadClusterStateException; |
| import org.apache.slider.core.exceptions.BadCommandArgumentsException; |
| import org.apache.slider.core.exceptions.BadConfigException; |
| import org.apache.slider.core.exceptions.ErrorStrings; |
| import org.apache.slider.core.exceptions.NoSuchNodeException; |
| import org.apache.slider.core.exceptions.NotFoundException; |
| import org.apache.slider.core.exceptions.SliderException; |
| import org.apache.slider.core.exceptions.UnknownApplicationInstanceException; |
| import org.apache.slider.core.exceptions.UsageException; |
| import org.apache.slider.core.exceptions.WaitTimeoutException; |
| import org.apache.slider.core.launch.AppMasterLauncher; |
| import org.apache.slider.core.launch.ClasspathConstructor; |
| import org.apache.slider.core.launch.CredentialUtils; |
| import org.apache.slider.core.launch.JavaCommandLineBuilder; |
| import org.apache.slider.core.launch.LaunchedApplication; |
| import org.apache.slider.core.launch.RunningApplication; |
| import org.apache.slider.core.launch.SerializedApplicationReport; |
| import org.apache.slider.core.main.RunService; |
| import org.apache.slider.core.persist.AppDefinitionPersister; |
| import org.apache.slider.core.persist.ApplicationReportSerDeser; |
| import org.apache.slider.core.persist.ConfPersister; |
| import org.apache.slider.core.persist.JsonSerDeser; |
| import org.apache.slider.core.persist.LockAcquireFailedException; |
| import org.apache.slider.core.registry.SliderRegistryUtils; |
| import org.apache.slider.core.registry.YarnAppListClient; |
| import org.apache.slider.core.registry.docstore.ConfigFormat; |
| import org.apache.slider.core.registry.docstore.PublishedConfigSet; |
| import org.apache.slider.core.registry.docstore.PublishedConfiguration; |
| import org.apache.slider.core.registry.docstore.PublishedConfigurationOutputter; |
| import org.apache.slider.core.registry.docstore.PublishedExports; |
| import org.apache.slider.core.registry.docstore.PublishedExportsOutputter; |
| import org.apache.slider.core.registry.docstore.PublishedExportsSet; |
| import org.apache.slider.core.registry.retrieve.RegistryRetriever; |
| import org.apache.slider.core.zk.BlockingZKWatcher; |
| import org.apache.slider.core.zk.ZKIntegration; |
| import org.apache.slider.core.zk.ZKPathBuilder; |
| import org.apache.slider.providers.AbstractClientProvider; |
| import org.apache.slider.providers.SliderProviderFactory; |
| import org.apache.slider.providers.agent.AgentKeys; |
| import org.apache.slider.providers.slideram.SliderAMClientProvider; |
| import org.apache.slider.server.appmaster.SliderAppMaster; |
| import org.apache.slider.server.appmaster.rpc.RpcBinder; |
| import org.apache.slider.server.services.security.SecurityStore; |
| import org.apache.slider.server.services.utility.AbstractSliderLaunchedService; |
| import org.apache.zookeeper.CreateMode; |
| import org.apache.zookeeper.KeeperException; |
| import org.apache.zookeeper.ZooDefs; |
| import org.apache.zookeeper.data.ACL; |
| import org.codehaus.jettison.json.JSONException; |
| import org.codehaus.jettison.json.JSONObject; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.BufferedReader; |
| import java.io.File; |
| import java.io.FilenameFilter; |
| import java.io.FileNotFoundException; |
| import java.io.FileOutputStream; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.InputStreamReader; |
| import java.io.InterruptedIOException; |
| import java.io.PrintStream; |
| import java.io.PrintWriter; |
| import java.io.StringWriter; |
| import java.io.Writer; |
| import java.net.InetAddress; |
| import java.net.InetSocketAddress; |
| import java.net.URISyntaxException; |
| import java.nio.charset.Charset; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Locale; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Set; |
| import java.util.regex.Matcher; |
| import java.util.regex.Pattern; |
| |
| import static org.apache.hadoop.registry.client.binding.RegistryUtils.*; |
| import static org.apache.slider.api.InternalKeys.*; |
| import static org.apache.slider.api.OptionKeys.*; |
| import static org.apache.slider.api.ResourceKeys.*; |
| import static org.apache.slider.common.Constants.HADOOP_JAAS_DEBUG; |
| import static org.apache.slider.common.params.SliderActions.*; |
| import static org.apache.slider.common.tools.SliderUtils.*; |
| |
| |
| /** |
| * Client service for Slider |
| */ |
| |
| public class SliderClient extends AbstractSliderLaunchedService implements RunService, |
| SliderExitCodes, SliderKeys, ErrorStrings, SliderClientAPI { |
| private static final Logger log = LoggerFactory.getLogger(SliderClient.class); |
| public static final String E_MUST_BE_A_VALID_JSON_FILE |
| = "Invalid configuration. Must be a valid json file."; |
| public static final String E_INVALID_INSTALL_LOCATION |
| = "A valid install location must be provided for the client."; |
| public static final String E_UNABLE_TO_READ_SUPPLIED_PACKAGE_FILE |
| = "Unable to read supplied package file"; |
| public static final String E_INVALID_APPLICATION_PACKAGE_LOCATION |
| = "A valid application package location required."; |
| public static final String E_INVALID_INSTALL_PATH = "Install path is not a valid directory"; |
| public static final String E_INSTALL_PATH_DOES_NOT_EXIST = "Install path does not exist"; |
| public static final String E_INVALID_APPLICATION_TYPE_NAME |
| = "A valid application type name is required (e.g. HBASE)."; |
| public static final String E_USE_REPLACEPKG_TO_OVERWRITE = "Use --replacepkg to overwrite."; |
| public static final String E_PACKAGE_DOES_NOT_EXIST = "Package does not exist"; |
| public static final String E_NO_ZOOKEEPER_QUORUM = "No Zookeeper quorum defined"; |
| public static final String E_NO_RESOURCE_MANAGER = "No valid Resource Manager address provided"; |
| public static final String E_PACKAGE_EXISTS = "Package exists"; |
| private static PrintStream clientOutputStream = System.out; |
| |
| // value should not be changed without updating string find in slider.py |
| private static final String PASSWORD_PROMPT = "Enter password for"; |
| |
| private ClientArgs serviceArgs; |
| public ApplicationId applicationId; |
| |
| private String deployedClusterName; |
| /** |
| * Cluster operations against the deployed cluster -will be null |
| * if no bonding has yet taken place |
| */ |
| private SliderClusterOperations sliderClusterOperations; |
| |
| protected SliderFileSystem sliderFileSystem; |
| |
| /** |
| * Yarn client service |
| */ |
| private SliderYarnClientImpl yarnClient; |
| private YarnAppListClient yarnAppListClient; |
| private AggregateConf launchedInstanceDefinition; |
| |
| /** |
| * The YARN registry service |
| */ |
| @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") |
| private RegistryOperations registryOperations; |
| |
| /** |
| * Constructor |
| */ |
| public SliderClient() { |
| super("Slider Client"); |
| new HdfsConfiguration(); |
| new YarnConfiguration(); |
| } |
| |
| /** |
| * This is called <i>Before serviceInit is called</i> |
| * @param config the initial configuration build up by the |
| * service launcher. |
| * @param args argument list list of arguments passed to the command line |
| * after any launcher-specific commands have been stripped. |
| * @return the post-binding configuration to pass to the <code>init()</code> |
| * operation. |
| * @throws Exception |
| */ |
| @Override |
| public Configuration bindArgs(Configuration config, String... args) throws Exception { |
| config = super.bindArgs(config, args); |
| serviceArgs = new ClientArgs(args); |
| serviceArgs.parse(); |
| // add the slider XML config |
| ConfigHelper.injectSliderXMLResource(); |
| // yarn-ify |
| YarnConfiguration yarnConfiguration = new YarnConfiguration(config); |
| return patchConfiguration(yarnConfiguration); |
| } |
| |
| @Override |
| protected void serviceInit(Configuration conf) throws Exception { |
| Configuration clientConf = loadSliderClientXML(); |
| ConfigHelper.mergeConfigurations(conf, clientConf, SLIDER_CLIENT_XML, true); |
| serviceArgs.applyDefinitions(conf); |
| serviceArgs.applyFileSystemBinding(conf); |
| AbstractActionArgs coreAction = serviceArgs.getCoreAction(); |
| // init security with our conf |
| if (!coreAction.disableSecureLogin() && isHadoopClusterSecure(conf)) { |
| forceLogin(); |
| initProcessSecurity(conf); |
| } |
| if (coreAction.getHadoopServicesRequired()) { |
| initHadoopBinding(); |
| } |
| super.serviceInit(conf); |
| } |
| |
| /** |
| * Launched service execution. This runs {@link #exec()} |
| * then catches some exceptions and converts them to exit codes |
| * @return an exit code |
| * @throws Throwable |
| */ |
| @Override |
| public int runService() throws Throwable { |
| try { |
| return exec(); |
| } catch (FileNotFoundException | PathNotFoundException nfe) { |
| throw new NotFoundException(nfe, nfe.toString()); |
| } |
| } |
| |
| /** |
| * Execute the command line |
| * @return an exit code |
| * @throws Throwable on a failure |
| */ |
| public int exec() throws Throwable { |
| |
| // choose the action |
| String action = serviceArgs.getAction(); |
| if (isUnset(action)) { |
| throw new SliderException(EXIT_USAGE, serviceArgs.usage()); |
| } |
| |
| int exitCode = EXIT_SUCCESS; |
| String clusterName = serviceArgs.getClusterName(); |
| // actions |
| |
| switch (action) { |
| case ACTION_AM_SUICIDE: |
| exitCode = actionAmSuicide(clusterName, |
| serviceArgs.getActionAMSuicideArgs()); |
| break; |
| |
| case ACTION_BUILD: |
| exitCode = actionBuild(clusterName, serviceArgs.getActionBuildArgs()); |
| break; |
| |
| case ACTION_CLIENT: |
| exitCode = actionClient(serviceArgs.getActionClientArgs()); |
| break; |
| |
| case ACTION_CREATE: |
| exitCode = actionCreate(clusterName, serviceArgs.getActionCreateArgs()); |
| break; |
| |
| case ACTION_DEPENDENCY: |
| exitCode = actionDependency(serviceArgs.getActionDependencyArgs()); |
| break; |
| |
| case ACTION_DESTROY: |
| exitCode = actionDestroy(clusterName, serviceArgs.getActionDestroyArgs()); |
| break; |
| |
| case ACTION_DIAGNOSTICS: |
| exitCode = actionDiagnostic(serviceArgs.getActionDiagnosticArgs()); |
| break; |
| |
| case ACTION_EXISTS: |
| exitCode = actionExists(clusterName, |
| serviceArgs.getActionExistsArgs()); |
| break; |
| |
| case ACTION_FLEX: |
| exitCode = actionFlex(clusterName, serviceArgs.getActionFlexArgs()); |
| break; |
| |
| case ACTION_FREEZE: |
| exitCode = actionFreeze(clusterName, serviceArgs.getActionFreezeArgs()); |
| break; |
| |
| case ACTION_HELP: |
| log.info(serviceArgs.usage()); |
| break; |
| |
| case ACTION_KDIAG: |
| exitCode = actionKDiag(serviceArgs.getActionKDiagArgs()); |
| break; |
| |
| case ACTION_KILL_CONTAINER: |
| exitCode = actionKillContainer(clusterName, |
| serviceArgs.getActionKillContainerArgs()); |
| break; |
| |
| case ACTION_INSTALL_KEYTAB: |
| exitCode = actionInstallKeytab(serviceArgs.getActionInstallKeytabArgs()); |
| break; |
| |
| case ACTION_INSTALL_PACKAGE: |
| exitCode = actionInstallPkg(serviceArgs.getActionInstallPackageArgs()); |
| break; |
| |
| case ACTION_KEYTAB: |
| exitCode = actionKeytab(serviceArgs.getActionKeytabArgs()); |
| break; |
| |
| case ACTION_LIST: |
| exitCode = actionList(clusterName, serviceArgs.getActionListArgs()); |
| break; |
| |
| case ACTION_LOOKUP: |
| exitCode = actionLookup(serviceArgs.getActionLookupArgs()); |
| break; |
| |
| case ACTION_NODES: |
| exitCode = actionNodes("", serviceArgs.getActionNodesArgs()); |
| break; |
| |
| case ACTION_PACKAGE: |
| exitCode = actionPackage(serviceArgs.getActionPackageArgs()); |
| break; |
| |
| case ACTION_REGISTRY: |
| exitCode = actionRegistry(serviceArgs.getActionRegistryArgs()); |
| break; |
| |
| case ACTION_RESOLVE: |
| exitCode = actionResolve(serviceArgs.getActionResolveArgs()); |
| break; |
| |
| case ACTION_STATUS: |
| exitCode = actionStatus(clusterName, serviceArgs.getActionStatusArgs()); |
| break; |
| |
| case ACTION_THAW: |
| exitCode = actionThaw(clusterName, serviceArgs.getActionThawArgs()); |
| break; |
| |
| case ACTION_TOKENS: |
| exitCode = actionTokens(serviceArgs.getActionTokenArgs()); |
| break; |
| |
| case ACTION_UPDATE: |
| exitCode = actionUpdate(clusterName, serviceArgs.getActionUpdateArgs()); |
| break; |
| |
| case ACTION_UPGRADE: |
| exitCode = actionUpgrade(clusterName, serviceArgs.getActionUpgradeArgs()); |
| break; |
| |
| case ACTION_VERSION: |
| exitCode = actionVersion(); |
| break; |
| |
| default: |
| throw new SliderException(EXIT_UNIMPLEMENTED, |
| "Unimplemented: " + action); |
| } |
| |
| return exitCode; |
| } |
| |
| /** |
| * Perform everything needed to init the hadoop binding. |
| * This assumes that the service is already in inited or started state |
| * @throws IOException |
| * @throws SliderException |
| */ |
| protected void initHadoopBinding() throws IOException, SliderException { |
| // validate the client |
| validateSliderClientEnvironment(null); |
| //create the YARN client |
| yarnClient = new SliderYarnClientImpl(); |
| yarnClient.init(getConfig()); |
| if (getServiceState() == STATE.STARTED) { |
| yarnClient.start(); |
| } |
| addService(yarnClient); |
| yarnAppListClient = |
| new YarnAppListClient(yarnClient, getUsername(), getConfig()); |
| // create the filesystem |
| sliderFileSystem = new SliderFileSystem(getConfig()); |
| } |
| |
| /** |
| * Delete the zookeeper node associated with the calling user and the cluster |
| * TODO: YARN registry operations |
| **/ |
| @VisibleForTesting |
| public boolean deleteZookeeperNode(String clusterName) throws YarnException, IOException { |
| String user = getUsername(); |
| String zkPath = ZKIntegration.mkClusterPath(user, clusterName); |
| Exception e = null; |
| try { |
| Configuration config = getConfig(); |
| ZKIntegration client = getZkClient(clusterName, user); |
| if (client != null) { |
| if (client.exists(zkPath)) { |
| log.info("Deleting zookeeper path {}", zkPath); |
| } |
| client.deleteRecursive(zkPath); |
| return true; |
| } |
| } catch (InterruptedException | BadConfigException | KeeperException ex) { |
| e = ex; |
| } |
| if (e != null) { |
| log.warn("Unable to recursively delete zk node {}", zkPath, e); |
| } |
| |
| return false; |
| } |
| |
| /** |
| * Create the zookeeper node associated with the calling user and the cluster |
| * |
| * @param clusterName slider application name |
| * @param nameOnly should the name only be created (i.e. don't create ZK node) |
| * @return the path, using the policy implemented in |
| * {@link ZKIntegration#mkClusterPath(String, String)} |
| * @throws YarnException |
| * @throws IOException |
| */ |
| @VisibleForTesting |
| public String createZookeeperNode(String clusterName, Boolean nameOnly) throws YarnException, IOException { |
| try { |
| return createZookeeperNodeInner(clusterName, nameOnly); |
| } catch (KeeperException.NodeExistsException e) { |
| return null; |
| } catch (KeeperException e) { |
| return null; |
| } catch (InterruptedException e) { |
| throw new InterruptedIOException(e.toString()); |
| } |
| } |
| |
| /** |
| * Create the zookeeper node associated with the calling user and the cluster |
| * -throwing exceptions on any failure |
| * @param clusterName cluster name |
| * @param nameOnly create the path, not the node |
| * @return the path, using the policy implemented in |
| * {@link ZKIntegration#mkClusterPath(String, String)} |
| * @throws YarnException |
| * @throws IOException |
| * @throws KeeperException |
| * @throws InterruptedException |
| */ |
| @VisibleForTesting |
| public String createZookeeperNodeInner(String clusterName, Boolean nameOnly) |
| throws YarnException, IOException, KeeperException, InterruptedException { |
| String user = getUsername(); |
| String zkPath = ZKIntegration.mkClusterPath(user, clusterName); |
| if (nameOnly) { |
| return zkPath; |
| } |
| ZKIntegration client = getZkClient(clusterName, user); |
| if (client != null) { |
| // set up the permissions. This must be done differently on a secure cluster from an insecure |
| // one |
| List<ACL> zkperms = new ArrayList<>(); |
| if (UserGroupInformation.isSecurityEnabled()) { |
| zkperms.add(new ACL(ZooDefs.Perms.ALL, ZooDefs.Ids.AUTH_IDS)); |
| zkperms.add(new ACL(ZooDefs.Perms.READ, ZooDefs.Ids.ANYONE_ID_UNSAFE)); |
| } else { |
| zkperms.add(new ACL(ZooDefs.Perms.ALL, ZooDefs.Ids.ANYONE_ID_UNSAFE)); |
| } |
| client.createPath(zkPath, "", |
| zkperms, |
| CreateMode.PERSISTENT); |
| return zkPath; |
| } else { |
| return null; |
| } |
| } |
| |
| /** |
| * Gets a zookeeper client, returns null if it cannot connect to zookeeper |
| **/ |
| protected ZKIntegration getZkClient(String clusterName, String user) throws YarnException { |
| String registryQuorum = lookupZKQuorum(); |
| ZKIntegration client = null; |
| try { |
| BlockingZKWatcher watcher = new BlockingZKWatcher(); |
| client = ZKIntegration.newInstance(registryQuorum, user, clusterName, true, false, watcher, |
| ZKIntegration.SESSION_TIMEOUT); |
| client.init(); |
| watcher.waitForZKConnection(2 * 1000); |
| } catch (InterruptedException e) { |
| client = null; |
| log.warn("Unable to connect to zookeeper quorum {}", registryQuorum, e); |
| } catch (IOException e) { |
| log.warn("Unable to connect to zookeeper quorum {}", registryQuorum, e); |
| } |
| return client; |
| } |
| |
| /** |
| * Keep this signature for backward compatibility with |
| * force=true by default. |
| */ |
| @Override |
| public int actionDestroy(String clustername) throws YarnException, |
| IOException { |
| ActionDestroyArgs destroyArgs = new ActionDestroyArgs(); |
| destroyArgs.force = true; |
| return actionDestroy(clustername, destroyArgs); |
| } |
| |
| @Override |
| public int actionDestroy(String clustername, |
| ActionDestroyArgs destroyArgs) throws YarnException, IOException { |
| // verify that a live cluster isn't there |
| validateClusterName(clustername); |
| //no=op, it is now mandatory. |
| verifyBindingsDefined(); |
| verifyNoLiveClusters(clustername, "Destroy"); |
| boolean forceDestroy = destroyArgs.force; |
| log.debug("actionDestroy({}, force={})", clustername, forceDestroy); |
| |
| // create the directory path |
| Path clusterDirectory = sliderFileSystem.buildClusterDirPath(clustername); |
| // delete the directory; |
| FileSystem fs = sliderFileSystem.getFileSystem(); |
| boolean exists = fs.exists(clusterDirectory); |
| if (exists) { |
| log.debug("Application Instance {} found at {}: destroying", clustername, clusterDirectory); |
| if (!forceDestroy) { |
| // fail the command if --force is not explicitly specified |
| throw new UsageException("Destroy will permanently delete directories and registries. " |
| + "Reissue this command with the --force option if you want to proceed."); |
| } |
| if (!fs.delete(clusterDirectory, true)) { |
| log.warn("Filesystem returned false from delete() operation"); |
| } |
| |
| if(!deleteZookeeperNode(clustername)) { |
| log.warn("Unable to perform node cleanup in Zookeeper."); |
| } |
| |
| if (fs.exists(clusterDirectory)) { |
| log.warn("Failed to delete {}", clusterDirectory); |
| } |
| |
| } else { |
| log.debug("Application Instance {} already destroyed", clustername); |
| } |
| |
| // rm the registry entry —do not let this block the destroy operations |
| String registryPath = SliderRegistryUtils.registryPathForInstance( |
| clustername); |
| try { |
| getRegistryOperations().delete(registryPath, true); |
| } catch (IOException e) { |
| log.warn("Error deleting registry entry {}: {} ", registryPath, e, e); |
| } catch (SliderException e) { |
| log.warn("Error binding to registry {} ", e, e); |
| } |
| |
| List<ApplicationReport> instances = findAllLiveInstances(clustername); |
| // detect any race leading to cluster creation during the check/destroy process |
| // and report a problem. |
| if (!instances.isEmpty()) { |
| throw new SliderException(EXIT_APPLICATION_IN_USE, |
| clustername + ": " |
| + E_DESTROY_CREATE_RACE_CONDITION |
| + " :" + |
| instances.get(0)); |
| } |
| log.info("Destroyed cluster {}", clustername); |
| return EXIT_SUCCESS; |
| } |
| |
| @Override |
| public int actionAmSuicide(String clustername, |
| ActionAMSuicideArgs args) throws YarnException, IOException { |
| SliderClusterOperations clusterOperations = |
| createClusterOperations(clustername); |
| clusterOperations.amSuicide(args.message, args.exitcode, args.waittime); |
| return EXIT_SUCCESS; |
| } |
| |
| @Override |
| public AbstractClientProvider createClientProvider(String provider) |
| throws SliderException { |
| SliderProviderFactory factory = |
| SliderProviderFactory.createSliderProviderFactory(provider); |
| return factory.createClientProvider(); |
| } |
| |
| /** |
| * Create the cluster -saving the arguments to a specification file first |
| * @param clustername cluster name |
| * @return the status code |
| * @throws YarnException Yarn problems |
| * @throws IOException other problems |
| * @throws BadCommandArgumentsException bad arguments. |
| */ |
| public int actionCreate(String clustername, ActionCreateArgs createArgs) throws |
| YarnException, |
| IOException { |
| |
| actionBuild(clustername, createArgs); |
| Path clusterDirectory = sliderFileSystem.buildClusterDirPath(clustername); |
| AggregateConf instanceDefinition = loadInstanceDefinitionUnresolved( |
| clustername, clusterDirectory); |
| try { |
| checkForCredentials(getConfig(), instanceDefinition.getAppConf()); |
| } catch (IOException e) { |
| sliderFileSystem.getFileSystem().delete(clusterDirectory, true); |
| throw e; |
| } |
| return startCluster(clustername, createArgs); |
| } |
| |
| @Override |
| public int actionUpgrade(String clustername, ActionUpgradeArgs upgradeArgs) |
| throws YarnException, IOException { |
| File template = upgradeArgs.template; |
| File resources = upgradeArgs.resources; |
| List<String> containers = upgradeArgs.containers; |
| List<String> components = upgradeArgs.components; |
| |
| // For upgrade spec, let's be little more strict with validation. If either |
| // --template or --resources is specified, then both needs to be specified. |
| // Otherwise the internal app config and resources states of the app will be |
| // unwantedly modified and the change will take effect to the running app |
| // immediately. |
| require(!(template != null && resources == null), |
| "Option %s must be specified with option %s", |
| Arguments.ARG_RESOURCES, Arguments.ARG_TEMPLATE); |
| |
| require(!(resources != null && template == null), |
| "Option %s must be specified with option %s", |
| Arguments.ARG_TEMPLATE, Arguments.ARG_RESOURCES); |
| |
| // For upgrade spec, both --template and --resources should be specified |
| // and neither of --containers or --components should be used |
| if (template != null && resources != null) { |
| require(CollectionUtils.isEmpty(containers), |
| "Option %s cannot be specified with %s or %s", |
| Arguments.ARG_CONTAINERS, Arguments.ARG_TEMPLATE, |
| Arguments.ARG_RESOURCES); |
| require(CollectionUtils.isEmpty(components), |
| "Option %s cannot be specified with %s or %s", |
| Arguments.ARG_COMPONENTS, Arguments.ARG_TEMPLATE, |
| Arguments.ARG_RESOURCES); |
| |
| // not an error to try to upgrade a stopped cluster, just return success |
| // code, appropriate log messages have already been dumped |
| if (!isAppInRunningState(clustername)) { |
| return EXIT_SUCCESS; |
| } |
| |
| // Now initiate the upgrade spec flow |
| buildInstanceDefinition(clustername, upgradeArgs, true, true, true); |
| SliderClusterOperations clusterOperations = createClusterOperations(clustername); |
| clusterOperations.amSuicide("AM restarted for application upgrade", 1, 1000); |
| return EXIT_SUCCESS; |
| } |
| |
| // Since neither --template or --resources were specified, it is upgrade |
| // containers flow. Here any one or both of --containers and --components |
| // can be specified. If a container is specified with --containers option |
| // and also belongs to a component type specified with --components, it will |
| // be upgraded only once. |
| return actionUpgradeContainers(clustername, upgradeArgs); |
| } |
| |
| private int actionUpgradeContainers(String clustername, |
| ActionUpgradeArgs upgradeArgs) throws YarnException, IOException { |
| verifyBindingsDefined(); |
| validateClusterName(clustername); |
| int waittime = upgradeArgs.getWaittime(); // ignored for now |
| String text = "Upgrade containers"; |
| log.debug("actionUpgradeContainers({}, reason={}, wait={})", clustername, |
| text, waittime); |
| |
| // not an error to try to upgrade a stopped cluster, just return success |
| // code, appropriate log messages have already been dumped |
| if (!isAppInRunningState(clustername)) { |
| return EXIT_SUCCESS; |
| } |
| |
| // Create sets of containers and components to get rid of duplicates and |
| // for quick lookup during checks below |
| Set<String> containers = new HashSet<>(); |
| if (upgradeArgs.containers != null) { |
| containers.addAll(new ArrayList<>(upgradeArgs.containers)); |
| } |
| Set<String> components = new HashSet<>(); |
| if (upgradeArgs.components != null) { |
| components.addAll(new ArrayList<>(upgradeArgs.components)); |
| } |
| |
| // check validity of component names and running containers here |
| List<ContainerInformation> liveContainers = getContainers(clustername); |
| Set<String> validContainers = new HashSet<>(); |
| Set<String> validComponents = new HashSet<>(); |
| for (ContainerInformation liveContainer : liveContainers) { |
| boolean allContainersAndComponentsAccountedFor = true; |
| if (CollectionUtils.isNotEmpty(containers)) { |
| if (containers.contains(liveContainer.containerId)) { |
| containers.remove(liveContainer.containerId); |
| validContainers.add(liveContainer.containerId); |
| } |
| allContainersAndComponentsAccountedFor = false; |
| } |
| if (CollectionUtils.isNotEmpty(components)) { |
| if (components.contains(liveContainer.component)) { |
| components.remove(liveContainer.component); |
| validComponents.add(liveContainer.component); |
| } |
| allContainersAndComponentsAccountedFor = false; |
| } |
| if (allContainersAndComponentsAccountedFor) { |
| break; |
| } |
| } |
| |
| // If any item remains in containers or components then they are invalid. |
| // Log warning for them and proceed. |
| if (CollectionUtils.isNotEmpty(containers)) { |
| log.warn("Invalid set of containers provided {}", containers); |
| } |
| if (CollectionUtils.isNotEmpty(components)) { |
| log.warn("Invalid set of components provided {}", components); |
| } |
| |
| // If not a single valid container or component is specified do not proceed |
| if (CollectionUtils.isEmpty(validContainers) |
| && CollectionUtils.isEmpty(validComponents)) { |
| log.error("Not a single valid container or component specified. Nothing to do."); |
| return EXIT_NOT_FOUND; |
| } |
| |
| SliderClusterProtocol appMaster = connect(findInstance(clustername)); |
| Messages.UpgradeContainersRequestProto r = |
| Messages.UpgradeContainersRequestProto |
| .newBuilder() |
| .setMessage(text) |
| .addAllContainer(validContainers) |
| .addAllComponent(validComponents) |
| .build(); |
| appMaster.upgradeContainers(r); |
| log.info("Cluster upgrade issued for -"); |
| if (CollectionUtils.isNotEmpty(validContainers)) { |
| log.info(" Containers (total {}): {}", validContainers.size(), |
| validContainers); |
| } |
| if (CollectionUtils.isNotEmpty(validComponents)) { |
| log.info(" Components (total {}): {}", validComponents.size(), |
| validComponents); |
| } |
| |
| return EXIT_SUCCESS; |
| } |
| |
| // returns true if and only if app is in RUNNING state |
| private boolean isAppInRunningState(String clustername) throws YarnException, |
| IOException { |
| // is this actually a known cluster? |
| sliderFileSystem.locateInstanceDefinition(clustername); |
| ApplicationReport app = findInstance(clustername); |
| if (app == null) { |
| // exit early |
| log.info("Cluster {} not running", clustername); |
| return false; |
| } |
| log.debug("App to upgrade was found: {}:\n{}", clustername, |
| new OnDemandReportStringifier(app)); |
| if (app.getYarnApplicationState().ordinal() >= YarnApplicationState.FINISHED.ordinal()) { |
| log.info("Cluster {} is in a terminated state {}. Use command '{}' instead.", |
| clustername, app.getYarnApplicationState(), ACTION_UPDATE); |
| return false; |
| } |
| |
| // IPC request to upgrade containers is possible if the app is running. |
| if (app.getYarnApplicationState().ordinal() < YarnApplicationState.RUNNING |
| .ordinal()) { |
| log.info("Cluster {} is in a pre-running state {}. To upgrade it needs " |
| + "to be RUNNING.", clustername, app.getYarnApplicationState()); |
| return false; |
| } |
| |
| return true; |
| } |
| |
| protected static void checkForCredentials(Configuration conf, |
| ConfTree tree) throws IOException { |
| if (tree.credentials == null || tree.credentials.isEmpty()) { |
| log.info("No credentials requested"); |
| return; |
| } |
| |
| BufferedReader br = null; |
| try { |
| for (Entry<String, List<String>> cred : tree.credentials.entrySet()) { |
| String provider = cred.getKey(); |
| List<String> aliases = cred.getValue(); |
| if (aliases == null || aliases.isEmpty()) { |
| continue; |
| } |
| Configuration c = new Configuration(conf); |
| c.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH, provider); |
| CredentialProvider credentialProvider = CredentialProviderFactory.getProviders(c).get(0); |
| Set<String> existingAliases = new HashSet<>(credentialProvider.getAliases()); |
| for (String alias : aliases) { |
| if (existingAliases.contains(alias.toLowerCase(Locale.ENGLISH))) { |
| log.info("Credentials for " + alias + " found in " + provider); |
| } else { |
| if (br == null) { |
| br = new BufferedReader(new InputStreamReader(System.in)); |
| } |
| char[] pass = readPassword(alias, br); |
| credentialProvider.createCredentialEntry(alias, pass); |
| credentialProvider.flush(); |
| Arrays.fill(pass, ' '); |
| } |
| } |
| } |
| } finally { |
| org.apache.hadoop.io.IOUtils.closeStream(br); |
| } |
| } |
| |
| private static char[] readOnePassword(String alias) throws IOException { |
| try(BufferedReader br = new BufferedReader(new InputStreamReader(System.in))) { |
| return readPassword(alias, br); |
| } |
| } |
| |
| // using a normal reader instead of a secure one, |
| // because stdin is not hooked up to the command line |
| private static char[] readPassword(String alias, BufferedReader br) |
| throws IOException { |
| char[] cred = null; |
| |
| boolean noMatch; |
| do { |
| log.info(String.format("%s %s: ", PASSWORD_PROMPT, alias)); |
| char[] newPassword1 = br.readLine().toCharArray(); |
| log.info(String.format("%s %s again: ", PASSWORD_PROMPT, alias)); |
| char[] newPassword2 = br.readLine().toCharArray(); |
| noMatch = !Arrays.equals(newPassword1, newPassword2); |
| if (noMatch) { |
| if (newPassword1 != null) Arrays.fill(newPassword1, ' '); |
| log.info(String.format("Passwords don't match. Try again.")); |
| } else { |
| cred = newPassword1; |
| } |
| if (newPassword2 != null) Arrays.fill(newPassword2, ' '); |
| } while (noMatch); |
| if (cred == null) |
| throw new IOException("Could not read credentials for " + alias + |
| " from stdin"); |
| return cred; |
| } |
| |
| @Override |
| public int actionBuild(String clustername, |
| AbstractClusterBuildingActionArgs buildInfo) throws |
| YarnException, |
| IOException { |
| |
| buildInstanceDefinition(clustername, buildInfo, false, false); |
| return EXIT_SUCCESS; |
| } |
| |
| @Override |
| public int actionKeytab(ActionKeytabArgs keytabInfo) |
| throws YarnException, IOException { |
| if (keytabInfo.install) { |
| return actionInstallKeytab(keytabInfo); |
| } else if (keytabInfo.delete) { |
| return actionDeleteKeytab(keytabInfo); |
| } else if (keytabInfo.list) { |
| return actionListKeytab(keytabInfo); |
| } else { |
| throw new BadCommandArgumentsException( |
| "Keytab option specified not found.\n" |
| + CommonArgs.usage(serviceArgs, ACTION_KEYTAB)); |
| } |
| } |
| |
| private int actionListKeytab(ActionKeytabArgs keytabInfo) throws IOException { |
| String folder = keytabInfo.folder != null ? keytabInfo.folder : StringUtils.EMPTY; |
| Path keytabPath = sliderFileSystem.buildKeytabInstallationDirPath(folder); |
| RemoteIterator<LocatedFileStatus> files = |
| sliderFileSystem.getFileSystem().listFiles(keytabPath, true); |
| log.info("Keytabs:"); |
| while (files.hasNext()) { |
| log.info("\t" + files.next().getPath().toString()); |
| } |
| |
| return EXIT_SUCCESS; |
| } |
| |
| private int actionDeleteKeytab(ActionKeytabArgs keytabInfo) |
| throws BadCommandArgumentsException, IOException { |
| if (StringUtils.isEmpty(keytabInfo.folder)) { |
| throw new BadCommandArgumentsException( |
| "A valid destination keytab sub-folder name is required (e.g. 'security').\n" |
| + CommonArgs.usage(serviceArgs, ACTION_KEYTAB)); |
| } |
| |
| if (StringUtils.isEmpty(keytabInfo.keytab)) { |
| throw new BadCommandArgumentsException("A keytab name is required."); |
| } |
| |
| Path pkgPath = sliderFileSystem.buildKeytabInstallationDirPath(keytabInfo.folder); |
| |
| Path fileInFs = new Path(pkgPath, keytabInfo.keytab ); |
| log.info("Deleting keytab {}", fileInFs); |
| FileSystem sfs = sliderFileSystem.getFileSystem(); |
| require(sfs.exists(fileInFs), "No keytab to delete found at %s", fileInFs.toUri()); |
| sfs.delete(fileInFs, false); |
| |
| return EXIT_SUCCESS; |
| } |
| |
| private int actionInstallKeytab(ActionKeytabArgs keytabInfo) |
| throws BadCommandArgumentsException, IOException { |
| Path srcFile = null; |
| require(isSet(keytabInfo.folder), |
| "A valid destination keytab sub-folder name is required (e.g. 'security').\n" |
| + CommonArgs.usage(serviceArgs, ACTION_KEYTAB)); |
| |
| requireArgumentSet(Arguments.ARG_KEYTAB, keytabInfo.keytab); |
| File keytabFile = new File(keytabInfo.keytab); |
| require(keytabFile.isFile(), |
| "Unable to access supplied keytab file at %s", keytabFile.getAbsolutePath()); |
| srcFile = new Path(keytabFile.toURI()); |
| |
| Path pkgPath = sliderFileSystem.buildKeytabInstallationDirPath(keytabInfo.folder); |
| FileSystem sfs = sliderFileSystem.getFileSystem(); |
| sfs.mkdirs(pkgPath); |
| sfs.setPermission(pkgPath, new FsPermission( |
| FsAction.ALL, FsAction.NONE, FsAction.NONE)); |
| |
| Path fileInFs = new Path(pkgPath, srcFile.getName()); |
| log.info("Installing keytab {} at {} and overwrite is {}.", |
| srcFile, fileInFs, keytabInfo.overwrite); |
| require(!(sfs.exists(fileInFs) && !keytabInfo.overwrite), |
| "Keytab exists at %s. Use --overwrite to overwrite.", fileInFs.toUri()); |
| |
| sfs.copyFromLocalFile(false, keytabInfo.overwrite, srcFile, fileInFs); |
| sfs.setPermission(fileInFs, |
| new FsPermission(FsAction.READ_WRITE, FsAction.NONE, FsAction.NONE)); |
| |
| return EXIT_SUCCESS; |
| } |
| |
| @Override |
| public int actionInstallKeytab(ActionInstallKeytabArgs installKeytabInfo) |
| throws YarnException, IOException { |
| log.warn("The 'install-keytab' option has been deprecated. Please use 'keytab --install'."); |
| return actionKeytab(new ActionKeytabArgs(installKeytabInfo)); |
| } |
| |
| @Override |
| public int actionInstallPkg(ActionInstallPackageArgs installPkgInfo) throws |
| YarnException, |
| IOException { |
| log.warn("The " + ACTION_INSTALL_PACKAGE |
| + " option has been deprecated. Please use '" |
| + ACTION_PACKAGE + " " + ClientArgs.ARG_INSTALL + "'."); |
| if (StringUtils.isEmpty(installPkgInfo.name)) { |
| throw new BadCommandArgumentsException( |
| E_INVALID_APPLICATION_TYPE_NAME + "\n" |
| + CommonArgs.usage(serviceArgs, ACTION_INSTALL_PACKAGE)); |
| } |
| Path srcFile = extractPackagePath(installPkgInfo.packageURI); |
| |
| // Do not provide new options to install-package command as it is in |
| // deprecated mode. So version is kept null here. Use package --install. |
| Path pkgPath = sliderFileSystem.buildPackageDirPath(installPkgInfo.name, |
| null); |
| FileSystem sfs = sliderFileSystem.getFileSystem(); |
| sfs.mkdirs(pkgPath); |
| |
| Path fileInFs = new Path(pkgPath, srcFile.getName()); |
| log.info("Installing package {} at {} and overwrite is {}.", |
| srcFile, fileInFs, installPkgInfo.replacePkg); |
| require(!(sfs.exists(fileInFs) && !installPkgInfo.replacePkg), |
| "Package exists at %s. : %s", fileInFs.toUri(), E_USE_REPLACEPKG_TO_OVERWRITE); |
| sfs.copyFromLocalFile(false, installPkgInfo.replacePkg, srcFile, fileInFs); |
| return EXIT_SUCCESS; |
| } |
| |
| @Override |
| public int actionClient(ActionClientArgs clientInfo) throws |
| YarnException, |
| IOException { |
| if (clientInfo.install) { |
| return doClientInstall(clientInfo); |
| } else if (clientInfo.getCertStore) { |
| return doCertificateStoreRetrieval(clientInfo); |
| } else { |
| throw new BadCommandArgumentsException( |
| "Only install, keystore, and truststore commands are supported for the client.\n" |
| + CommonArgs.usage(serviceArgs, ACTION_CLIENT)); |
| |
| } |
| } |
| |
| private int doCertificateStoreRetrieval(ActionClientArgs clientInfo) |
| throws YarnException, IOException { |
| if (clientInfo.keystore != null && clientInfo.truststore != null) { |
| throw new BadCommandArgumentsException( |
| "Only one of either keystore or truststore can be retrieved at one time. " |
| + "Retrieval of both should be done separately\n" |
| + CommonArgs.usage(serviceArgs, ACTION_CLIENT)); |
| } |
| |
| requireArgumentSet(Arguments.ARG_NAME, clientInfo.name); |
| |
| File storeFile = null; |
| SecurityStore.StoreType type; |
| if (clientInfo.keystore != null) { |
| storeFile = clientInfo.keystore; |
| type = SecurityStore.StoreType.keystore; |
| } else { |
| storeFile = clientInfo.truststore; |
| type = SecurityStore.StoreType.truststore; |
| } |
| |
| require (!storeFile.exists(), |
| "File %s already exists. Please remove that file or select a different file name.", |
| storeFile.getAbsolutePath()); |
| String hostname = null; |
| if (type == SecurityStore.StoreType.keystore) { |
| hostname = clientInfo.hostname; |
| if (hostname == null) { |
| hostname = InetAddress.getLocalHost().getCanonicalHostName(); |
| log.info("No hostname specified via command line. Using {}", hostname); |
| } |
| } |
| |
| String password = clientInfo.password; |
| if (password == null) { |
| String provider = clientInfo.provider; |
| String alias = clientInfo.alias; |
| if (provider != null && alias != null) { |
| Configuration conf = new Configuration(getConfig()); |
| conf.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH, provider); |
| char[] chars = conf.getPassword(alias); |
| if (chars == null) { |
| CredentialProvider credentialProvider = |
| CredentialProviderFactory.getProviders(conf).get(0); |
| chars = readOnePassword(alias); |
| credentialProvider.createCredentialEntry(alias, chars); |
| credentialProvider.flush(); |
| } |
| password = String.valueOf(chars); |
| Arrays.fill(chars, ' '); |
| } else { |
| log.info("No password and no provider/alias pair were provided, " + |
| "prompting for password"); |
| // get a password |
| password = String.valueOf(readOnePassword(type.name())); |
| } |
| } |
| |
| byte[] keystore = createClusterOperations(clientInfo.name) |
| .getClientCertificateStore(hostname, "client", password, type.name()); |
| // persist to file |
| IOUtils.write(keystore, new FileOutputStream(storeFile)); |
| |
| return EXIT_SUCCESS; |
| } |
| |
| private int doClientInstall(ActionClientArgs clientInfo) |
| throws IOException, SliderException { |
| |
| require(clientInfo.installLocation != null, |
| E_INVALID_INSTALL_LOCATION +"\n" |
| + CommonArgs.usage(serviceArgs, ACTION_CLIENT)); |
| require(clientInfo.installLocation.exists(), |
| E_INSTALL_PATH_DOES_NOT_EXIST + ": " + clientInfo.installLocation.getAbsolutePath()); |
| |
| require(clientInfo.installLocation.isDirectory(), |
| E_INVALID_INSTALL_PATH + ": " + clientInfo.installLocation.getAbsolutePath()); |
| |
| File pkgFile; |
| require(isSet(clientInfo.packageURI), E_INVALID_APPLICATION_PACKAGE_LOCATION); |
| pkgFile = new File(clientInfo.packageURI); |
| require(pkgFile.isFile(), |
| E_UNABLE_TO_READ_SUPPLIED_PACKAGE_FILE + " at %s", pkgFile.getAbsolutePath()); |
| |
| JSONObject config = null; |
| if(clientInfo.clientConfig != null) { |
| try { |
| byte[] encoded = Files.toByteArray(clientInfo.clientConfig); |
| config = new JSONObject(new String(encoded, Charset.defaultCharset())); |
| } catch (JSONException jsonEx) { |
| log.error("Unable to read supplied configuration at {}: {}", |
| clientInfo.clientConfig, jsonEx); |
| log.debug("Unable to read supplied configuration at {}: {}", |
| clientInfo.clientConfig, jsonEx, jsonEx); |
| throw new BadConfigException(E_MUST_BE_A_VALID_JSON_FILE, jsonEx); |
| } |
| } |
| |
| // Only INSTALL is supported |
| AbstractClientProvider |
| provider = createClientProvider(SliderProviderFactory.DEFAULT_CLUSTER_TYPE); |
| provider.processClientOperation(sliderFileSystem, |
| "INSTALL", |
| clientInfo.installLocation, |
| pkgFile, |
| config, |
| clientInfo.name); |
| return EXIT_SUCCESS; |
| } |
| |
| |
| @Override |
| public int actionPackage(ActionPackageArgs actionPackageInfo) |
| throws YarnException, IOException { |
| initializeOutputStream(actionPackageInfo.out); |
| int exitCode = -1; |
| if (actionPackageInfo.help) { |
| exitCode = actionHelp(ACTION_PACKAGE); |
| } |
| if (actionPackageInfo.install) { |
| exitCode = actionPackageInstall(actionPackageInfo); |
| } |
| if (actionPackageInfo.delete) { |
| exitCode = actionPackageDelete(actionPackageInfo); |
| } |
| if (actionPackageInfo.list) { |
| exitCode = actionPackageList(); |
| } |
| if (actionPackageInfo.instances) { |
| exitCode = actionPackageInstances(); |
| } |
| finalizeOutputStream(actionPackageInfo.out); |
| if (exitCode != -1) { |
| return exitCode; |
| } |
| throw new BadCommandArgumentsException( |
| "Select valid package operation option"); |
| } |
| |
| private void initializeOutputStream(String outFile) |
| throws FileNotFoundException { |
| if (outFile != null) { |
| clientOutputStream = new PrintStream(new FileOutputStream(outFile)); |
| } else { |
| clientOutputStream = System.out; |
| } |
| } |
| |
| private void finalizeOutputStream(String outFile) { |
| if (outFile != null && clientOutputStream != null) { |
| clientOutputStream.flush(); |
| clientOutputStream.close(); |
| } |
| clientOutputStream = System.out; |
| } |
| |
| private int actionPackageInstances() throws YarnException, IOException { |
| Map<String, Path> persistentInstances = sliderFileSystem |
| .listPersistentInstances(); |
| if (persistentInstances.isEmpty()) { |
| log.info("No slider cluster specification available"); |
| return EXIT_SUCCESS; |
| } |
| String pkgPathValue = sliderFileSystem |
| .buildPackageDirPath(StringUtils.EMPTY, StringUtils.EMPTY).toUri() |
| .getPath(); |
| FileSystem fs = sliderFileSystem.getFileSystem(); |
| Iterator<Map.Entry<String, Path>> instanceItr = persistentInstances |
| .entrySet().iterator(); |
| log.info("List of applications with its package name and path"); |
| println("%-25s %15s %30s %s", "Cluster Name", "Package Name", |
| "Package Version", "Application Location"); |
| while(instanceItr.hasNext()) { |
| Map.Entry<String, Path> entry = instanceItr.next(); |
| String clusterName = entry.getKey(); |
| Path clusterPath = entry.getValue(); |
| AggregateConf instanceDefinition = loadInstanceDefinitionUnresolved( |
| clusterName, clusterPath); |
| Path appDefPath = null; |
| try { |
| appDefPath = new Path( |
| getApplicationDefinitionPath(instanceDefinition |
| .getAppConfOperations())); |
| } catch (BadConfigException e) { |
| // Invalid cluster state, so move on to next. No need to log anything |
| // as this is just listing of instances. |
| continue; |
| } |
| if (!appDefPath.isUriPathAbsolute()) { |
| appDefPath = new Path(fs.getHomeDirectory(), appDefPath); |
| } |
| String appDefPathStr = appDefPath.toUri().toString(); |
| try { |
| if (appDefPathStr.contains(pkgPathValue) && fs.isFile(appDefPath)) { |
| String packageName = appDefPath.getParent().getName(); |
| String packageVersion = StringUtils.EMPTY; |
| if (instanceDefinition.isVersioned()) { |
| packageVersion = packageName; |
| packageName = appDefPath.getParent().getParent().getName(); |
| } |
| println("%-25s %15s %30s %s", clusterName, packageName, |
| packageVersion, appDefPathStr); |
| } |
| } catch (IOException e) { |
| log.debug("{} application definition path {} is not found.", clusterName, appDefPathStr); |
| } |
| } |
| return EXIT_SUCCESS; |
| } |
| |
| private int actionPackageList() throws IOException { |
| Path pkgPath = sliderFileSystem.buildPackageDirPath(StringUtils.EMPTY, |
| StringUtils.EMPTY); |
| log.info("Package install path : {}", pkgPath); |
| FileSystem sfs = sliderFileSystem.getFileSystem(); |
| if (!sfs.isDirectory(pkgPath)) { |
| log.info("No package(s) installed"); |
| return EXIT_SUCCESS; |
| } |
| FileStatus[] fileStatus = sfs.listStatus(pkgPath); |
| boolean hasPackage = false; |
| StringBuilder sb = new StringBuilder(); |
| sb.append("List of installed packages:\n"); |
| for (FileStatus fstat : fileStatus) { |
| if (fstat.isDirectory()) { |
| sb.append("\t").append(fstat.getPath().getName()); |
| sb.append("\n"); |
| hasPackage = true; |
| } |
| } |
| if (hasPackage) { |
| println(sb.toString()); |
| } else { |
| log.info("No package(s) installed"); |
| } |
| return EXIT_SUCCESS; |
| } |
| |
| private void createSummaryMetainfoFile(Path srcFile, Path destFile, |
| boolean overwrite) throws IOException { |
| FileSystem srcFs = srcFile.getFileSystem(getConfig()); |
| try (InputStream inputStreamJson = SliderUtils |
| .getApplicationResourceInputStream(srcFs, srcFile, "metainfo.json"); |
| InputStream inputStreamXml = SliderUtils |
| .getApplicationResourceInputStream(srcFs, srcFile, "metainfo.xml");) { |
| InputStream inputStream = null; |
| Path summaryFileInFs = null; |
| if (inputStreamJson != null) { |
| inputStream = inputStreamJson; |
| summaryFileInFs = new Path(destFile.getParent(), destFile.getName() |
| + ".metainfo.json"); |
| log.info("Found JSON metainfo file in package"); |
| } else if (inputStreamXml != null) { |
| inputStream = inputStreamXml; |
| summaryFileInFs = new Path(destFile.getParent(), destFile.getName() |
| + ".metainfo.xml"); |
| log.info("Found XML metainfo file in package"); |
| } |
| if (inputStream != null) { |
| try (FSDataOutputStream dataOutputStream = sliderFileSystem |
| .getFileSystem().create(summaryFileInFs, overwrite)) { |
| log.info("Creating summary metainfo file"); |
| IOUtils.copy(inputStream, dataOutputStream); |
| } |
| } |
| } |
| } |
| |
| private int actionPackageInstall(ActionPackageArgs actionPackageArgs) |
| throws YarnException, IOException { |
| requireArgumentSet(Arguments.ARG_NAME, actionPackageArgs.name); |
| |
| Path srcFile = extractPackagePath(actionPackageArgs.packageURI); |
| |
| Path pkgPath = sliderFileSystem.buildPackageDirPath(actionPackageArgs.name, |
| actionPackageArgs.version); |
| FileSystem fs = sliderFileSystem.getFileSystem(); |
| if (!fs.exists(pkgPath)) { |
| fs.mkdirs(pkgPath); |
| } |
| |
| Path fileInFs = new Path(pkgPath, srcFile.getName()); |
| require(actionPackageArgs.replacePkg || !fs.exists(fileInFs), |
| E_PACKAGE_EXISTS +" at %s. Use --replacepkg to overwrite.", fileInFs.toUri()); |
| |
| log.info("Installing package {} to {} (overwrite set to {})", srcFile, |
| fileInFs, actionPackageArgs.replacePkg); |
| fs.copyFromLocalFile(false, actionPackageArgs.replacePkg, srcFile, fileInFs); |
| createSummaryMetainfoFile(srcFile, fileInFs, actionPackageArgs.replacePkg); |
| |
| String destPathWithHomeDir = Path |
| .getPathWithoutSchemeAndAuthority(fileInFs).toString(); |
| String destHomeDir = Path.getPathWithoutSchemeAndAuthority( |
| fs.getHomeDirectory()).toString(); |
| // a somewhat contrived approach to stripping out the home directory and any trailing |
| // separator; designed to work on windows and unix |
| String destPathWithoutHomeDir; |
| if (destPathWithHomeDir.startsWith(destHomeDir)) { |
| destPathWithoutHomeDir = destPathWithHomeDir.substring(destHomeDir.length()); |
| if (destPathWithoutHomeDir.startsWith("/") || destPathWithoutHomeDir.startsWith("\\")) { |
| destPathWithoutHomeDir = destPathWithoutHomeDir.substring(1); |
| } |
| } else { |
| destPathWithoutHomeDir = destPathWithHomeDir; |
| } |
| log.info("Set " + AgentKeys.APP_DEF + " in your app config JSON to {}", |
| destPathWithoutHomeDir); |
| |
| return EXIT_SUCCESS; |
| } |
| |
| private Path extractPackagePath(String packageURI) |
| throws BadCommandArgumentsException { |
| require(isSet(packageURI), E_INVALID_APPLICATION_PACKAGE_LOCATION); |
| File pkgFile = new File(packageURI); |
| require(pkgFile.isFile(), |
| E_UNABLE_TO_READ_SUPPLIED_PACKAGE_FILE + ": " + pkgFile.getAbsolutePath()); |
| return new Path(pkgFile.toURI()); |
| } |
| |
| private int actionPackageDelete(ActionPackageArgs actionPackageArgs) throws |
| YarnException, IOException { |
| requireArgumentSet(Arguments.ARG_NAME, actionPackageArgs.name); |
| |
| Path pkgPath = sliderFileSystem.buildPackageDirPath(actionPackageArgs.name, |
| actionPackageArgs.version); |
| FileSystem fs = sliderFileSystem.getFileSystem(); |
| require(fs.exists(pkgPath), E_PACKAGE_DOES_NOT_EXIST +": %s ", pkgPath.toUri()); |
| log.info("Deleting package {} at {}.", actionPackageArgs.name, pkgPath); |
| |
| if(fs.delete(pkgPath, true)) { |
| log.info("Deleted package {} " + actionPackageArgs.name); |
| return EXIT_SUCCESS; |
| } else { |
| log.warn("Package deletion failed."); |
| return EXIT_NOT_FOUND; |
| } |
| } |
| |
| @Override |
| public int actionUpdate(String clustername, |
| AbstractClusterBuildingActionArgs buildInfo) throws |
| YarnException, IOException { |
| buildInstanceDefinition(clustername, buildInfo, true, true); |
| return EXIT_SUCCESS; |
| } |
| |
| /** |
| * Build up the AggregateConfiguration for an application instance then |
| * persists it |
| * @param clustername name of the cluster |
| * @param buildInfo the arguments needed to build the cluster |
| * @param overwrite true if existing cluster directory can be overwritten |
| * @param liveClusterAllowed true if live cluster can be modified |
| * @throws YarnException |
| * @throws IOException |
| */ |
| |
| public void buildInstanceDefinition(String clustername, |
| AbstractClusterBuildingActionArgs buildInfo, boolean overwrite, |
| boolean liveClusterAllowed) throws YarnException, IOException { |
| buildInstanceDefinition(clustername, buildInfo, overwrite, |
| liveClusterAllowed, false); |
| } |
| |
| public void buildInstanceDefinition(String clustername, |
| AbstractClusterBuildingActionArgs buildInfo, boolean overwrite, |
| boolean liveClusterAllowed, boolean isUpgradeFlow) throws YarnException, |
| IOException { |
| // verify that a live cluster isn't there |
| validateClusterName(clustername); |
| verifyBindingsDefined(); |
| if (!liveClusterAllowed) { |
| verifyNoLiveClusters(clustername, "Create"); |
| } |
| |
| Configuration conf = getConfig(); |
| String registryQuorum = lookupZKQuorum(); |
| |
| Path appconfdir = buildInfo.getConfdir(); |
| // Provider |
| String providerName = buildInfo.getProvider(); |
| requireArgumentSet(Arguments.ARG_PROVIDER, providerName); |
| log.debug("Provider is {}", providerName); |
| SliderAMClientProvider sliderAM = new SliderAMClientProvider(conf); |
| AbstractClientProvider provider = |
| createClientProvider(providerName); |
| InstanceBuilder builder = |
| new InstanceBuilder(sliderFileSystem, |
| getConfig(), |
| clustername); |
| |
| AggregateConf instanceDefinition = new AggregateConf(); |
| ConfTreeOperations appConf = instanceDefinition.getAppConfOperations(); |
| ConfTreeOperations resources = instanceDefinition.getResourceOperations(); |
| ConfTreeOperations internal = instanceDefinition.getInternalOperations(); |
| //initial definition is set by the providers |
| sliderAM.prepareInstanceConfiguration(instanceDefinition); |
| provider.prepareInstanceConfiguration(instanceDefinition); |
| |
| //load in any specified on the command line |
| if (buildInfo.resources != null) { |
| try { |
| resources.mergeFile(buildInfo.resources, |
| new ResourcesInputPropertiesValidator()); |
| |
| } catch (IOException e) { |
| throw new BadConfigException(e, |
| "incorrect argument to %s: \"%s\" : %s ", |
| Arguments.ARG_RESOURCES, |
| buildInfo.resources, |
| e.toString()); |
| } |
| } |
| if (buildInfo.template != null) { |
| try { |
| appConf.mergeFile(buildInfo.template, |
| new TemplateInputPropertiesValidator()); |
| } catch (IOException e) { |
| throw new BadConfigException(e, |
| "incorrect argument to %s: \"%s\" : %s ", |
| Arguments.ARG_TEMPLATE, |
| buildInfo.template, |
| e.toString()); |
| } |
| } |
| |
| if (isUpgradeFlow) { |
| ActionUpgradeArgs upgradeInfo = (ActionUpgradeArgs) buildInfo; |
| if (!upgradeInfo.force) { |
| validateClientAndClusterResource(clustername, resources); |
| } |
| } |
| |
| //get the command line options |
| ConfTree cmdLineAppOptions = buildInfo.buildAppOptionsConfTree(); |
| ConfTree cmdLineResourceOptions = buildInfo.buildResourceOptionsConfTree(); |
| |
| appConf.merge(cmdLineAppOptions); |
| |
| AppDefinitionPersister appDefinitionPersister = new AppDefinitionPersister(sliderFileSystem); |
| appDefinitionPersister.processSuppliedDefinitions(clustername, buildInfo, appConf); |
| |
| // put the role counts into the resources file |
| Map<String, String> argsRoleMap = buildInfo.getComponentMap(); |
| for (Map.Entry<String, String> roleEntry : argsRoleMap.entrySet()) { |
| String count = roleEntry.getValue(); |
| String key = roleEntry.getKey(); |
| log.info("{} => {}", key, count); |
| resources.getOrAddComponent(key).put(COMPONENT_INSTANCES, count); |
| } |
| |
| //all CLI role options |
| Map<String, Map<String, String>> appOptionMap = |
| buildInfo.getCompOptionMap(); |
| appConf.mergeComponents(appOptionMap); |
| |
| //internal picks up core. values only |
| internal.propagateGlobalKeys(appConf, "slider."); |
| internal.propagateGlobalKeys(appConf, "internal."); |
| |
| //copy over role. and yarn. values ONLY to the resources |
| if (PROPAGATE_RESOURCE_OPTION) { |
| resources.propagateGlobalKeys(appConf, "component."); |
| resources.propagateGlobalKeys(appConf, "role."); |
| resources.propagateGlobalKeys(appConf, "yarn."); |
| resources.mergeComponentsPrefix(appOptionMap, "component.", true); |
| resources.mergeComponentsPrefix(appOptionMap, "yarn.", true); |
| resources.mergeComponentsPrefix(appOptionMap, "role.", true); |
| } |
| |
| // resource component args |
| appConf.merge(cmdLineResourceOptions); |
| resources.merge(cmdLineResourceOptions); |
| resources.mergeComponents(buildInfo.getResourceCompOptionMap()); |
| |
| builder.init(providerName, instanceDefinition); |
| builder.resolve(); |
| builder.propagateFilename(); |
| builder.propagatePrincipals(); |
| builder.setImageDetailsIfAvailable(buildInfo.getImage(), |
| buildInfo.getAppHomeDir()); |
| builder.setQueue(buildInfo.queue); |
| |
| String quorum = buildInfo.getZKhosts(); |
| if (isUnset(quorum)) { |
| quorum = registryQuorum; |
| } |
| if (isUnset(quorum)) { |
| throw new BadConfigException(E_NO_ZOOKEEPER_QUORUM); |
| } |
| ZKPathBuilder zkPaths = new ZKPathBuilder(getAppName(), |
| getUsername(), |
| clustername, |
| registryQuorum, |
| quorum); |
| String zookeeperRoot = buildInfo.getAppZKPath(); |
| |
| if (isSet(zookeeperRoot)) { |
| zkPaths.setAppPath(zookeeperRoot); |
| } else { |
| String createDefaultZkNode = appConf.getGlobalOptions() |
| .getOption(AgentKeys.CREATE_DEF_ZK_NODE, "false"); |
| if (createDefaultZkNode.equals("true")) { |
| String defaultZKPath = createZookeeperNode(clustername, false); |
| log.debug("ZK node created for application instance: {}", defaultZKPath); |
| if (defaultZKPath != null) { |
| zkPaths.setAppPath(defaultZKPath); |
| } |
| } else { |
| // create AppPath if default is being used |
| String defaultZKPath = createZookeeperNode(clustername, true); |
| log.debug("ZK node assigned to application instance: {}", defaultZKPath); |
| zkPaths.setAppPath(defaultZKPath); |
| } |
| } |
| |
| builder.addZKBinding(zkPaths); |
| |
| //then propagate any package URI |
| if (buildInfo.packageURI != null) { |
| appConf.set(AgentKeys.PACKAGE_PATH, buildInfo.packageURI); |
| } |
| |
| propagatePythonExecutable(conf, instanceDefinition); |
| |
| // make any substitutions needed at this stage |
| replaceTokens(appConf.getConfTree(), getUsername(), clustername); |
| |
| // TODO: Refactor the validation code and persistence code |
| try { |
| persistInstanceDefinition(overwrite, appconfdir, builder); |
| appDefinitionPersister.persistPackages(); |
| |
| } catch (LockAcquireFailedException e) { |
| log.warn("Failed to get a Lock on {} : {}", builder, e, e); |
| throw new BadClusterStateException("Failed to save " + clustername |
| + ": " + e); |
| } |
| |
| // providers to validate what there is |
| // TODO: Validation should be done before persistence |
| AggregateConf instanceDescription = builder.getInstanceDescription(); |
| validateInstanceDefinition(sliderAM, instanceDescription, sliderFileSystem); |
| validateInstanceDefinition(provider, instanceDescription, sliderFileSystem); |
| } |
| |
| private void validateClientAndClusterResource(String clustername, |
| ConfTreeOperations clientResources) throws BadClusterStateException, |
| SliderException, IOException { |
| log.info("Validating upgrade resource definition with current cluster " |
| + "state (components and instance count)"); |
| Map<String, Integer> clientComponentInstances = new HashMap<>(); |
| for (String componentName : clientResources.getComponentNames()) { |
| if (!SliderKeys.COMPONENT_AM.equals(componentName)) { |
| clientComponentInstances.put(componentName, clientResources |
| .getComponentOptInt(componentName, |
| COMPONENT_INSTANCES, -1)); |
| } |
| } |
| |
| AggregateConf clusterConf = null; |
| try { |
| clusterConf = loadPersistedClusterDescription(clustername); |
| } catch (LockAcquireFailedException e) { |
| log.warn("Failed to get a Lock on cluster resource : {}", e, e); |
| throw new BadClusterStateException( |
| "Failed to load client resource definition " + clustername + ": " + e, e); |
| } |
| Map<String, Integer> clusterComponentInstances = new HashMap<>(); |
| for (Map.Entry<String, Map<String, String>> component : clusterConf |
| .getResources().components.entrySet()) { |
| if (!SliderKeys.COMPONENT_AM.equals(component.getKey())) { |
| clusterComponentInstances.put( |
| component.getKey(), |
| Integer.decode(component.getValue().get( |
| COMPONENT_INSTANCES))); |
| } |
| } |
| |
| // client and cluster should be an exact match |
| Iterator<Map.Entry<String, Integer>> clientComponentInstanceIt = clientComponentInstances |
| .entrySet().iterator(); |
| while (clientComponentInstanceIt.hasNext()) { |
| Map.Entry<String, Integer> clientComponentInstanceEntry = clientComponentInstanceIt.next(); |
| if (clusterComponentInstances.containsKey(clientComponentInstanceEntry.getKey())) { |
| // compare instance count now and remove from both maps if they match |
| if (clusterComponentInstances |
| .get(clientComponentInstanceEntry.getKey()) == clientComponentInstanceEntry |
| .getValue()) { |
| clusterComponentInstances.remove(clientComponentInstanceEntry.getKey()); |
| clientComponentInstanceIt.remove(); |
| } |
| } |
| } |
| |
| if (!clientComponentInstances.isEmpty() |
| || !clusterComponentInstances.isEmpty()) { |
| log.error("Mismatch found in upgrade resource definition and cluster " |
| + "resource state"); |
| if (!clientComponentInstances.isEmpty()) { |
| log.info("The upgrade resource definitions that do not match are:"); |
| for (Map.Entry<String, Integer> clientComponentInstanceEntry : clientComponentInstances |
| .entrySet()) { |
| log.info(" Component Name: {}, Instance count: {}", |
| clientComponentInstanceEntry.getKey(), |
| clientComponentInstanceEntry.getValue()); |
| } |
| } |
| if (!clusterComponentInstances.isEmpty()) { |
| log.info("The cluster resources that do not match are:"); |
| for (Map.Entry<String, Integer> clusterComponentInstanceEntry : clusterComponentInstances |
| .entrySet()) { |
| log.info(" Component Name: {}, Instance count: {}", |
| clusterComponentInstanceEntry.getKey(), |
| clusterComponentInstanceEntry.getValue()); |
| } |
| } |
| throw new BadConfigException("Resource definition provided for " |
| + "upgrade does not match with that of the currently running " |
| + "cluster.\nIf you are aware of what you are doing, rerun the " |
| + "command with " + Arguments.ARG_FORCE + " option."); |
| } |
| } |
| |
| protected void persistInstanceDefinition(boolean overwrite, |
| Path appconfdir, |
| InstanceBuilder builder) |
| throws IOException, SliderException, LockAcquireFailedException { |
| builder.persist(appconfdir, overwrite); |
| } |
| |
| @VisibleForTesting |
| public static void replaceTokens(ConfTree conf, |
| String userName, String clusterName) throws IOException { |
| Map<String,String> newglobal = new HashMap<>(); |
| for (Entry<String,String> entry : conf.global.entrySet()) { |
| newglobal.put(entry.getKey(), replaceTokens(entry.getValue(), |
| userName, clusterName)); |
| } |
| conf.global.putAll(newglobal); |
| |
| for (String component : conf.components.keySet()) { |
| Map<String,String> newComponent = new HashMap<>(); |
| for (Entry<String,String> entry : conf.components.get(component).entrySet()) { |
| newComponent.put(entry.getKey(), replaceTokens(entry.getValue(), |
| userName, clusterName)); |
| } |
| conf.components.get(component).putAll(newComponent); |
| } |
| |
| Map<String,List<String>> newcred = new HashMap<>(); |
| for (Entry<String,List<String>> entry : conf.credentials.entrySet()) { |
| List<String> resultList = new ArrayList<>(); |
| for (String v : entry.getValue()) { |
| resultList.add(replaceTokens(v, userName, clusterName)); |
| } |
| newcred.put(replaceTokens(entry.getKey(), userName, clusterName), |
| resultList); |
| } |
| conf.credentials.clear(); |
| conf.credentials.putAll(newcred); |
| } |
| |
| private static String replaceTokens(String s, String userName, |
| String clusterName) throws IOException { |
| return s.replaceAll(Pattern.quote("${USER}"), userName) |
| .replaceAll(Pattern.quote("${USER_NAME}"), userName); |
| } |
| |
| public FsPermission getClusterDirectoryPermissions(Configuration conf) { |
| String clusterDirPermsOct = |
| conf.get(CLUSTER_DIRECTORY_PERMISSIONS, DEFAULT_CLUSTER_DIRECTORY_PERMISSIONS); |
| return new FsPermission(clusterDirPermsOct); |
| } |
| |
| /** |
| * Verify that the Resource Manager is configured (on a non-HA cluster). |
| * with a useful error message |
| * @throws BadCommandArgumentsException the exception raised on an invalid config |
| */ |
| public void verifyBindingsDefined() throws BadCommandArgumentsException { |
| InetSocketAddress rmAddr = getRmAddress(getConfig()); |
| if (!getConfig().getBoolean(YarnConfiguration.RM_HA_ENABLED, false) |
| && !isAddressDefined(rmAddr)) { |
| throw new BadCommandArgumentsException( |
| E_NO_RESOURCE_MANAGER |
| + " in the argument " |
| + Arguments.ARG_MANAGER |
| + " or the configuration property " |
| + YarnConfiguration.RM_ADDRESS |
| + " value :" + rmAddr); |
| } |
| } |
| |
| /** |
| * Load and start a cluster specification. |
| * This assumes that all validation of args and cluster state |
| * have already taken place |
| * |
| * @param clustername name of the cluster. |
| * @param launchArgs launch arguments |
| * @return the exit code |
| * @throws YarnException |
| * @throws IOException |
| */ |
| protected int startCluster(String clustername, |
| LaunchArgsAccessor launchArgs) throws |
| YarnException, |
| IOException { |
| Path clusterDirectory = sliderFileSystem.buildClusterDirPath(clustername); |
| AggregateConf instanceDefinition = loadInstanceDefinitionUnresolved( |
| clustername, |
| clusterDirectory); |
| |
| LaunchedApplication launchedApplication = |
| launchApplication(clustername, clusterDirectory, instanceDefinition, |
| serviceArgs.isDebug()); |
| |
| if (launchArgs.getOutputFile() != null) { |
| // output file has been requested. Get the app report and serialize it |
| ApplicationReport report = |
| launchedApplication.getApplicationReport(); |
| SerializedApplicationReport sar = new SerializedApplicationReport(report); |
| sar.submitTime = System.currentTimeMillis(); |
| ApplicationReportSerDeser serDeser = new ApplicationReportSerDeser(); |
| serDeser.save(sar, launchArgs.getOutputFile()); |
| } |
| int waittime = launchArgs.getWaittime(); |
| if (waittime > 0) { |
| return waitForAppRunning(launchedApplication, waittime, waittime); |
| } else { |
| // no waiting |
| return EXIT_SUCCESS; |
| } |
| } |
| |
| /** |
| * Load the instance definition. It is not resolved at this point |
| * @param name cluster name |
| * @param clusterDirectory cluster dir |
| * @return the loaded configuration |
| * @throws IOException |
| * @throws SliderException |
| * @throws UnknownApplicationInstanceException if the file is not found |
| */ |
| public AggregateConf loadInstanceDefinitionUnresolved(String name, |
| Path clusterDirectory) throws IOException, SliderException { |
| |
| try { |
| AggregateConf definition = |
| InstanceIO.loadInstanceDefinitionUnresolved(sliderFileSystem, |
| clusterDirectory); |
| definition.setName(name); |
| return definition; |
| } catch (FileNotFoundException e) { |
| throw UnknownApplicationInstanceException.unknownInstance(name, e); |
| } |
| } |
| |
| /** |
| * Load the instance definition. |
| * @param name cluster name |
| * @param resolved flag to indicate the cluster should be resolved |
| * @return the loaded configuration |
| * @throws IOException IO problems |
| * @throws SliderException slider explicit issues |
| * @throws UnknownApplicationInstanceException if the file is not found |
| */ |
| public AggregateConf loadInstanceDefinition(String name, |
| boolean resolved) throws |
| IOException, |
| SliderException { |
| |
| Path clusterDirectory = sliderFileSystem.buildClusterDirPath(name); |
| AggregateConf instanceDefinition = loadInstanceDefinitionUnresolved( |
| name, |
| clusterDirectory); |
| if (resolved) { |
| instanceDefinition.resolve(); |
| } |
| return instanceDefinition; |
| |
| } |
| |
| protected AppMasterLauncher setupAppMasterLauncher(String clustername, |
| Path clusterDirectory, |
| AggregateConf instanceDefinition, |
| boolean debugAM) |
| throws YarnException, IOException{ |
| deployedClusterName = clustername; |
| validateClusterName(clustername); |
| verifyNoLiveClusters(clustername, "Launch"); |
| Configuration config = getConfig(); |
| lookupZKQuorum(); |
| boolean clusterSecure = isHadoopClusterSecure(config); |
| //create the Slider AM provider -this helps set up the AM |
| SliderAMClientProvider sliderAM = new SliderAMClientProvider(config); |
| |
| instanceDefinition.resolve(); |
| launchedInstanceDefinition = instanceDefinition; |
| |
| ConfTreeOperations internalOperations = instanceDefinition.getInternalOperations(); |
| MapOperations internalOptions = internalOperations.getGlobalOptions(); |
| ConfTreeOperations resourceOperations = instanceDefinition.getResourceOperations(); |
| ConfTreeOperations appOperations = instanceDefinition.getAppConfOperations(); |
| Path generatedConfDirPath = |
| createPathThatMustExist(internalOptions.getMandatoryOption( |
| INTERNAL_GENERATED_CONF_PATH)); |
| Path snapshotConfPath = |
| createPathThatMustExist(internalOptions.getMandatoryOption( |
| INTERNAL_SNAPSHOT_CONF_PATH)); |
| |
| |
| // cluster Provider |
| AbstractClientProvider provider = createClientProvider( |
| internalOptions.getMandatoryOption(INTERNAL_PROVIDER_NAME)); |
| if (log.isDebugEnabled()) { |
| log.debug(instanceDefinition.toString()); |
| } |
| MapOperations sliderAMResourceComponent = |
| resourceOperations.getOrAddComponent(SliderKeys.COMPONENT_AM); |
| MapOperations resourceGlobalOptions = resourceOperations.getGlobalOptions(); |
| |
| // add the tags if available |
| Set<String> applicationTags = provider.getApplicationTags(sliderFileSystem, |
| getApplicationDefinitionPath(appOperations)); |
| |
| Credentials credentials = null; |
| if (clusterSecure) { |
| // pick up oozie credentials |
| credentials = CredentialUtils.loadTokensFromEnvironment(System.getenv(), |
| config); |
| if (credentials == null) { |
| // nothing from oozie, so build up directly |
| credentials = new Credentials( |
| UserGroupInformation.getCurrentUser().getCredentials()); |
| CredentialUtils.addRMRenewableFSDelegationTokens(config, |
| sliderFileSystem.getFileSystem(), |
| credentials); |
| CredentialUtils.addRMDelegationToken(yarnClient, credentials); |
| |
| } else { |
| log.info("Using externally supplied credentials to launch AM"); |
| } |
| } |
| |
| AppMasterLauncher amLauncher = new AppMasterLauncher(clustername, |
| SliderKeys.APP_TYPE, |
| config, |
| sliderFileSystem, |
| yarnClient, |
| clusterSecure, |
| sliderAMResourceComponent, |
| resourceGlobalOptions, |
| applicationTags, |
| credentials); |
| |
| ApplicationId appId = amLauncher.getApplicationId(); |
| // set the application name; |
| amLauncher.setKeepContainersOverRestarts(true); |
| |
| int maxAppAttempts = config.getInt(KEY_AM_RESTART_LIMIT, 0); |
| amLauncher.setMaxAppAttempts(maxAppAttempts); |
| |
| sliderFileSystem.purgeAppInstanceTempFiles(clustername); |
| Path tempPath = sliderFileSystem.createAppInstanceTempPath( |
| clustername, |
| appId.toString() + "/am"); |
| String libdir = "lib"; |
| Path libPath = new Path(tempPath, libdir); |
| sliderFileSystem.getFileSystem().mkdirs(libPath); |
| log.debug("FS={}, tempPath={}, libdir={}", sliderFileSystem, tempPath, libPath); |
| |
| // set local resources for the application master |
| // local files or archives as needed |
| // In this scenario, the jar file for the application master is part of the local resources |
| Map<String, LocalResource> localResources = amLauncher.getLocalResources(); |
| |
| // look for the configuration directory named on the command line |
| boolean hasServerLog4jProperties = false; |
| Path remoteConfPath = null; |
| String relativeConfDir = null; |
| String confdirProp = System.getProperty(SliderKeys.PROPERTY_CONF_DIR); |
| if (isUnset(confdirProp)) { |
| log.debug("No local configuration directory provided as system property"); |
| } else { |
| File confDir = new File(confdirProp); |
| if (!confDir.exists()) { |
| throw new BadConfigException(E_CONFIGURATION_DIRECTORY_NOT_FOUND, |
| confDir); |
| } |
| Path localConfDirPath = createLocalPath(confDir); |
| remoteConfPath = new Path(clusterDirectory, SliderKeys.SUBMITTED_CONF_DIR); |
| log.debug("Slider configuration directory is {}; remote to be {}", |
| localConfDirPath, remoteConfPath); |
| copyDirectory(config, localConfDirPath, remoteConfPath, null); |
| |
| File log4jserver = |
| new File(confDir, SliderKeys.LOG4J_SERVER_PROP_FILENAME); |
| hasServerLog4jProperties = log4jserver.isFile(); |
| } |
| // the assumption here is that minimr cluster => this is a test run |
| // and the classpath can look after itself |
| |
| boolean usingMiniMRCluster = getUsingMiniMRCluster(); |
| if (!usingMiniMRCluster) { |
| |
| log.debug("Destination is not a MiniYARNCluster -copying full classpath"); |
| |
| // insert conf dir first |
| if (remoteConfPath != null) { |
| relativeConfDir = SliderKeys.SUBMITTED_CONF_DIR; |
| Map<String, LocalResource> submittedConfDir = |
| sliderFileSystem.submitDirectory(remoteConfPath, |
| relativeConfDir); |
| mergeMaps(localResources, submittedConfDir); |
| } |
| } |
| // build up the configuration |
| // IMPORTANT: it is only after this call that site configurations |
| // will be valid. |
| |
| propagatePrincipals(config, instanceDefinition); |
| // validate security data |
| |
| /* |
| // turned off until tested |
| SecurityConfiguration securityConfiguration = |
| new SecurityConfiguration(config, |
| instanceDefinition, clustername); |
| |
| */ |
| Configuration clientConfExtras = new Configuration(false); |
| // then build up the generated path. |
| FsPermission clusterPerms = getClusterDirectoryPermissions(config); |
| copyDirectory(config, snapshotConfPath, generatedConfDirPath, |
| clusterPerms); |
| |
| |
| // standard AM resources |
| sliderAM.prepareAMAndConfigForLaunch(sliderFileSystem, |
| config, |
| amLauncher, |
| instanceDefinition, |
| snapshotConfPath, |
| generatedConfDirPath, |
| clientConfExtras, |
| libdir, |
| tempPath, |
| usingMiniMRCluster); |
| //add provider-specific resources |
| provider.prepareAMAndConfigForLaunch(sliderFileSystem, |
| config, |
| amLauncher, |
| instanceDefinition, |
| snapshotConfPath, |
| generatedConfDirPath, |
| clientConfExtras, |
| libdir, |
| tempPath, |
| usingMiniMRCluster); |
| |
| // now that the site config is fully generated, the provider gets |
| // to do a quick review of them. |
| log.debug("Preflight validation of cluster configuration"); |
| |
| |
| sliderAM.preflightValidateClusterConfiguration(sliderFileSystem, |
| clustername, |
| config, |
| instanceDefinition, |
| clusterDirectory, |
| generatedConfDirPath, |
| clusterSecure |
| ); |
| |
| provider.preflightValidateClusterConfiguration(sliderFileSystem, |
| clustername, |
| config, |
| instanceDefinition, |
| clusterDirectory, |
| generatedConfDirPath, |
| clusterSecure |
| ); |
| |
| |
| // TODO: consider supporting apps that don't have an image path |
| Path imagePath = |
| extractImagePath(sliderFileSystem, internalOptions); |
| if (sliderFileSystem.maybeAddImagePath(localResources, imagePath)) { |
| log.debug("Registered image path {}", imagePath); |
| } |
| |
| // build the environment |
| amLauncher.putEnv( |
| buildEnvMap(sliderAMResourceComponent)); |
| ClasspathConstructor classpath = buildClasspath(relativeConfDir, |
| libdir, |
| getConfig(), |
| sliderFileSystem, |
| usingMiniMRCluster); |
| amLauncher.setClasspath(classpath); |
| //add english env |
| amLauncher.setEnv("LANG", "en_US.UTF-8"); |
| amLauncher.setEnv("LC_ALL", "en_US.UTF-8"); |
| amLauncher.setEnv("LANGUAGE", "en_US.UTF-8"); |
| amLauncher.maybeSetEnv(HADOOP_JAAS_DEBUG, |
| System.getenv(HADOOP_JAAS_DEBUG)); |
| amLauncher.putEnv(getAmLaunchEnv(config)); |
| |
| for (Map.Entry<String, String> envs : getSystemEnv().entrySet()) { |
| log.debug("System env {}={}", envs.getKey(), envs.getValue()); |
| } |
| if (log.isDebugEnabled()) { |
| log.debug("AM classpath={}", classpath); |
| log.debug("Environment Map:\n{}", |
| stringifyMap(amLauncher.getEnv())); |
| log.debug("Files in lib path\n{}", sliderFileSystem.listFSDir(libPath)); |
| } |
| |
| // rm address |
| |
| InetSocketAddress rmSchedulerAddress; |
| try { |
| rmSchedulerAddress = getRmSchedulerAddress(config); |
| } catch (IllegalArgumentException e) { |
| throw new BadConfigException("%s Address invalid: %s", |
| YarnConfiguration.RM_SCHEDULER_ADDRESS, |
| config.get(YarnConfiguration.RM_SCHEDULER_ADDRESS)); |
| } |
| String rmAddr = NetUtils.getHostPortString(rmSchedulerAddress); |
| |
| JavaCommandLineBuilder commandLine = new JavaCommandLineBuilder(); |
| // insert any JVM options); |
| sliderAM.addJVMOptions(instanceDefinition, commandLine); |
| // enable asserts |
| commandLine.enableJavaAssertions(); |
| |
| // if the conf dir has a log4j-server.properties, switch to that |
| if (hasServerLog4jProperties) { |
| commandLine.sysprop(SYSPROP_LOG4J_CONFIGURATION, LOG4J_SERVER_PROP_FILENAME); |
| commandLine.sysprop(SYSPROP_LOG_DIR, ApplicationConstants.LOG_DIR_EXPANSION_VAR); |
| } |
| |
| // add the AM sevice entry point |
| commandLine.add(SliderAppMaster.SERVICE_CLASSNAME); |
| |
| // create action and the cluster name |
| commandLine.add(ACTION_CREATE, clustername); |
| |
| // debug |
| if (debugAM) { |
| commandLine.add(Arguments.ARG_DEBUG); |
| } |
| |
| // set the cluster directory path |
| commandLine.add(Arguments.ARG_CLUSTER_URI, clusterDirectory.toUri()); |
| |
| if (!isUnset(rmAddr)) { |
| commandLine.add(Arguments.ARG_RM_ADDR, rmAddr); |
| } |
| |
| if (serviceArgs.getFilesystemBinding() != null) { |
| commandLine.add(Arguments.ARG_FILESYSTEM, serviceArgs.getFilesystemBinding()); |
| } |
| |
| // pass the registry binding |
| commandLine.addConfOptionToCLI(config, RegistryConstants.KEY_REGISTRY_ZK_ROOT, |
| RegistryConstants.DEFAULT_ZK_REGISTRY_ROOT); |
| commandLine.addMandatoryConfOption(config, RegistryConstants.KEY_REGISTRY_ZK_QUORUM); |
| |
| if (clusterSecure) { |
| // if the cluster is secure, make sure that |
| // the relevant security settings go over |
| commandLine.addConfOption(config, DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY); |
| } |
| |
| // copy over any/all YARN RM client values, in case the server-side XML conf file |
| // has the 0.0.0.0 address |
| commandLine.addConfOptions(config, |
| YarnConfiguration.RM_ADDRESS, |
| YarnConfiguration.RM_CLUSTER_ID, |
| YarnConfiguration.RM_HOSTNAME, |
| YarnConfiguration.RM_PRINCIPAL); |
| |
| // write out the path output |
| commandLine.addOutAndErrFiles(STDOUT_AM, STDERR_AM); |
| |
| String cmdStr = commandLine.build(); |
| log.debug("Completed setting up app master command {}", cmdStr); |
| |
| amLauncher.addCommandLine(commandLine); |
| |
| // the Slider AM gets to configure the AM requirements, not the custom provider |
| sliderAM.prepareAMResourceRequirements(sliderAMResourceComponent, |
| amLauncher.getResource()); |
| |
| |
| // Set the priority for the application master |
| amLauncher.setPriority(config.getInt(KEY_YARN_QUEUE_PRIORITY, |
| DEFAULT_YARN_QUEUE_PRIORITY)); |
| |
| // Set the queue to which this application is to be submitted in the RM |
| // Queue for App master |
| String amQueue = config.get(KEY_YARN_QUEUE, DEFAULT_YARN_QUEUE); |
| String suppliedQueue = internalOperations.getGlobalOptions().get(INTERNAL_QUEUE); |
| if(!isUnset(suppliedQueue)) { |
| amQueue = suppliedQueue; |
| log.info("Using queue {} for the application instance.", amQueue); |
| } |
| |
| if (isSet(amQueue)) { |
| amLauncher.setQueue(amQueue); |
| } |
| return amLauncher; |
| } |
| |
| /** |
| * |
| * @param clustername name of the cluster |
| * @param clusterDirectory cluster dir |
| * @param instanceDefinition the instance definition |
| * @param debugAM enable debug AM options |
| * @return the launched application |
| * @throws YarnException |
| * @throws IOException |
| */ |
| public LaunchedApplication launchApplication(String clustername, |
| Path clusterDirectory, |
| AggregateConf instanceDefinition, |
| boolean debugAM) |
| throws YarnException, IOException { |
| |
| AppMasterLauncher amLauncher = setupAppMasterLauncher(clustername, |
| clusterDirectory, |
| instanceDefinition, |
| debugAM); |
| |
| applicationId = amLauncher.getApplicationId(); |
| log.info("Submitting application {}", applicationId); |
| |
| // submit the application |
| LaunchedApplication launchedApplication = amLauncher.submitApplication(); |
| return launchedApplication; |
| } |
| |
| protected Map<String, String> getAmLaunchEnv(Configuration config) { |
| String sliderAmLaunchEnv = config.get(KEY_AM_LAUNCH_ENV); |
| log.debug("{} = {}", KEY_AM_LAUNCH_ENV, sliderAmLaunchEnv); |
| // Multiple env variables can be specified with a comma (,) separator |
| String[] envs = StringUtils.isEmpty(sliderAmLaunchEnv) ? null |
| : sliderAmLaunchEnv.split(","); |
| if (ArrayUtils.isEmpty(envs)) { |
| return Collections.emptyMap(); |
| } |
| Map<String, String> amLaunchEnv = new HashMap<>(); |
| for (String env : envs) { |
| if (StringUtils.isNotEmpty(env)) { |
| // Each env name/value is separated by equals sign (=) |
| String[] tokens = env.split("="); |
| if (tokens != null && tokens.length == 2) { |
| String envKey = tokens[0]; |
| String envValue = tokens[1]; |
| for (Map.Entry<String, String> placeholder : generatePlaceholderKeyValueMap( |
| env).entrySet()) { |
| if (StringUtils.isNotEmpty(placeholder.getValue())) { |
| envValue = envValue.replaceAll( |
| Pattern.quote(placeholder.getKey()), placeholder.getValue()); |
| } |
| } |
| if (Shell.WINDOWS) { |
| envValue = "%" + envKey + "%;" + envValue; |
| } else { |
| envValue = "$" + envKey + ":" + envValue; |
| } |
| log.info("Setting AM launch env {}={}", envKey, envValue); |
| amLaunchEnv.put(envKey, envValue); |
| } |
| } |
| } |
| return amLaunchEnv; |
| } |
| |
| protected Map<String, String> generatePlaceholderKeyValueMap(String env) { |
| String PLACEHOLDER_PATTERN = "\\$\\{[^{]+\\}"; |
| Pattern placeholderPattern = Pattern.compile(PLACEHOLDER_PATTERN); |
| Matcher placeholderMatcher = placeholderPattern.matcher(env); |
| Map<String, String> placeholderKeyValueMap = new HashMap<String, String>(); |
| if (placeholderMatcher.find()) { |
| String placeholderKey = placeholderMatcher.group(); |
| String systemKey = placeholderKey |
| .substring(2, placeholderKey.length() - 1).toUpperCase(Locale.ENGLISH) |
| .replaceAll("\\.", "_"); |
| String placeholderValue = getSystemEnv(systemKey); |
| log.debug("Placeholder {}={}", placeholderKey, placeholderValue); |
| placeholderKeyValueMap.put(placeholderKey, placeholderValue); |
| } |
| return placeholderKeyValueMap; |
| } |
| |
| private void propagatePythonExecutable(Configuration config, |
| AggregateConf instanceDefinition) { |
| String pythonExec = config.get( |
| PYTHON_EXECUTABLE_PATH); |
| if (pythonExec != null) { |
| instanceDefinition.getAppConfOperations().getGlobalOptions().putIfUnset( |
| PYTHON_EXECUTABLE_PATH, |
| pythonExec); |
| } |
| } |
| |
| |
| /** |
| * Wait for the launched app to be accepted in the time |
| * and, optionally running. |
| * <p> |
| * If the application |
| * |
| * @param launchedApplication application |
| * @param acceptWaitMillis time in millis to wait for accept |
| * @param runWaitMillis time in millis to wait for the app to be running. |
| * May be null, in which case no wait takes place |
| * @return exit code: success |
| * @throws YarnException |
| * @throws IOException |
| */ |
| public int waitForAppRunning(LaunchedApplication launchedApplication, |
| int acceptWaitMillis, int runWaitMillis) throws YarnException, IOException { |
| assert launchedApplication != null; |
| int exitCode; |
| // wait for the submit state to be reached |
| ApplicationReport report = launchedApplication.monitorAppToState( |
| YarnApplicationState.ACCEPTED, |
| new Duration(acceptWaitMillis)); |
| |
| // may have failed, so check that |
| if (hasAppFinished(report)) { |
| exitCode = buildExitCode(report); |
| } else { |
| // exit unless there is a wait |
| |
| |
| if (runWaitMillis != 0) { |
| // waiting for state to change |
| Duration duration = new Duration(runWaitMillis * 1000); |
| duration.start(); |
| report = launchedApplication.monitorAppToState( |
| YarnApplicationState.RUNNING, duration); |
| if (report != null && |
| report.getYarnApplicationState() == YarnApplicationState.RUNNING) { |
| exitCode = EXIT_SUCCESS; |
| } else { |
| exitCode = buildExitCode(report); |
| } |
| } else { |
| exitCode = EXIT_SUCCESS; |
| } |
| } |
| return exitCode; |
| } |
| |
| |
| /** |
| * Propagate any critical principals from the current site config down to the HBase one. |
| * @param config config to read from |
| * @param clusterSpec cluster spec |
| */ |
| private void propagatePrincipals(Configuration config, |
| AggregateConf clusterSpec) { |
| String dfsPrincipal = config.get(DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY); |
| if (dfsPrincipal != null) { |
| String siteDfsPrincipal = SITE_XML_PREFIX + DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY; |
| clusterSpec.getAppConfOperations().getGlobalOptions().putIfUnset( |
| siteDfsPrincipal, |
| dfsPrincipal); |
| } |
| } |
| |
| /** |
| * Create a path that must exist in the cluster fs |
| * @param uri uri to create |
| * @return the path |
| * @throws FileNotFoundException if the path does not exist |
| */ |
| public Path createPathThatMustExist(String uri) throws |
| SliderException, IOException { |
| return sliderFileSystem.createPathThatMustExist(uri); |
| } |
| |
| /** |
| * verify that a live cluster isn't there |
| * @param clustername cluster name |
| * @param action |
| * @throws SliderException with exit code EXIT_CLUSTER_LIVE |
| * if a cluster of that name is either live or starting up. |
| */ |
| public void verifyNoLiveClusters(String clustername, String action) throws |
| IOException, |
| YarnException { |
| List<ApplicationReport> existing = findAllLiveInstances(clustername); |
| |
| if (!existing.isEmpty()) { |
| throw new SliderException(EXIT_APPLICATION_IN_USE, |
| action +" failed for " |
| + clustername |
| + ": " |
| + E_CLUSTER_RUNNING + " :" + |
| existing.get(0)); |
| } |
| } |
| |
| public String getUsername() throws IOException { |
| return RegistryUtils.currentUser(); |
| } |
| |
| /** |
| * Get the name of any deployed cluster |
| * @return the cluster name |
| */ |
| public String getDeployedClusterName() { |
| return deployedClusterName; |
| } |
| |
| @VisibleForTesting |
| public void setDeployedClusterName(String deployedClusterName) { |
| this.deployedClusterName = deployedClusterName; |
| } |
| |
| /** |
| * ask if the client is using a mini MR cluster |
| * @return true if they are |
| */ |
| private boolean getUsingMiniMRCluster() { |
| return getConfig().getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, |
| false); |
| } |
| |
| /** |
| * Get the application name used in the zookeeper root paths |
| * @return an application-specific path in ZK |
| */ |
| private String getAppName() { |
| return "slider"; |
| } |
| |
| /** |
| * Wait for the app to start running (or go past that state) |
| * @param duration time to wait |
| * @return the app report; null if the duration turned out |
| * @throws YarnException YARN or app issues |
| * @throws IOException IO problems |
| */ |
| @VisibleForTesting |
| public ApplicationReport monitorAppToRunning(Duration duration) |
| throws YarnException, IOException { |
| return monitorAppToState(YarnApplicationState.RUNNING, duration); |
| } |
| |
| /** |
| * Build an exit code for an application from its report. |
| * If the report parameter is null, its interpreted as a timeout |
| * @param report report application report |
| * @return the exit code |
| * @throws IOException |
| * @throws YarnException |
| */ |
| private int buildExitCode(ApplicationReport report) throws |
| IOException, |
| YarnException { |
| if (null == report) { |
| return EXIT_TIMED_OUT; |
| } |
| |
| YarnApplicationState state = report.getYarnApplicationState(); |
| FinalApplicationStatus dsStatus = report.getFinalApplicationStatus(); |
| switch (state) { |
| case FINISHED: |
| if (FinalApplicationStatus.SUCCEEDED == dsStatus) { |
| log.info("Application has completed successfully"); |
| return EXIT_SUCCESS; |
| } else { |
| log.info("Application finished unsuccessfully." + |
| "YarnState = {}, DSFinalStatus = {} Breaking monitoring loop", |
| state, dsStatus); |
| return EXIT_YARN_SERVICE_FINISHED_WITH_ERROR; |
| } |
| |
| case KILLED: |
| log.info("Application did not finish. YarnState={}, DSFinalStatus={}", |
| state, dsStatus); |
| return EXIT_YARN_SERVICE_KILLED; |
| |
| case FAILED: |
| log.info("Application Failed. YarnState={}, DSFinalStatus={}", state, |
| dsStatus); |
| return EXIT_YARN_SERVICE_FAILED; |
| |
| default: |
| //not in any of these states |
| return EXIT_SUCCESS; |
| } |
| } |
| |
| /** |
| * Monitor the submitted application for reaching the requested state. |
| * Will also report if the app reaches a later state (failed, killed, etc) |
| * Kill application if duration!= null & time expires. |
| * Prerequisite: the applicatin was launched. |
| * @param desiredState desired state. |
| * @param duration how long to wait -must be more than 0 |
| * @return the application report -null on a timeout |
| * @throws YarnException |
| * @throws IOException |
| */ |
| @VisibleForTesting |
| public ApplicationReport monitorAppToState( |
| YarnApplicationState desiredState, |
| Duration duration) |
| throws YarnException, IOException { |
| LaunchedApplication launchedApplication = |
| new LaunchedApplication(applicationId, yarnClient); |
| return launchedApplication.monitorAppToState(desiredState, duration); |
| } |
| |
| @Override |
| public ApplicationReport getApplicationReport() throws |
| IOException, |
| YarnException { |
| return getApplicationReport(applicationId); |
| } |
| |
| @Override |
| public boolean forceKillApplication(String reason) |
| throws YarnException, IOException { |
| if (applicationId != null) { |
| new LaunchedApplication(applicationId, yarnClient).forceKill(reason); |
| return true; |
| } |
| return false; |
| } |
| |
| /** |
| * List Slider instances belonging to a specific user. This will include |
| * failed and killed instances; there may be duplicates |
| * @param user user: "" means all users, null means "default" |
| * @return a possibly empty list of Slider AMs |
| */ |
| |
| public List<ApplicationReport> listSliderInstances(String user) |
| throws YarnException, IOException { |
| return yarnAppListClient.listInstances(user); |
| } |
| |
| /** |
| * A basic list action to list live instances |
| * @param clustername cluster name |
| * @return success if the listing was considered successful |
| * @throws IOException |
| * @throws YarnException |
| */ |
| public int actionList(String clustername) throws IOException, YarnException { |
| ActionListArgs args = new ActionListArgs(); |
| args.live = true; |
| return actionList(clustername, args); |
| } |
| |
| /** |
| * Implement the list action. |
| * @param clustername List out specific instance name |
| * @param args Action list arguments |
| * @return 0 if one or more entries were listed |
| * @throws IOException |
| * @throws YarnException |
| * @throws UnknownApplicationInstanceException if a specific instance |
| * was named but it was not found |
| */ |
| @Override |
| public int actionList(String clustername, ActionListArgs args) |
| throws IOException, YarnException { |
| Set<String> appInstances = getApplicationList(clustername, args); |
| // getApplicationList never returns null |
| return !appInstances.isEmpty() ? EXIT_SUCCESS |
| : ((appInstances.isEmpty() && isUnset(clustername)) ? EXIT_SUCCESS |
| : EXIT_FALSE); |
| } |
| |
| /** |
| * Retrieve a list of all live instances. If clustername is supplied then it |
| * returns this specific cluster, if and only if it exists and is live. |
| * |
| * @param clustername |
| * cluster name (if looking for a specific live cluster) |
| * @return the list of application names which satisfies the list criteria |
| * @throws IOException |
| * @throws YarnException |
| */ |
| public Set<String> getApplicationList(String clustername) throws IOException, |
| YarnException { |
| ActionListArgs args = new ActionListArgs(); |
| args.live = true; |
| return getApplicationList(clustername, args); |
| } |
| |
| /** |
| * Retrieve a list of application instances satisfying the query criteria. |
| * |
| * @param clustername |
| * List out specific instance name (set null for all) |
| * @param args |
| * Action list arguments |
| * @return the list of application names which satisfies the list criteria |
| * @throws IOException |
| * @throws YarnException |
| * @throws UnknownApplicationInstanceException |
| * if a specific instance was named but it was not found |
| */ |
| public Set<String> getApplicationList(String clustername, ActionListArgs args) |
| throws IOException, YarnException { |
| if (args.help) { |
| actionHelp(ACTION_LIST); |
| // the above call throws an exception so the return is not really required |
| return Collections.emptySet(); |
| } |
| verifyBindingsDefined(); |
| |
| boolean live = args.live; |
| String state = args.state; |
| boolean listContainers = args.containers; |
| boolean verbose = args.verbose; |
| String version = args.version; |
| Set<String> components = args.components; |
| |
| if (live && !state.isEmpty()) { |
| throw new BadCommandArgumentsException( |
| Arguments.ARG_LIVE + " and " + Arguments.ARG_STATE + " are exclusive"); |
| } |
| if (listContainers && isUnset(clustername)) { |
| throw new BadCommandArgumentsException( |
| "Should specify an application instance with " |
| + Arguments.ARG_CONTAINERS); |
| } |
| // specifying both --version and --components with --containers is okay |
| if (StringUtils.isNotEmpty(version) && !listContainers) { |
| throw new BadCommandArgumentsException(Arguments.ARG_VERSION |
| + " can be specified only with " + Arguments.ARG_CONTAINERS); |
| } |
| if (!components.isEmpty() && !listContainers) { |
| throw new BadCommandArgumentsException(Arguments.ARG_COMPONENTS |
| + " can be specified only with " + Arguments.ARG_CONTAINERS); |
| } |
| |
| // flag to indicate only services in a specific state are to be listed |
| boolean listOnlyInState = live || !state.isEmpty(); |
| |
| YarnApplicationState min, max; |
| if (live) { |
| min = YarnApplicationState.NEW; |
| max = YarnApplicationState.RUNNING; |
| } else if (!state.isEmpty()) { |
| YarnApplicationState stateVal = extractYarnApplicationState(state); |
| min = max = stateVal; |
| } else { |
| min = YarnApplicationState.NEW; |
| max = YarnApplicationState.KILLED; |
| } |
| // get the complete list of persistent instances |
| Map<String, Path> persistentInstances = sliderFileSystem.listPersistentInstances(); |
| |
| if (persistentInstances.isEmpty() && isUnset(clustername)) { |
| // an empty listing is a success if no cluster was named |
| log.debug("No application instances found"); |
| return Collections.emptySet(); |
| } |
| |
| // and those the RM knows about |
| List<ApplicationReport> instances = listSliderInstances(null); |
| sortApplicationsByMostRecent(instances); |
| Map<String, ApplicationReport> reportMap = |
| buildApplicationReportMap(instances, min, max); |
| log.debug("Persisted {} deployed {} filtered[{}-{}] & de-duped to {}", |
| persistentInstances.size(), |
| instances.size(), |
| min, max, |
| reportMap.size() ); |
| |
| List<ContainerInformation> containers = null; |
| if (isSet(clustername)) { |
| // only one instance is expected |
| // resolve the persistent value |
| Path persistent = persistentInstances.get(clustername); |
| if (persistent == null) { |
| throw unknownClusterException(clustername); |
| } |
| // create a new map with only that instance in it. |
| // this restricts the output of results to this instance |
| persistentInstances = new HashMap<>(); |
| persistentInstances.put(clustername, persistent); |
| if (listContainers) { |
| containers = getContainers(clustername); |
| } |
| } |
| |
| // at this point there is either the entire list or a stripped down instance |
| Set<String> listedInstances = new HashSet<String>(); |
| for (String name : persistentInstances.keySet()) { |
| ApplicationReport report = reportMap.get(name); |
| if (!listOnlyInState || report != null) { |
| // list the details if all were requested, or the filtering contained |
| // a report |
| listedInstances.add(name); |
| // containers will be non-null when only one instance is requested |
| String details = instanceDetailsToString(name, report, |
| containers, version, components, verbose); |
| print(details); |
| } |
| } |
| |
| return listedInstances; |
| } |
| |
| public List<ContainerInformation> getContainers(String name) |
| throws YarnException, IOException { |
| SliderClusterOperations clusterOps = new SliderClusterOperations( |
| bondToCluster(name)); |
| try { |
| return clusterOps.getContainers(); |
| } catch (NoSuchNodeException e) { |
| throw new BadClusterStateException( |
| "Containers not found for application instance %s", name); |
| } |
| } |
| |
| /** |
| * Enumerate slider instances for the current user, and the |
| * most recent app report, where available. |
| * @param listOnlyInState boolean to indicate that the instances should |
| * only include those in a YARN state |
| * <code> minAppState <= currentState <= maxAppState </code> |
| * |
| * @param minAppState minimum application state to include in enumeration. |
| * @param maxAppState maximum application state to include |
| * @return a map of application instance name to description |
| * @throws IOException Any IO problem |
| * @throws YarnException YARN problems |
| */ |
| @Override |
| public Map<String, SliderInstanceDescription> enumSliderInstances( |
| boolean listOnlyInState, |
| YarnApplicationState minAppState, |
| YarnApplicationState maxAppState) |
| throws IOException, YarnException { |
| return yarnAppListClient.enumSliderInstances(listOnlyInState, |
| minAppState, |
| maxAppState); |
| } |
| |
| /** |
| * Extract the state of a Yarn application --state argument |
| * @param state state argument |
| * @return the application state |
| * @throws BadCommandArgumentsException if the argument did not match |
| * any known state |
| */ |
| private YarnApplicationState extractYarnApplicationState(String state) throws |
| BadCommandArgumentsException { |
| YarnApplicationState stateVal; |
| try { |
| stateVal = YarnApplicationState.valueOf(state.toUpperCase(Locale.ENGLISH)); |
| } catch (IllegalArgumentException e) { |
| throw new BadCommandArgumentsException("Unknown state: " + state); |
| |
| } |
| return stateVal; |
| } |
| |
| /** |
| * Is an application active: accepted or running |
| * @param report the application report |
| * @return true if it is running or scheduled to run. |
| */ |
| public boolean isApplicationActive(ApplicationReport report) { |
| return report.getYarnApplicationState() == YarnApplicationState.RUNNING |
| || report.getYarnApplicationState() == YarnApplicationState.ACCEPTED; |
| } |
| |
| /** |
| * Implement the islive action: probe for a cluster of the given name existing |
| * @return exit code |
| */ |
| |
| @Override |
| @VisibleForTesting |
| public int actionFlex(String name, ActionFlexArgs args) throws YarnException, IOException { |
| validateClusterName(name); |
| Map<String, String> roleMap = args.getComponentMap(); |
| // throw usage exception if no changes proposed |
| if (roleMap.size() == 0) { |
| actionHelp(ACTION_FLEX); |
| } |
| verifyBindingsDefined(); |
| log.debug("actionFlex({})", name); |
| Map<String, String> roleInstances = new HashMap<>(); |
| for (Map.Entry<String, String> roleEntry : roleMap.entrySet()) { |
| String key = roleEntry.getKey(); |
| String val = roleEntry.getValue(); |
| roleInstances.put(key, val); |
| } |
| return flex(name, roleInstances); |
| } |
| |
| @Override |
| public int actionExists(String name, boolean checkLive) throws YarnException, IOException { |
| ActionExistsArgs args = new ActionExistsArgs(); |
| args.live = checkLive; |
| return actionExists(name, args); |
| } |
| |
| public int actionExists(String name, ActionExistsArgs args) throws YarnException, IOException { |
| verifyBindingsDefined(); |
| validateClusterName(name); |
| boolean checkLive = args.live; |
| log.debug("actionExists({}, {}, {})", name, checkLive, args.state); |
| |
| //initial probe for a cluster in the filesystem |
| Path clusterDirectory = sliderFileSystem.buildClusterDirPath(name); |
| if (!sliderFileSystem.getFileSystem().exists(clusterDirectory)) { |
| throw unknownClusterException(name); |
| } |
| String state = args.state; |
| if (!checkLive && isUnset(state)) { |
| log.info("Application {} exists", name); |
| return EXIT_SUCCESS; |
| } |
| |
| //test for liveness/state |
| boolean inDesiredState = false; |
| ApplicationReport instance; |
| instance = findInstance(name); |
| if (instance == null) { |
| log.info("Application {} not running", name); |
| return EXIT_FALSE; |
| } |
| if (checkLive) { |
| // the app exists, check that it is not in any terminated state |
| YarnApplicationState appstate = instance.getYarnApplicationState(); |
| log.debug(" current app state = {}", appstate); |
| inDesiredState = |
| appstate.ordinal() < YarnApplicationState.FINISHED.ordinal(); |
| } else { |
| // scan for instance in single --state state |
| List<ApplicationReport> userInstances = yarnClient.listDeployedInstances(""); |
| state = state.toUpperCase(Locale.ENGLISH); |
| YarnApplicationState desiredState = extractYarnApplicationState(state); |
| ApplicationReport foundInstance = |
| yarnClient.findAppInInstanceList(userInstances, name, desiredState); |
| if (foundInstance != null) { |
| // found in selected state: success |
| inDesiredState = true; |
| // mark this as the instance to report |
| instance = foundInstance; |
| } |
| } |
| |
| OnDemandReportStringifier report = |
| new OnDemandReportStringifier(instance); |
| if (!inDesiredState) { |
| //cluster in the list of apps but not running |
| log.info("Application {} found but is in wrong state {}", name, |
| instance.getYarnApplicationState()); |
| log.debug("State {}", report); |
| return EXIT_FALSE; |
| } else { |
| log.debug("Application instance is in desired state"); |
| log.info("Application {} is {}\n{}", name, |
| instance.getYarnApplicationState(), report); |
| return EXIT_SUCCESS; |
| } |
| } |
| |
| |
| @Override |
| public int actionKillContainer(String name, |
| ActionKillContainerArgs args) throws YarnException, IOException { |
| String id = args.id; |
| if (isUnset(id)) { |
| throw new BadCommandArgumentsException("Missing container id"); |
| } |
| log.info("killingContainer {}:{}", name, id); |
| SliderClusterOperations clusterOps = |
| new SliderClusterOperations(bondToCluster(name)); |
| try { |
| clusterOps.killContainer(id); |
| } catch (NoSuchNodeException e) { |
| throw new BadClusterStateException("Container %s not found in cluster %s", |
| id, name); |
| } |
| return EXIT_SUCCESS; |
| } |
| |
| @Override |
| public String actionEcho(String name, ActionEchoArgs args) throws |
| YarnException, |
| IOException { |
| String message = args.message; |
| if (message == null) { |
| throw new BadCommandArgumentsException("missing message"); |
| } |
| SliderClusterOperations clusterOps = |
| new SliderClusterOperations(bondToCluster(name)); |
| return clusterOps.echo(message); |
| } |
| |
| /** |
| * Get at the service registry operations |
| * @return registry client -valid after the service is inited. |
| */ |
| public YarnAppListClient getYarnAppListClient() { |
| return yarnAppListClient; |
| } |
| |
| /** |
| * Find an instance of an application belonging to the current user |
| * @param appname application name |
| * @return the app report or null if none is found |
| * @throws YarnException YARN issues |
| * @throws IOException IO problems |
| */ |
| private ApplicationReport findInstance(String appname) |
| throws YarnException, IOException { |
| return yarnAppListClient.findInstance(appname); |
| } |
| |
| private RunningApplication findApplication(String appname) |
| throws YarnException, IOException { |
| ApplicationReport applicationReport = findInstance(appname); |
| return applicationReport != null ? |
| new RunningApplication(yarnClient, applicationReport): null; |
| } |
| |
| /** |
| * find all live instances of a specific app -if there is >1 in the cluster, |
| * this returns them all. State should be running or less |
| * @param appname application name |
| * @return the list of all matching application instances |
| */ |
| private List<ApplicationReport> findAllLiveInstances(String appname) |
| throws YarnException, IOException { |
| |
| return yarnAppListClient.findAllLiveInstances(appname); |
| } |
| |
| /** |
| * Connect to a Slider AM |
| * @param app application report providing the details on the application |
| * @return an instance |
| * @throws YarnException |
| * @throws IOException |
| */ |
| private SliderClusterProtocol connect(ApplicationReport app) |
| throws YarnException, IOException { |
| |
| try { |
| return RpcBinder.getProxy(getConfig(), |
| yarnClient.getRmClient(), |
| app, |
| Constants.CONNECT_TIMEOUT, |
| Constants.RPC_TIMEOUT); |
| } catch (InterruptedException e) { |
| throw new SliderException(SliderExitCodes.EXIT_TIMED_OUT, |
| e, |
| "Interrupted waiting for communications with the Slider AM"); |
| } |
| } |
| |
| @Override |
| @VisibleForTesting |
| public int actionStatus(String clustername, ActionStatusArgs statusArgs) throws |
| YarnException, |
| IOException { |
| verifyBindingsDefined(); |
| validateClusterName(clustername); |
| String outfile = statusArgs.getOutput(); |
| ClusterDescription status = getClusterDescription(clustername); |
| String text = status.toJsonString(); |
| if (outfile == null) { |
| log.info(text); |
| } else { |
| status.save(new File(outfile).getAbsoluteFile()); |
| } |
| return EXIT_SUCCESS; |
| } |
| |
| @Override |
| public int actionVersion() { |
| SliderVersionInfo.loadAndPrintVersionInfo(log); |
| return EXIT_SUCCESS; |
| } |
| |
| @Override |
| public int actionFreeze(String clustername, |
| ActionFreezeArgs freezeArgs) throws YarnException, IOException { |
| verifyBindingsDefined(); |
| validateClusterName(clustername); |
| int waittime = freezeArgs.getWaittime(); |
| String text = freezeArgs.message; |
| boolean forcekill = freezeArgs.force; |
| log.debug("actionFreeze({}, reason={}, wait={}, force={})", clustername, |
| text, |
| waittime, |
| forcekill); |
| |
| //is this actually a known cluster? |
| sliderFileSystem.locateInstanceDefinition(clustername); |
| ApplicationReport app = findInstance(clustername); |
| if (app == null) { |
| // exit early |
| log.info("Cluster {} not running", clustername); |
| // not an error to stop a stopped cluster |
| return EXIT_SUCCESS; |
| } |
| log.debug("App to stop was found: {}:\n{}", clustername, |
| new OnDemandReportStringifier(app)); |
| if (app.getYarnApplicationState().ordinal() >= |
| YarnApplicationState.FINISHED.ordinal()) { |
| log.info("Cluster {} is in a terminated state {}", clustername, |
| app.getYarnApplicationState()); |
| return EXIT_SUCCESS; |
| } |
| |
| // IPC request for a managed shutdown is only possible if the app is running. |
| // so we need to force kill if the app is accepted or submitted |
| if (!forcekill |
| && app.getYarnApplicationState().ordinal() < YarnApplicationState.RUNNING.ordinal()) { |
| log.info("Cluster {} is in a pre-running state {}. Force killing it", clustername, |
| app.getYarnApplicationState()); |
| forcekill = true; |
| } |
| |
| LaunchedApplication application = new LaunchedApplication(yarnClient, app); |
| applicationId = application.getApplicationId(); |
| |
| if (forcekill) { |
| // escalating to forced kill |
| application.kill("Forced stop of " + clustername + ": " + text); |
| } else { |
| try { |
| SliderClusterProtocol appMaster = connect(app); |
| Messages.StopClusterRequestProto r = |
| Messages.StopClusterRequestProto |
| .newBuilder() |
| .setMessage(text) |
| .build(); |
| appMaster.stopCluster(r); |
| |
| log.debug("Cluster stop command issued"); |
| |
| } catch (YarnException e) { |
| log.warn("Exception while trying to terminate {}", clustername, e); |
| return EXIT_FALSE; |
| } catch (IOException e) { |
| log.warn("Exception while trying to terminate {}", clustername, e); |
| return EXIT_FALSE; |
| } |
| } |
| |
| //wait for completion. We don't currently return an exception during this process |
| //as the stop operation has been issued, this is just YARN. |
| try { |
| if (waittime > 0) { |
| ApplicationReport applicationReport = |
| application.monitorAppToState(YarnApplicationState.FINISHED, |
| new Duration(waittime * 1000)); |
| if (applicationReport == null) { |
| log.info("application did not shut down in time"); |
| return EXIT_FALSE; |
| } |
| } |
| |
| } catch (YarnException | IOException e) { |
| log.warn("Exception while waiting for the application {} to shut down: {}", |
| clustername, e); |
| } |
| |
| return EXIT_SUCCESS; |
| } |
| |
| @Override |
| public int actionThaw(String clustername, ActionThawArgs thaw) throws YarnException, IOException { |
| validateClusterName(clustername); |
| verifyBindingsDefined(); |
| // see if it is actually running and bail out; |
| verifyNoLiveClusters(clustername, "Start"); |
| |
| //start the cluster |
| return startCluster(clustername, thaw); |
| } |
| |
| /** |
| * Implement flexing |
| * @param clustername name of the cluster |
| * @param roleInstances map of new role instances |
| * @return EXIT_SUCCESS if the #of nodes in a live cluster changed |
| * @throws YarnException |
| * @throws IOException |
| */ |
| public int flex(String clustername, Map<String, String> roleInstances) |
| throws YarnException, IOException { |
| verifyBindingsDefined(); |
| validateClusterName(clustername); |
| Path clusterDirectory = sliderFileSystem.buildClusterDirPath(clustername); |
| AggregateConf instanceDefinition = loadInstanceDefinitionUnresolved( |
| clustername, |
| clusterDirectory); |
| |
| ConfTreeOperations resources = |
| instanceDefinition.getResourceOperations(); |
| for (Map.Entry<String, String> entry : roleInstances.entrySet()) { |
| String role = entry.getKey(); |
| String updateCountStr = entry.getValue(); |
| int currentCount = 0; |
| MapOperations component = resources.getOrAddComponent(role); |
| try { |
| // check if a relative count is specified |
| if (updateCountStr.startsWith("+") || updateCountStr.startsWith("-")) { |
| int updateCount = Integer.parseInt(updateCountStr); |
| // if component was specified before, get the current count |
| if (component.get(COMPONENT_INSTANCES) != null) { |
| currentCount = Integer.valueOf(component.get(COMPONENT_INSTANCES)); |
| if (currentCount + updateCount < 0) { |
| throw new BadCommandArgumentsException("The requested count " + |
| "of \"%s\" for role %s makes the total number of " + |
| "instances negative: \"%s\"", updateCount, role, |
| currentCount+updateCount); |
| } |
| else { |
| component.put(COMPONENT_INSTANCES, |
| Integer.toString(currentCount+updateCount)); |
| } |
| } |
| else { |
| if (updateCount < 0) { |
| throw new BadCommandArgumentsException("Invalid to request " + |
| "negative count of \"%s\" for role %s", updateCount, role); |
| } |
| else { |
| Map<String, String> map = new HashMap<>(); |
| resources.confTree.components.put(role, map); |
| component = new MapOperations(role, map); |
| component.put(COMPONENT_INSTANCES, Integer.toString(updateCount)); |
| } |
| } |
| } |
| else { |
| int count = Integer.parseInt(updateCountStr); |
| resources.getOrAddComponent(role).put(COMPONENT_INSTANCES, |
| Integer.toString(count)); |
| } |
| } |
| catch (NumberFormatException e) { |
| throw new BadCommandArgumentsException("Requested count of role %s" + |
| " is not a number: \"%s\"", |
| role, updateCountStr); |
| } |
| |
| log.debug("Flexed cluster specification ( {} -> {}) : \n{}", |
| role, |
| updateCountStr, |
| resources); |
| } |
| SliderAMClientProvider sliderAM = new SliderAMClientProvider(getConfig()); |
| AbstractClientProvider provider = createClientProvider( |
| instanceDefinition.getInternalOperations().getGlobalOptions().getMandatoryOption( |
| INTERNAL_PROVIDER_NAME)); |
| // slider provider to validate what there is |
| validateInstanceDefinition(sliderAM, instanceDefinition, sliderFileSystem); |
| validateInstanceDefinition(provider, instanceDefinition, sliderFileSystem); |
| |
| int exitCode = EXIT_FALSE; |
| // save the specification |
| try { |
| InstanceIO.saveInstanceDefinition(sliderFileSystem, clusterDirectory, |
| instanceDefinition); |
| } catch (LockAcquireFailedException e) { |
| // lock failure |
| log.debug("Failed to lock dir {}", clusterDirectory, e); |
| log.warn("Failed to save new resource definition to {} : {}", clusterDirectory, e); |
| } |
| |
| // now see if it is actually running and tell it about the update if it is |
| ApplicationReport instance = findInstance(clustername); |
| if (instance != null) { |
| log.info("Flexing running cluster"); |
| SliderClusterProtocol appMaster = connect(instance); |
| SliderClusterOperations clusterOps = new SliderClusterOperations(appMaster); |
| clusterOps.flex(instanceDefinition.getResources()); |
| log.info("application instance size updated"); |
| exitCode = EXIT_SUCCESS; |
| } else { |
| log.info("No running instance to update"); |
| } |
| return exitCode; |
| } |
| |
| /** |
| * Validate an instance definition against a provider. |
| * @param provider the provider performing the validation |
| * @param instanceDefinition the instance definition |
| * @throws SliderException if invalid. |
| */ |
| protected void validateInstanceDefinition(AbstractClientProvider provider, |
| AggregateConf instanceDefinition, SliderFileSystem fs) throws SliderException { |
| try { |
| provider.validateInstanceDefinition(instanceDefinition, fs); |
| } catch (SliderException e) { |
| //problem, reject it |
| log.info("Error {} validating application instance definition ", e.getMessage()); |
| log.debug("Error validating application instance definition ", e); |
| log.info(instanceDefinition.toString()); |
| throw e; |
| } |
| } |
| |
| |
| /** |
| * Load the persistent cluster description |
| * @param clustername name of the cluster |
| * @return the description in the filesystem |
| * @throws IOException any problems loading -including a missing file |
| */ |
| @VisibleForTesting |
| public AggregateConf loadPersistedClusterDescription(String clustername) |
| throws IOException, SliderException, LockAcquireFailedException { |
| Path clusterDirectory = sliderFileSystem.buildClusterDirPath(clustername); |
| ConfPersister persister = new ConfPersister(sliderFileSystem, clusterDirectory); |
| AggregateConf instanceDescription = new AggregateConf(); |
| persister.load(instanceDescription); |
| return instanceDescription; |
| } |
| |
| /** |
| * Connect to a live cluster and get its current state |
| * @param clustername the cluster name |
| * @return its description |
| */ |
| @VisibleForTesting |
| public ClusterDescription getClusterDescription(String clustername) throws |
| YarnException, |
| IOException { |
| SliderClusterOperations clusterOperations = |
| createClusterOperations(clustername); |
| return clusterOperations.getClusterDescription(); |
| } |
| |
| /** |
| * Connect to the cluster and get its current state |
| * @return its description |
| */ |
| @VisibleForTesting |
| public ClusterDescription getClusterDescription() throws |
| YarnException, |
| IOException { |
| return getClusterDescription(getDeployedClusterName()); |
| } |
| |
| /** |
| * List all node UUIDs in a role |
| * @param role role name or "" for all |
| * @return an array of UUID strings |
| * @throws IOException |
| * @throws YarnException |
| */ |
| @VisibleForTesting |
| public String[] listNodeUUIDsByRole(String role) throws |
| IOException, |
| YarnException { |
| return createClusterOperations() |
| .listNodeUUIDsByRole(role); |
| } |
| |
| /** |
| * List all nodes in a role. This is a double round trip: once to list |
| * the nodes in a role, another to get their details |
| * @param role component/role to look for |
| * @return an array of ContainerNode instances |
| * @throws IOException |
| * @throws YarnException |
| */ |
| @VisibleForTesting |
| public List<ClusterNode> listClusterNodesInRole(String role) throws |
| IOException, |
| YarnException { |
| return createClusterOperations().listClusterNodesInRole(role); |
| } |
| |
| /** |
| * Get the details on a list of uuids |
| * @param uuids uuids to ask for |
| * @return a possibly empty list of node details |
| * @throws IOException |
| * @throws YarnException |
| */ |
| @VisibleForTesting |
| public List<ClusterNode> listClusterNodes(String[] uuids) throws |
| IOException, |
| YarnException { |
| |
| if (uuids.length == 0) { |
| // short cut on an empty list |
| return new LinkedList<>(); |
| } |
| return createClusterOperations().listClusterNodes(uuids); |
| } |
| |
| /** |
| * Get the instance definition from the far end |
| */ |
| @VisibleForTesting |
| public AggregateConf getLiveInstanceDefinition() throws IOException, YarnException { |
| return createClusterOperations().getInstanceDefinition(); |
| } |
| |
| /** |
| * Bond to a running cluster |
| * @param clustername cluster name |
| * @return the AM RPC client |
| * @throws SliderException if the cluster is unkown |
| */ |
| private SliderClusterProtocol bondToCluster(String clustername) throws |
| YarnException, |
| IOException { |
| verifyBindingsDefined(); |
| if (clustername == null) { |
| throw unknownClusterException("(undefined)"); |
| } |
| ApplicationReport instance = findInstance(clustername); |
| if (null == instance) { |
| throw unknownClusterException(clustername); |
| } |
| return connect(instance); |
| } |
| |
| /** |
| * Create a cluster operations instance against a given cluster |
| * @param clustername cluster name |
| * @return a bonded cluster operations instance |
| * @throws YarnException YARN issues |
| * @throws IOException IO problems |
| */ |
| private SliderClusterOperations createClusterOperations(String clustername) throws |
| YarnException, |
| IOException { |
| SliderClusterProtocol sliderAM = bondToCluster(clustername); |
| return new SliderClusterOperations(sliderAM); |
| } |
| |
| /** |
| * Create a cluster operations instance against the active cluster |
| * -returning any previous created one if held. |
| * @return a bonded cluster operations instance |
| * @throws YarnException YARN issues |
| * @throws IOException IO problems |
| */ |
| private SliderClusterOperations createClusterOperations() throws |
| YarnException, |
| IOException { |
| if (sliderClusterOperations == null) { |
| sliderClusterOperations = |
| createClusterOperations(getDeployedClusterName()); |
| } |
| return sliderClusterOperations; |
| } |
| |
| /** |
| * Wait for an instance of a named role to be live (or past it in the lifecycle) |
| * @param role role to look for |
| * @param timeout time to wait |
| * @return the state. If still in CREATED, the cluster didn't come up |
| * in the time period. If LIVE, all is well. If >LIVE, it has shut for a reason |
| * @throws IOException IO |
| * @throws SliderException Slider |
| * @throws WaitTimeoutException if the wait timed out |
| */ |
| @VisibleForTesting |
| public int waitForRoleInstanceLive(String role, long timeout) |
| throws WaitTimeoutException, IOException, YarnException { |
| return createClusterOperations().waitForRoleInstanceLive(role, timeout); |
| } |
| |
| /** |
| * Generate an exception for an unknown cluster |
| * @param clustername cluster name |
| * @return an exception with text and a relevant exit code |
| */ |
| public UnknownApplicationInstanceException unknownClusterException(String clustername) { |
| return UnknownApplicationInstanceException.unknownInstance(clustername); |
| } |
| |
| @Override |
| public String toString() { |
| return "Slider Client in state " + getServiceState() |
| + " and Slider Application Instance " + deployedClusterName; |
| } |
| |
| /** |
| * Get all YARN applications |
| * @return a possibly empty list |
| * @throws YarnException |
| * @throws IOException |
| */ |
| @VisibleForTesting |
| public List<ApplicationReport> getApplications() throws YarnException, IOException { |
| return yarnClient.getApplications(); |
| } |
| |
| @VisibleForTesting |
| public ApplicationReport getApplicationReport(ApplicationId appId) |
| throws YarnException, IOException { |
| return new LaunchedApplication(appId, yarnClient).getApplicationReport(); |
| } |
| |
| /** |
| * The configuration used for deployment (after resolution). |
| * Non-null only after the client has launched the application |
| * @return the resolved configuration or null |
| */ |
| @VisibleForTesting |
| public AggregateConf getLaunchedInstanceDefinition() { |
| return launchedInstanceDefinition; |
| } |
| |
| |
| @Override |
| public int actionResolve(ActionResolveArgs args) |
| throws YarnException, IOException { |
| // as this is an API entry point, validate |
| // the arguments |
| args.validate(); |
| RegistryOperations operations = getRegistryOperations(); |
| String path = SliderRegistryUtils.resolvePath(args.path); |
| ServiceRecordMarshal serviceRecordMarshal = new ServiceRecordMarshal(); |
| try { |
| if (args.list) { |
| File destDir = args.destdir; |
| if (destDir != null) { |
| destDir.mkdirs(); |
| } |
| |
| Map<String, ServiceRecord> recordMap; |
| Map<String, RegistryPathStatus> znodes; |
| try { |
| znodes = statChildren(registryOperations, path); |
| recordMap = extractServiceRecords(registryOperations, |
| path, |
| znodes.values()); |
| } catch (PathNotFoundException e) { |
| // treat the root directory as if if is always there |
| |
| if ("/".equals(path)) { |
| znodes = new HashMap<>(0); |
| recordMap = new HashMap<>(0); |
| } else { |
| throw e; |
| } |
| } |
| // subtract all records from the znodes map to get pure directories |
| log.info("Entries: {}", znodes.size()); |
| |
| for (String name : znodes.keySet()) { |
| println(" " + name); |
| } |
| println(""); |
| |
| log.info("Service records: {}", recordMap.size()); |
| for (Entry<String, ServiceRecord> recordEntry : recordMap.entrySet()) { |
| String name = recordEntry.getKey(); |
| ServiceRecord instance = recordEntry.getValue(); |
| String json = serviceRecordMarshal.toJson(instance); |
| if (destDir == null) { |
| println(name); |
| println(json); |
| } else { |
| String filename = RegistryPathUtils.lastPathEntry(name) + ".json"; |
| File jsonFile = new File(destDir, filename); |
| write(jsonFile, |
| serviceRecordMarshal.toBytes(instance), |
| true); |
| } |
| } |
| } else { |
| // resolve single entry |
| ServiceRecord instance = resolve(path); |
| File outFile = args.out; |
| if (args.destdir != null) { |
| outFile = new File(args.destdir, RegistryPathUtils.lastPathEntry(path)); |
| } |
| if (outFile != null) { |
| write(outFile, serviceRecordMarshal.toBytes(instance), true); |
| } else { |
| println(serviceRecordMarshal.toJson(instance)); |
| } |
| } |
| } catch (PathNotFoundException | NoRecordException e) { |
| // no record at this path |
| throw new NotFoundException(e, path); |
| } |
| return EXIT_SUCCESS; |
| } |
| |
| @Override |
| public int actionRegistry(ActionRegistryArgs registryArgs) throws |
| YarnException, |
| IOException { |
| // as this is also a test entry point, validate |
| // the arguments |
| registryArgs.validate(); |
| try { |
| if (registryArgs.list) { |
| actionRegistryList(registryArgs); |
| } else if (registryArgs.listConf) { |
| // list the configurations |
| actionRegistryListConfigsYarn(registryArgs); |
| } else if (registryArgs.listExports) { |
| // list the exports |
| actionRegistryListExports(registryArgs); |
| } else if (isSet(registryArgs.getConf)) { |
| // get a configuration |
| PublishedConfiguration publishedConfiguration = |
| actionRegistryGetConfig(registryArgs); |
| outputConfig(publishedConfiguration, registryArgs); |
| } else if (isSet(registryArgs.getExport)) { |
| // get a export group |
| PublishedExports publishedExports = |
| actionRegistryGetExport(registryArgs); |
| outputExport(publishedExports, registryArgs); |
| } else { |
| // it's an unknown command |
| log.info(CommonArgs.usage(serviceArgs, ACTION_DIAGNOSTICS)); |
| return EXIT_USAGE; |
| } |
| // JDK7 |
| } catch (FileNotFoundException e) { |
| log.info("{}", e.toString()); |
| log.debug("{}", e, e); |
| return EXIT_NOT_FOUND; |
| } catch (PathNotFoundException e) { |
| log.info("{}", e.toString()); |
| log.debug("{}", e, e); |
| return EXIT_NOT_FOUND; |
| } |
| return EXIT_SUCCESS; |
| } |
| |
| /** |
| * Registry operation |
| * |
| * @param registryArgs registry Arguments |
| * @return the instances (for tests) |
| * @throws YarnException YARN problems |
| * @throws IOException Network or other problems |
| */ |
| @VisibleForTesting |
| public Collection<ServiceRecord> actionRegistryList( |
| ActionRegistryArgs registryArgs) |
| throws YarnException, IOException { |
| String serviceType = registryArgs.serviceType; |
| String name = registryArgs.name; |
| RegistryOperations operations = getRegistryOperations(); |
| Collection<ServiceRecord> serviceRecords; |
| if (StringUtils.isEmpty(name)) { |
| String path = serviceclassPath(currentUser(), serviceType); |
| |
| try { |
| Map<String, ServiceRecord> recordMap = |
| listServiceRecords(operations, path); |
| if (recordMap.isEmpty()) { |
| throw new UnknownApplicationInstanceException( |
| "No applications registered under " + path); |
| } |
| serviceRecords = recordMap.values(); |
| } catch (PathNotFoundException e) { |
| throw new NotFoundException(path, e); |
| } |
| } else { |
| ServiceRecord instance = lookupServiceRecord(registryArgs); |
| serviceRecords = new ArrayList<>(1); |
| serviceRecords.add(instance); |
| } |
| |
| for (ServiceRecord serviceRecord : serviceRecords) { |
| logInstance(serviceRecord, registryArgs.verbose); |
| } |
| return serviceRecords; |
| } |
| |
| @Override |
| public int actionDiagnostic(ActionDiagnosticArgs diagnosticArgs) { |
| try { |
| if (diagnosticArgs.client) { |
| actionDiagnosticClient(diagnosticArgs); |
| } else if (diagnosticArgs.application) { |
| actionDiagnosticApplication(diagnosticArgs); |
| } else if (diagnosticArgs.yarn) { |
| actionDiagnosticYarn(diagnosticArgs); |
| } else if (diagnosticArgs.credentials) { |
| actionDiagnosticCredentials(); |
| } else if (diagnosticArgs.all) { |
| actionDiagnosticAll(diagnosticArgs); |
| } else if (diagnosticArgs.level) { |
| actionDiagnosticIntelligent(diagnosticArgs); |
| } else { |
| // it's an unknown option |
| log.info(CommonArgs.usage(serviceArgs, ACTION_DIAGNOSTICS)); |
| return EXIT_USAGE; |
| } |
| } catch (Exception e) { |
| log.error(e.toString()); |
| return EXIT_FALSE; |
| } |
| return EXIT_SUCCESS; |
| } |
| |
| private void actionDiagnosticIntelligent(ActionDiagnosticArgs diagnosticArgs) |
| throws YarnException, IOException, URISyntaxException { |
| // not using member variable clustername because we want to place |
| // application name after --application option and member variable |
| // cluster name has to be put behind action |
| String clusterName = diagnosticArgs.name; |
| requireArgumentSet(Arguments.ARG_NAME, clusterName); |
| |
| try { |
| validateClientConfigFile(); |
| log.info("Slider-client.xml is accessible"); |
| } catch (IOException e) { |
| // we are catching exceptions here because those are indication of |
| // validation result, and we need to print them here |
| log.error("validation of slider-client.xml fails because: " + e, e); |
| return; |
| } |
| SliderClusterOperations clusterOperations = createClusterOperations(clusterName); |
| // cluster not found exceptions will be thrown upstream |
| ClusterDescription clusterDescription = clusterOperations |
| .getClusterDescription(); |
| log.info("Slider AppMaster is accessible"); |
| |
| if (clusterDescription.state == StateValues.STATE_LIVE) { |
| AggregateConf instanceDefinition = clusterOperations |
| .getInstanceDefinition(); |
| String imagePath = instanceDefinition.getInternalOperations().get( |
| INTERNAL_APPLICATION_IMAGE_PATH); |
| // if null, that means slider uploaded the agent tarball for the user |
| // and we need to use where slider has put |
| if (imagePath == null) { |
| ApplicationReport appReport = findInstance(clusterName); |
| Path path1 = sliderFileSystem.getTempPathForCluster(clusterName); |
| |
| Path subPath = new Path(path1, appReport.getApplicationId().toString() |
| + "/agent"); |
| imagePath = subPath.toString(); |
| } |
| String pathStr = imagePath + "/" + AGENT_TAR; |
| try { |
| validateHDFSFile(sliderFileSystem, pathStr); |
| log.info("Slider agent package is properly installed"); |
| } catch (FileNotFoundException e) { |
| log.error("can not find agent package: {}", pathStr); |
| log.debug("can not find agent package: {}", pathStr, e); |
| return; |
| } catch (IOException e) { |
| log.error("can not open agent package: {}", pathStr, e); |
| return; |
| } |
| String pkgTarballPath = getApplicationDefinitionPath(instanceDefinition |
| .getAppConfOperations()); |
| try { |
| validateHDFSFile(sliderFileSystem, pkgTarballPath); |
| log.info("Application package is properly installed"); |
| } catch (FileNotFoundException e) { |
| log.error("can not find application package: {}", pkgTarballPath, e); |
| } catch (IOException e) { |
| log.error("can not open application package: {} ", pkgTarballPath, e); |
| } |
| } |
| } |
| |
| private void actionDiagnosticAll(ActionDiagnosticArgs diagnosticArgs) |
| throws IOException, YarnException { |
| // assign application name from param to each sub diagnostic function |
| actionDiagnosticClient(diagnosticArgs); |
| actionDiagnosticApplication(diagnosticArgs); |
| actionDiagnosticSlider(diagnosticArgs); |
| actionDiagnosticYarn(diagnosticArgs); |
| actionDiagnosticCredentials(); |
| } |
| |
| private void actionDiagnosticCredentials() throws BadConfigException, |
| IOException { |
| if (isHadoopClusterSecure(loadSliderClientXML())) { |
| String credentialCacheFileDescription = null; |
| try { |
| credentialCacheFileDescription = checkCredentialCacheFile(); |
| } catch (BadConfigException e) { |
| log.error("The credential config is not valid: " + e.toString()); |
| throw e; |
| } catch (IOException e) { |
| log.error("Unable to read the credential file: " + e.toString()); |
| throw e; |
| } |
| log.info("Credential cache file for the current user: " |
| + credentialCacheFileDescription); |
| } else { |
| log.info("the cluster is not in secure mode"); |
| } |
| } |
| |
| private void actionDiagnosticYarn(ActionDiagnosticArgs diagnosticArgs) |
| throws IOException, YarnException { |
| JSONObject converter = null; |
| log.info("the node in the YARN cluster has below state: "); |
| List<NodeReport> yarnClusterInfo; |
| try { |
| yarnClusterInfo = yarnClient.getNodeReports(NodeState.RUNNING); |
| } catch (YarnException e1) { |
| log.error("Exception happened when fetching node report from the YARN cluster: " |
| + e1.toString()); |
| throw e1; |
| } catch (IOException e1) { |
| log.error("Network problem happened when fetching node report YARN cluster: " |
| + e1.toString()); |
| throw e1; |
| } |
| for (NodeReport nodeReport : yarnClusterInfo) { |
| log.info(nodeReport.toString()); |
| } |
| |
| if (diagnosticArgs.verbose) { |
| Writer configWriter = new StringWriter(); |
| try { |
| Configuration.dumpConfiguration(yarnClient.getConfig(), configWriter); |
| } catch (IOException e1) { |
| log.error("Network problem happened when retrieving YARN config from YARN: " |
| + e1.toString()); |
| throw e1; |
| } |
| try { |
| converter = new JSONObject(configWriter.toString()); |
| log.info("the configuration of the YARN cluster is: " |
| + converter.toString(2)); |
| |
| } catch (JSONException e) { |
| log.error("JSONException happened during parsing response from YARN: " |
| + e.toString()); |
| } |
| } |
| } |
| |
| private void actionDiagnosticSlider(ActionDiagnosticArgs diagnosticArgs) |
| throws YarnException, IOException { |
| // not using member variable clustername because we want to place |
| // application name after --application option and member variable |
| // cluster name has to be put behind action |
| String clusterName = diagnosticArgs.name; |
| if(isUnset(clusterName)){ |
| throw new BadCommandArgumentsException("application name must be provided with --name option"); |
| } |
| AggregateConf instanceDefinition = fetchInstanceDefinition(clusterName); |
| String imagePath = instanceDefinition.getInternalOperations().get( |
| INTERNAL_APPLICATION_IMAGE_PATH); |
| // if null, it will be uploaded by Slider and thus at slider's path |
| if (imagePath == null) { |
| ApplicationReport appReport = findInstance(clusterName); |
| Path path1 = sliderFileSystem.getTempPathForCluster(clusterName); |
| Path subPath = new Path(path1, appReport.getApplicationId().toString() |
| + "/agent"); |
| imagePath = subPath.toString(); |
| } |
| log.info("The path of slider agent tarball on HDFS is: " + imagePath); |
| } |
| |
| private AggregateConf fetchInstanceDefinition(String clusterName) |
| throws YarnException, IOException { |
| SliderClusterOperations clusterOperations; |
| AggregateConf instanceDefinition = null; |
| try { |
| clusterOperations = createClusterOperations(clusterName); |
| instanceDefinition = clusterOperations.getInstanceDefinition(); |
| } catch (YarnException | IOException e) { |
| log.error("Failed to retrieve instance definition from YARN: " |
| + e.toString()); |
| throw e; |
| } |
| return instanceDefinition; |
| } |
| |
| private void actionDiagnosticApplication(ActionDiagnosticArgs diagnosticArgs) |
| throws YarnException, IOException { |
| // not using member variable clustername because we want to place |
| // application name after --application option and member variable |
| // cluster name has to be put behind action |
| String clusterName = diagnosticArgs.name; |
| requireArgumentSet(Arguments.ARG_NAME, clusterName); |
| AggregateConf instanceDefinition = fetchInstanceDefinition(clusterName); |
| String clusterDir = instanceDefinition.getAppConfOperations() |
| .getGlobalOptions().get(AgentKeys.APP_ROOT); |
| String pkgTarball = getApplicationDefinitionPath(instanceDefinition.getAppConfOperations()); |
| String runAsUser = instanceDefinition.getAppConfOperations() |
| .getGlobalOptions().get(AgentKeys.RUNAS_USER); |
| |
| log.info("The location of the cluster instance directory in HDFS is: {}", clusterDir); |
| log.info("The name of the application package tarball on HDFS is: {}",pkgTarball); |
| log.info("The runas user of the application in the cluster is: {}",runAsUser); |
| |
| if (diagnosticArgs.verbose) { |
| log.info("App config of the application:\n{}", |
| instanceDefinition.getAppConf().toJson()); |
| log.info("Resource config of the application:\n{}", |
| instanceDefinition.getResources().toJson()); |
| } |
| } |
| |
| private void actionDiagnosticClient(ActionDiagnosticArgs diagnosticArgs) |
| throws SliderException, IOException { |
| try { |
| String currentCommandPath = getCurrentCommandPath(); |
| SliderVersionInfo.loadAndPrintVersionInfo(log); |
| String clientConfigPath = getClientConfigPath(); |
| String jdkInfo = getJDKInfo(); |
| println("The slider command path: %s", currentCommandPath); |
| println("The slider-client.xml used by current running command path: %s", |
| clientConfigPath); |
| println(jdkInfo); |
| |
| // security info |
| Configuration config = getConfig(); |
| if (isHadoopClusterSecure(config)) { |
| println("Hadoop Cluster is secure"); |
| println("Login user is %s", UserGroupInformation.getLoginUser()); |
| println("Current user is %s", UserGroupInformation.getCurrentUser()); |
| |
| } else { |
| println("Hadoop Cluster is insecure"); |
| } |
| |
| // verbose? |
| if (diagnosticArgs.verbose) { |
| // do the environment |
| Map<String, String> env = getSystemEnv(); |
| Set<String> envList = ConfigHelper.sortedConfigKeys(env.entrySet()); |
| StringBuilder builder = new StringBuilder("Environment variables:\n"); |
| for (String key : envList) { |
| builder.append(key).append("=").append(env.get(key)).append("\n"); |
| } |
| println(builder.toString()); |
| |
| // Java properties |
| builder = new StringBuilder("JVM Properties\n"); |
| Map<String, String> props = |
| sortedMap(toMap(System.getProperties())); |
| for (Entry<String, String> entry : props.entrySet()) { |
| builder.append(entry.getKey()).append("=") |
| .append(entry.getValue()).append("\n"); |
| } |
| |
| println(builder.toString()); |
| |
| // then the config |
| println("Slider client configuration:\n" + ConfigHelper.dumpConfigToString(config)); |
| } |
| |
| validateSliderClientEnvironment(log); |
| } catch (SliderException | IOException e) { |
| log.error(e.toString()); |
| throw e; |
| } |
| |
| } |
| |
| /** |
| * Kerberos Diagnostics |
| * @param args CLI arguments |
| * @return exit code |
| * @throws SliderException |
| * @throws IOException |
| */ |
| @SuppressWarnings("IOResourceOpenedButNotSafelyClosed") |
| private int actionKDiag(ActionKDiagArgs args) |
| throws Exception { |
| PrintWriter out = new PrintWriter(System.err); |
| boolean closeStream = false; |
| if (args.out != null) { |
| out = new PrintWriter(new FileOutputStream(args.out)); |
| closeStream = true; |
| } |
| try { |
| KerberosDiags kdiags = new KerberosDiags(getConfig(), |
| out, |
| args.services, |
| args.keytab, |
| args.principal, |
| args.keylen, |
| args.secure); |
| kdiags.execute(); |
| } catch (KerberosDiags.KerberosDiagsFailure e) { |
| log.error(e.toString()); |
| log.debug(e.toString(), e); |
| throw e; |
| } catch (Exception e) { |
| log.error("Kerberos Diagnostics", e); |
| throw e; |
| } finally { |
| if (closeStream) { |
| out.flush(); |
| out.close(); |
| } |
| } |
| return 0; |
| } |
| |
| /** |
| * Log a service record instance |
| * @param instance record |
| * @param verbose verbose logging of all external endpoints |
| */ |
| private void logInstance(ServiceRecord instance, |
| boolean verbose) { |
| if (!verbose) { |
| log.info("{}", instance.get(YarnRegistryAttributes.YARN_ID, "")); |
| } else { |
| log.info("{}: ", instance.get(YarnRegistryAttributes.YARN_ID, "")); |
| logEndpoints(instance); |
| } |
| } |
| |
| /** |
| * Log the external endpoints of a service record |
| * @param instance service record instance |
| */ |
| private void logEndpoints(ServiceRecord instance) { |
| List<Endpoint> endpoints = instance.external; |
| for (Endpoint endpoint : endpoints) { |
| log.info(endpoint.toString()); |
| } |
| } |
| |
| /** |
| * list configs available for an instance |
| * |
| * @param registryArgs registry Arguments |
| * @throws YarnException YARN problems |
| * @throws IOException Network or other problems |
| */ |
| public void actionRegistryListConfigsYarn(ActionRegistryArgs registryArgs) |
| throws YarnException, IOException { |
| |
| ServiceRecord instance = lookupServiceRecord(registryArgs); |
| |
| RegistryRetriever retriever = new RegistryRetriever(getConfig(), instance); |
| PublishedConfigSet configurations = |
| retriever.getConfigurations(!registryArgs.internal); |
| PrintStream out = null; |
| try { |
| if (registryArgs.out != null) { |
| out = new PrintStream(new FileOutputStream(registryArgs.out)); |
| } else { |
| out = System.out; |
| } |
| for (String configName : configurations.keys()) { |
| if (!registryArgs.verbose) { |
| out.println(configName); |
| } else { |
| PublishedConfiguration published = |
| configurations.get(configName); |
| out.printf("%s: %s\n", |
| configName, |
| published.description); |
| } |
| } |
| } finally { |
| if (registryArgs.out != null && out != null) { |
| out.flush(); |
| out.close(); |
| } |
| } |
| } |
| |
| /** |
| * list exports available for an instance |
| * |
| * @param registryArgs registry Arguments |
| * @throws YarnException YARN problems |
| * @throws IOException Network or other problems |
| */ |
| public void actionRegistryListExports(ActionRegistryArgs registryArgs) |
| throws YarnException, IOException { |
| ServiceRecord instance = lookupServiceRecord(registryArgs); |
| |
| RegistryRetriever retriever = new RegistryRetriever(getConfig(), instance); |
| PublishedExportsSet exports = |
| retriever.getExports(!registryArgs.internal); |
| PrintStream out = null; |
| boolean streaming = false; |
| try { |
| if (registryArgs.out != null) { |
| out = new PrintStream(new FileOutputStream(registryArgs.out)); |
| streaming = true; |
| log.debug("Saving output to {}", registryArgs.out); |
| } else { |
| out = System.out; |
| } |
| log.debug("Number of exports: {}", exports.keys().size()); |
| for (String exportName : exports.keys()) { |
| if (streaming) { |
| log.debug(exportName); |
| } |
| if (!registryArgs.verbose) { |
| out.println(exportName); |
| } else { |
| PublishedExports published = exports.get(exportName); |
| out.printf("%s: %s\n", |
| exportName, |
| published.description); |
| } |
| } |
| } finally { |
| if (streaming) { |
| out.flush(); |
| out.close(); |
| } |
| } |
| } |
| |
| /** |
| * list configs available for an instance |
| * |
| * @param registryArgs registry Arguments |
| * @throws YarnException YARN problems |
| * @throws IOException Network or other problems |
| * @throws FileNotFoundException if the config is not found |
| */ |
| @VisibleForTesting |
| public PublishedConfiguration actionRegistryGetConfig(ActionRegistryArgs registryArgs) |
| throws YarnException, IOException { |
| ServiceRecord instance = lookupServiceRecord(registryArgs); |
| |
| RegistryRetriever retriever = new RegistryRetriever(getConfig(), instance); |
| boolean external = !registryArgs.internal; |
| PublishedConfigSet configurations = |
| retriever.getConfigurations(external); |
| |
| PublishedConfiguration published = retriever.retrieveConfiguration(configurations, |
| registryArgs.getConf, |
| external); |
| return published; |
| } |
| |
| /** |
| * get a specific export group |
| * |
| * @param registryArgs registry Arguments |
| * |
| * @throws YarnException YARN problems |
| * @throws IOException Network or other problems |
| * @throws FileNotFoundException if the config is not found |
| */ |
| @VisibleForTesting |
| public PublishedExports actionRegistryGetExport(ActionRegistryArgs registryArgs) |
| throws YarnException, IOException { |
| ServiceRecord instance = lookupServiceRecord(registryArgs); |
| |
| RegistryRetriever retriever = new RegistryRetriever(getConfig(), instance); |
| boolean external = !registryArgs.internal; |
| PublishedExportsSet exports = retriever.getExports(external); |
| |
| PublishedExports published = retriever.retrieveExports(exports, |
| registryArgs.getExport, |
| external); |
| return published; |
| } |
| |
| /** |
| * write out the config. If a destination is provided and that dir is a |
| * directory, the entry is written to it with the name provided + extension, |
| * else it is printed to standard out. |
| * @param published published config |
| * @param registryArgs registry Arguments |
| * @throws BadCommandArgumentsException |
| * @throws IOException |
| */ |
| private void outputConfig(PublishedConfiguration published, |
| ActionRegistryArgs registryArgs) throws |
| BadCommandArgumentsException, |
| IOException { |
| // decide whether or not to print |
| String entry = registryArgs.getConf; |
| String format = registryArgs.format; |
| ConfigFormat configFormat = ConfigFormat.resolve(format); |
| if (configFormat == null) { |
| throw new BadCommandArgumentsException( |
| "Unknown/Unsupported format %s ", format); |
| } |
| PublishedConfigurationOutputter outputter = |
| PublishedConfigurationOutputter.createOutputter(configFormat, |
| published); |
| boolean print = registryArgs.out == null; |
| if (!print) { |
| File outputPath = registryArgs.out; |
| if (outputPath.isDirectory()) { |
| // creating it under a directory |
| outputPath = new File(outputPath, entry + "." + format); |
| } |
| log.debug("Destination path: {}", outputPath); |
| outputter.save(outputPath); |
| } else { |
| print(outputter.asString()); |
| } |
| |
| } |
| |
| /** |
| * write out the config |
| * @param published |
| * @param registryArgs |
| * @throws BadCommandArgumentsException |
| * @throws IOException |
| */ |
| private void outputExport(PublishedExports published, |
| ActionRegistryArgs registryArgs) throws |
| BadCommandArgumentsException, |
| IOException { |
| // decide whether or not to print |
| String entry = registryArgs.getExport; |
| String format = ConfigFormat.JSON.toString(); |
| ConfigFormat configFormat = ConfigFormat.resolve(format); |
| if (configFormat == null || configFormat != ConfigFormat.JSON) { |
| throw new BadCommandArgumentsException( |
| "Unknown/Unsupported format %s . Only JSON is supported.", format); |
| } |
| |
| PublishedExportsOutputter outputter = |
| PublishedExportsOutputter.createOutputter(configFormat, |
| published); |
| boolean print = registryArgs.out == null; |
| if (!print) { |
| File destFile; |
| destFile = registryArgs.out; |
| if (destFile.isDirectory()) { |
| // creating it under a directory |
| destFile = new File(destFile, entry + "." + format); |
| } |
| log.info("Destination path: {}", destFile); |
| outputter.save(destFile); |
| } else { |
| print(outputter.asString()); |
| } |
| } |
| |
| /** |
| * Look up an instance |
| * @return instance data |
| * @throws SliderException other failures |
| * @throws IOException IO problems or wrapped exceptions |
| */ |
| private ServiceRecord lookupServiceRecord(ActionRegistryArgs registryArgs) throws |
| SliderException, |
| IOException { |
| String user; |
| if (StringUtils.isNotEmpty(registryArgs.user)) { |
| user = RegistryPathUtils.encodeForRegistry(registryArgs.user); |
| } else { |
| user = currentUser(); |
| } |
| |
| String path = servicePath(user, registryArgs.serviceType, |
| registryArgs.name); |
| return resolve(path); |
| } |
| |
| /** |
| * Look up a service record of the current user |
| * @param serviceType service type |
| * @param id instance ID |
| * @return instance data |
| * @throws UnknownApplicationInstanceException no path or service record |
| * at the end of the path |
| * @throws SliderException other failures |
| * @throws IOException IO problems or wrapped exceptions |
| */ |
| public ServiceRecord lookupServiceRecord(String serviceType, String id) |
| throws IOException, SliderException { |
| String path = servicePath(currentUser(), serviceType, id); |
| return resolve(path); |
| } |
| |
| /** |
| * |
| * Look up an instance |
| * @param path path |
| * @return instance data |
| * @throws NotFoundException no path/no service record |
| * at the end of the path |
| * @throws SliderException other failures |
| * @throws IOException IO problems or wrapped exceptions |
| */ |
| public ServiceRecord resolve(String path) |
| throws IOException, SliderException { |
| try { |
| return getRegistryOperations().resolve(path); |
| } catch (PathNotFoundException | NoRecordException e) { |
| throw new NotFoundException(e.getPath().toString(), e); |
| } |
| } |
| |
| /** |
| * List instances in the registry for the current user |
| * @return a list of slider registry instances |
| * @throws IOException Any IO problem ... including no path in the registry |
| * to slider service classes for this user |
| * @throws SliderException other failures |
| */ |
| |
| public Map<String, ServiceRecord> listRegistryInstances() |
| throws IOException, SliderException { |
| Map<String, ServiceRecord> recordMap = listServiceRecords( |
| getRegistryOperations(), |
| serviceclassPath(currentUser(), SliderKeys.APP_TYPE)); |
| return recordMap; |
| } |
| |
| /** |
| * List instances in the registry |
| * @return the instance IDs |
| * @throws IOException |
| * @throws YarnException |
| */ |
| public List<String> listRegisteredSliderInstances() throws |
| IOException, |
| YarnException { |
| try { |
| Map<String, ServiceRecord> recordMap = listServiceRecords( |
| getRegistryOperations(), |
| serviceclassPath(currentUser(), SliderKeys.APP_TYPE)); |
| return new ArrayList<>(recordMap.keySet()); |
| } catch (PathNotFoundException e) { |
| log.debug("No registry path for slider instances for current user: {}", e, e); |
| // no entries: return an empty list |
| return new ArrayList<>(0); |
| } catch (IOException | YarnException e) { |
| throw e; |
| } catch (Exception e) { |
| throw new IOException(e); |
| } |
| } |
| |
| /** |
| * Start the registry if it is not there yet |
| * @return the registry service |
| * @throws SliderException |
| * @throws IOException |
| */ |
| private synchronized RegistryOperations maybeStartYarnRegistry() |
| throws SliderException, IOException { |
| |
| if (registryOperations == null) { |
| registryOperations = startRegistryOperationsService(); |
| } |
| return registryOperations; |
| } |
| |
| @Override |
| public RegistryOperations getRegistryOperations() |
| throws SliderException, IOException { |
| return maybeStartYarnRegistry(); |
| } |
| |
| /** |
| * Output to standard out/stderr (implementation specific detail) |
| * @param src source |
| */ |
| @SuppressWarnings("UseOfSystemOutOrSystemErr") |
| private static void print(CharSequence src) { |
| clientOutputStream.append(src); |
| } |
| |
| /** |
| * Output to standard out/stderr with a newline after |
| * @param message message |
| */ |
| private static void println(String message) { |
| print(message); |
| print("\n"); |
| } |
| /** |
| * Output to standard out/stderr with a newline after, formatted |
| * @param message message |
| * @param args arguments for string formatting |
| */ |
| private static void println(String message, Object ... args) { |
| print(String.format(message, args)); |
| print("\n"); |
| } |
| |
| /** |
| * Implement the lookup action. |
| * @param args Action arguments |
| * @return 0 if the entry was found |
| * @throws IOException |
| * @throws YarnException |
| * @throws UnknownApplicationInstanceException if a specific instance |
| * was named but it was not found |
| */ |
| @VisibleForTesting |
| public int actionLookup(ActionLookupArgs args) |
| throws IOException, YarnException { |
| verifyBindingsDefined(); |
| try { |
| ApplicationId id = ConverterUtils.toApplicationId(args.id); |
| ApplicationReport report = yarnClient.getApplicationReport(id); |
| SerializedApplicationReport sar = new SerializedApplicationReport(report); |
| ApplicationReportSerDeser serDeser = new ApplicationReportSerDeser(); |
| if (args.outputFile != null) { |
| serDeser.save(sar, args.outputFile); |
| } else { |
| println(serDeser.toJson(sar)); |
| } |
| } catch (IllegalArgumentException e) { |
| throw new BadCommandArgumentsException(e, "%s : %s", args, e); |
| } catch (ApplicationAttemptNotFoundException | ApplicationNotFoundException notFound) { |
| throw new NotFoundException(notFound, notFound.toString()); |
| } |
| return EXIT_SUCCESS; |
| } |
| |
| @Override |
| public int actionDependency(ActionDependencyArgs args) throws IOException, |
| YarnException { |
| String currentUser = getUsername(); |
| log.info("Running command as user {}", currentUser); |
| |
| String version = getSliderVersion(); |
| Path dependencyLibTarGzip = sliderFileSystem.getDependencyTarGzip(); |
| |
| // Check if dependency has already been uploaded, in which case log |
| // appropriately and exit success (unless overwrite has been requested) |
| if (sliderFileSystem.isFile(dependencyLibTarGzip) && !args.overwrite) { |
| println(String.format( |
| "Dependency libs are already uploaded to %s. Use %s " |
| + "if you want to re-upload", dependencyLibTarGzip.toUri(), |
| Arguments.ARG_OVERWRITE)); |
| return EXIT_SUCCESS; |
| } |
| |
| String libDir = System.getProperty(SliderKeys.PROPERTY_LIB_DIR); |
| if (isSet(libDir)) { |
| File srcFolder = new File(libDir); |
| File tempLibTarGzipFile = File.createTempFile( |
| SliderKeys.SLIDER_DEPENDENCY_TAR_GZ_FILE_NAME + "_", |
| SliderKeys.SLIDER_DEPENDENCY_TAR_GZ_FILE_EXT); |
| // copy all jars except slider-core-<version>.jar |
| tarGzipFolder(srcFolder, tempLibTarGzipFile, createJarFilter()); |
| |
| log.info("Uploading dependency for AM (version {}) from {} to {}", |
| version, tempLibTarGzipFile.toURI(), dependencyLibTarGzip.toUri()); |
| sliderFileSystem.copyLocalFileToHdfs(tempLibTarGzipFile, |
| dependencyLibTarGzip, new FsPermission( |
| SliderKeys.SLIDER_DEPENDENCY_DIR_PERMISSIONS)); |
| return EXIT_SUCCESS; |
| } else { |
| return EXIT_FALSE; |
| } |
| } |
| |
| private int actionHelp(String actionName) throws YarnException, IOException { |
| throw new UsageException(CommonArgs.usage(serviceArgs, actionName)); |
| } |
| |
| private int actionHelp(String errMsg, String actionName) |
| throws YarnException, IOException { |
| throw new UsageException("%s %s", errMsg, CommonArgs.usage(serviceArgs, |
| actionName)); |
| } |
| |
| /** |
| * List the nodes in the cluster, possibly filtering by node state or label. |
| * |
| * @param args argument list |
| * @return a possibly empty list of nodes in the cluster |
| * @throws IOException IO problems |
| * @throws YarnException YARN problems |
| */ |
| @Override |
| public NodeInformationList listYarnClusterNodes(ActionNodesArgs args) |
| throws YarnException, IOException { |
| return yarnClient.listNodes(args.label, args.healthy); |
| } |
| |
| /** |
| * List the nodes in the cluster, possibly filtering by node state or label. |
| * |
| * @param args argument list |
| * @return a possibly empty list of nodes in the cluster |
| * @throws IOException IO problems |
| * @throws YarnException YARN problems |
| */ |
| public NodeInformationList listInstanceNodes(String instance, ActionNodesArgs args) |
| throws YarnException, IOException { |
| // TODO |
| log.info("listInstanceNodes {}", instance); |
| SliderClusterOperations clusterOps = |
| new SliderClusterOperations(bondToCluster(instance)); |
| return clusterOps.getLiveNodes(); |
| } |
| |
| /** |
| * List the nodes in the cluster, possibly filtering by node state or label. |
| * Prints them to stdout unless the args names a file instead. |
| * @param args argument list |
| * @throws IOException IO problems |
| * @throws YarnException YARN problems |
| */ |
| public int actionNodes(String instance, ActionNodesArgs args) throws YarnException, IOException { |
| |
| args.instance = instance; |
| NodeInformationList nodes; |
| if (SliderUtils.isUnset(instance)) { |
| nodes = listYarnClusterNodes(args); |
| } else { |
| nodes = listInstanceNodes(instance, args); |
| } |
| log.debug("Node listing for {} has {} nodes", args, nodes.size()); |
| JsonSerDeser<NodeInformationList> serDeser = NodeInformationList.createSerializer(); |
| if (args.outputFile != null) { |
| serDeser.save(nodes, args.outputFile); |
| } else { |
| println(serDeser.toJson(nodes)); |
| } |
| return 0; |
| } |
| |
| /** |
| * Create a new IPC client for talking to slider via what follows the REST API. |
| * Client must already be bonded to the cluster |
| * @return a new IPC client |
| */ |
| public SliderApplicationApi createIpcClient() |
| throws IOException, YarnException { |
| return new SliderApplicationIpcClient(createClusterOperations()); |
| } |
| |
| /** |
| * Save/list tokens. This is for testing oozie integration |
| * @param args commands |
| * @return status |
| */ |
| private int actionTokens(ActionTokensArgs args) |
| throws IOException, YarnException { |
| return new TokensOperation().actionTokens(args, |
| sliderFileSystem.getFileSystem(), |
| getConfig(), |
| yarnClient); |
| } |
| |
| } |
| |
| |