| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.slider.client; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Preconditions; |
| import com.google.common.io.Files; |
| |
| import org.apache.commons.lang.StringUtils; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.FileUtil; |
| import org.apache.hadoop.fs.LocatedFileStatus; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.PathNotFoundException; |
| import org.apache.hadoop.fs.RemoteIterator; |
| import org.apache.hadoop.fs.permission.FsAction; |
| import org.apache.hadoop.fs.permission.FsPermission; |
| import org.apache.hadoop.hdfs.DFSConfigKeys; |
| import org.apache.hadoop.hdfs.HdfsConfiguration; |
| import org.apache.hadoop.net.NetUtils; |
| import org.apache.hadoop.registry.client.api.RegistryConstants; |
| import org.apache.hadoop.registry.client.binding.RegistryPathUtils; |
| import org.apache.hadoop.registry.client.types.RegistryPathStatus; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.security.alias.CredentialProvider; |
| import org.apache.hadoop.security.alias.CredentialProviderFactory; |
| import org.apache.hadoop.yarn.api.ApplicationConstants; |
| import org.apache.hadoop.yarn.api.records.ApplicationId; |
| import org.apache.hadoop.yarn.api.records.ApplicationReport; |
| import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; |
| import org.apache.hadoop.yarn.api.records.LocalResource; |
| import org.apache.hadoop.yarn.api.records.NodeReport; |
| import org.apache.hadoop.yarn.api.records.NodeState; |
| import org.apache.hadoop.yarn.api.records.YarnApplicationState; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException; |
| import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; |
| import org.apache.hadoop.yarn.exceptions.YarnException; |
| import org.apache.hadoop.registry.client.api.RegistryOperations; |
| |
| import static org.apache.hadoop.registry.client.binding.RegistryUtils.*; |
| |
| import org.apache.hadoop.registry.client.binding.RegistryUtils; |
| import org.apache.hadoop.registry.client.exceptions.NoRecordException; |
| import org.apache.hadoop.registry.client.types.Endpoint; |
| import org.apache.hadoop.registry.client.types.ServiceRecord; |
| import org.apache.hadoop.registry.client.types.yarn.YarnRegistryAttributes; |
| import org.apache.hadoop.yarn.util.ConverterUtils; |
| import org.apache.slider.api.ClusterDescription; |
| import org.apache.slider.api.ClusterNode; |
| import org.apache.slider.api.InternalKeys; |
| import org.apache.slider.api.OptionKeys; |
| import org.apache.slider.api.ResourceKeys; |
| import org.apache.slider.api.SliderClusterProtocol; |
| import org.apache.slider.api.StateValues; |
| import org.apache.slider.api.proto.Messages; |
| import org.apache.slider.api.types.SliderInstanceDescription; |
| import org.apache.slider.client.ipc.SliderClusterOperations; |
| import org.apache.slider.common.Constants; |
| import org.apache.slider.common.SliderExitCodes; |
| import org.apache.slider.common.SliderKeys; |
| import org.apache.slider.common.SliderXmlConfKeys; |
| import org.apache.slider.common.params.AbstractActionArgs; |
| import org.apache.slider.common.params.AbstractClusterBuildingActionArgs; |
| import org.apache.slider.common.params.ActionClientArgs; |
| import org.apache.slider.common.params.ActionDiagnosticArgs; |
| import org.apache.slider.common.params.ActionExistsArgs; |
| import org.apache.slider.common.params.ActionInstallKeytabArgs; |
| import org.apache.slider.common.params.ActionInstallPackageArgs; |
| import org.apache.slider.common.params.ActionPackageArgs; |
| import org.apache.slider.common.params.ActionAMSuicideArgs; |
| import org.apache.slider.common.params.ActionCreateArgs; |
| import org.apache.slider.common.params.ActionEchoArgs; |
| import org.apache.slider.common.params.ActionFlexArgs; |
| import org.apache.slider.common.params.ActionFreezeArgs; |
| import org.apache.slider.common.params.ActionKeytabArgs; |
| import org.apache.slider.common.params.ActionKillContainerArgs; |
| import org.apache.slider.common.params.ActionListArgs; |
| import org.apache.slider.common.params.ActionLookupArgs; |
| import org.apache.slider.common.params.ActionRegistryArgs; |
| import org.apache.slider.common.params.ActionResolveArgs; |
| import org.apache.slider.common.params.ActionStatusArgs; |
| import org.apache.slider.common.params.ActionThawArgs; |
| import org.apache.slider.common.params.Arguments; |
| import org.apache.slider.common.params.ClientArgs; |
| import org.apache.slider.common.params.CommonArgs; |
| import org.apache.slider.common.params.LaunchArgsAccessor; |
| import org.apache.slider.common.tools.ConfigHelper; |
| import org.apache.slider.common.tools.Duration; |
| import org.apache.slider.common.tools.SliderFileSystem; |
| import org.apache.slider.common.tools.SliderUtils; |
| import org.apache.slider.common.tools.SliderVersionInfo; |
| import org.apache.slider.core.build.InstanceBuilder; |
| import org.apache.slider.core.build.InstanceIO; |
| import org.apache.slider.core.conf.AggregateConf; |
| import org.apache.slider.core.conf.ConfTree; |
| import org.apache.slider.core.conf.ConfTreeOperations; |
| import org.apache.slider.core.conf.MapOperations; |
| import org.apache.slider.core.conf.ResourcesInputPropertiesValidator; |
| import org.apache.slider.core.conf.TemplateInputPropertiesValidator; |
| import org.apache.slider.core.exceptions.BadClusterStateException; |
| import org.apache.slider.core.exceptions.BadCommandArgumentsException; |
| import org.apache.slider.core.exceptions.BadConfigException; |
| import org.apache.slider.core.exceptions.ErrorStrings; |
| import org.apache.slider.core.exceptions.NoSuchNodeException; |
| import org.apache.slider.core.exceptions.NotFoundException; |
| import org.apache.slider.core.exceptions.SliderException; |
| import org.apache.slider.core.exceptions.UnknownApplicationInstanceException; |
| import org.apache.slider.core.exceptions.WaitTimeoutException; |
| import org.apache.slider.core.launch.AppMasterLauncher; |
| import org.apache.slider.core.launch.ClasspathConstructor; |
| import org.apache.slider.core.launch.CommandLineBuilder; |
| import org.apache.slider.core.launch.JavaCommandLineBuilder; |
| import org.apache.slider.core.launch.LaunchedApplication; |
| import org.apache.slider.core.launch.RunningApplication; |
| import org.apache.slider.core.launch.SerializedApplicationReport; |
| import org.apache.slider.core.main.RunService; |
| import org.apache.slider.core.persist.ApplicationReportSerDeser; |
| import org.apache.slider.core.persist.ConfPersister; |
| import org.apache.slider.core.persist.LockAcquireFailedException; |
| import org.apache.slider.core.registry.SliderRegistryUtils; |
| import org.apache.slider.core.registry.YarnAppListClient; |
| import org.apache.slider.core.registry.docstore.ConfigFormat; |
| import org.apache.slider.core.registry.docstore.PublishedConfigSet; |
| import org.apache.slider.core.registry.docstore.PublishedConfiguration; |
| import org.apache.slider.core.registry.docstore.PublishedConfigurationOutputter; |
| import org.apache.slider.core.registry.docstore.PublishedExports; |
| import org.apache.slider.core.registry.docstore.PublishedExportsOutputter; |
| import org.apache.slider.core.registry.docstore.PublishedExportsSet; |
| import org.apache.slider.core.registry.retrieve.RegistryRetriever; |
| import org.apache.slider.core.zk.BlockingZKWatcher; |
| import org.apache.slider.core.zk.ZKIntegration; |
| import org.apache.slider.core.zk.ZKPathBuilder; |
| import org.apache.slider.providers.AbstractClientProvider; |
| import org.apache.slider.providers.SliderProviderFactory; |
| import org.apache.slider.providers.agent.AgentKeys; |
| import org.apache.slider.providers.slideram.SliderAMClientProvider; |
| import org.apache.slider.server.appmaster.SliderAppMaster; |
| import org.apache.slider.server.appmaster.rpc.RpcBinder; |
| import org.apache.slider.server.services.utility.AbstractSliderLaunchedService; |
| import org.apache.zookeeper.CreateMode; |
| import org.apache.zookeeper.KeeperException; |
| import org.apache.zookeeper.ZooDefs; |
| import org.codehaus.jettison.json.JSONException; |
| import org.codehaus.jettison.json.JSONObject; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.BufferedReader; |
| import java.io.File; |
| import java.io.FileNotFoundException; |
| import java.io.FileOutputStream; |
| import java.io.IOException; |
| import java.io.InputStreamReader; |
| import java.io.PrintStream; |
| import java.io.StringWriter; |
| import java.io.Writer; |
| import java.net.InetSocketAddress; |
| import java.net.URISyntaxException; |
| import java.nio.charset.Charset; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Locale; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Set; |
| import java.util.regex.Pattern; |
| |
| import static org.apache.slider.common.params.SliderActions.*; |
| |
| /** |
| * Client service for Slider |
| */ |
| |
| public class SliderClient extends AbstractSliderLaunchedService implements RunService, |
| SliderExitCodes, SliderKeys, ErrorStrings, SliderClientAPI { |
| private static final Logger log = LoggerFactory.getLogger(SliderClient.class); |
| |
| // value should not be changed without updating string find in slider.py |
| private static final String PASSWORD_PROMPT = "Enter password for"; |
| |
| private ClientArgs serviceArgs; |
| public ApplicationId applicationId; |
| |
| private String deployedClusterName; |
| /** |
| * Cluster operations against the deployed cluster -will be null |
| * if no bonding has yet taken place |
| */ |
| private SliderClusterOperations sliderClusterOperations; |
| |
| protected SliderFileSystem sliderFileSystem; |
| |
| /** |
| * Yarn client service |
| */ |
| private SliderYarnClientImpl yarnClient; |
| private YarnAppListClient yarnAppListClient; |
| private AggregateConf launchedInstanceDefinition; |
| |
| /** |
| * The YARN registry service |
| */ |
| private RegistryOperations registryOperations; |
| |
| /** |
| * Constructor |
| */ |
| public SliderClient() { |
| super("Slider Client"); |
| new HdfsConfiguration(); |
| new YarnConfiguration(); |
| } |
| |
| /** |
| * This is called <i>Before serviceInit is called</i> |
| * @param config the initial configuration build up by the |
| * service launcher. |
| * @param args argument list list of arguments passed to the command line |
| * after any launcher-specific commands have been stripped. |
| * @return the post-binding configuration to pass to the <code>init()</code> |
| * operation. |
| * @throws Exception |
| */ |
| @Override |
| public Configuration bindArgs(Configuration config, String... args) throws Exception { |
| config = super.bindArgs(config, args); |
| serviceArgs = new ClientArgs(args); |
| serviceArgs.parse(); |
| // add the slider XML config |
| ConfigHelper.injectSliderXMLResource(); |
| // yarn-ify |
| YarnConfiguration yarnConfiguration = new YarnConfiguration(config); |
| return SliderUtils.patchConfiguration(yarnConfiguration); |
| } |
| |
| @Override |
| protected void serviceInit(Configuration conf) throws Exception { |
| Configuration clientConf = SliderUtils.loadSliderClientXML(); |
| ConfigHelper.mergeConfigurations(conf, clientConf, SLIDER_CLIENT_XML, true); |
| serviceArgs.applyDefinitions(conf); |
| serviceArgs.applyFileSystemBinding(conf); |
| // init security with our conf |
| if (SliderUtils.isHadoopClusterSecure(conf)) { |
| SliderUtils.forceLogin(); |
| SliderUtils.initProcessSecurity(conf); |
| } |
| AbstractActionArgs coreAction = serviceArgs.getCoreAction(); |
| if (coreAction.getHadoopServicesRequired()) { |
| initHadoopBinding(); |
| } |
| super.serviceInit(conf); |
| } |
| |
| /** |
| * this is where the work is done. |
| * @return the exit code |
| * @throws Throwable anything that went wrong |
| */ |
| /* JDK7 |
| |
| @Override |
| public int runService() throws Throwable { |
| |
| // choose the action |
| String action = serviceArgs.getAction(); |
| int exitCode = EXIT_SUCCESS; |
| String clusterName = serviceArgs.getClusterName(); |
| // actions |
| switch (action) { |
| case ACTION_BUILD: |
| exitCode = actionBuild(clusterName, serviceArgs.getActionBuildArgs()); |
| break; |
| case ACTION_UPDATE: |
| exitCode = actionUpdate(clusterName, serviceArgs.getActionUpdateArgs()); |
| break; |
| case ACTION_CREATE: |
| exitCode = actionCreate(clusterName, serviceArgs.getActionCreateArgs()); |
| break; |
| case ACTION_FREEZE: |
| exitCode = actionFreeze(clusterName, serviceArgs.getActionFreezeArgs()); |
| break; |
| case ACTION_THAW: |
| exitCode = actionThaw(clusterName, serviceArgs.getActionThawArgs()); |
| break; |
| case ACTION_DESTROY: |
| exitCode = actionDestroy(clusterName); |
| break; |
| case ACTION_EXISTS: |
| exitCode = actionExists(clusterName, |
| serviceArgs.getActionExistsArgs().live); |
| break; |
| case ACTION_FLEX: |
| exitCode = actionFlex(clusterName, serviceArgs.getActionFlexArgs()); |
| break; |
| case ACTION_GETCONF: |
| exitCode = |
| actionGetConf(clusterName, serviceArgs.getActionGetConfArgs()); |
| break; |
| case ACTION_HELP: |
| case ACTION_USAGE: |
| log.info(serviceArgs.usage()); |
| break; |
| case ACTION_KILL_CONTAINER: |
| exitCode = actionKillContainer(clusterName, |
| serviceArgs.getActionKillContainerArgs()); |
| break; |
| case ACTION_AM_SUICIDE: |
| exitCode = actionAmSuicide(clusterName, |
| serviceArgs.getActionAMSuicideArgs()); |
| break; |
| case ACTION_LIST: |
| exitCode = actionList(clusterName, serviceArgs.getActionListArgs()); |
| break; |
| case ACTION_REGISTRY: |
| exitCode = actionRegistry( |
| serviceArgs.getActionRegistryArgs()); |
| break; |
| case ACTION_STATUS: |
| exitCode = actionStatus(clusterName, |
| serviceArgs.getActionStatusArgs()); |
| break; |
| case ACTION_VERSION: |
| exitCode = actionVersion(); |
| break; |
| default: |
| throw new SliderException(EXIT_UNIMPLEMENTED, |
| "Unimplemented: " + action); |
| } |
| |
| return exitCode; |
| } |
| |
| */ |
| |
| /** |
| * Launched service execution. This runs {@link #exec()} |
| * then catches some exceptions and converts them to exit codes |
| * @return an exit code |
| * @throws Throwable |
| */ |
| @Override |
| public int runService() throws Throwable { |
| try { |
| return exec(); |
| } catch (FileNotFoundException nfe) { |
| throw new NotFoundException(nfe, nfe.toString()); |
| } catch (PathNotFoundException nfe) { |
| throw new NotFoundException(nfe, nfe.toString()); |
| } |
| } |
| |
| /** |
| * Execute the command line |
| * @return an exit code |
| * @throws Throwable on a failure |
| */ |
| public int exec() throws Throwable { |
| |
| // choose the action |
| String action = serviceArgs.getAction(); |
| |
| int exitCode = EXIT_SUCCESS; |
| String clusterName = serviceArgs.getClusterName(); |
| // actions |
| if (ACTION_PACKAGE.equals(action)) { |
| exitCode = actionPackage(serviceArgs.getActionPackageArgs()); |
| } else if (ACTION_CLIENT.equals(action)) { |
| exitCode = actionClient(serviceArgs.getActionClientArgs()); |
| } else if (ACTION_INSTALL_PACKAGE.equals(action)) { |
| exitCode = actionInstallPkg(serviceArgs.getActionInstallPackageArgs()); |
| } else if (ACTION_INSTALL_KEYTAB.equals(action)) { |
| exitCode = actionInstallKeytab(serviceArgs.getActionInstallKeytabArgs()); |
| } else if (ACTION_KEYTAB.equals(action)) { |
| exitCode = actionKeytab(serviceArgs.getActionKeytabArgs()); |
| } else if (ACTION_BUILD.equals(action)) { |
| exitCode = actionBuild(clusterName, serviceArgs.getActionBuildArgs()); |
| } else if (ACTION_CREATE.equals(action)) { |
| exitCode = actionCreate(clusterName, serviceArgs.getActionCreateArgs()); |
| } else if (ACTION_FREEZE.equals(action)) { |
| exitCode = actionFreeze(clusterName, |
| serviceArgs.getActionFreezeArgs()); |
| } else if (ACTION_THAW.equals(action)) { |
| exitCode = actionThaw(clusterName, serviceArgs.getActionThawArgs()); |
| } else if (ACTION_DESTROY.equals(action)) { |
| exitCode = actionDestroy(clusterName); |
| } else if (ACTION_DIAGNOSTICS.equals(action)) { |
| exitCode = actionDiagnostic(serviceArgs.getActionDiagnosticArgs()); |
| } else if (ACTION_EXISTS.equals(action)) { |
| exitCode = actionExists(clusterName, |
| serviceArgs.getActionExistsArgs()); |
| } else if (ACTION_FLEX.equals(action)) { |
| exitCode = actionFlex(clusterName, serviceArgs.getActionFlexArgs()); |
| } else if (ACTION_HELP.equals(action)) { |
| log.info(serviceArgs.usage()); |
| } else if (ACTION_KILL_CONTAINER.equals(action)) { |
| exitCode = actionKillContainer(clusterName, |
| serviceArgs.getActionKillContainerArgs()); |
| } else if (ACTION_AM_SUICIDE.equals(action)) { |
| exitCode = actionAmSuicide(clusterName, |
| serviceArgs.getActionAMSuicideArgs()); |
| } else if (ACTION_LIST.equals(action)) { |
| exitCode = actionList(clusterName, serviceArgs.getActionListArgs()); |
| } else if (ACTION_LOOKUP.equals(action)) { |
| exitCode = actionLookup(serviceArgs.getActionLookupArgs()); |
| } else if (ACTION_REGISTRY.equals(action)) { |
| exitCode = actionRegistry(serviceArgs.getActionRegistryArgs()); |
| } else if (ACTION_RESOLVE.equals(action)) { |
| exitCode = actionResolve(serviceArgs.getActionResolveArgs()); |
| } else if (ACTION_STATUS.equals(action)) { |
| exitCode = actionStatus(clusterName, |
| serviceArgs.getActionStatusArgs()); |
| } else if (ACTION_UPDATE.equals(action)) { |
| exitCode = actionUpdate(clusterName, serviceArgs.getActionUpdateArgs()); |
| } else if (ACTION_VERSION.equals(action)) { |
| exitCode = actionVersion(); |
| } else if (SliderUtils.isUnset(action)) { |
| throw new SliderException(EXIT_USAGE, |
| serviceArgs.usage()); |
| } else { |
| throw new SliderException(EXIT_UNIMPLEMENTED, |
| "Unimplemented: " + action); |
| } |
| |
| return exitCode; |
| } |
| |
| /** |
| * Perform everything needed to init the hadoop binding. |
| * This assumes that the service is already in inited or started state |
| * @throws IOException |
| * @throws SliderException |
| */ |
| protected void initHadoopBinding() throws IOException, SliderException { |
| // validate the client |
| SliderUtils.validateSliderClientEnvironment(null); |
| //create the YARN client |
| yarnClient = new SliderYarnClientImpl(); |
| yarnClient.init(getConfig()); |
| if (getServiceState() == STATE.STARTED) { |
| yarnClient.start(); |
| } |
| addService(yarnClient); |
| yarnAppListClient = |
| new YarnAppListClient(yarnClient, getUsername(), getConfig()); |
| // create the filesystem |
| sliderFileSystem = new SliderFileSystem(getConfig()); |
| } |
| |
| /** |
| * Delete the zookeeper node associated with the calling user and the cluster |
| * TODO: YARN registry operations |
| **/ |
| @VisibleForTesting |
| public boolean deleteZookeeperNode(String clusterName) throws YarnException, IOException { |
| String user = getUsername(); |
| String zkPath = ZKIntegration.mkClusterPath(user, clusterName); |
| Exception e = null; |
| try { |
| Configuration config = getConfig(); |
| ZKIntegration client = getZkClient(clusterName, user); |
| if (client != null) { |
| if (client.exists(zkPath)) { |
| log.info("Deleting zookeeper path {}", zkPath); |
| } |
| client.deleteRecursive(zkPath); |
| return true; |
| } |
| } catch (InterruptedException ignored) { |
| e = ignored; |
| } catch (KeeperException ignored) { |
| e = ignored; |
| } catch (BadConfigException ignored) { |
| e = ignored; |
| } |
| if (e != null) { |
| log.debug("Unable to recursively delete zk node {}", zkPath); |
| log.debug("Reason: ", e); |
| } |
| |
| return false; |
| } |
| |
| /** |
| * Create the zookeeper node associated with the calling user and the cluster |
| */ |
| @VisibleForTesting |
| public String createZookeeperNode(String clusterName, Boolean nameOnly) throws YarnException, IOException { |
| String user = getUsername(); |
| String zkPath = ZKIntegration.mkClusterPath(user, clusterName); |
| if (nameOnly) { |
| return zkPath; |
| } |
| Configuration config = getConfig(); |
| ZKIntegration client = getZkClient(clusterName, user); |
| if (client != null) { |
| try { |
| client.createPath(zkPath, "", ZooDefs.Ids.OPEN_ACL_UNSAFE, |
| CreateMode.PERSISTENT); |
| return zkPath; |
| } catch (InterruptedException e) { |
| log.warn("Unable to create default zk node {}", zkPath, e); |
| } catch (KeeperException e) { |
| log.warn("Unable to create default zk node {}", zkPath, e); |
| } |
| } |
| |
| return null; |
| } |
| |
| /** |
| * Gets a zookeeper client, returns null if it cannot connect to zookeeper |
| **/ |
| protected ZKIntegration getZkClient(String clusterName, String user) throws YarnException { |
| String registryQuorum = lookupZKQuorum(); |
| ZKIntegration client = null; |
| try { |
| BlockingZKWatcher watcher = new BlockingZKWatcher(); |
| client = ZKIntegration.newInstance(registryQuorum, user, clusterName, true, false, watcher); |
| client.init(); |
| watcher.waitForZKConnection(2 * 1000); |
| } catch (InterruptedException e) { |
| client = null; |
| log.warn("Unable to connect to zookeeper quorum {}", registryQuorum, e); |
| } catch (IOException e) { |
| log.warn("Unable to connect to zookeeper quorum {}", registryQuorum, e); |
| } |
| return client; |
| } |
| |
| @Override |
| public int actionDestroy(String clustername) throws YarnException, |
| IOException { |
| // verify that a live cluster isn't there |
| SliderUtils.validateClusterName(clustername); |
| //no=op, it is now mandatory. |
| verifyBindingsDefined(); |
| verifyNoLiveClusters(clustername, "Destroy"); |
| |
| // create the directory path |
| Path clusterDirectory = sliderFileSystem.buildClusterDirPath(clustername); |
| // delete the directory; |
| FileSystem fs = sliderFileSystem.getFileSystem(); |
| boolean exists = fs.exists(clusterDirectory); |
| if (exists) { |
| log.debug("Application Instance {} found at {}: destroying", clustername, clusterDirectory); |
| boolean deleted = |
| fs.delete(clusterDirectory, true); |
| if (!deleted) { |
| log.warn("Filesystem returned false from delete() operation"); |
| } |
| |
| if(!deleteZookeeperNode(clustername)) { |
| log.warn("Unable to perform node cleanup in Zookeeper."); |
| } |
| |
| if (fs.exists(clusterDirectory)) { |
| log.warn("Failed to delete {}", clusterDirectory); |
| } |
| |
| } else { |
| log.debug("Application Instance {} already destroyed", clustername); |
| } |
| |
| // rm the registry entry —do not let this block the destroy operations |
| String registryPath = SliderRegistryUtils.registryPathForInstance( |
| clustername); |
| try { |
| getRegistryOperations().delete(registryPath, true); |
| } catch (IOException e) { |
| log.warn("Error deleting registry entry {}: {} ", registryPath, e, e); |
| } catch (SliderException e) { |
| log.warn("Error binding to registry {} ", e, e); |
| } |
| |
| List<ApplicationReport> instances = findAllLiveInstances(clustername); |
| // detect any race leading to cluster creation during the check/destroy process |
| // and report a problem. |
| if (!instances.isEmpty()) { |
| throw new SliderException(EXIT_APPLICATION_IN_USE, |
| clustername + ": " |
| + E_DESTROY_CREATE_RACE_CONDITION |
| + " :" + |
| instances.get(0)); |
| } |
| log.info("Destroyed cluster {}", clustername); |
| return EXIT_SUCCESS; |
| } |
| |
| @Override |
| public int actionAmSuicide(String clustername, |
| ActionAMSuicideArgs args) throws YarnException, IOException { |
| SliderClusterOperations clusterOperations = |
| createClusterOperations(clustername); |
| clusterOperations.amSuicide(args.message, args.exitcode, args.waittime); |
| return EXIT_SUCCESS; |
| } |
| |
| @Override |
| public AbstractClientProvider createClientProvider(String provider) |
| throws SliderException { |
| SliderProviderFactory factory = |
| SliderProviderFactory.createSliderProviderFactory(provider); |
| return factory.createClientProvider(); |
| } |
| |
| /** |
| * Create the cluster -saving the arguments to a specification file first |
| * @param clustername cluster name |
| * @return the status code |
| * @throws YarnException Yarn problems |
| * @throws IOException other problems |
| * @throws BadCommandArgumentsException bad arguments. |
| */ |
| public int actionCreate(String clustername, ActionCreateArgs createArgs) throws |
| YarnException, |
| IOException { |
| |
| actionBuild(clustername, createArgs); |
| Path clusterDirectory = sliderFileSystem.buildClusterDirPath(clustername); |
| AggregateConf instanceDefinition = loadInstanceDefinitionUnresolved( |
| clustername, clusterDirectory); |
| try { |
| checkForCredentials(getConfig(), instanceDefinition.getAppConf()); |
| } catch (IOException e) { |
| sliderFileSystem.getFileSystem().delete(clusterDirectory, true); |
| throw e; |
| } |
| return startCluster(clustername, createArgs); |
| } |
| |
| private void checkForCredentials(Configuration conf, |
| ConfTree tree) throws IOException { |
| if (tree.credentials == null || tree.credentials.size()==0) { |
| log.info("No credentials requested"); |
| return; |
| } |
| |
| BufferedReader br = null; |
| try { |
| for (Entry<String, List<String>> cred : tree.credentials.entrySet()) { |
| String provider = cred.getKey(); |
| List<String> aliases = cred.getValue(); |
| if (aliases == null || aliases.size() == 0) { |
| continue; |
| } |
| Configuration c = new Configuration(conf); |
| c.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH, provider); |
| CredentialProvider credentialProvider = |
| CredentialProviderFactory.getProviders(c).get(0); |
| Set<String> existingAliases = |
| new HashSet<String>(credentialProvider.getAliases()); |
| for (String alias : aliases) { |
| if (existingAliases.contains(alias.toLowerCase(Locale.ENGLISH))) { |
| log.info("Credentials for " + alias + " found in " + provider); |
| } else { |
| if (br == null) { |
| br = new BufferedReader(new InputStreamReader(System.in)); |
| } |
| char[] pass = readPassword(alias, br); |
| if (pass == null) |
| throw new IOException("Could not read credentials for " + alias + |
| " from stdin"); |
| credentialProvider.createCredentialEntry(alias, pass); |
| credentialProvider.flush(); |
| Arrays.fill(pass, ' '); |
| } |
| } |
| } |
| } finally { |
| if (br != null) { |
| br.close(); |
| } |
| } |
| } |
| |
| // using a normal reader instead of a secure one, |
| // because stdin is not hooked up to the command line |
| private char[] readPassword(String alias, BufferedReader br) |
| throws IOException { |
| char[] cred = null; |
| |
| boolean noMatch; |
| do { |
| log.info(String.format("%s %s: ", PASSWORD_PROMPT, alias)); |
| char[] newPassword1 = br.readLine().toCharArray(); |
| log.info(String.format("%s %s again: ", PASSWORD_PROMPT, alias)); |
| char[] newPassword2 = br.readLine().toCharArray(); |
| noMatch = !Arrays.equals(newPassword1, newPassword2); |
| if (noMatch) { |
| if (newPassword1 != null) Arrays.fill(newPassword1, ' '); |
| log.info(String.format("Passwords don't match. Try again.")); |
| } else { |
| cred = newPassword1; |
| } |
| if (newPassword2 != null) Arrays.fill(newPassword2, ' '); |
| } while (noMatch); |
| return cred; |
| } |
| |
| @Override |
| public int actionBuild(String clustername, |
| AbstractClusterBuildingActionArgs buildInfo) throws |
| YarnException, |
| IOException { |
| |
| buildInstanceDefinition(clustername, buildInfo, false, false); |
| return EXIT_SUCCESS; |
| } |
| |
| @Override |
| public int actionKeytab(ActionKeytabArgs keytabInfo) |
| throws YarnException, IOException { |
| if (keytabInfo.install) { |
| return actionInstallKeytab(keytabInfo); |
| } else if (keytabInfo.delete) { |
| return actionDeleteKeytab(keytabInfo); |
| } else if (keytabInfo.list) { |
| return actionListKeytab(keytabInfo); |
| } else { |
| throw new BadCommandArgumentsException( |
| "Keytab option specified not found.\n" |
| + CommonArgs.usage(serviceArgs, ACTION_KEYTAB)); |
| } |
| } |
| |
| private int actionListKeytab(ActionKeytabArgs keytabInfo) throws IOException { |
| String folder = keytabInfo.folder != null ? keytabInfo.folder : StringUtils.EMPTY; |
| Path keytabPath = sliderFileSystem.buildKeytabInstallationDirPath(folder); |
| RemoteIterator<LocatedFileStatus> files = |
| sliderFileSystem.getFileSystem().listFiles(keytabPath, true); |
| log.info("Keytabs:"); |
| while (files.hasNext()) { |
| log.info("\t" + files.next().getPath().toString()); |
| } |
| |
| return EXIT_SUCCESS; |
| } |
| |
| private int actionDeleteKeytab(ActionKeytabArgs keytabInfo) |
| throws BadCommandArgumentsException, IOException { |
| if (StringUtils.isEmpty(keytabInfo.folder)) { |
| throw new BadCommandArgumentsException( |
| "A valid destination keytab sub-folder name is required (e.g. 'security').\n" |
| + CommonArgs.usage(serviceArgs, ACTION_KEYTAB)); |
| } |
| |
| if (StringUtils.isEmpty(keytabInfo.keytab)) { |
| throw new BadCommandArgumentsException("A keytab name is required."); |
| } |
| |
| Path pkgPath = sliderFileSystem.buildKeytabInstallationDirPath(keytabInfo.folder); |
| |
| Path fileInFs = new Path(pkgPath, keytabInfo.keytab ); |
| log.info("Deleting keytab {}", fileInFs); |
| if (!sliderFileSystem.getFileSystem().exists(fileInFs)) { |
| throw new BadCommandArgumentsException("No keytab to delete found at " + |
| fileInFs.toUri().toString()); |
| } |
| |
| sliderFileSystem.getFileSystem().delete(fileInFs, false); |
| |
| return EXIT_SUCCESS; |
| } |
| |
| private int actionInstallKeytab(ActionKeytabArgs keytabInfo) |
| throws BadCommandArgumentsException, IOException { |
| Path srcFile = null; |
| if (StringUtils.isEmpty(keytabInfo.folder)) { |
| throw new BadCommandArgumentsException( |
| "A valid destination keytab sub-folder name is required (e.g. 'security').\n" |
| + CommonArgs.usage(serviceArgs, ACTION_KEYTAB)); |
| } |
| |
| if (StringUtils.isEmpty(keytabInfo.keytab)) { |
| throw new BadCommandArgumentsException("A valid local keytab location is required."); |
| } else { |
| File keytabFile = new File(keytabInfo.keytab); |
| if (!keytabFile.exists() || keytabFile.isDirectory()) { |
| throw new BadCommandArgumentsException("Unable to access supplied keytab file at " + |
| keytabFile.getAbsolutePath()); |
| } else { |
| srcFile = new Path(keytabFile.toURI()); |
| } |
| } |
| |
| Path pkgPath = sliderFileSystem.buildKeytabInstallationDirPath(keytabInfo.folder); |
| sliderFileSystem.getFileSystem().mkdirs(pkgPath); |
| sliderFileSystem.getFileSystem().setPermission(pkgPath, new FsPermission( |
| FsAction.ALL, FsAction.NONE, FsAction.NONE)); |
| |
| Path fileInFs = new Path(pkgPath, srcFile.getName()); |
| log.info("Installing keytab {} at {} and overwrite is {}.", srcFile, fileInFs, keytabInfo.overwrite); |
| if (sliderFileSystem.getFileSystem().exists(fileInFs) && !keytabInfo.overwrite) { |
| throw new BadCommandArgumentsException("Keytab exists at " + |
| fileInFs.toUri().toString() + |
| ". Use --overwrite to overwrite."); |
| } |
| |
| sliderFileSystem.getFileSystem().copyFromLocalFile(false, keytabInfo.overwrite, srcFile, fileInFs); |
| sliderFileSystem.getFileSystem().setPermission(fileInFs, new FsPermission( |
| FsAction.READ_WRITE, FsAction.NONE, FsAction.NONE)); |
| |
| return EXIT_SUCCESS; |
| } |
| |
| @Override |
| public int actionInstallKeytab(ActionInstallKeytabArgs installKeytabInfo) |
| throws YarnException, IOException { |
| log.warn("The 'install-keytab' option has been deprecated. Please use 'keytab --install'."); |
| return actionKeytab(new ActionKeytabArgs(installKeytabInfo)); |
| } |
| |
| @Override |
| public int actionInstallPkg(ActionInstallPackageArgs installPkgInfo) throws |
| YarnException, |
| IOException { |
| |
| Path srcFile = null; |
| if (StringUtils.isEmpty(installPkgInfo.name)) { |
| throw new BadCommandArgumentsException( |
| "A valid application type name is required (e.g. HBASE).\n" |
| + CommonArgs.usage(serviceArgs, ACTION_INSTALL_PACKAGE)); |
| } |
| |
| if (StringUtils.isEmpty(installPkgInfo.packageURI)) { |
| throw new BadCommandArgumentsException("A valid application package location required."); |
| } else { |
| File pkgFile = new File(installPkgInfo.packageURI); |
| if (!pkgFile.exists() || pkgFile.isDirectory()) { |
| throw new BadCommandArgumentsException("Unable to access supplied pkg file at " + |
| pkgFile.getAbsolutePath()); |
| } else { |
| srcFile = new Path(pkgFile.toURI()); |
| } |
| } |
| |
| Path pkgPath = sliderFileSystem.buildPackageDirPath(installPkgInfo.name); |
| sliderFileSystem.getFileSystem().mkdirs(pkgPath); |
| |
| Path fileInFs = new Path(pkgPath, srcFile.getName()); |
| log.info("Installing package {} at {} and overwrite is {}.", srcFile, fileInFs, installPkgInfo.replacePkg); |
| if (sliderFileSystem.getFileSystem().exists(fileInFs) && !installPkgInfo.replacePkg) { |
| throw new BadCommandArgumentsException("Pkg exists at " + |
| fileInFs.toUri().toString() + |
| ". Use --replacepkg to overwrite."); |
| } |
| |
| sliderFileSystem.getFileSystem().copyFromLocalFile(false, installPkgInfo.replacePkg, srcFile, fileInFs); |
| return EXIT_SUCCESS; |
| } |
| |
| @Override |
| public int actionClient(ActionClientArgs clientInfo) throws |
| SliderException, |
| IOException { |
| |
| if(!clientInfo.install) { |
| throw new BadCommandArgumentsException( |
| "Only install command is supported for the client.\n" |
| + CommonArgs.usage(serviceArgs, ACTION_CLIENT)); |
| } |
| |
| if (clientInfo.installLocation == null) { |
| throw new BadCommandArgumentsException( |
| "A valid install location must be provided for the client.\n" |
| + CommonArgs.usage(serviceArgs, ACTION_CLIENT)); |
| } else { |
| if (!clientInfo.installLocation.exists()) { |
| throw new BadCommandArgumentsException("Install path does not exist at " + |
| clientInfo.installLocation.getAbsolutePath()); |
| } |
| if (!clientInfo.installLocation.isDirectory()) { |
| throw new BadCommandArgumentsException("Install path is not a valid directory " + |
| clientInfo.installLocation.getAbsolutePath()); |
| } |
| } |
| |
| File pkgFile; |
| if (StringUtils.isEmpty(clientInfo.packageURI)) { |
| throw new BadCommandArgumentsException("A valid application package location required."); |
| } else { |
| pkgFile = new File(clientInfo.packageURI); |
| if (!pkgFile.exists() || pkgFile.isDirectory()) { |
| throw new BadCommandArgumentsException("Unable to access supplied pkg file at " + |
| pkgFile.getAbsolutePath()); |
| } |
| } |
| |
| JSONObject config = null; |
| if(clientInfo.clientConfig != null) { |
| try { |
| byte[] encoded = Files.toByteArray(clientInfo.clientConfig); |
| config = new JSONObject(new String(encoded, Charset.defaultCharset())); |
| }catch(JSONException jsonEx) { |
| log.error("Unable to read supplied config", jsonEx); |
| throw new SliderException("Invalid configuration. Must be a valid json file.", jsonEx); |
| } |
| } |
| |
| // Only INSTALL is supported |
| AbstractClientProvider provider = createClientProvider(SliderProviderFactory.DEFAULT_CLUSTER_TYPE); |
| provider.processClientOperation(sliderFileSystem, |
| "INSTALL", |
| clientInfo.installLocation, |
| pkgFile, |
| config); |
| return EXIT_SUCCESS; |
| } |
| |
| |
| @Override |
| public int actionPackage(ActionPackageArgs actionPackageInfo) |
| throws YarnException, IOException { |
| if (actionPackageInfo.install) { |
| return actionPackageInstall(actionPackageInfo); |
| } |
| if (actionPackageInfo.delete) { |
| return actionPackageDelete(actionPackageInfo); |
| } |
| if (actionPackageInfo.list) { |
| return actionPackageList(); |
| } |
| if (actionPackageInfo.instances) { |
| return actionPackageInstances(); |
| } |
| throw new BadCommandArgumentsException( |
| "Select valid package operation option"); |
| } |
| |
| private int actionPackageInstances() throws YarnException, IOException { |
| Map<String, Path> persistentInstances = sliderFileSystem |
| .listPersistentInstances(); |
| if(persistentInstances.isEmpty()) { |
| log.info("No slider cluster specification available"); |
| return EXIT_SUCCESS; |
| } |
| String pkgPathValue = sliderFileSystem |
| .buildPackageDirPath(StringUtils.EMPTY).toUri().getPath(); |
| FileSystem fs = sliderFileSystem.getFileSystem(); |
| Iterator<Map.Entry<String, Path>> instanceItr = persistentInstances |
| .entrySet().iterator(); |
| log.info("List of application with its package name and path"); |
| while(instanceItr.hasNext()) { |
| Map.Entry<String, Path> entry = instanceItr.next(); |
| String clusterName = entry.getKey(); |
| Path clusterPath = entry.getValue(); |
| AggregateConf instanceDefinition = loadInstanceDefinitionUnresolved( |
| clusterName, clusterPath); |
| Path appDefPath = new Path(instanceDefinition.getAppConfOperations() |
| .getGlobalOptions().getMandatoryOption(AgentKeys.APP_DEF)); |
| try { |
| if (appDefPath.toString().contains(pkgPathValue) |
| && fs.isFile(appDefPath)) { |
| String packageName = appDefPath.getParent().getName(); |
| println("\t" + clusterName + "\t" + packageName + "\t" |
| + appDefPath.toString()); |
| } |
| } catch(IOException e) { |
| if(log.isDebugEnabled()) { |
| log.debug(clusterName + " application definition path " |
| + appDefPath.toString() + " is not found."); |
| } |
| } |
| } |
| return EXIT_SUCCESS; |
| } |
| |
| private int actionPackageList() throws IOException { |
| Path pkgPath = sliderFileSystem.buildPackageDirPath(StringUtils.EMPTY); |
| log.info("Package install path : " + pkgPath); |
| if (!sliderFileSystem.getFileSystem().isDirectory(pkgPath)) { |
| log.info("No package(s) installed"); |
| return EXIT_SUCCESS; |
| } |
| FileStatus[] fileStatus = sliderFileSystem.getFileSystem().listStatus( |
| pkgPath); |
| boolean hasPackage = false; |
| StringBuilder sb = new StringBuilder(); |
| sb.append("List of installed packages:\n"); |
| for (FileStatus fstat : fileStatus) { |
| if (fstat.isDirectory()) { |
| sb.append("\t" + fstat.getPath().getName()); |
| sb.append("\n"); |
| hasPackage = true; |
| } |
| } |
| if (hasPackage) { |
| println(sb.toString()); |
| } else { |
| log.info("No package(s) installed"); |
| } |
| return EXIT_SUCCESS; |
| } |
| |
| private int actionPackageInstall(ActionPackageArgs actionPackageArgs) throws |
| YarnException, |
| IOException { |
| |
| Path srcFile = null; |
| if (StringUtils.isEmpty(actionPackageArgs.name)) { |
| throw new BadCommandArgumentsException( |
| "A valid application type name is required (e.g. HBASE).\n" |
| + CommonArgs.usage(serviceArgs, ACTION_PACKAGE)); |
| } |
| |
| if (StringUtils.isEmpty(actionPackageArgs.packageURI)) { |
| throw new BadCommandArgumentsException( |
| "A valid application package location required."); |
| } else { |
| File pkgFile = new File(actionPackageArgs.packageURI); |
| if (!pkgFile.exists() || pkgFile.isDirectory()) { |
| throw new BadCommandArgumentsException( |
| "Unable to access supplied pkg file at " |
| + pkgFile.getAbsolutePath()); |
| } else { |
| srcFile = new Path(pkgFile.toURI()); |
| } |
| } |
| |
| Path pkgPath = sliderFileSystem.buildPackageDirPath(actionPackageArgs.name); |
| sliderFileSystem.getFileSystem().mkdirs(pkgPath); |
| |
| Path fileInFs = new Path(pkgPath, srcFile.getName()); |
| log.info("Installing package {} at {} and overwrite is {}.", srcFile, |
| fileInFs, actionPackageArgs.replacePkg); |
| if (sliderFileSystem.getFileSystem().exists(fileInFs) |
| && !actionPackageArgs.replacePkg) { |
| throw new BadCommandArgumentsException("Pkg exists at " + |
| fileInFs.toUri().toString() + |
| ". Use --replacepkg to overwrite."); |
| } |
| |
| sliderFileSystem.getFileSystem().copyFromLocalFile(false, |
| actionPackageArgs.replacePkg, srcFile, fileInFs); |
| return EXIT_SUCCESS; |
| } |
| |
| private int actionPackageDelete(ActionPackageArgs actionPackageArgs) throws |
| YarnException, IOException { |
| if (StringUtils.isEmpty(actionPackageArgs.name)) { |
| throw new BadCommandArgumentsException( |
| "A valid application type name is required (e.g. HBASE).\n" |
| + CommonArgs.usage(serviceArgs, ACTION_PACKAGE)); |
| } |
| |
| Path pkgPath = sliderFileSystem.buildPackageDirPath(actionPackageArgs.name); |
| if (!sliderFileSystem.getFileSystem().exists(pkgPath)) { |
| throw new BadCommandArgumentsException("Package does not exists at " |
| + pkgPath.toUri().toString()); |
| } |
| log.info("Deleting package {} at {}.", actionPackageArgs.name, pkgPath); |
| |
| if(sliderFileSystem.getFileSystem().delete(pkgPath, true)) { |
| log.info("Deleted package {} " + actionPackageArgs.name); |
| return EXIT_SUCCESS; |
| } else { |
| log.warn("Package deletion failed."); |
| return EXIT_NOT_FOUND; |
| } |
| } |
| |
| @Override |
| public int actionUpdate(String clustername, |
| AbstractClusterBuildingActionArgs buildInfo) throws |
| YarnException, IOException { |
| buildInstanceDefinition(clustername, buildInfo, true, true); |
| return EXIT_SUCCESS; |
| } |
| |
| /** |
| * Build up the AggregateConfiguration for an application instance then |
| * persists it |
| * @param clustername name of the cluster |
| * @param buildInfo the arguments needed to build the cluster |
| * @param overwrite true if existing cluster directory can be overwritten |
| * @param liveClusterAllowed true if live cluster can be modified |
| * @throws YarnException |
| * @throws IOException |
| */ |
| |
| public void buildInstanceDefinition(String clustername, |
| AbstractClusterBuildingActionArgs buildInfo, boolean overwrite, boolean liveClusterAllowed) |
| throws YarnException, IOException { |
| // verify that a live cluster isn't there |
| SliderUtils.validateClusterName(clustername); |
| verifyBindingsDefined(); |
| if (!liveClusterAllowed) { |
| verifyNoLiveClusters(clustername, "Create"); |
| } |
| |
| Configuration conf = getConfig(); |
| String registryQuorum = lookupZKQuorum(); |
| |
| Path appconfdir = buildInfo.getConfdir(); |
| // Provider |
| String providerName = buildInfo.getProvider(); |
| requireArgumentSet(Arguments.ARG_PROVIDER, providerName); |
| log.debug("Provider is {}", providerName); |
| SliderAMClientProvider sliderAM = new SliderAMClientProvider(conf); |
| AbstractClientProvider provider = |
| createClientProvider(providerName); |
| InstanceBuilder builder = |
| new InstanceBuilder(sliderFileSystem, |
| getConfig(), |
| clustername); |
| |
| AggregateConf instanceDefinition = new AggregateConf(); |
| ConfTreeOperations appConf = instanceDefinition.getAppConfOperations(); |
| ConfTreeOperations resources = instanceDefinition.getResourceOperations(); |
| ConfTreeOperations internal = instanceDefinition.getInternalOperations(); |
| //initial definition is set by the providers |
| sliderAM.prepareInstanceConfiguration(instanceDefinition); |
| provider.prepareInstanceConfiguration(instanceDefinition); |
| |
| //load in any specified on the command line |
| if (buildInfo.resources != null) { |
| try { |
| resources.mergeFile(buildInfo.resources, |
| new ResourcesInputPropertiesValidator()); |
| |
| } catch (IOException e) { |
| throw new BadConfigException(e, |
| "incorrect argument to %s: \"%s\" : %s ", |
| Arguments.ARG_RESOURCES, |
| buildInfo.resources, |
| e.toString()); |
| } |
| } |
| if (buildInfo.template != null) { |
| try { |
| appConf.mergeFile(buildInfo.template, |
| new TemplateInputPropertiesValidator()); |
| } catch (IOException e) { |
| throw new BadConfigException(e, |
| "incorrect argument to %s: \"%s\" : %s ", |
| Arguments.ARG_TEMPLATE, |
| buildInfo.template, |
| e.toString()); |
| } |
| } |
| |
| List<AppDefinition> appDefinitions = new ArrayList<>(); |
| // if metainfo is provided add to the app instance |
| if(buildInfo.appMetaInfo != null) { |
| if(!buildInfo.appMetaInfo.canRead() || !buildInfo.appMetaInfo.isFile()) { |
| throw new BadConfigException("--metainfo file cannot be read."); |
| } |
| |
| if(buildInfo.appDef != null) { |
| throw new BadConfigException("both --metainfo and --appdef may not be specified."); |
| } |
| |
| if(SliderUtils.isSet(appConf.getGlobalOptions().get(AgentKeys.APP_DEF))) { |
| throw new BadConfigException("application.def must not be set if --metainfo is provided."); |
| } |
| |
| File tempDir = Files.createTempDir(); |
| File pkgSrcDir = new File(tempDir, "default"); |
| pkgSrcDir.mkdirs(); |
| Files.copy(buildInfo.appMetaInfo, new File(pkgSrcDir, "metainfo.json")); |
| |
| Path appDirPath = sliderFileSystem.buildAppDefDirPath(clustername); |
| log.info("Using default app def path {}", appDirPath.toString()); |
| AppDefinition ad = new AppDefinition(); |
| ad.fileInFs = appDirPath; |
| ad.appDef = pkgSrcDir; |
| ad.pkgName = SliderKeys.DEFAULT_APP_PKG; |
| appDefinitions.add(ad); |
| // processDefinitionPackageOrFolder( |
| // appDirPath, |
| // pkgSrcDir, |
| // SliderKeys.DEFAULT_APP_PKG); |
| Path appDefPath = new Path(appDirPath, SliderKeys.DEFAULT_APP_PKG); |
| appConf.getGlobalOptions().set(AgentKeys.APP_DEF, appDefPath); |
| log.info("Setting app package to {}.", appDefPath); |
| } |
| |
| if (buildInfo.appDef != null) { |
| if(SliderUtils.isSet(appConf.getGlobalOptions().get(AgentKeys.APP_DEF))) { |
| throw new BadConfigException("application.def must not be set if --appdef is provided."); |
| } |
| |
| Path appDirPath = sliderFileSystem.buildAppDefDirPath(clustername); |
| AppDefinition ad = new AppDefinition(); |
| ad.fileInFs = appDirPath; |
| ad.appDef = buildInfo.appDef; |
| ad.pkgName = SliderKeys.DEFAULT_APP_PKG; |
| appDefinitions.add(ad); |
| // processDefinitionPackageOrFolder( |
| // appDirPath, |
| // buildInfo.appDef, |
| // SliderKeys.DEFAULT_APP_PKG); |
| Path appDefPath = new Path(appDirPath, SliderKeys.DEFAULT_APP_PKG); |
| appConf.getGlobalOptions().set(AgentKeys.APP_DEF, appDefPath); |
| log.info("Setting app package to {}.", appDefPath); } |
| |
| if (buildInfo.addonDelegate.getAddonMap().size() > 0) { |
| List<String> addons = new ArrayList<String>(); |
| Map<String, String> addonMap = buildInfo.addonDelegate.getAddonMap(); |
| for (String key : addonMap.keySet()) { |
| Path addonPath = sliderFileSystem.buildAddonDirPath(clustername, key); |
| String addonPkgName = "addon_" + key + ".zip"; |
| AppDefinition ad = new AppDefinition(); |
| ad.fileInFs = addonPath; |
| ad.appDef = buildInfo.appDef; |
| ad.pkgName = addonPkgName; |
| appDefinitions.add(ad); |
| // processDefinitionPackageOrFolder(addonPath, |
| // buildInfo.appDef, |
| // addonPkgName); |
| String addOnKey = AgentKeys.ADDON_PREFIX + key; |
| Path addonPkgPath = new Path(addonPath, addonPkgName); |
| log.info("Setting addon package {} to {}.",addOnKey, addonPkgPath); |
| appConf.getGlobalOptions().set(addOnKey, addonPkgPath); |
| } |
| |
| String existingList = appConf.getGlobalOptions().get(AgentKeys.ADDONS); |
| if(SliderUtils.isUnset(existingList)) { |
| existingList = ""; |
| } |
| appConf.getGlobalOptions().set(AgentKeys.ADDONS, existingList + StringUtils.join(addons, ",")); |
| } |
| |
| |
| //get the command line options |
| ConfTree cmdLineAppOptions = buildInfo.buildAppOptionsConfTree(); |
| ConfTree cmdLineResourceOptions = buildInfo.buildResourceOptionsConfTree(); |
| |
| appConf.merge(cmdLineAppOptions); |
| |
| // put the role counts into the resources file |
| Map<String, String> argsRoleMap = buildInfo.getComponentMap(); |
| for (Map.Entry<String, String> roleEntry : argsRoleMap.entrySet()) { |
| String count = roleEntry.getValue(); |
| String key = roleEntry.getKey(); |
| log.debug("{} => {}", key, count); |
| resources.getOrAddComponent(key) |
| .put(ResourceKeys.COMPONENT_INSTANCES, count); |
| } |
| |
| //all CLI role options |
| Map<String, Map<String, String>> appOptionMap = |
| buildInfo.getCompOptionMap(); |
| appConf.mergeComponents(appOptionMap); |
| |
| //internal picks up core. values only |
| internal.propagateGlobalKeys(appConf, "slider."); |
| internal.propagateGlobalKeys(appConf, "internal."); |
| |
| //copy over role. and yarn. values ONLY to the resources |
| if (PROPAGATE_RESOURCE_OPTION) { |
| resources.propagateGlobalKeys(appConf, "component."); |
| resources.propagateGlobalKeys(appConf, "role."); |
| resources.propagateGlobalKeys(appConf, "yarn."); |
| resources.mergeComponentsPrefix(appOptionMap, "component.", true); |
| resources.mergeComponentsPrefix(appOptionMap, "yarn.", true); |
| resources.mergeComponentsPrefix(appOptionMap, "role.", true); |
| } |
| |
| // resource component args |
| appConf.merge(cmdLineResourceOptions); |
| resources.merge(cmdLineResourceOptions); |
| resources.mergeComponents(buildInfo.getResourceCompOptionMap()); |
| |
| builder.init(providerName, instanceDefinition); |
| builder.propagateFilename(); |
| builder.propagatePrincipals(); |
| builder.setImageDetailsIfAvailable(buildInfo.getImage(), |
| buildInfo.getAppHomeDir()); |
| builder.setQueue(buildInfo.queue); |
| |
| String quorum = buildInfo.getZKhosts(); |
| if (SliderUtils.isUnset(quorum)) { |
| quorum = registryQuorum; |
| } |
| if (isUnset(quorum)) { |
| throw new BadConfigException("No Zookeeper quorum defined"); |
| } |
| ZKPathBuilder zkPaths = new ZKPathBuilder(getAppName(), |
| getUsername(), |
| clustername, |
| registryQuorum, |
| quorum); |
| String zookeeperRoot = buildInfo.getAppZKPath(); |
| |
| if (isSet(zookeeperRoot)) { |
| zkPaths.setAppPath(zookeeperRoot); |
| } else { |
| String createDefaultZkNode = appConf.getGlobalOptions().getOption(AgentKeys.CREATE_DEF_ZK_NODE, "false"); |
| if (createDefaultZkNode.equals("true")) { |
| String defaultZKPath = createZookeeperNode(clustername, false); |
| log.debug("ZK node created for application instance: {}", defaultZKPath); |
| if (defaultZKPath != null) { |
| zkPaths.setAppPath(defaultZKPath); |
| } |
| } else { |
| // create AppPath if default is being used |
| String defaultZKPath = createZookeeperNode(clustername, true); |
| log.debug("ZK node assigned to application instance: {}", defaultZKPath); |
| zkPaths.setAppPath(defaultZKPath); |
| } |
| } |
| |
| builder.addZKBinding(zkPaths); |
| |
| //then propagate any package URI |
| if (buildInfo.packageURI != null) { |
| appConf.set(AgentKeys.PACKAGE_PATH, buildInfo.packageURI); |
| } |
| |
| propagatePythonExecutable(conf, instanceDefinition); |
| |
| // make any substitutions needed at this stage |
| replaceTokens(appConf.getConfTree(), getUsername(), clustername); |
| |
| // TODO: Refactor the validation code and persistence code |
| try { |
| persistInstanceDefinition(overwrite, appconfdir, builder); |
| for(AppDefinition ad : appDefinitions) { |
| processDefinitionPackageOrFolder(ad.fileInFs, |
| ad.appDef, |
| ad.pkgName); |
| } |
| } catch (LockAcquireFailedException e) { |
| log.warn("Failed to get a Lock on {} : {}", builder, e); |
| throw new BadClusterStateException("Failed to save " + clustername |
| + ": " + e); |
| } |
| |
| // providers to validate what there is |
| AggregateConf instanceDescription = builder.getInstanceDescription(); |
| validateInstanceDefinition(sliderAM, instanceDescription, sliderFileSystem); |
| validateInstanceDefinition(provider, instanceDescription, sliderFileSystem); |
| } |
| |
| /** |
| * Process the application package or folder by copying it to the cluster path |
| * @param fileInFs fileInFs folder |
| * @param appDef definition package or folder |
| * @param pkgName package name when its a folder |
| * @throws BadConfigException |
| * @throws IOException |
| * @throws BadClusterStateException |
| */ |
| private void processDefinitionPackageOrFolder(Path fileInFs, |
| File appDef, |
| String pkgName) |
| throws BadConfigException, IOException, BadClusterStateException { |
| if (!appDef.canRead()) { |
| throw new BadConfigException("Pkg/Folder cannot be accessed - " + appDef.getAbsolutePath()); |
| } |
| |
| File src = appDef; |
| String targetName = appDef.getName(); |
| |
| if (appDef.isDirectory()) { |
| log.info("Processing app package/folder {} for {}", appDef.getAbsolutePath(), pkgName); |
| File tmpDir = Files.createTempDir(); |
| File zipFile = new File(tmpDir.getCanonicalPath(), File.separator + pkgName); |
| SliderUtils.zipFolder(appDef, zipFile); |
| |
| src = zipFile; |
| targetName = pkgName; |
| } |
| |
| sliderFileSystem.getFileSystem().copyFromLocalFile( |
| false, |
| false, |
| new Path(src.toURI()), |
| new Path(fileInFs, targetName)); |
| } |
| |
| protected void persistInstanceDefinition(boolean overwrite, |
| Path appconfdir, |
| InstanceBuilder builder) |
| throws IOException, SliderException, LockAcquireFailedException { |
| builder.persist(appconfdir, overwrite); |
| } |
| |
| @VisibleForTesting |
| public static void replaceTokens(ConfTree conf, |
| String userName, String clusterName) throws IOException { |
| Map<String,String> newglobal = new HashMap<String,String>(); |
| for (Entry<String,String> entry : conf.global.entrySet()) { |
| newglobal.put(entry.getKey(), replaceTokens(entry.getValue(), |
| userName, clusterName)); |
| } |
| conf.global.putAll(newglobal); |
| |
| Map<String,List<String>> newcred = new HashMap<String,List<String>>(); |
| for (Entry<String,List<String>> entry : conf.credentials.entrySet()) { |
| List<String> resultList = new ArrayList<String>(); |
| for (String v : entry.getValue()) { |
| resultList.add(replaceTokens(v, userName, clusterName)); |
| } |
| newcred.put(replaceTokens(entry.getKey(), userName, clusterName), |
| resultList); |
| } |
| conf.credentials.clear(); |
| conf.credentials.putAll(newcred); |
| } |
| |
| private static String replaceTokens(String s, String userName, |
| String clusterName) throws IOException { |
| return s.replaceAll(Pattern.quote("${USER}"), userName) |
| .replaceAll(Pattern.quote("${USER_NAME}"), userName) |
| .replaceAll(Pattern.quote("${CLUSTER_NAME}"), clusterName); |
| } |
| |
| public FsPermission getClusterDirectoryPermissions(Configuration conf) { |
| String clusterDirPermsOct = |
| conf.get(CLUSTER_DIRECTORY_PERMISSIONS, |
| DEFAULT_CLUSTER_DIRECTORY_PERMISSIONS); |
| return new FsPermission(clusterDirPermsOct); |
| } |
| |
| /** |
| * Verify that the Resource Manager is configured (on a non-HA cluster). |
| * with a useful error message |
| * @throws BadCommandArgumentsException the exception raised on an invalid config |
| */ |
| public void verifyBindingsDefined() throws BadCommandArgumentsException { |
| InetSocketAddress rmAddr = SliderUtils.getRmAddress(getConfig()); |
| if (!getConfig().getBoolean(YarnConfiguration.RM_HA_ENABLED, false) |
| && !SliderUtils.isAddressDefined(rmAddr)) { |
| throw new BadCommandArgumentsException( |
| "No valid Resource Manager address provided in the argument " |
| + Arguments.ARG_MANAGER |
| + " or the configuration property " |
| + YarnConfiguration.RM_ADDRESS |
| + " value :" + rmAddr); |
| } |
| } |
| |
| /** |
| * Load and start a cluster specification. |
| * This assumes that all validation of args and cluster state |
| * have already taken place |
| * |
| * @param clustername name of the cluster. |
| * @param launchArgs launch arguments |
| * @return the exit code |
| * @throws YarnException |
| * @throws IOException |
| */ |
| private int startCluster(String clustername, |
| LaunchArgsAccessor launchArgs) throws |
| YarnException, |
| IOException { |
| Path clusterDirectory = sliderFileSystem.buildClusterDirPath(clustername); |
| AggregateConf instanceDefinition = loadInstanceDefinitionUnresolved( |
| clustername, |
| clusterDirectory); |
| |
| LaunchedApplication launchedApplication = |
| launchApplication(clustername, clusterDirectory, instanceDefinition, |
| serviceArgs.isDebug()); |
| applicationId = launchedApplication.getApplicationId(); |
| |
| if (launchArgs.getOutputFile() != null) { |
| // output file has been requested. Get the app report and serialize it |
| ApplicationReport report = |
| launchedApplication.getApplicationReport(); |
| SerializedApplicationReport sar = new SerializedApplicationReport(report); |
| sar.submitTime = System.currentTimeMillis(); |
| ApplicationReportSerDeser serDeser = new ApplicationReportSerDeser(); |
| serDeser.save(sar, launchArgs.getOutputFile()); |
| } |
| int waittime = launchArgs.getWaittime(); |
| if (waittime > 0) { |
| return waitForAppRunning(launchedApplication, waittime, waittime); |
| } else { |
| // no waiting |
| return EXIT_SUCCESS; |
| } |
| } |
| |
| /** |
| * Load the instance definition. It is not resolved at this point |
| * @param name cluster name |
| * @param clusterDirectory cluster dir |
| * @return the loaded configuration |
| * @throws IOException |
| * @throws SliderException |
| * @throws UnknownApplicationInstanceException if the file is not found |
| */ |
| public AggregateConf loadInstanceDefinitionUnresolved(String name, |
| Path clusterDirectory) throws |
| IOException, |
| SliderException { |
| |
| try { |
| AggregateConf definition = |
| InstanceIO.loadInstanceDefinitionUnresolved(sliderFileSystem, |
| clusterDirectory); |
| definition.setName(name); |
| return definition; |
| } catch (FileNotFoundException e) { |
| throw UnknownApplicationInstanceException.unknownInstance(name, e); |
| } |
| } |
| /** |
| * Load the instance definition. |
| * @param name cluster name |
| * @param resolved flag to indicate the cluster should be resolved |
| * @return the loaded configuration |
| * @throws IOException IO problems |
| * @throws SliderException slider explicit issues |
| * @throws UnknownApplicationInstanceException if the file is not found |
| */ |
| public AggregateConf loadInstanceDefinition(String name, |
| boolean resolved) throws |
| IOException, |
| SliderException { |
| |
| Path clusterDirectory = sliderFileSystem.buildClusterDirPath(name); |
| AggregateConf instanceDefinition = loadInstanceDefinitionUnresolved( |
| name, |
| clusterDirectory); |
| if (resolved) { |
| instanceDefinition.resolve(); |
| } |
| return instanceDefinition; |
| |
| } |
| |
| /** |
| * |
| * @param clustername name of the cluster |
| * @param clusterDirectory cluster dir |
| * @param instanceDefinition the instance definition |
| * @param debugAM enable debug AM options |
| * @return the launched application |
| * @throws YarnException |
| * @throws IOException |
| */ |
| public LaunchedApplication launchApplication(String clustername, |
| Path clusterDirectory, |
| AggregateConf instanceDefinition, |
| boolean debugAM) |
| throws YarnException, IOException { |
| |
| |
| deployedClusterName = clustername; |
| SliderUtils.validateClusterName(clustername); |
| verifyNoLiveClusters(clustername, "Launch"); |
| Configuration config = getConfig(); |
| lookupZKQuorum(); |
| boolean clusterSecure = SliderUtils.isHadoopClusterSecure(config); |
| //create the Slider AM provider -this helps set up the AM |
| SliderAMClientProvider sliderAM = new SliderAMClientProvider(config); |
| |
| instanceDefinition.resolve(); |
| launchedInstanceDefinition = instanceDefinition; |
| |
| ConfTreeOperations internalOperations = |
| instanceDefinition.getInternalOperations(); |
| MapOperations internalOptions = internalOperations.getGlobalOptions(); |
| ConfTreeOperations resourceOperations = |
| instanceDefinition.getResourceOperations(); |
| ConfTreeOperations appOperations = |
| instanceDefinition.getAppConfOperations(); |
| Path generatedConfDirPath = |
| createPathThatMustExist(internalOptions.getMandatoryOption( |
| InternalKeys.INTERNAL_GENERATED_CONF_PATH)); |
| Path snapshotConfPath = |
| createPathThatMustExist(internalOptions.getMandatoryOption( |
| InternalKeys.INTERNAL_SNAPSHOT_CONF_PATH)); |
| |
| |
| // cluster Provider |
| AbstractClientProvider provider = createClientProvider( |
| internalOptions.getMandatoryOption( |
| InternalKeys.INTERNAL_PROVIDER_NAME)); |
| // make sure the conf dir is valid; |
| |
| if (log.isDebugEnabled()) { |
| log.debug(instanceDefinition.toString()); |
| } |
| MapOperations sliderAMResourceComponent = |
| resourceOperations.getOrAddComponent(SliderKeys.COMPONENT_AM); |
| MapOperations resourceGlobalOptions = resourceOperations.getGlobalOptions(); |
| |
| // add the tags if available |
| Set<String> applicationTags = provider.getApplicationTags(sliderFileSystem, |
| appOperations.getGlobalOptions().get(AgentKeys.APP_DEF)); |
| AppMasterLauncher amLauncher = new AppMasterLauncher(clustername, |
| SliderKeys.APP_TYPE, |
| config, |
| sliderFileSystem, |
| yarnClient, |
| clusterSecure, |
| sliderAMResourceComponent, |
| resourceGlobalOptions, |
| applicationTags); |
| |
| ApplicationId appId = amLauncher.getApplicationId(); |
| // set the application name; |
| amLauncher.setKeepContainersOverRestarts(true); |
| |
| int maxAppAttempts = config.getInt(KEY_AM_RESTART_LIMIT, 0); |
| amLauncher.setMaxAppAttempts(maxAppAttempts); |
| |
| sliderFileSystem.purgeAppInstanceTempFiles(clustername); |
| Path tempPath = sliderFileSystem.createAppInstanceTempPath( |
| clustername, |
| appId.toString() + "/am"); |
| String libdir = "lib"; |
| Path libPath = new Path(tempPath, libdir); |
| sliderFileSystem.getFileSystem().mkdirs(libPath); |
| log.debug("FS={}, tempPath={}, libdir={}", |
| sliderFileSystem, tempPath, libPath); |
| |
| // set local resources for the application master |
| // local files or archives as needed |
| // In this scenario, the jar file for the application master is part of the local resources |
| Map<String, LocalResource> localResources = amLauncher.getLocalResources(); |
| |
| // look for the configuration directory named on the command line |
| boolean hasServerLog4jProperties = false; |
| Path remoteConfPath = null; |
| String relativeConfDir = null; |
| String confdirProp = |
| System.getProperty(SliderKeys.PROPERTY_CONF_DIR); |
| if (confdirProp == null || confdirProp.isEmpty()) { |
| log.debug("No local configuration directory provided as system property"); |
| } else { |
| File confDir = new File(confdirProp); |
| if (!confDir.exists()) { |
| throw new BadConfigException(E_CONFIGURATION_DIRECTORY_NOT_FOUND, |
| confDir); |
| } |
| Path localConfDirPath = SliderUtils.createLocalPath(confDir); |
| remoteConfPath = new Path(clusterDirectory, SliderKeys.SUBMITTED_CONF_DIR); |
| log.debug("Slider configuration directory is {}; remote to be {}", |
| localConfDirPath, remoteConfPath); |
| SliderUtils.copyDirectory(config, localConfDirPath, remoteConfPath, null); |
| |
| File log4jserver = |
| new File(confDir, SliderKeys.LOG4J_SERVER_PROP_FILENAME); |
| hasServerLog4jProperties = log4jserver.isFile(); |
| } |
| // the assumption here is that minimr cluster => this is a test run |
| // and the classpath can look after itself |
| |
| boolean usingMiniMRCluster = getUsingMiniMRCluster(); |
| if (!usingMiniMRCluster) { |
| |
| log.debug("Destination is not a MiniYARNCluster -copying full classpath"); |
| |
| // insert conf dir first |
| if (remoteConfPath != null) { |
| relativeConfDir = SliderKeys.SUBMITTED_CONF_DIR; |
| Map<String, LocalResource> submittedConfDir = |
| sliderFileSystem.submitDirectory(remoteConfPath, |
| relativeConfDir); |
| SliderUtils.mergeMaps(localResources, submittedConfDir); |
| } |
| } |
| // build up the configuration |
| // IMPORTANT: it is only after this call that site configurations |
| // will be valid. |
| |
| propagatePrincipals(config, instanceDefinition); |
| // validate security data |
| |
| /* |
| // turned off until tested |
| SecurityConfiguration securityConfiguration = |
| new SecurityConfiguration(config, |
| instanceDefinition, clustername); |
| |
| */ |
| Configuration clientConfExtras = new Configuration(false); |
| // then build up the generated path. |
| FsPermission clusterPerms = getClusterDirectoryPermissions(config); |
| SliderUtils.copyDirectory(config, snapshotConfPath, generatedConfDirPath, |
| clusterPerms); |
| |
| |
| // standard AM resources |
| sliderAM.prepareAMAndConfigForLaunch(sliderFileSystem, |
| config, |
| amLauncher, |
| instanceDefinition, |
| snapshotConfPath, |
| generatedConfDirPath, |
| clientConfExtras, |
| libdir, |
| tempPath, |
| usingMiniMRCluster); |
| //add provider-specific resources |
| provider.prepareAMAndConfigForLaunch(sliderFileSystem, |
| config, |
| amLauncher, |
| instanceDefinition, |
| snapshotConfPath, |
| generatedConfDirPath, |
| clientConfExtras, |
| libdir, |
| tempPath, |
| usingMiniMRCluster); |
| |
| // now that the site config is fully generated, the provider gets |
| // to do a quick review of them. |
| log.debug("Preflight validation of cluster configuration"); |
| |
| |
| sliderAM.preflightValidateClusterConfiguration(sliderFileSystem, |
| clustername, |
| config, |
| instanceDefinition, |
| clusterDirectory, |
| generatedConfDirPath, |
| clusterSecure |
| ); |
| |
| provider.preflightValidateClusterConfiguration(sliderFileSystem, |
| clustername, |
| config, |
| instanceDefinition, |
| clusterDirectory, |
| generatedConfDirPath, |
| clusterSecure |
| ); |
| |
| |
| // TODO: consider supporting apps that don't have an image path |
| Path imagePath = |
| SliderUtils.extractImagePath(sliderFileSystem, internalOptions); |
| if (sliderFileSystem.maybeAddImagePath(localResources, imagePath)) { |
| log.debug("Registered image path {}", imagePath); |
| } |
| |
| // build the environment |
| amLauncher.putEnv( |
| SliderUtils.buildEnvMap(sliderAMResourceComponent)); |
| ClasspathConstructor classpath = SliderUtils.buildClasspath(relativeConfDir, |
| libdir, |
| getConfig(), |
| usingMiniMRCluster); |
| amLauncher.setClasspath(classpath); |
| //add english env |
| amLauncher.setEnv("LANG", "en_US.UTF-8"); |
| amLauncher.setEnv("LC_ALL", "en_US.UTF-8"); |
| amLauncher.setEnv("LANGUAGE", "en_US.UTF-8"); |
| if (log.isDebugEnabled()) { |
| log.debug("AM classpath={}", classpath); |
| log.debug("Environment Map:\n{}", |
| SliderUtils.stringifyMap(amLauncher.getEnv())); |
| log.debug("Files in lib path\n{}", sliderFileSystem.listFSDir(libPath)); |
| } |
| |
| // rm address |
| |
| InetSocketAddress rmSchedulerAddress; |
| try { |
| rmSchedulerAddress = SliderUtils.getRmSchedulerAddress(config); |
| } catch (IllegalArgumentException e) { |
| throw new BadConfigException("%s Address invalid: %s", |
| YarnConfiguration.RM_SCHEDULER_ADDRESS, |
| config.get( |
| YarnConfiguration.RM_SCHEDULER_ADDRESS) |
| ); |
| |
| } |
| String rmAddr = NetUtils.getHostPortString(rmSchedulerAddress); |
| |
| JavaCommandLineBuilder commandLine = new JavaCommandLineBuilder(); |
| // insert any JVM options); |
| sliderAM.addJVMOptions(instanceDefinition, commandLine); |
| // enable asserts |
| commandLine.enableJavaAssertions(); |
| |
| // if the conf dir has a log4j-server.properties, switch to that |
| if (hasServerLog4jProperties) { |
| commandLine.sysprop(SYSPROP_LOG4J_CONFIGURATION, LOG4J_SERVER_PROP_FILENAME); |
| commandLine.sysprop(SYSPROP_LOG_DIR, ApplicationConstants.LOG_DIR_EXPANSION_VAR); |
| } |
| |
| // add the AM sevice entry point |
| commandLine.add(SliderAppMaster.SERVICE_CLASSNAME); |
| |
| // create action and the cluster name |
| commandLine.add(ACTION_CREATE, clustername); |
| |
| // debug |
| if (debugAM) { |
| commandLine.add(Arguments.ARG_DEBUG); |
| } |
| |
| // set the cluster directory path |
| commandLine.add(Arguments.ARG_CLUSTER_URI, clusterDirectory.toUri()); |
| |
| if (!isUnset(rmAddr)) { |
| commandLine.add(Arguments.ARG_RM_ADDR, rmAddr); |
| } |
| |
| if (serviceArgs.getFilesystemBinding() != null) { |
| commandLine.add(Arguments.ARG_FILESYSTEM, serviceArgs.getFilesystemBinding()); |
| } |
| |
| /** |
| * pass the registry binding |
| */ |
| addConfOptionToCLI(commandLine, config, |
| RegistryConstants.KEY_REGISTRY_ZK_ROOT, |
| RegistryConstants.DEFAULT_ZK_REGISTRY_ROOT); |
| addMandatoryConfOptionToCLI(commandLine, config, |
| RegistryConstants.KEY_REGISTRY_ZK_QUORUM); |
| |
| if (clusterSecure) { |
| // if the cluster is secure, make sure that |
| // the relevant security settings go over |
| /* |
| addConfOptionToCLI(commandLine, config, KEY_SECURITY); |
| */ |
| addConfOptionToCLI(commandLine, |
| config, |
| DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY); |
| } |
| // write out the path output |
| commandLine.addOutAndErrFiles(STDOUT_AM, STDERR_AM); |
| |
| String cmdStr = commandLine.build(); |
| log.debug("Completed setting up app master command {}", cmdStr); |
| |
| amLauncher.addCommandLine(commandLine); |
| |
| // the Slider AM gets to configure the AM requirements, not the custom provider |
| sliderAM.prepareAMResourceRequirements(sliderAMResourceComponent, |
| amLauncher.getResource()); |
| |
| |
| // Set the priority for the application master |
| |
| int amPriority = config.getInt(KEY_YARN_QUEUE_PRIORITY, |
| DEFAULT_YARN_QUEUE_PRIORITY); |
| |
| |
| amLauncher.setPriority(amPriority); |
| |
| // Set the queue to which this application is to be submitted in the RM |
| // Queue for App master |
| String amQueue = config.get(KEY_YARN_QUEUE, DEFAULT_YARN_QUEUE); |
| String suppliedQueue = internalOperations.getGlobalOptions().get(InternalKeys.INTERNAL_QUEUE); |
| if(!SliderUtils.isUnset(suppliedQueue)) { |
| amQueue = suppliedQueue; |
| log.info("Using queue {} for the application instance.", amQueue); |
| } |
| |
| if (amQueue != null) { |
| amLauncher.setQueue(amQueue); |
| } |
| |
| // submit the application |
| LaunchedApplication launchedApplication = amLauncher.submitApplication(); |
| return launchedApplication; |
| } |
| |
| private void propagatePythonExecutable(Configuration config, |
| AggregateConf instanceDefinition) { |
| String pythonExec = config.get( |
| SliderXmlConfKeys.PYTHON_EXECUTABLE_PATH); |
| if (pythonExec != null) { |
| instanceDefinition.getAppConfOperations().getGlobalOptions().putIfUnset( |
| SliderXmlConfKeys.PYTHON_EXECUTABLE_PATH, |
| pythonExec); |
| } |
| } |
| |
| |
| /** |
| * Wait for the launched app to be accepted in the time |
| * and, optionally running. |
| * <p> |
| * If the application |
| * |
| * @param launchedApplication application |
| * @param acceptWaitMillis time in millis to wait for accept |
| * @param runWaitMillis time in millis to wait for the app to be running. |
| * May be null, in which case no wait takes place |
| * @return exit code: success |
| * @throws YarnException |
| * @throws IOException |
| */ |
| public int waitForAppRunning(LaunchedApplication launchedApplication, |
| int acceptWaitMillis, int runWaitMillis) throws YarnException, IOException { |
| assert launchedApplication != null; |
| int exitCode; |
| // wait for the submit state to be reached |
| ApplicationReport report = launchedApplication.monitorAppToState( |
| YarnApplicationState.ACCEPTED, |
| new Duration(acceptWaitMillis)); |
| |
| // may have failed, so check that |
| if (SliderUtils.hasAppFinished(report)) { |
| exitCode = buildExitCode(report); |
| } else { |
| // exit unless there is a wait |
| |
| |
| if (runWaitMillis != 0) { |
| // waiting for state to change |
| Duration duration = new Duration(runWaitMillis * 1000); |
| duration.start(); |
| report = launchedApplication.monitorAppToState( |
| YarnApplicationState.RUNNING, duration); |
| if (report != null && |
| report.getYarnApplicationState() == YarnApplicationState.RUNNING) { |
| exitCode = EXIT_SUCCESS; |
| } else { |
| exitCode = buildExitCode(report); |
| } |
| } else { |
| exitCode = EXIT_SUCCESS; |
| } |
| } |
| return exitCode; |
| } |
| |
| |
| /** |
| * Propagate any critical principals from the current site config down to the HBase one. |
| * @param config config to read from |
| * @param clusterSpec cluster spec |
| */ |
| private void propagatePrincipals(Configuration config, |
| AggregateConf clusterSpec) { |
| String dfsPrincipal = config.get( |
| DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY); |
| if (dfsPrincipal != null) { |
| String siteDfsPrincipal = OptionKeys.SITE_XML_PREFIX + |
| DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY; |
| clusterSpec.getAppConfOperations().getGlobalOptions().putIfUnset( |
| siteDfsPrincipal, |
| dfsPrincipal); |
| } |
| } |
| |
| |
| private boolean addConfOptionToCLI(CommandLineBuilder cmdLine, |
| Configuration conf, |
| String key) { |
| String val = conf.get(key); |
| return defineIfSet(cmdLine, key, val); |
| } |
| |
| private String addConfOptionToCLI(CommandLineBuilder cmdLine, |
| Configuration conf, |
| String key, |
| String defVal) { |
| String val = conf.get(key, defVal); |
| define(cmdLine, key, val); |
| return val; |
| } |
| |
| /** |
| * Add a <code>-D key=val</code> command to the CLI |
| * @param cmdLine command line |
| * @param key key |
| * @param val value |
| */ |
| private void define(CommandLineBuilder cmdLine, String key, String val) { |
| Preconditions.checkArgument(key != null, "null key"); |
| Preconditions.checkArgument(val != null, "null value"); |
| cmdLine.add(Arguments.ARG_DEFINE, key + "=" + val); |
| } |
| |
| /** |
| * Add a <code>-D key=val</code> command to the CLI if <code>val</code> |
| * is not null |
| * @param cmdLine command line |
| * @param key key |
| * @param val value |
| */ |
| private boolean defineIfSet(CommandLineBuilder cmdLine, String key, String val) { |
| Preconditions.checkArgument(key != null, "null key"); |
| if (val != null) { |
| define(cmdLine, key, val); |
| return true; |
| } else { |
| return false; |
| } |
| } |
| |
| private void addMandatoryConfOptionToCLI(CommandLineBuilder cmdLine, |
| Configuration conf, |
| String key) throws BadConfigException { |
| if (!addConfOptionToCLI(cmdLine, conf, key)) { |
| throw new BadConfigException("Missing configuration option: " + key); |
| } |
| } |
| |
| /** |
| * Create a path that must exist in the cluster fs |
| * @param uri uri to create |
| * @return the path |
| * @throws FileNotFoundException if the path does not exist |
| */ |
| public Path createPathThatMustExist(String uri) throws |
| SliderException, |
| IOException { |
| return sliderFileSystem.createPathThatMustExist(uri); |
| } |
| |
| /** |
| * verify that a live cluster isn't there |
| * @param clustername cluster name |
| * @param action |
| * @throws SliderException with exit code EXIT_CLUSTER_LIVE |
| * if a cluster of that name is either live or starting up. |
| */ |
| public void verifyNoLiveClusters(String clustername, String action) throws |
| IOException, |
| YarnException { |
| List<ApplicationReport> existing = findAllLiveInstances(clustername); |
| |
| if (!existing.isEmpty()) { |
| throw new SliderException(EXIT_APPLICATION_IN_USE, |
| action +" failed for " |
| + clustername |
| + ": " |
| + E_CLUSTER_RUNNING + " :" + |
| existing.get(0)); |
| } |
| } |
| |
| public String getUsername() throws IOException { |
| return RegistryUtils.currentUser(); |
| } |
| |
| /** |
| * Get the name of any deployed cluster |
| * @return the cluster name |
| */ |
| public String getDeployedClusterName() { |
| return deployedClusterName; |
| } |
| |
| @VisibleForTesting |
| public void setDeployedClusterName(String deployedClusterName) { |
| this.deployedClusterName = deployedClusterName; |
| } |
| |
| /** |
| * ask if the client is using a mini MR cluster |
| * @return true if they are |
| */ |
| private boolean getUsingMiniMRCluster() { |
| return getConfig().getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, |
| false); |
| } |
| |
| /** |
| * Get the application name used in the zookeeper root paths |
| * @return an application-specific path in ZK |
| */ |
| private String getAppName() { |
| return "slider"; |
| } |
| |
| /** |
| * Wait for the app to start running (or go past that state) |
| * @param duration time to wait |
| * @return the app report; null if the duration turned out |
| * @throws YarnException YARN or app issues |
| * @throws IOException IO problems |
| */ |
| @VisibleForTesting |
| public ApplicationReport monitorAppToRunning(Duration duration) |
| throws YarnException, IOException { |
| return monitorAppToState(YarnApplicationState.RUNNING, duration); |
| } |
| |
| /** |
| * Build an exit code for an application from its report. |
| * If the report parameter is null, its interpreted as a timeout |
| * @param report report application report |
| * @return the exit code |
| * @throws IOException |
| * @throws YarnException |
| */ |
| private int buildExitCode(ApplicationReport report) throws |
| IOException, |
| YarnException { |
| if (null == report) { |
| return EXIT_TIMED_OUT; |
| } |
| |
| YarnApplicationState state = report.getYarnApplicationState(); |
| FinalApplicationStatus dsStatus = report.getFinalApplicationStatus(); |
| switch (state) { |
| case FINISHED: |
| if (FinalApplicationStatus.SUCCEEDED == dsStatus) { |
| log.info("Application has completed successfully"); |
| return EXIT_SUCCESS; |
| } else { |
| log.info("Application finished unsuccessfully." + |
| "YarnState = {}, DSFinalStatus = {} Breaking monitoring loop", |
| state, dsStatus); |
| return EXIT_YARN_SERVICE_FINISHED_WITH_ERROR; |
| } |
| |
| case KILLED: |
| log.info("Application did not finish. YarnState={}, DSFinalStatus={}", |
| state, dsStatus); |
| return EXIT_YARN_SERVICE_KILLED; |
| |
| case FAILED: |
| log.info("Application Failed. YarnState={}, DSFinalStatus={}", state, |
| dsStatus); |
| return EXIT_YARN_SERVICE_FAILED; |
| |
| default: |
| //not in any of these states |
| return EXIT_SUCCESS; |
| } |
| } |
| |
| /** |
| * Monitor the submitted application for reaching the requested state. |
| * Will also report if the app reaches a later state (failed, killed, etc) |
| * Kill application if duration!= null & time expires. |
| * Prerequisite: the applicatin was launched. |
| * @param desiredState desired state. |
| * @param duration how long to wait -must be more than 0 |
| * @return the application report -null on a timeout |
| * @throws YarnException |
| * @throws IOException |
| */ |
| @VisibleForTesting |
| public ApplicationReport monitorAppToState( |
| YarnApplicationState desiredState, |
| Duration duration) |
| throws YarnException, IOException { |
| LaunchedApplication launchedApplication = |
| new LaunchedApplication(applicationId, yarnClient); |
| return launchedApplication.monitorAppToState(desiredState, duration); |
| } |
| |
| @Override |
| public ApplicationReport getApplicationReport() throws |
| IOException, |
| YarnException { |
| return getApplicationReport(applicationId); |
| } |
| |
| @Override |
| public boolean forceKillApplication(String reason) |
| throws YarnException, IOException { |
| if (applicationId != null) { |
| new LaunchedApplication(applicationId, yarnClient).forceKill(reason); |
| return true; |
| } |
| return false; |
| } |
| |
| /** |
| * List Slider instances belonging to a specific user. This will include |
| * failed and killed instances; there may be duplicates |
| * @param user user: "" means all users, null means "default" |
| * @return a possibly empty list of Slider AMs |
| */ |
| |
| public List<ApplicationReport> listSliderInstances(String user) |
| throws YarnException, IOException { |
| return yarnAppListClient.listInstances(user); |
| } |
| |
| /** |
| * A basic list action to list live instances |
| * @param clustername cluster name |
| * @return success if the listing was considered successful |
| * @throws IOException |
| * @throws YarnException |
| */ |
| public int actionList(String clustername) throws IOException, YarnException { |
| ActionListArgs args = new ActionListArgs(); |
| args.live = true; |
| return actionList(clustername, args); |
| } |
| |
| /** |
| * Implement the list action. |
| * @param clustername List out specific instance name |
| * @param args Action list arguments |
| * @return 0 if one or more entries were listed |
| * @throws IOException |
| * @throws YarnException |
| * @throws UnknownApplicationInstanceException if a specific instance |
| * was named but it was not found |
| */ |
| @Override |
| public int actionList(String clustername, ActionListArgs args) |
| throws IOException, YarnException { |
| verifyBindingsDefined(); |
| |
| boolean live = args.live; |
| String state = args.state; |
| boolean verbose = args.verbose; |
| |
| if (live && !state.isEmpty()) { |
| throw new BadCommandArgumentsException( |
| Arguments.ARG_LIVE + " and " + Arguments.ARG_STATE + " are exclusive"); |
| } |
| // flag to indicate only services in a specific state are to be listed |
| boolean listOnlyInState = live || !state.isEmpty(); |
| |
| YarnApplicationState min, max; |
| if (live) { |
| min = YarnApplicationState.NEW; |
| max = YarnApplicationState.RUNNING; |
| } else if (!state.isEmpty()) { |
| YarnApplicationState stateVal = extractYarnApplicationState(state); |
| min = max = stateVal; |
| } else { |
| min = YarnApplicationState.NEW; |
| max = YarnApplicationState.KILLED; |
| } |
| // get the complete list of persistent instances |
| Map<String, Path> persistentInstances = sliderFileSystem.listPersistentInstances(); |
| |
| if (persistentInstances.isEmpty() && isUnset(clustername)) { |
| // an empty listing is a success if no cluster was named |
| log.debug("No application instances found"); |
| return EXIT_SUCCESS; |
| } |
| |
| // and those the RM knows about |
| List<ApplicationReport> instances = listSliderInstances(null); |
| SliderUtils.sortApplicationsByMostRecent(instances); |
| Map<String, ApplicationReport> reportMap = |
| SliderUtils.buildApplicationReportMap(instances, min, max); |
| log.debug("Persisted {} deployed {} filtered[{}-{}] & de-duped to {}", |
| persistentInstances.size(), |
| instances.size(), |
| min, max, |
| reportMap.size() ); |
| |
| if (isSet(clustername)) { |
| // only one instance is expected |
| // resolve the persistent value |
| Path persistent = persistentInstances.get(clustername); |
| if (persistent == null) { |
| throw unknownClusterException(clustername); |
| } |
| // create a new map with only that instance in it. |
| // this restricts the output of results to this instance |
| persistentInstances = new HashMap<String, Path>(); |
| persistentInstances.put(clustername, persistent); |
| } |
| |
| // at this point there is either the entire list or a stripped down instance |
| int listed = 0; |
| |
| for (String name : persistentInstances.keySet()) { |
| ApplicationReport report = reportMap.get(name); |
| if (!listOnlyInState || report != null) { |
| // list the details if all were requested, or the filtering contained |
| // a report |
| listed++; |
| String details = SliderUtils.instanceDetailsToString(name, |
| report, |
| verbose); |
| print(details); |
| } |
| } |
| |
| return listed > 0 ? EXIT_SUCCESS: EXIT_FALSE; |
| } |
| |
| /** |
| * Enumerate slider instances for the current user, and the |
| * most recent app report, where available. |
| * @param listOnlyInState boolean to indicate that the instances should |
| * only include those in a YARN state |
| * <code> minAppState <= currentState <= maxAppState </code> |
| * |
| * @param minAppState minimum application state to include in enumeration. |
| * @param maxAppState maximum application state to include |
| * @return a map of application instance name to description |
| * @throws IOException Any IO problem |
| * @throws YarnException YARN problems |
| */ |
| @Override |
| public Map<String, SliderInstanceDescription> enumSliderInstances( |
| boolean listOnlyInState, |
| YarnApplicationState minAppState, |
| YarnApplicationState maxAppState) |
| throws IOException, YarnException { |
| return yarnAppListClient.enumSliderInstances(listOnlyInState, |
| minAppState, |
| maxAppState); |
| } |
| |
| /** |
| * Extract the state of a Yarn application --state argument |
| * @param state state argument |
| * @return the application state |
| * @throws BadCommandArgumentsException if the argument did not match |
| * any known state |
| */ |
| private YarnApplicationState extractYarnApplicationState(String state) throws |
| BadCommandArgumentsException { |
| YarnApplicationState stateVal; |
| try { |
| stateVal = YarnApplicationState.valueOf(state.toUpperCase(Locale.ENGLISH)); |
| } catch (IllegalArgumentException e) { |
| throw new BadCommandArgumentsException("Unknown state: " + state); |
| |
| } |
| return stateVal; |
| } |
| |
| /** |
| * Is an application active: accepted or running |
| * @param report the application report |
| * @return true if it is running or scheduled to run. |
| */ |
| public boolean isApplicationActive(ApplicationReport report) { |
| return report.getYarnApplicationState() == YarnApplicationState.RUNNING |
| || report.getYarnApplicationState() == YarnApplicationState.ACCEPTED; |
| } |
| |
| /** |
| * Implement the islive action: probe for a cluster of the given name existing |
| * @return exit code |
| */ |
| |
| @Override |
| @VisibleForTesting |
| public int actionFlex(String name, ActionFlexArgs args) throws YarnException, IOException { |
| verifyBindingsDefined(); |
| SliderUtils.validateClusterName(name); |
| log.debug("actionFlex({})", name); |
| Map<String, Integer> roleInstances = new HashMap<String, Integer>(); |
| Map<String, String> roleMap = args.getComponentMap(); |
| for (Map.Entry<String, String> roleEntry : roleMap.entrySet()) { |
| String key = roleEntry.getKey(); |
| String val = roleEntry.getValue(); |
| try { |
| roleInstances.put(key, Integer.valueOf(val)); |
| } catch (NumberFormatException e) { |
| throw new BadCommandArgumentsException("Requested count of role %s" + |
| " is not a number: \"%s\"", |
| key, val); |
| } |
| } |
| return flex(name, roleInstances); |
| } |
| |
| @Override |
| public int actionExists(String name, boolean checkLive) throws YarnException, IOException { |
| ActionExistsArgs args = new ActionExistsArgs(); |
| args.live = checkLive; |
| return actionExists(name, args); |
| } |
| |
| public int actionExists(String name, ActionExistsArgs args) throws YarnException, IOException { |
| verifyBindingsDefined(); |
| SliderUtils.validateClusterName(name); |
| boolean checkLive = args.live; |
| log.debug("actionExists({}, {}, {})", name, checkLive, args.state); |
| |
| //initial probe for a cluster in the filesystem |
| Path clusterDirectory = sliderFileSystem.buildClusterDirPath(name); |
| if (!sliderFileSystem.getFileSystem().exists(clusterDirectory)) { |
| throw unknownClusterException(name); |
| } |
| String state = args.state; |
| if (!checkLive && SliderUtils.isUnset(state)) { |
| log.info("Application {} exists", name); |
| return EXIT_SUCCESS; |
| } |
| |
| //test for liveness/state |
| boolean inDesiredState = false; |
| ApplicationReport instance; |
| instance = findInstance(name); |
| if (instance == null) { |
| log.info("Application {} not running", name); |
| return EXIT_FALSE; |
| } |
| if (checkLive) { |
| // the app exists, check that it is not in any terminated state |
| YarnApplicationState appstate = instance.getYarnApplicationState(); |
| log.debug(" current app state = {}", appstate); |
| inDesiredState = |
| appstate.ordinal() < YarnApplicationState.FINISHED.ordinal(); |
| } else { |
| // scan for instance in single --state state |
| List<ApplicationReport> userInstances = yarnClient.listDeployedInstances(""); |
| state = state.toUpperCase(Locale.ENGLISH); |
| YarnApplicationState desiredState = extractYarnApplicationState(state); |
| ApplicationReport foundInstance = |
| yarnClient.findAppInInstanceList(userInstances, name, desiredState); |
| if (foundInstance != null) { |
| // found in selected state: success |
| inDesiredState = true; |
| // mark this as the instance to report |
| instance = foundInstance; |
| } |
| } |
| |
| SliderUtils.OnDemandReportStringifier report = |
| new SliderUtils.OnDemandReportStringifier(instance); |
| if (!inDesiredState) { |
| //cluster in the list of apps but not running |
| log.info("Application {} found but is in wrong state {}", name, |
| instance.getYarnApplicationState()); |
| log.debug("State {}", report); |
| return EXIT_FALSE; |
| } else { |
| log.debug("Application instance is in desired state"); |
| log.info("Application {} is {}\n{}", name, |
| instance.getYarnApplicationState(), report); |
| return EXIT_SUCCESS; |
| } |
| } |
| |
| |
| @Override |
| public int actionKillContainer(String name, |
| ActionKillContainerArgs args) throws YarnException, IOException { |
| String id = args.id; |
| if (SliderUtils.isUnset(id)) { |
| throw new BadCommandArgumentsException("Missing container id"); |
| } |
| log.info("killingContainer {}:{}", name, id); |
| SliderClusterOperations clusterOps = |
| new SliderClusterOperations(bondToCluster(name)); |
| try { |
| clusterOps.killContainer(id); |
| } catch (NoSuchNodeException e) { |
| throw new BadClusterStateException("Container %s not found in cluster %s", |
| id, name); |
| } |
| return EXIT_SUCCESS; |
| } |
| |
| @Override |
| public String actionEcho(String name, ActionEchoArgs args) throws |
| YarnException, |
| IOException { |
| String message = args.message; |
| if (message == null) { |
| throw new BadCommandArgumentsException("missing message"); |
| } |
| SliderClusterOperations clusterOps = |
| new SliderClusterOperations(bondToCluster(name)); |
| return clusterOps.echo(message); |
| } |
| |
| /** |
| * Get at the service registry operations |
| * @return registry client -valid after the service is inited. |
| */ |
| public YarnAppListClient getYarnAppListClient() { |
| return yarnAppListClient; |
| } |
| |
| /** |
| * Find an instance of an application belonging to the current user |
| * @param appname application name |
| * @return the app report or null if none is found |
| * @throws YarnException YARN issues |
| * @throws IOException IO problems |
| */ |
| private ApplicationReport findInstance(String appname) |
| throws YarnException, IOException { |
| return yarnAppListClient.findInstance(appname); |
| } |
| |
| private RunningApplication findApplication(String appname) |
| throws YarnException, IOException { |
| ApplicationReport applicationReport = findInstance(appname); |
| return applicationReport != null ? |
| new RunningApplication(yarnClient, applicationReport): null; |
| } |
| |
| /** |
| * find all live instances of a specific app -if there is >1 in the cluster, |
| * this returns them all. State should be running or less |
| * @param appname application name |
| * @return the list of all matching application instances |
| */ |
| private List<ApplicationReport> findAllLiveInstances(String appname) |
| throws YarnException, IOException { |
| |
| return yarnAppListClient.findAllLiveInstances(appname); |
| } |
| |
| /** |
| * Connect to a Slider AM |
| * @param app application report providing the details on the application |
| * @return an instance |
| * @throws YarnException |
| * @throws IOException |
| */ |
| private SliderClusterProtocol connect(ApplicationReport app) |
| throws YarnException, IOException { |
| |
| try { |
| return RpcBinder.getProxy(getConfig(), |
| yarnClient.getRmClient(), |
| app, |
| Constants.CONNECT_TIMEOUT, |
| Constants.RPC_TIMEOUT); |
| } catch (InterruptedException e) { |
| throw new SliderException(SliderExitCodes.EXIT_TIMED_OUT, |
| e, |
| "Interrupted waiting for communications with the Slider AM"); |
| } |
| } |
| |
| @Override |
| @VisibleForTesting |
| public int actionStatus(String clustername, ActionStatusArgs statusArgs) throws |
| YarnException, |
| IOException { |
| verifyBindingsDefined(); |
| SliderUtils.validateClusterName(clustername); |
| String outfile = statusArgs.getOutput(); |
| ClusterDescription status = getClusterDescription(clustername); |
| String text = status.toJsonString(); |
| if (outfile == null) { |
| log.info(text); |
| } else { |
| status.save(new File(outfile).getAbsoluteFile()); |
| } |
| return EXIT_SUCCESS; |
| } |
| |
| @Override |
| public int actionVersion() { |
| SliderVersionInfo.loadAndPrintVersionInfo(log); |
| return EXIT_SUCCESS; |
| } |
| |
| @Override |
| public int actionFreeze(String clustername, |
| ActionFreezeArgs freezeArgs) throws YarnException, IOException { |
| verifyBindingsDefined(); |
| SliderUtils.validateClusterName(clustername); |
| int waittime = freezeArgs.getWaittime(); |
| String text = freezeArgs.message; |
| boolean forcekill = freezeArgs.force; |
| log.debug("actionFreeze({}, reason={}, wait={}, force={})", clustername, |
| text, |
| waittime, |
| forcekill); |
| |
| //is this actually a known cluster? |
| sliderFileSystem.locateInstanceDefinition(clustername); |
| ApplicationReport app = findInstance(clustername); |
| if (app == null) { |
| // exit early |
| log.info("Cluster {} not running", clustername); |
| // not an error to stop a stopped cluster |
| return EXIT_SUCCESS; |
| } |
| log.debug("App to stop was found: {}:\n{}", clustername, |
| new SliderUtils.OnDemandReportStringifier(app)); |
| if (app.getYarnApplicationState().ordinal() >= |
| YarnApplicationState.FINISHED.ordinal()) { |
| log.info("Cluster {} is a terminated state {}", clustername, |
| app.getYarnApplicationState()); |
| return EXIT_SUCCESS; |
| } |
| |
| // IPC request for a managed shutdown is only possible if the app is running. |
| // so we need to force kill if the app is accepted or submitted |
| if (!forcekill |
| && app.getYarnApplicationState().ordinal() < YarnApplicationState.RUNNING.ordinal()) { |
| log.info("Cluster {} is in a pre-running state {}. Force killing it", clustername, |
| app.getYarnApplicationState()); |
| forcekill = true; |
| } |
| |
| LaunchedApplication application = new LaunchedApplication(yarnClient, app); |
| applicationId = application.getApplicationId(); |
| |
| if (forcekill) { |
| // escalating to forced kill |
| application.kill("Forced stop of " + clustername + ": " + text); |
| } else { |
| try { |
| SliderClusterProtocol appMaster = connect(app); |
| Messages.StopClusterRequestProto r = |
| Messages.StopClusterRequestProto |
| .newBuilder() |
| .setMessage(text) |
| .build(); |
| appMaster.stopCluster(r); |
| |
| log.debug("Cluster stop command issued"); |
| |
| } catch (YarnException e) { |
| log.warn("Exception while trying to terminate {}", clustername, e); |
| return EXIT_FALSE; |
| } catch (IOException e) { |
| log.warn("Exception while trying to terminate {}", clustername, e); |
| return EXIT_FALSE; |
| } |
| } |
| |
| //wait for completion. We don't currently return an exception during this process |
| //as the stop operation has been issued, this is just YARN. |
| try { |
| if (waittime > 0) { |
| ApplicationReport applicationReport = |
| application.monitorAppToState(YarnApplicationState.FINISHED, |
| new Duration(waittime * 1000)); |
| if (applicationReport == null) { |
| log.info("application did not shut down in time"); |
| return EXIT_FALSE; |
| } |
| } |
| |
| // JDK7 } catch (YarnException | IOException e) { |
| } catch (YarnException e) { |
| log.warn("Exception while waiting for the application {} to shut down: {}", |
| clustername, e); |
| } catch ( IOException e) { |
| log.warn("Exception while waiting for the application {} to shut down: {}", |
| clustername, e); |
| } |
| |
| return EXIT_SUCCESS; |
| } |
| |
| @Override |
| public int actionThaw(String clustername, ActionThawArgs thaw) throws YarnException, IOException { |
| SliderUtils.validateClusterName(clustername); |
| verifyBindingsDefined(); |
| // see if it is actually running and bail out; |
| verifyNoLiveClusters(clustername, "Start"); |
| |
| //start the cluster |
| return startCluster(clustername, thaw); |
| } |
| |
| /** |
| * Implement flexing |
| * @param clustername name of the cluster |
| * @param roleInstances map of new role instances |
| * @return EXIT_SUCCESS if the #of nodes in a live cluster changed |
| * @throws YarnException |
| * @throws IOException |
| */ |
| public int flex(String clustername, Map<String, Integer> roleInstances) |
| throws YarnException, IOException { |
| verifyBindingsDefined(); |
| SliderUtils.validateClusterName(clustername); |
| Path clusterDirectory = sliderFileSystem.buildClusterDirPath(clustername); |
| AggregateConf instanceDefinition = loadInstanceDefinitionUnresolved( |
| clustername, |
| clusterDirectory); |
| |
| ConfTreeOperations resources = |
| instanceDefinition.getResourceOperations(); |
| for (Map.Entry<String, Integer> entry : roleInstances.entrySet()) { |
| String role = entry.getKey(); |
| int count = entry.getValue(); |
| resources.getOrAddComponent(role).put(ResourceKeys.COMPONENT_INSTANCES, |
| Integer.toString(count)); |
| |
| log.debug("Flexed cluster specification ( {} -> {}) : \n{}", |
| role, |
| count, |
| resources); |
| } |
| SliderAMClientProvider sliderAM = new SliderAMClientProvider(getConfig()); |
| AbstractClientProvider provider = createClientProvider( |
| instanceDefinition.getInternalOperations().getGlobalOptions().getMandatoryOption( |
| InternalKeys.INTERNAL_PROVIDER_NAME)); |
| // slider provider to validate what there is |
| validateInstanceDefinition(sliderAM, instanceDefinition, sliderFileSystem); |
| validateInstanceDefinition(provider, instanceDefinition, sliderFileSystem); |
| |
| int exitCode = EXIT_FALSE; |
| // save the specification |
| try { |
| InstanceIO.saveInstanceDefinition(sliderFileSystem, clusterDirectory, |
| instanceDefinition); |
| } catch (LockAcquireFailedException e) { |
| // lock failure |
| log.debug("Failed to lock dir {}", clusterDirectory, e); |
| log.warn("Failed to save new resource definition to {} : {}", clusterDirectory, e); |
| } |
| |
| // now see if it is actually running and tell it about the update if it is |
| ApplicationReport instance = findInstance(clustername); |
| if (instance != null) { |
| log.info("Flexing running cluster"); |
| SliderClusterProtocol appMaster = connect(instance); |
| SliderClusterOperations clusterOps = new SliderClusterOperations(appMaster); |
| clusterOps.flex(instanceDefinition.getResources()); |
| log.info("application instance size updated"); |
| exitCode = EXIT_SUCCESS; |
| } else { |
| log.info("No running instance to update"); |
| } |
| return exitCode; |
| } |
| |
| /** |
| * Validate an instance definition against a provider. |
| * @param provider the provider performing the validation |
| * @param instanceDefinition the instance definition |
| * @throws SliderException if invalid. |
| */ |
| protected void validateInstanceDefinition(AbstractClientProvider provider, |
| AggregateConf instanceDefinition, SliderFileSystem fs) throws SliderException { |
| try { |
| provider.validateInstanceDefinition(instanceDefinition, fs); |
| } catch (SliderException e) { |
| //problem, reject it |
| log.info("Error {} validating application instance definition ", e.getMessage()); |
| log.debug("Error validating application instance definition ", e); |
| log.info(instanceDefinition.toString()); |
| throw e; |
| } |
| } |
| |
| |
| /** |
| * Load the persistent cluster description |
| * @param clustername name of the cluster |
| * @return the description in the filesystem |
| * @throws IOException any problems loading -including a missing file |
| */ |
| @VisibleForTesting |
| public AggregateConf loadPersistedClusterDescription(String clustername) |
| throws IOException, SliderException, LockAcquireFailedException { |
| Path clusterDirectory = sliderFileSystem.buildClusterDirPath(clustername); |
| ConfPersister persister = new ConfPersister(sliderFileSystem, clusterDirectory); |
| AggregateConf instanceDescription = new AggregateConf(); |
| persister.load(instanceDescription); |
| return instanceDescription; |
| } |
| |
| /** |
| * Connect to a live cluster and get its current state |
| * @param clustername the cluster name |
| * @return its description |
| */ |
| @VisibleForTesting |
| public ClusterDescription getClusterDescription(String clustername) throws |
| YarnException, |
| IOException { |
| SliderClusterOperations clusterOperations = |
| createClusterOperations(clustername); |
| return clusterOperations.getClusterDescription(); |
| } |
| |
| /** |
| * Connect to the cluster and get its current state |
| * @return its description |
| */ |
| @VisibleForTesting |
| public ClusterDescription getClusterDescription() throws |
| YarnException, |
| IOException { |
| return getClusterDescription(getDeployedClusterName()); |
| } |
| |
| /** |
| * List all node UUIDs in a role |
| * @param role role name or "" for all |
| * @return an array of UUID strings |
| * @throws IOException |
| * @throws YarnException |
| */ |
| @VisibleForTesting |
| public String[] listNodeUUIDsByRole(String role) throws |
| IOException, |
| YarnException { |
| return createClusterOperations() |
| .listNodeUUIDsByRole(role); |
| } |
| |
| /** |
| * List all nodes in a role. This is a double round trip: once to list |
| * the nodes in a role, another to get their details |
| * @param role component/role to look for |
| * @return an array of ContainerNode instances |
| * @throws IOException |
| * @throws YarnException |
| */ |
| @VisibleForTesting |
| public List<ClusterNode> listClusterNodesInRole(String role) throws |
| IOException, |
| YarnException { |
| return createClusterOperations().listClusterNodesInRole(role); |
| } |
| |
| /** |
| * Get the details on a list of uuids |
| * @param uuids uuids to ask for |
| * @return a possibly empty list of node details |
| * @throws IOException |
| * @throws YarnException |
| */ |
| @VisibleForTesting |
| public List<ClusterNode> listClusterNodes(String[] uuids) throws |
| IOException, |
| YarnException { |
| |
| if (uuids.length == 0) { |
| // short cut on an empty list |
| return new LinkedList<ClusterNode>(); |
| } |
| return createClusterOperations().listClusterNodes(uuids); |
| } |
| |
| /** |
| * Get the instance definition from the far end |
| */ |
| @VisibleForTesting |
| public AggregateConf getLiveInstanceDefinition() throws IOException, YarnException { |
| return createClusterOperations().getInstanceDefinition(); |
| } |
| |
| /** |
| * Bond to a running cluster |
| * @param clustername cluster name |
| * @return the AM RPC client |
| * @throws SliderException if the cluster is unkown |
| */ |
| private SliderClusterProtocol bondToCluster(String clustername) throws |
| YarnException, |
| IOException { |
| verifyBindingsDefined(); |
| if (clustername == null) { |
| throw unknownClusterException("(undefined)"); |
| } |
| ApplicationReport instance = findInstance(clustername); |
| if (null == instance) { |
| throw unknownClusterException(clustername); |
| } |
| return connect(instance); |
| } |
| |
| /** |
| * Create a cluster operations instance against a given cluster |
| * @param clustername cluster name |
| * @return a bonded cluster operations instance |
| * @throws YarnException YARN issues |
| * @throws IOException IO problems |
| */ |
| private SliderClusterOperations createClusterOperations(String clustername) throws |
| YarnException, |
| IOException { |
| SliderClusterProtocol sliderAM = bondToCluster(clustername); |
| return new SliderClusterOperations(sliderAM); |
| } |
| |
| /** |
| * Create a cluster operations instance against the active cluster |
| * -returning any previous created one if held. |
| * @return a bonded cluster operations instance |
| * @throws YarnException YARN issues |
| * @throws IOException IO problems |
| */ |
| private SliderClusterOperations createClusterOperations() throws |
| YarnException, |
| IOException { |
| if (sliderClusterOperations == null) { |
| sliderClusterOperations = |
| createClusterOperations(getDeployedClusterName()); |
| } |
| return sliderClusterOperations; |
| } |
| |
| /** |
| * Wait for an instance of a named role to be live (or past it in the lifecycle) |
| * @param role role to look for |
| * @param timeout time to wait |
| * @return the state. If still in CREATED, the cluster didn't come up |
| * in the time period. If LIVE, all is well. If >LIVE, it has shut for a reason |
| * @throws IOException IO |
| * @throws SliderException Slider |
| * @throws WaitTimeoutException if the wait timed out |
| */ |
| @VisibleForTesting |
| public int waitForRoleInstanceLive(String role, long timeout) |
| throws WaitTimeoutException, IOException, YarnException { |
| return createClusterOperations().waitForRoleInstanceLive(role, timeout); |
| } |
| |
| /** |
| * Generate an exception for an unknown cluster |
| * @param clustername cluster name |
| * @return an exception with text and a relevant exit code |
| */ |
| public UnknownApplicationInstanceException unknownClusterException(String clustername) { |
| return UnknownApplicationInstanceException.unknownInstance(clustername); |
| } |
| |
| @Override |
| public String toString() { |
| return "Slider Client in state " + getServiceState() |
| + " and Slider Application Instance " + deployedClusterName; |
| } |
| |
| /** |
| * Get all YARN applications |
| * @return a possibly empty list |
| * @throws YarnException |
| * @throws IOException |
| */ |
| @VisibleForTesting |
| public List<ApplicationReport> getApplications() throws YarnException, IOException { |
| return yarnClient.getApplications(); |
| } |
| |
| @VisibleForTesting |
| public ApplicationReport getApplicationReport(ApplicationId appId) |
| throws YarnException, IOException { |
| return new LaunchedApplication(appId, yarnClient).getApplicationReport(); |
| } |
| |
| /** |
| * The configuration used for deployment (after resolution). |
| * Non-null only after the client has launched the application |
| * @return the resolved configuration or null |
| */ |
| @VisibleForTesting |
| public AggregateConf getLaunchedInstanceDefinition() { |
| return launchedInstanceDefinition; |
| } |
| |
| |
| @Override |
| public int actionResolve(ActionResolveArgs args) |
| throws YarnException, IOException { |
| // as this is an API entry point, validate |
| // the arguments |
| args.validate(); |
| RegistryOperations operations = getRegistryOperations(); |
| String path = SliderRegistryUtils.resolvePath(args.path); |
| ServiceRecordMarshal serviceRecordMarshal = new ServiceRecordMarshal(); |
| try { |
| if (args.list) { |
| File destDir = args.destdir; |
| if (destDir != null) { |
| destDir.mkdirs(); |
| } |
| |
| Map<String, ServiceRecord> recordMap; |
| Map<String, RegistryPathStatus> znodes; |
| try { |
| znodes = statChildren(registryOperations, path); |
| recordMap = extractServiceRecords(registryOperations, |
| path, |
| znodes.values()); |
| } catch (PathNotFoundException e) { |
| // treat the root directory as if if is always there |
| |
| if ("/".equals(path)) { |
| znodes = new HashMap<String, RegistryPathStatus>(0); |
| recordMap = new HashMap<String, ServiceRecord>(0); |
| } else { |
| throw e; |
| } |
| } |
| // subtract all records from the znodes map to get pure directories |
| log.info("Entries: {}", znodes.size()); |
| |
| for (String name : znodes.keySet()) { |
| println(" " + name); |
| } |
| println(""); |
| |
| log.info("Service records: {}", recordMap.size()); |
| for (Entry<String, ServiceRecord> recordEntry : recordMap.entrySet()) { |
| String name = recordEntry.getKey(); |
| ServiceRecord instance = recordEntry.getValue(); |
| String json = serviceRecordMarshal.toJson(instance); |
| if (destDir == null) { |
| println(name); |
| println(json); |
| } else { |
| String filename = RegistryPathUtils.lastPathEntry(name) + ".json"; |
| File jsonFile = new File(destDir, filename); |
| SliderUtils.write(jsonFile, |
| serviceRecordMarshal.toBytes(instance), |
| true); |
| } |
| } |
| } else { |
| // resolve single entry |
| ServiceRecord instance = resolve(path); |
| File outFile = args.out; |
| if (args.destdir != null) { |
| outFile = new File(args.destdir, RegistryPathUtils.lastPathEntry(path)); |
| } |
| if (outFile != null) { |
| SliderUtils.write(outFile, serviceRecordMarshal.toBytes(instance), true); |
| } else { |
| println(serviceRecordMarshal.toJson(instance)); |
| } |
| } |
| // TODO JDK7 |
| } catch (PathNotFoundException e) { |
| // no record at this path |
| throw new NotFoundException(e, path); |
| } catch (NoRecordException e) { |
| throw new NotFoundException(e, path); |
| } |
| return EXIT_SUCCESS; |
| } |
| |
| @Override |
| public int actionRegistry(ActionRegistryArgs registryArgs) throws |
| YarnException, |
| IOException { |
| // as this is also a test entry point, validate |
| // the arguments |
| registryArgs.validate(); |
| try { |
| if (registryArgs.list) { |
| actionRegistryList(registryArgs); |
| } else if (registryArgs.listConf) { |
| // list the configurations |
| actionRegistryListConfigsYarn(registryArgs); |
| } else if (registryArgs.listExports) { |
| // list the exports |
| actionRegistryListExports(registryArgs); |
| } else if (SliderUtils.isSet(registryArgs.getConf)) { |
| // get a configuration |
| PublishedConfiguration publishedConfiguration = |
| actionRegistryGetConfig(registryArgs); |
| outputConfig(publishedConfiguration, registryArgs); |
| } else if (SliderUtils.isSet(registryArgs.getExport)) { |
| // get a export group |
| PublishedExports publishedExports = |
| actionRegistryGetExport(registryArgs); |
| outputExport(publishedExports, registryArgs); |
| } else { |
| // it's an unknown command |
| log.info(CommonArgs.usage(serviceArgs, ACTION_DIAGNOSTICS)); |
| return EXIT_USAGE; |
| } |
| // JDK7 |
| } catch (FileNotFoundException e) { |
| log.info("{}", e); |
| log.debug("{}", e, e); |
| return EXIT_NOT_FOUND; |
| } catch (PathNotFoundException e) { |
| log.info("{}", e); |
| log.debug("{}", e, e); |
| return EXIT_NOT_FOUND; |
| } |
| return EXIT_SUCCESS; |
| } |
| |
| /** |
| * Registry operation |
| * |
| * @param registryArgs registry Arguments |
| * @return the instances (for tests) |
| * @throws YarnException YARN problems |
| * @throws IOException Network or other problems |
| */ |
| @VisibleForTesting |
| public Collection<ServiceRecord> actionRegistryList( |
| ActionRegistryArgs registryArgs) |
| throws YarnException, IOException { |
| String serviceType = registryArgs.serviceType; |
| String name = registryArgs.name; |
| RegistryOperations operations = getRegistryOperations(); |
| Collection<ServiceRecord> serviceRecords; |
| if (StringUtils.isEmpty(name)) { |
| String path = |
| serviceclassPath( |
| currentUser(), |
| serviceType); |
| |
| try { |
| Map<String, ServiceRecord> recordMap = |
| listServiceRecords(operations, path); |
| if (recordMap.isEmpty()) { |
| throw new UnknownApplicationInstanceException( |
| "No applications registered under " + path); |
| } |
| serviceRecords = recordMap.values(); |
| } catch (PathNotFoundException e) { |
| throw new NotFoundException(path, e); |
| } |
| } else { |
| ServiceRecord instance = lookupServiceRecord(registryArgs); |
| serviceRecords = new ArrayList<ServiceRecord>(1); |
| serviceRecords.add(instance); |
| } |
| |
| for (ServiceRecord serviceRecord : serviceRecords) { |
| logInstance(serviceRecord, registryArgs.verbose); |
| } |
| return serviceRecords; |
| } |
| |
| @Override |
| public int actionDiagnostic(ActionDiagnosticArgs diagnosticArgs) { |
| try { |
| if (diagnosticArgs.client) { |
| actionDiagnosticClient(diagnosticArgs); |
| } else if (diagnosticArgs.application) { |
| actionDiagnosticApplication(diagnosticArgs); |
| } else if (diagnosticArgs.yarn) { |
| actionDiagnosticYarn(diagnosticArgs); |
| } else if (diagnosticArgs.credentials) { |
| actionDiagnosticCredentials(); |
| } else if (diagnosticArgs.all) { |
| actionDiagnosticAll(diagnosticArgs); |
| } else if (diagnosticArgs.level) { |
| actionDiagnosticIntelligent(diagnosticArgs); |
| } else { |
| // it's an unknown option |
| log.info(CommonArgs.usage(serviceArgs, ACTION_DIAGNOSTICS)); |
| return EXIT_USAGE; |
| } |
| } catch (Exception e) { |
| log.error(e.toString()); |
| return EXIT_FALSE; |
| } |
| return EXIT_SUCCESS; |
| } |
| |
| private void actionDiagnosticIntelligent(ActionDiagnosticArgs diagnosticArgs) |
| throws YarnException, IOException, URISyntaxException { |
| // not using member variable clustername because we want to place |
| // application name after --application option and member variable |
| // cluster name has to be put behind action |
| String clusterName = diagnosticArgs.name; |
| if(SliderUtils.isUnset(clusterName)){ |
| throw new BadCommandArgumentsException("application name must be provided with --name option"); |
| } |
| |
| try { |
| SliderUtils.validateClientConfigFile(); |
| log.info("Slider-client.xml is accessible"); |
| } catch (IOException e) { |
| // we are catching exceptions here because those are indication of |
| // validation result, and we need to print them here |
| log.error( |
| "validation of slider-client.xml fails because: " + e.toString(), e); |
| return; |
| } |
| SliderClusterOperations clusterOperations = createClusterOperations(clusterName); |
| // cluster not found exceptions will be thrown upstream |
| ClusterDescription clusterDescription = clusterOperations |
| .getClusterDescription(); |
| log.info("Slider AppMaster is accessible"); |
| |
| if (clusterDescription.state == StateValues.STATE_LIVE) { |
| AggregateConf instanceDefinition = clusterOperations |
| .getInstanceDefinition(); |
| String imagePath = instanceDefinition.getInternalOperations().get( |
| InternalKeys.INTERNAL_APPLICATION_IMAGE_PATH); |
| // if null, that means slider uploaded the agent tarball for the user |
| // and we need to use where slider has put |
| if (imagePath == null) { |
| ApplicationReport appReport = findInstance(clusterName); |
| Path path1 = sliderFileSystem.getTempPathForCluster(clusterName); |
| |
| Path subPath = new Path(path1, appReport.getApplicationId().toString() |
| + "/agent"); |
| imagePath = subPath.toString(); |
| } |
| try { |
| SliderUtils.validateHDFSFile(sliderFileSystem, imagePath + "/" + AGENT_TAR); |
| log.info("Slider agent package is properly installed"); |
| } catch (FileNotFoundException e) { |
| log.error("can not find agent package: " + e.toString()); |
| return; |
| } catch (IOException e) { |
| log.error("can not open agent package: " + e.toString()); |
| return; |
| } |
| String pkgTarballPath = instanceDefinition.getAppConfOperations() |
| .getGlobalOptions().getMandatoryOption(AgentKeys.APP_DEF); |
| try { |
| SliderUtils.validateHDFSFile(sliderFileSystem, pkgTarballPath); |
| log.info("Application package is properly installed"); |
| } catch (FileNotFoundException e) { |
| log.error("can not find application package: {}", e); |
| return; |
| } catch (IOException e) { |
| log.error("can not open application package: {} ", e); |
| return; |
| } |
| } |
| } |
| |
| private void actionDiagnosticAll(ActionDiagnosticArgs diagnosticArgs) |
| throws IOException, YarnException { |
| // assign application name from param to each sub diagnostic function |
| actionDiagnosticClient(diagnosticArgs); |
| actionDiagnosticApplication(diagnosticArgs); |
| actionDiagnosticSlider(diagnosticArgs); |
| actionDiagnosticYarn(diagnosticArgs); |
| actionDiagnosticCredentials(); |
| } |
| |
| private void actionDiagnosticCredentials() throws BadConfigException, |
| IOException { |
| if (SliderUtils.isHadoopClusterSecure(SliderUtils |
| .loadSliderClientXML())) { |
| String credentialCacheFileDescription = null; |
| try { |
| credentialCacheFileDescription = SliderUtils.checkCredentialCacheFile(); |
| } catch (BadConfigException e) { |
| log.error("The credential config is not valid: " + e.toString()); |
| throw e; |
| } catch (IOException e) { |
| log.error("Unable to read the credential file: " + e.toString()); |
| throw e; |
| } |
| log.info("Credential cache file for the current user: " |
| + credentialCacheFileDescription); |
| } else { |
| log.info("the cluster is not in secure mode"); |
| } |
| } |
| |
| private void actionDiagnosticYarn(ActionDiagnosticArgs diagnosticArgs) |
| throws IOException, YarnException { |
| JSONObject converter = null; |
| log.info("the node in the YARN cluster has below state: "); |
| List<NodeReport> yarnClusterInfo; |
| try { |
| yarnClusterInfo = yarnClient.getNodeReports(NodeState.RUNNING); |
| } catch (YarnException e1) { |
| log.error("Exception happened when fetching node report from the YARN cluster: " |
| + e1.toString()); |
| throw e1; |
| } catch (IOException e1) { |
| log.error("Network problem happened when fetching node report YARN cluster: " |
| + e1.toString()); |
| throw e1; |
| } |
| for (NodeReport nodeReport : yarnClusterInfo) { |
| log.info(nodeReport.toString()); |
| } |
| |
| if (diagnosticArgs.verbose) { |
| Writer configWriter = new StringWriter(); |
| try { |
| Configuration.dumpConfiguration(yarnClient.getConfig(), configWriter); |
| } catch (IOException e1) { |
| log.error("Network problem happened when retrieving YARN config from YARN: " |
| + e1.toString()); |
| throw e1; |
| } |
| try { |
| converter = new JSONObject(configWriter.toString()); |
| log.info("the configuration of the YARN cluster is: " |
| + converter.toString(2)); |
| |
| } catch (JSONException e) { |
| log.error("JSONException happened during parsing response from YARN: " |
| + e.toString()); |
| } |
| } |
| } |
| |
| private void actionDiagnosticSlider(ActionDiagnosticArgs diagnosticArgs) |
| throws YarnException, IOException { |
| // not using member variable clustername because we want to place |
| // application name after --application option and member variable |
| // cluster name has to be put behind action |
| String clusterName = diagnosticArgs.name; |
| if(SliderUtils.isUnset(clusterName)){ |
| throw new BadCommandArgumentsException("application name must be provided with --name option"); |
| } |
| SliderClusterOperations clusterOperations; |
| AggregateConf instanceDefinition = null; |
| try { |
| clusterOperations = createClusterOperations(clusterName); |
| instanceDefinition = clusterOperations.getInstanceDefinition(); |
| } catch (YarnException e) { |
| log.error("Exception happened when retrieving instance definition from YARN: " |
| + e.toString()); |
| throw e; |
| } catch (IOException e) { |
| log.error("Network problem happened when retrieving instance definition from YARN: " |
| + e.toString()); |
| throw e; |
| } |
| String imagePath = instanceDefinition.getInternalOperations().get( |
| InternalKeys.INTERNAL_APPLICATION_IMAGE_PATH); |
| // if null, it will be uploaded by Slider and thus at slider's path |
| if (imagePath == null) { |
| ApplicationReport appReport = findInstance(clusterName); |
| Path path1 = sliderFileSystem.getTempPathForCluster(clusterName); |
| Path subPath = new Path(path1, appReport.getApplicationId().toString() |
| + "/agent"); |
| imagePath = subPath.toString(); |
| } |
| log.info("The path of slider agent tarball on HDFS is: " + imagePath); |
| } |
| |
| private void actionDiagnosticApplication(ActionDiagnosticArgs diagnosticArgs) |
| throws YarnException, IOException { |
| // not using member variable clustername because we want to place |
| // application name after --application option and member variable |
| // cluster name has to be put behind action |
| String clusterName = diagnosticArgs.name; |
| if(SliderUtils.isUnset(clusterName)){ |
| throw new BadCommandArgumentsException("application name must be provided with --name option"); |
| } |
| SliderClusterOperations clusterOperations; |
| AggregateConf instanceDefinition = null; |
| try { |
| clusterOperations = createClusterOperations(clusterName); |
| instanceDefinition = clusterOperations.getInstanceDefinition(); |
| } catch (YarnException e) { |
| log.error("Exception happened when retrieving instance definition from YARN: " |
| + e.toString()); |
| throw e; |
| } catch (IOException e) { |
| log.error("Network problem happened when retrieving instance definition from YARN: " |
| + e.toString()); |
| throw e; |
| } |
| String clusterDir = instanceDefinition.getAppConfOperations() |
| .getGlobalOptions().get(AgentKeys.APP_ROOT); |
| String pkgTarball = instanceDefinition.getAppConfOperations() |
| .getGlobalOptions().get(AgentKeys.APP_DEF); |
| String runAsUser = instanceDefinition.getAppConfOperations() |
| .getGlobalOptions().get(AgentKeys.RUNAS_USER); |
| |
| log.info("The location of the cluster instance directory in HDFS is: " |
| + clusterDir); |
| log.info("The name of the application package tarball on HDFS is: " |
| + pkgTarball); |
| log.info("The runas user of the application in the cluster is: " |
| + runAsUser); |
| |
| if (diagnosticArgs.verbose) { |
| log.info("App config of the application: " |
| + instanceDefinition.getAppConf().toJson()); |
| log.info("Resource config of the application: " |
| + instanceDefinition.getResources().toJson()); |
| } |
| } |
| |
| private void actionDiagnosticClient(ActionDiagnosticArgs diagnosticArgs) |
| throws SliderException, IOException { |
| try { |
| String currentCommandPath = SliderUtils.getCurrentCommandPath(); |
| SliderVersionInfo.loadAndPrintVersionInfo(log); |
| String clientConfigPath = SliderUtils.getClientConfigPath(); |
| String jdkInfo = SliderUtils.getJDKInfo(); |
| println("The slider command path: %s", currentCommandPath); |
| println("The slider-client.xml used by current running command path: %s", |
| clientConfigPath); |
| println(jdkInfo); |
| |
| // security info |
| Configuration config = getConfig(); |
| if (SliderUtils.isHadoopClusterSecure(config)) { |
| println("Hadoop Cluster is secure"); |
| println("Login user is %s", UserGroupInformation.getLoginUser()); |
| println("Current user is %s", UserGroupInformation.getCurrentUser()); |
| |
| } else { |
| println("Hadoop Cluster is insecure"); |
| } |
| |
| // verbose? |
| if (diagnosticArgs.verbose) { |
| // do the environment |
| Map<String, String> env = System.getenv(); |
| Set<String> envList = ConfigHelper.sortedConfigKeys(env.entrySet()); |
| StringBuilder builder = new StringBuilder("Environment variables:\n"); |
| for (String key : envList) { |
| builder.append(key).append("=").append(env.get(key)).append("\n"); |
| } |
| println(builder.toString()); |
| |
| // Java properties |
| builder = new StringBuilder("JVM Properties\n"); |
| Map<String, String> props = |
| SliderUtils.sortedMap(SliderUtils.toMap(System.getProperties())); |
| for (Entry<String, String> entry : props.entrySet()) { |
| builder.append(entry.getKey()).append("=") |
| .append(entry.getValue()).append("\n"); |
| } |
| |
| println(builder.toString()); |
| |
| // then the config |
| println("Slider client configuration:\n" |
| + ConfigHelper.dumpConfigToString(config)); |
| |
| } |
| |
| SliderUtils.validateSliderClientEnvironment(log); |
| } catch (SliderException e) { |
| log.error(e.toString()); |
| throw e; |
| } catch (IOException e) { |
| log.error(e.toString()); |
| throw e; |
| } |
| |
| } |
| |
| /** |
| * Log a service record instance |
| * @param instance record |
| * @param verbose verbose logging of all external endpoints |
| */ |
| private void logInstance(ServiceRecord instance, |
| boolean verbose) { |
| if (!verbose) { |
| log.info("{}", instance.get(YarnRegistryAttributes.YARN_ID, "")); |
| } else { |
| log.info("{}: ", instance.get(YarnRegistryAttributes.YARN_ID, "")); |
| logEndpoints(instance); |
| } |
| } |
| |
| /** |
| * Log the external endpoints of a service record |
| * @param instance service record instance |
| */ |
| private void logEndpoints(ServiceRecord instance) { |
| List<Endpoint> endpoints = instance.external; |
| for (Endpoint endpoint : endpoints) { |
| log.info(endpoint.toString()); |
| } |
| } |
| |
| /** |
| * list configs available for an instance |
| * |
| * @param registryArgs registry Arguments |
| * @throws YarnException YARN problems |
| * @throws IOException Network or other problems |
| */ |
| public void actionRegistryListConfigsYarn(ActionRegistryArgs registryArgs) |
| throws YarnException, IOException { |
| |
| ServiceRecord instance = lookupServiceRecord(registryArgs); |
| |
| RegistryRetriever retriever = new RegistryRetriever(instance); |
| PublishedConfigSet configurations = |
| retriever.getConfigurations(!registryArgs.internal); |
| PrintStream out = null; |
| try { |
| if (registryArgs.out != null) { |
| out = new PrintStream(new FileOutputStream(registryArgs.out)); |
| } else { |
| out = System.out; |
| } |
| for (String configName : configurations.keys()) { |
| if (!registryArgs.verbose) { |
| out.println(configName); |
| } else { |
| PublishedConfiguration published = |
| configurations.get(configName); |
| out.printf("%s: %s\n", |
| configName, |
| published.description); |
| } |
| } |
| } finally { |
| if (registryArgs.out != null && out != null) { |
| out.flush(); |
| out.close(); |
| } |
| } |
| } |
| |
| /** |
| * list exports available for an instance |
| * |
| * @param registryArgs registry Arguments |
| * @throws YarnException YARN problems |
| * @throws IOException Network or other problems |
| */ |
| public void actionRegistryListExports(ActionRegistryArgs registryArgs) |
| throws YarnException, IOException { |
| ServiceRecord instance = lookupServiceRecord(registryArgs); |
| |
| RegistryRetriever retriever = new RegistryRetriever(instance); |
| PublishedExportsSet exports = |
| retriever.getExports(!registryArgs.internal); |
| PrintStream out = null; |
| boolean streaming = false; |
| try { |
| if (registryArgs.out != null) { |
| out = new PrintStream(new FileOutputStream(registryArgs.out)); |
| streaming = true; |
| log.debug("Saving output to {}", registryArgs.out); |
| } else { |
| out = System.out; |
| } |
| log.debug("Number of exports: {}", exports.keys().size()); |
| for (String exportName : exports.keys()) { |
| if (streaming) { |
| log.debug(exportName); |
| } |
| if (!registryArgs.verbose) { |
| out.println(exportName); |
| } else { |
| PublishedExports published = exports.get(exportName); |
| out.printf("%s: %s\n", |
| exportName, |
| published.description); |
| } |
| } |
| } finally { |
| if (streaming) { |
| out.flush(); |
| out.close(); |
| } |
| } |
| } |
| |
| /** |
| * list configs available for an instance |
| * |
| * @param registryArgs registry Arguments |
| * @throws YarnException YARN problems |
| * @throws IOException Network or other problems |
| * @throws FileNotFoundException if the config is not found |
| */ |
| @VisibleForTesting |
| public PublishedConfiguration actionRegistryGetConfig(ActionRegistryArgs registryArgs) |
| throws YarnException, IOException { |
| ServiceRecord instance = lookupServiceRecord(registryArgs); |
| |
| RegistryRetriever retriever = new RegistryRetriever(instance); |
| boolean external = !registryArgs.internal; |
| PublishedConfigSet configurations = |
| retriever.getConfigurations(external); |
| |
| PublishedConfiguration published = retriever.retrieveConfiguration(configurations, |
| registryArgs.getConf, |
| external); |
| return published; |
| } |
| |
| /** |
| * get a specific export group |
| * |
| * @param registryArgs registry Arguments |
| * |
| * @throws YarnException YARN problems |
| * @throws IOException Network or other problems |
| * @throws FileNotFoundException if the config is not found |
| */ |
| @VisibleForTesting |
| public PublishedExports actionRegistryGetExport(ActionRegistryArgs registryArgs) |
| throws YarnException, IOException { |
| ServiceRecord instance = lookupServiceRecord(registryArgs); |
| |
| RegistryRetriever retriever = new RegistryRetriever(instance); |
| boolean external = !registryArgs.internal; |
| PublishedExportsSet exports = |
| retriever.getExports(external); |
| |
| PublishedExports published = retriever.retrieveExports(exports, |
| registryArgs.getExport, |
| external); |
| return published; |
| } |
| |
| /** |
| * write out the config. If a destination is provided and that dir is a |
| * directory, the entry is written to it with the name provided + extension, |
| * else it is printed to standard out. |
| * @param published published config |
| * @param registryArgs registry Arguments |
| * @throws BadCommandArgumentsException |
| * @throws IOException |
| */ |
| private void outputConfig(PublishedConfiguration published, |
| ActionRegistryArgs registryArgs) throws |
| BadCommandArgumentsException, |
| IOException { |
| // decide whether or not to print |
| String entry = registryArgs.getConf; |
| String format = registryArgs.format; |
| ConfigFormat configFormat = ConfigFormat.resolve(format); |
| if (configFormat == null) { |
| throw new BadCommandArgumentsException( |
| "Unknown/Unsupported format %s ", format); |
| } |
| PublishedConfigurationOutputter outputter = |
| PublishedConfigurationOutputter.createOutputter(configFormat, |
| published); |
| boolean print = registryArgs.out == null; |
| if (!print) { |
| File outputPath = registryArgs.out; |
| if (outputPath.isDirectory()) { |
| // creating it under a directory |
| outputPath = new File(outputPath, entry + "." + format); |
| } |
| log.debug("Destination path: {}", outputPath); |
| outputter.save(outputPath); |
| } else { |
| print(outputter.asString()); |
| } |
| |
| } |
| |
| /** |
| * write out the config |
| * @param published |
| * @param registryArgs |
| * @throws BadCommandArgumentsException |
| * @throws IOException |
| */ |
| private void outputExport(PublishedExports published, |
| ActionRegistryArgs registryArgs) throws |
| BadCommandArgumentsException, |
| IOException { |
| // decide whether or not to print |
| String entry = registryArgs.getExport; |
| String format = ConfigFormat.JSON.toString(); |
| ConfigFormat configFormat = ConfigFormat.resolve(format); |
| if (configFormat == null || configFormat != ConfigFormat.JSON) { |
| throw new BadCommandArgumentsException( |
| "Unknown/Unsupported format %s . Only JSON is supported.", format); |
| } |
| |
| PublishedExportsOutputter outputter = |
| PublishedExportsOutputter.createOutputter(configFormat, |
| published); |
| boolean print = registryArgs.out == null; |
| if (!print) { |
| File destFile; |
| destFile = registryArgs.out; |
| if (destFile.isDirectory()) { |
| // creating it under a directory |
| destFile = new File(destFile, entry + "." + format); |
| } |
| log.info("Destination path: {}", destFile); |
| outputter.save(destFile); |
| } else { |
| print(outputter.asString()); |
| } |
| } |
| |
| /** |
| * Look up an instance |
| * @return instance data |
| * @throws SliderException other failures |
| * @throws IOException IO problems or wrapped exceptions |
| */ |
| private ServiceRecord lookupServiceRecord(ActionRegistryArgs registryArgs) throws |
| SliderException, |
| IOException { |
| String user; |
| if (StringUtils.isNotEmpty(registryArgs.user)) { |
| user = RegistryPathUtils.encodeForRegistry(registryArgs.user); |
| } else { |
| user = currentUser(); |
| } |
| |
| String path = servicePath(user, registryArgs.serviceType, |
| registryArgs.name); |
| return resolve(path); |
| } |
| |
| /** |
| * Look up a service record of the current user |
| * @param serviceType service type |
| * @param id instance ID |
| * @return instance data |
| * @throws UnknownApplicationInstanceException no path or service record |
| * at the end of the path |
| * @throws SliderException other failures |
| * @throws IOException IO problems or wrapped exceptions |
| */ |
| public ServiceRecord lookupServiceRecord(String serviceType, String id) |
| throws IOException, SliderException { |
| String path = servicePath(currentUser(), serviceType, id); |
| return resolve(path); |
| } |
| |
| /** |
| * |
| * Look up an instance |
| * @param path path |
| * @return instance data |
| * @throws NotFoundException no path/no service record |
| * at the end of the path |
| * @throws SliderException other failures |
| * @throws IOException IO problems or wrapped exceptions |
| */ |
| public ServiceRecord resolve(String path) |
| throws IOException, SliderException { |
| try { |
| return getRegistryOperations().resolve(path); |
| } catch (PathNotFoundException e) { |
| throw new NotFoundException(e.getPath().toString(), e); |
| } catch (NoRecordException e) { |
| throw new NotFoundException(e.getPath().toString(), e); |
| } |
| } |
| |
| /** |
| * List instances in the registry for the current user |
| * @return a list of slider registry instances |
| * @throws IOException Any IO problem ... including no path in the registry |
| * to slider service classes for this user |
| * @throws SliderException other failures |
| */ |
| |
| public Map<String, ServiceRecord> listRegistryInstances() |
| throws IOException, SliderException { |
| Map<String, ServiceRecord> recordMap = listServiceRecords( |
| getRegistryOperations(), |
| serviceclassPath(currentUser(), SliderKeys.APP_TYPE)); |
| return recordMap; |
| } |
| |
| /** |
| * List instances in the registry |
| * @return the instance IDs |
| * @throws IOException |
| * @throws YarnException |
| */ |
| public List<String> listRegisteredSliderInstances() throws |
| IOException, |
| YarnException { |
| try { |
| Map<String, ServiceRecord> recordMap = listServiceRecords( |
| getRegistryOperations(), |
| serviceclassPath(currentUser(), SliderKeys.APP_TYPE)); |
| return new ArrayList<String>(recordMap.keySet()); |
| /// JDK7 } catch (YarnException | IOException e) { |
| } catch (PathNotFoundException e) { |
| log.debug("No registry path for slider instances for current user: {}", e, e); |
| // no entries: return an empty list |
| return new ArrayList<String>(0); |
| } catch (IOException e) { |
| throw e; |
| } catch (YarnException e) { |
| throw e; |
| } catch (Exception e) { |
| throw new IOException(e); |
| } |
| } |
| |
| /** |
| * Start the registry if it is not there yet |
| * @return the registry service |
| * @throws SliderException |
| * @throws IOException |
| */ |
| private synchronized RegistryOperations maybeStartYarnRegistry() |
| throws SliderException, IOException { |
| |
| if (registryOperations == null) { |
| registryOperations = startRegistryOperationsService(); |
| } |
| return registryOperations; |
| } |
| |
| @Override |
| public RegistryOperations getRegistryOperations() |
| throws SliderException, IOException { |
| return maybeStartYarnRegistry(); |
| } |
| |
| /** |
| * Output to standard out/stderr (implementation specific detail) |
| * @param src source |
| */ |
| @SuppressWarnings("UseOfSystemOutOrSystemErr") |
| private static void print(CharSequence src) { |
| System.out.append(src); |
| } |
| |
| /** |
| * Output to standard out/stderr with a newline after |
| * @param message message |
| */ |
| private static void println(String message) { |
| print(message); |
| print("\n"); |
| } |
| /** |
| * Output to standard out/stderr with a newline after, formatted |
| * @param message message |
| * @param args arguments for string formatting |
| */ |
| private static void println(String message, Object ... args) { |
| print(String.format(message, args)); |
| print("\n"); |
| } |
| |
| /** |
| * Implement the lookup action. |
| * @param args Action arguments |
| * @return 0 if the entry was found |
| * @throws IOException |
| * @throws YarnException |
| * @throws UnknownApplicationInstanceException if a specific instance |
| * was named but it was not found |
| */ |
| @VisibleForTesting |
| public int actionLookup(ActionLookupArgs args) |
| throws IOException, YarnException { |
| verifyBindingsDefined(); |
| try { |
| ApplicationId id = ConverterUtils.toApplicationId(args.id); |
| ApplicationReport report = yarnClient.getApplicationReport(id); |
| SerializedApplicationReport sar = new SerializedApplicationReport(report); |
| ApplicationReportSerDeser serDeser = new ApplicationReportSerDeser(); |
| if (args.outputFile != null) { |
| serDeser.save(sar, args.outputFile); |
| } else { |
| println(serDeser.toJson(sar)); |
| } |
| } catch (IllegalArgumentException e) { |
| throw new BadCommandArgumentsException(e, "%s : %s", args, e); |
| } catch (ApplicationAttemptNotFoundException notFound) { |
| throw new NotFoundException(notFound, notFound.toString()); |
| } catch (ApplicationNotFoundException notFound) { |
| throw new NotFoundException(notFound, notFound.toString()); |
| } |
| return EXIT_SUCCESS; |
| } |
| |
| // TODO: Refactor this |
| class AppDefinition { |
| Path fileInFs; |
| File appDef; |
| String pkgName; |
| } |
| } |
| |
| |