| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.applications.distributedshell; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.io.UncheckedIOException; |
| import java.nio.ByteBuffer; |
| import java.nio.charset.StandardCharsets; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.Vector; |
| import java.util.Arrays; |
| import java.util.Base64; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| |
| import org.apache.hadoop.thirdparty.com.google.common.base.Joiner; |
| |
| import org.apache.commons.cli.CommandLine; |
| import org.apache.commons.cli.GnuParser; |
| import org.apache.commons.cli.HelpFormatter; |
| import org.apache.commons.cli.Option; |
| import org.apache.commons.cli.Options; |
| import org.apache.commons.cli.ParseException; |
| import org.apache.commons.lang3.StringUtils; |
| import org.apache.hadoop.classification.InterfaceAudience; |
| import org.apache.hadoop.classification.InterfaceStability; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FSDataOutputStream; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.permission.FsPermission; |
| import org.apache.hadoop.io.DataOutputBuffer; |
| import org.apache.hadoop.security.Credentials; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.security.token.Token; |
| import org.apache.hadoop.yarn.api.ApplicationClientProtocol; |
| import org.apache.hadoop.yarn.api.ApplicationConstants; |
| import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; |
| import org.apache.hadoop.yarn.api.records.ApplicationId; |
| import org.apache.hadoop.yarn.api.records.ApplicationReport; |
| import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; |
| import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; |
| import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; |
| import org.apache.hadoop.yarn.api.records.LocalResource; |
| import org.apache.hadoop.yarn.api.records.LocalResourceType; |
| import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; |
| import org.apache.hadoop.yarn.api.records.LogAggregationContext; |
| import org.apache.hadoop.yarn.api.records.NodeReport; |
| import org.apache.hadoop.yarn.api.records.NodeState; |
| import org.apache.hadoop.yarn.api.records.Priority; |
| import org.apache.hadoop.yarn.api.records.QueueACL; |
| import org.apache.hadoop.yarn.api.records.QueueInfo; |
| import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; |
| import org.apache.hadoop.yarn.api.records.Resource; |
| import org.apache.hadoop.yarn.api.records.ResourceInformation; |
| import org.apache.hadoop.yarn.api.records.ResourceRequest; |
| import org.apache.hadoop.yarn.api.records.ResourceTypeInfo; |
| import org.apache.hadoop.yarn.api.records.URL; |
| import org.apache.hadoop.yarn.api.records.YarnApplicationState; |
| import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; |
| import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; |
| import org.apache.hadoop.yarn.api.records.ExecutionType; |
| import org.apache.hadoop.yarn.client.api.TimelineClient; |
| import org.apache.hadoop.yarn.client.api.YarnClient; |
| import org.apache.hadoop.yarn.client.api.YarnClientApplication; |
| import org.apache.hadoop.yarn.client.util.YarnClientUtils; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException; |
| import org.apache.hadoop.yarn.exceptions.YARNFeatureNotEnabledException; |
| import org.apache.hadoop.yarn.exceptions.YarnException; |
| import org.apache.hadoop.yarn.util.DockerClientConfigHandler; |
| import org.apache.hadoop.yarn.util.UnitsConversionUtil; |
| import org.apache.hadoop.yarn.util.resource.ResourceUtils; |
| import org.apache.hadoop.yarn.util.resource.Resources; |
| import org.apache.hadoop.yarn.util.timeline.TimelineUtils; |
| import org.apache.hadoop.classification.VisibleForTesting; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * Client for Distributed Shell application submission to YARN. |
| * |
| * <p> The distributed shell client allows an application master to be launched that in turn would run |
| * the provided shell command on a set of containers. </p> |
| * |
| * <p>This client is meant to act as an example on how to write yarn-based applications. </p> |
| * |
| * <p> To submit an application, a client first needs to connect to the <code>ResourceManager</code> |
| * aka ApplicationsManager or ASM via the {@link ApplicationClientProtocol}. The {@link ApplicationClientProtocol} |
| * provides a way for the client to get access to cluster information and to request for a |
| * new {@link ApplicationId}. <p> |
| * |
| * <p> For the actual job submission, the client first has to create an {@link ApplicationSubmissionContext}. |
| * The {@link ApplicationSubmissionContext} defines the application details such as {@link ApplicationId} |
| * and application name, the priority assigned to the application and the queue |
| * to which this application needs to be assigned. In addition to this, the {@link ApplicationSubmissionContext} |
| * also defines the {@link ContainerLaunchContext} which describes the <code>Container</code> with which |
| * the {@link ApplicationMaster} is launched. </p> |
| * |
| * <p> The {@link ContainerLaunchContext} in this scenario defines the resources to be allocated for the |
| * {@link ApplicationMaster}'s container, the local resources (jars, configuration files) to be made available |
| * and the environment to be set for the {@link ApplicationMaster} and the commands to be executed to run the |
| * {@link ApplicationMaster}. <p> |
| * |
| * <p> Using the {@link ApplicationSubmissionContext}, the client submits the application to the |
| * <code>ResourceManager</code> and then monitors the application by requesting the <code>ResourceManager</code> |
| * for an {@link ApplicationReport} at regular time intervals. In case of the application taking too long, the client |
| * kills the application by submitting a {@link KillApplicationRequest} to the <code>ResourceManager</code>. </p> |
| * |
| */ |
| @InterfaceAudience.Public |
| @InterfaceStability.Unstable |
| public class Client { |
| |
| private static final Logger LOG = LoggerFactory |
| .getLogger(Client.class); |
| |
| private static final int DEFAULT_AM_MEMORY = 100; |
| private static final int DEFAULT_AM_VCORES = 1; |
| private static final int DEFAULT_CONTAINER_MEMORY = 10; |
| private static final int DEFAULT_CONTAINER_VCORES = 1; |
| |
| // check the application once per second. |
| private static final int APP_MONITOR_INTERVAL = 1000; |
| |
| // Configuration |
| private Configuration conf; |
| private YarnClient yarnClient; |
| // Application master specific info to register a new Application with RM/ASM |
| private String appName = ""; |
| private ApplicationId applicationId; |
| // App master priority |
| private int amPriority = 0; |
| // Queue for App master |
| private String amQueue = ""; |
| // Amt. of memory resource to request for to run the App Master |
| private long amMemory = DEFAULT_AM_MEMORY; |
| // Amt. of virtual core resource to request for to run the App Master |
| private int amVCores = DEFAULT_AM_VCORES; |
| // Amount of resources to request to run the App Master |
| private Map<String, Long> amResources = new HashMap<>(); |
| // AM resource profile |
| private String amResourceProfile = ""; |
| |
| // Application master jar file |
| private String appMasterJar = ""; |
| // Main class to invoke application master |
| private final String appMasterMainClass; |
| |
| // Shell command to be executed |
| private String shellCommand = ""; |
| // Location of shell script |
| private String shellScriptPath = ""; |
| // Args to be passed to the shell command |
| private String[] shellArgs = new String[] {}; |
| // Env variables to be setup for the shell command |
| private Map<String, String> shellEnv = new HashMap<String, String>(); |
| // Shell Command Container priority |
| private int shellCmdPriority = 0; |
| |
| // Amt of memory to request for container in which shell script will be executed |
| private long containerMemory = DEFAULT_CONTAINER_MEMORY; |
| // Amt. of virtual cores to request for container in which shell script will be executed |
| private int containerVirtualCores = DEFAULT_CONTAINER_VCORES; |
| // Amt. of resources to request for container |
| // in which shell script will be executed |
| private Map<String, Long> containerResources = new HashMap<>(); |
| // container resource profile |
| private String containerResourceProfile = ""; |
| // No. of containers in which the shell script needs to be executed |
| private int numContainers = 1; |
| private String nodeLabelExpression = null; |
| // Container type, default GUARANTEED. |
| private ExecutionType containerType = ExecutionType.GUARANTEED; |
| // Whether to auto promote opportunistic containers |
| private boolean autoPromoteContainers = false; |
| // Whether to enforce execution type of containers |
| private boolean enforceExecType = false; |
| |
| // Placement specification |
| private String placementSpec = ""; |
| // Node Attribute specification |
| private String nodeAttributeSpec = ""; |
| // log4j.properties file |
| // if available, add to local resources and set into classpath |
| private String log4jPropFile = ""; |
| // rolling |
| private String rollingFilesPattern = ""; |
| |
| // Start time for client |
| private long clientStartTime = System.currentTimeMillis(); |
| // Timeout threshold for client. Kill app after time interval expires. |
| private long clientTimeout = 600000; |
| |
| // flag to indicate whether to keep containers across application attempts. |
| private boolean keepContainers = false; |
| |
| private long attemptFailuresValidityInterval = -1; |
| |
| private Vector<CharSequence> containerRetryOptions = new Vector<>(5); |
| |
| // Debug flag |
| boolean debugFlag = false; |
| |
| // Timeline domain ID |
| private String domainId = null; |
| |
| // Flag to indicate whether to create the domain of the given ID |
| private boolean toCreateDomain = false; |
| |
| // Timeline domain reader access control |
| private String viewACLs = null; |
| |
| // Timeline domain writer access control |
| private String modifyACLs = null; |
| |
| private String flowName = null; |
| private String flowVersion = null; |
| private long flowRunId = 0L; |
| |
| // Docker client configuration |
| private String dockerClientConfig = null; |
| |
| // Application tags |
| private Set<String> applicationTags = new HashSet<>(); |
| |
| private List<String> filesToLocalize = new ArrayList<>(); |
| |
| // Command line options |
| private Options opts; |
| |
| private final AtomicBoolean stopSignalReceived; |
| private final AtomicBoolean isRunning; |
| private final Object objectLock = new Object(); |
| |
| private static final String shellCommandPath = "shellCommands"; |
| private static final String shellArgsPath = "shellArgs"; |
| private static final String appMasterJarPath = "AppMaster.jar"; |
| // Hardcoded path to custom log_properties |
| private static final String log4jPath = "log4j.properties"; |
| |
| public static final String SCRIPT_PATH = "ExecScript"; |
| |
| /** |
| * @param args Command line arguments |
| */ |
| public static void main(String[] args) { |
| boolean result = false; |
| try { |
| Client client = new Client(); |
| LOG.info("Initializing Client"); |
| try { |
| boolean doRun = client.init(args); |
| if (!doRun) { |
| System.exit(0); |
| } |
| } catch (IllegalArgumentException e) { |
| System.err.println(e.getLocalizedMessage()); |
| client.printUsage(); |
| System.exit(-1); |
| } |
| result = client.run(); |
| } catch (Throwable t) { |
| LOG.error("Error running Client", t); |
| System.exit(1); |
| } |
| if (result) { |
| LOG.info("Application completed successfully"); |
| System.exit(0); |
| } |
| LOG.error("Application failed to complete successfully"); |
| System.exit(2); |
| } |
| |
| /** |
| */ |
| public Client(Configuration conf) throws Exception { |
| this( |
| "org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster", |
| conf); |
| } |
| |
| Client(String appMasterMainClass, Configuration conf) { |
| this.conf = conf; |
| this.conf.setBoolean( |
| YarnConfiguration.YARN_CLIENT_LOAD_RESOURCETYPES_FROM_SERVER, true); |
| this.appMasterMainClass = appMasterMainClass; |
| yarnClient = YarnClient.createYarnClient(); |
| yarnClient.init(conf); |
| opts = new Options(); |
| opts.addOption("appname", true, "Application Name. Default value - DistributedShell"); |
| opts.addOption("priority", true, "Application Priority. Default 0"); |
| opts.addOption("queue", true, "RM Queue in which this application is to be submitted"); |
| opts.addOption("timeout", true, "Application timeout in milliseconds"); |
| opts.addOption("master_memory", true, "Amount of memory in MB to be requested to run the application master"); |
| opts.addOption("master_vcores", true, "Amount of virtual cores " + |
| "to be requested to run the application master"); |
| opts.addOption("master_resources", true, "Amount of resources " + |
| "to be requested to run the application master. " + |
| "Specified as resource type=value pairs separated by commas." + |
| "E.g. -master_resources memory-mb=512,vcores=2"); |
| opts.addOption("jar", true, "Jar file containing the application master"); |
| opts.addOption("master_resource_profile", true, "Resource profile for the application master"); |
| opts.addOption("shell_command", true, "Shell command to be executed by " + |
| "the Application Master. Can only specify either --shell_command " + |
| "or --shell_script"); |
| opts.addOption("shell_script", true, "Location of the shell script to be " + |
| "executed. Can only specify either --shell_command or --shell_script"); |
| opts.addOption("shell_args", true, "Command line args for the shell script." + |
| "Multiple args can be separated by empty space."); |
| opts.getOption("shell_args").setArgs(Option.UNLIMITED_VALUES); |
| opts.addOption("shell_env", true, |
| "Environment for shell script. Specified as env_key=env_val pairs"); |
| opts.addOption("shell_cmd_priority", true, "Priority for the shell command containers"); |
| opts.addOption("container_type", true, |
| "Container execution type, GUARANTEED or OPPORTUNISTIC"); |
| opts.addOption("container_memory", true, "Amount of memory in MB " + |
| "to be requested to run the shell command"); |
| opts.addOption("container_vcores", true, "Amount of virtual cores " + |
| "to be requested to run the shell command"); |
| opts.addOption("container_resources", true, "Amount of resources " + |
| "to be requested to run the shell command. " + |
| "Specified as resource type=value pairs separated by commas. " + |
| "E.g. -container_resources memory-mb=256,vcores=1"); |
| opts.addOption("container_resource_profile", true, "Resource profile for the shell command"); |
| opts.addOption("num_containers", true, "No. of containers on which the shell command needs to be executed"); |
| opts.addOption("promote_opportunistic_after_start", false, |
| "Flag to indicate whether to automatically promote opportunistic" |
| + " containers to guaranteed."); |
| opts.addOption("enforce_execution_type", false, |
| "Flag to indicate whether to enforce execution type of containers"); |
| opts.addOption("log_properties", true, "log4j.properties file"); |
| opts.addOption("rolling_log_pattern", true, |
| "pattern for files that should be aggregated in a rolling fashion"); |
| opts.addOption("keep_containers_across_application_attempts", false, |
| "Flag to indicate whether to keep containers across application " |
| + "attempts." |
| + " If the flag is true, running containers will not be killed when" |
| + " application attempt fails and these containers will be " |
| + "retrieved by" |
| + " the new application attempt "); |
| opts.addOption("attempt_failures_validity_interval", true, |
| "when attempt_failures_validity_interval in milliseconds is set to > 0," + |
| "the failure number will not take failures which happen out of " + |
| "the validityInterval into failure count. " + |
| "If failure count reaches to maxAppAttempts, " + |
| "the application will be failed."); |
| opts.addOption("debug", false, "Dump out debug information"); |
| opts.addOption("domain", true, "ID of the timeline domain where the " |
| + "timeline entities will be put"); |
| opts.addOption("view_acls", true, "Users and groups that allowed to " |
| + "view the timeline entities in the given domain"); |
| opts.addOption("modify_acls", true, "Users and groups that allowed to " |
| + "modify the timeline entities in the given domain"); |
| opts.addOption("create", false, "Flag to indicate whether to create the " |
| + "domain specified with -domain."); |
| opts.addOption("flow_name", true, "Flow name which the distributed shell " |
| + "app belongs to"); |
| opts.addOption("flow_version", true, "Flow version which the distributed " |
| + "shell app belongs to"); |
| opts.addOption("flow_run_id", true, "Flow run ID which the distributed " |
| + "shell app belongs to"); |
| opts.addOption("help", false, "Print usage"); |
| opts.addOption("node_label_expression", true, |
| "Node label expression to determine the nodes" |
| + " where all the containers of this application" |
| + " will be allocated, \"\" means containers" |
| + " can be allocated anywhere, if you don't specify the option," |
| + " default node_label_expression of queue will be used."); |
| opts.addOption("container_retry_policy", true, |
| "Retry policy when container fails to run, " |
| + "0: NEVER_RETRY, 1: RETRY_ON_ALL_ERRORS, " |
| + "2: RETRY_ON_SPECIFIC_ERROR_CODES"); |
| opts.addOption("container_retry_error_codes", true, |
| "When retry policy is set to RETRY_ON_SPECIFIC_ERROR_CODES, error " |
| + "codes is specified with this option, " |
| + "e.g. --container_retry_error_codes 1,2,3"); |
| opts.addOption("container_max_retries", true, |
| "If container could retry, it specifies max retires"); |
| opts.addOption("container_retry_interval", true, |
| "Interval between each retry, unit is milliseconds"); |
| opts.addOption("container_failures_validity_interval", true, |
| "Failures which are out of the time window will not be added to" |
| + " the number of container retry attempts"); |
| opts.addOption("docker_client_config", true, |
| "The docker client configuration path. The scheme should be supplied" |
| + " (i.e. file:// or hdfs://)." |
| + " Only used when the Docker runtime is enabled and requested."); |
| opts.addOption("placement_spec", true, |
| "Placement specification. Please note, if this option is specified," |
| + " The \"num_containers\" option will be ignored. All requested" |
| + " containers will be of type GUARANTEED" ); |
| opts.addOption("application_tags", true, "Application tags."); |
| opts.addOption("localize_files", true, "List of files, separated by comma" |
| + " to be localized for the command"); |
| stopSignalReceived = new AtomicBoolean(false); |
| isRunning = new AtomicBoolean(false); |
| } |
| |
| /** |
| */ |
| public Client() throws Exception { |
| this(new YarnConfiguration()); |
| } |
| |
| /** |
| * Helper function to print out usage |
| */ |
| private void printUsage() { |
| new HelpFormatter().printHelp("Client", opts); |
| } |
| |
| /** |
| * Parse command line options |
| * @param args Parsed command line options |
| * @return Whether the init was successful to run the client |
| * @throws ParseException |
| */ |
| public boolean init(String[] args) throws ParseException { |
| |
| CommandLine cliParser = new GnuParser().parse(opts, args); |
| |
| if (args.length == 0) { |
| throw new IllegalArgumentException("No args specified for client to initialize"); |
| } |
| |
| if (cliParser.hasOption("log_properties")) { |
| String log4jPath = cliParser.getOptionValue("log_properties"); |
| try { |
| Log4jPropertyHelper.updateLog4jConfiguration(Client.class, log4jPath); |
| } catch (Exception e) { |
| LOG.warn("Can not set up custom log4j properties. " + e); |
| } |
| } |
| |
| if (cliParser.hasOption("rolling_log_pattern")) { |
| rollingFilesPattern = cliParser.getOptionValue("rolling_log_pattern"); |
| } |
| |
| if (cliParser.hasOption("help")) { |
| printUsage(); |
| return false; |
| } |
| |
| if (cliParser.hasOption("debug")) { |
| debugFlag = true; |
| |
| } |
| |
| if (cliParser.hasOption("keep_containers_across_application_attempts")) { |
| LOG.info("keep_containers_across_application_attempts"); |
| keepContainers = true; |
| } |
| |
| if (cliParser.hasOption("placement_spec")) { |
| placementSpec = cliParser.getOptionValue("placement_spec"); |
| // Check if it is parsable |
| PlacementSpec.parse(this.placementSpec); |
| } |
| |
| appName = cliParser.getOptionValue("appname", "DistributedShell"); |
| amPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0")); |
| amQueue = cliParser.getOptionValue("queue", "default"); |
| amMemory = |
| Integer.parseInt(cliParser.getOptionValue("master_memory", "-1")); |
| amVCores = |
| Integer.parseInt(cliParser.getOptionValue("master_vcores", "-1")); |
| if (cliParser.hasOption("master_resources")) { |
| Map<String, Long> masterResources = |
| parseResourcesString(cliParser.getOptionValue("master_resources")); |
| for (Map.Entry<String, Long> entry : masterResources.entrySet()) { |
| if (entry.getKey().equals(ResourceInformation.MEMORY_URI)) { |
| amMemory = entry.getValue(); |
| } else if (entry.getKey().equals(ResourceInformation.VCORES_URI)) { |
| amVCores = entry.getValue().intValue(); |
| } else { |
| amResources.put(entry.getKey(), entry.getValue()); |
| } |
| } |
| } |
| amResourceProfile = cliParser.getOptionValue("master_resource_profile", ""); |
| |
| if (!cliParser.hasOption("jar")) { |
| throw new IllegalArgumentException("No jar file specified for application master"); |
| } |
| |
| appMasterJar = cliParser.getOptionValue("jar"); |
| |
| if (!cliParser.hasOption("shell_command") && !cliParser.hasOption("shell_script")) { |
| throw new IllegalArgumentException( |
| "No shell command or shell script specified to be executed by application master"); |
| } else if (cliParser.hasOption("shell_command") && cliParser.hasOption("shell_script")) { |
| throw new IllegalArgumentException("Can not specify shell_command option " + |
| "and shell_script option at the same time"); |
| } else if (cliParser.hasOption("shell_command")) { |
| shellCommand = cliParser.getOptionValue("shell_command"); |
| } else { |
| shellScriptPath = cliParser.getOptionValue("shell_script"); |
| } |
| if (cliParser.hasOption("shell_args")) { |
| shellArgs = cliParser.getOptionValues("shell_args"); |
| } |
| if (cliParser.hasOption("shell_env")) { |
| String envs[] = cliParser.getOptionValues("shell_env"); |
| for (String env : envs) { |
| env = env.trim(); |
| int index = env.indexOf('='); |
| if (index == -1) { |
| shellEnv.put(env, ""); |
| continue; |
| } |
| String key = env.substring(0, index); |
| String val = ""; |
| if (index < (env.length()-1)) { |
| val = env.substring(index+1); |
| } |
| shellEnv.put(key, val); |
| } |
| } |
| shellCmdPriority = Integer.parseInt(cliParser.getOptionValue("shell_cmd_priority", "0")); |
| |
| if (cliParser.hasOption("container_type")) { |
| String containerTypeStr = cliParser.getOptionValue("container_type"); |
| if (Arrays.stream(ExecutionType.values()).noneMatch( |
| executionType -> executionType.toString() |
| .equals(containerTypeStr))) { |
| throw new IllegalArgumentException("Invalid container_type: " |
| + containerTypeStr); |
| } |
| containerType = ExecutionType.valueOf(containerTypeStr); |
| } |
| if (cliParser.hasOption("promote_opportunistic_after_start")) { |
| autoPromoteContainers = true; |
| } |
| if (cliParser.hasOption("enforce_execution_type")) { |
| enforceExecType = true; |
| } |
| containerMemory = |
| Integer.parseInt(cliParser.getOptionValue("container_memory", "-1")); |
| containerVirtualCores = |
| Integer.parseInt(cliParser.getOptionValue("container_vcores", "-1")); |
| if (cliParser.hasOption("container_resources")) { |
| Map<String, Long> resources = |
| parseResourcesString(cliParser.getOptionValue("container_resources")); |
| for (Map.Entry<String, Long> entry : resources.entrySet()) { |
| if (entry.getKey().equals(ResourceInformation.MEMORY_URI)) { |
| containerMemory = entry.getValue(); |
| } else if (entry.getKey().equals(ResourceInformation.VCORES_URI)) { |
| containerVirtualCores = entry.getValue().intValue(); |
| } else { |
| containerResources.put(entry.getKey(), entry.getValue()); |
| } |
| } |
| } |
| containerResourceProfile = |
| cliParser.getOptionValue("container_resource_profile", ""); |
| numContainers = |
| Integer.parseInt(cliParser.getOptionValue("num_containers", "1")); |
| |
| if (numContainers < 1) { |
| throw new IllegalArgumentException("Invalid no. of containers specified," |
| + " exiting. Specified numContainer=" + numContainers); |
| } |
| |
| nodeLabelExpression = cliParser.getOptionValue("node_label_expression", null); |
| |
| clientTimeout = Integer.parseInt(cliParser.getOptionValue("timeout", "600000")); |
| |
| attemptFailuresValidityInterval = |
| Long.parseLong(cliParser.getOptionValue( |
| "attempt_failures_validity_interval", "-1")); |
| |
| log4jPropFile = cliParser.getOptionValue("log_properties", ""); |
| |
| // Get timeline domain options |
| if (cliParser.hasOption("domain")) { |
| domainId = cliParser.getOptionValue("domain"); |
| toCreateDomain = cliParser.hasOption("create"); |
| if (cliParser.hasOption("view_acls")) { |
| viewACLs = cliParser.getOptionValue("view_acls"); |
| } |
| if (cliParser.hasOption("modify_acls")) { |
| modifyACLs = cliParser.getOptionValue("modify_acls"); |
| } |
| } |
| |
| // Get container retry options |
| if (cliParser.hasOption("container_retry_policy")) { |
| containerRetryOptions.add("--container_retry_policy " |
| + cliParser.getOptionValue("container_retry_policy")); |
| } |
| if (cliParser.hasOption("container_retry_error_codes")) { |
| containerRetryOptions.add("--container_retry_error_codes " |
| + cliParser.getOptionValue("container_retry_error_codes")); |
| } |
| if (cliParser.hasOption("container_max_retries")) { |
| containerRetryOptions.add("--container_max_retries " |
| + cliParser.getOptionValue("container_max_retries")); |
| } |
| if (cliParser.hasOption("container_retry_interval")) { |
| containerRetryOptions.add("--container_retry_interval " |
| + cliParser.getOptionValue("container_retry_interval")); |
| } |
| if (cliParser.hasOption("container_failures_validity_interval")) { |
| containerRetryOptions.add("--container_failures_validity_interval " |
| + cliParser.getOptionValue("container_failures_validity_interval")); |
| } |
| |
| if (cliParser.hasOption("flow_name")) { |
| flowName = cliParser.getOptionValue("flow_name"); |
| } |
| if (cliParser.hasOption("flow_version")) { |
| flowVersion = cliParser.getOptionValue("flow_version"); |
| } |
| if (cliParser.hasOption("flow_run_id")) { |
| try { |
| flowRunId = Long.parseLong(cliParser.getOptionValue("flow_run_id")); |
| } catch (NumberFormatException e) { |
| throw new IllegalArgumentException( |
| "Flow run is not a valid long value", e); |
| } |
| } |
| if (cliParser.hasOption("docker_client_config")) { |
| dockerClientConfig = cliParser.getOptionValue("docker_client_config"); |
| } |
| |
| if (cliParser.hasOption("application_tags")) { |
| String applicationTagsStr = cliParser.getOptionValue("application_tags"); |
| String[] appTags = applicationTagsStr.split(","); |
| for (String appTag : appTags) { |
| this.applicationTags.add(appTag.trim()); |
| } |
| } |
| |
| if (cliParser.hasOption("localize_files")) { |
| String filesStr = cliParser.getOptionValue("localize_files"); |
| if (filesStr.contains(",")) { |
| String[] files = filesStr.split(","); |
| filesToLocalize = Arrays.asList(files); |
| } else { |
| filesToLocalize.add(filesStr); |
| } |
| } |
| |
| return true; |
| } |
| |
| /** |
| * Main run function for the client |
| * @return true if application completed successfully |
| * @throws IOException |
| * @throws YarnException |
| */ |
| public boolean run() throws IOException, YarnException { |
| LOG.info("Running Client"); |
| isRunning.set(true); |
| yarnClient.start(); |
| // set the client start time. |
| clientStartTime = System.currentTimeMillis(); |
| |
| YarnClusterMetrics clusterMetrics = yarnClient.getYarnClusterMetrics(); |
| LOG.info("Got Cluster metric info from ASM" |
| + ", numNodeManagers=" + clusterMetrics.getNumNodeManagers()); |
| |
| List<NodeReport> clusterNodeReports = yarnClient.getNodeReports( |
| NodeState.RUNNING); |
| LOG.info("Got Cluster node info from ASM"); |
| for (NodeReport node : clusterNodeReports) { |
| LOG.info("Got node report from ASM for" |
| + ", nodeId=" + node.getNodeId() |
| + ", nodeAddress=" + node.getHttpAddress() |
| + ", nodeRackName=" + node.getRackName() |
| + ", nodeNumContainers=" + node.getNumContainers()); |
| } |
| |
| QueueInfo queueInfo = yarnClient.getQueueInfo(this.amQueue); |
| if (queueInfo == null) { |
| throw new IllegalArgumentException(String |
| .format("Queue %s not present in scheduler configuration.", |
| this.amQueue)); |
| } |
| |
| LOG.info("Queue info" |
| + ", queueName=" + queueInfo.getQueueName() |
| + ", queueCurrentCapacity=" + queueInfo.getCurrentCapacity() |
| + ", queueMaxCapacity=" + queueInfo.getMaximumCapacity() |
| + ", queueApplicationCount=" + queueInfo.getApplications().size() |
| + ", queueChildQueueCount=" + queueInfo.getChildQueues().size()); |
| |
| List<QueueUserACLInfo> listAclInfo = yarnClient.getQueueAclsInfo(); |
| for (QueueUserACLInfo aclInfo : listAclInfo) { |
| for (QueueACL userAcl : aclInfo.getUserAcls()) { |
| LOG.info("User ACL Info for Queue" |
| + ", queueName=" + aclInfo.getQueueName() |
| + ", userAcl=" + userAcl.name()); |
| } |
| } |
| |
| if (domainId != null && domainId.length() > 0 && toCreateDomain) { |
| prepareTimelineDomain(); |
| } |
| |
| Map<String, Resource> profiles; |
| try { |
| profiles = yarnClient.getResourceProfiles(); |
| } catch (YARNFeatureNotEnabledException re) { |
| profiles = null; |
| } |
| |
| List<String> appProfiles = new ArrayList<>(2); |
| appProfiles.add(amResourceProfile); |
| appProfiles.add(containerResourceProfile); |
| for (String appProfile : appProfiles) { |
| if (appProfile != null && !appProfile.isEmpty()) { |
| if (profiles == null) { |
| String message = "Resource profiles is not enabled"; |
| LOG.error(message); |
| throw new IOException(message); |
| } |
| if (!profiles.containsKey(appProfile)) { |
| String message = "Unknown resource profile '" + appProfile |
| + "'. Valid resource profiles are " + profiles.keySet(); |
| LOG.error(message); |
| throw new IOException(message); |
| } |
| } |
| } |
| |
| // Get a new application id |
| YarnClientApplication app = yarnClient.createApplication(); |
| GetNewApplicationResponse appResponse = app.getNewApplicationResponse(); |
| // TODO get min/max resource capabilities from RM and change memory ask if needed |
| // If we do not have min/max, we may not be able to correctly request |
| // the required resources from the RM for the app master |
| // Memory ask has to be a multiple of min and less than max. |
| // Dump out information about cluster capability as seen by the resource manager |
| long maxMem = appResponse.getMaximumResourceCapability().getMemorySize(); |
| LOG.info("Max mem capability of resources in this cluster " + maxMem); |
| |
| // A resource ask cannot exceed the max. |
| if (amMemory > maxMem) { |
| LOG.info("AM memory specified above max threshold of cluster. Using max value." |
| + ", specified=" + amMemory |
| + ", max=" + maxMem); |
| amMemory = maxMem; |
| } |
| |
| int maxVCores = appResponse.getMaximumResourceCapability().getVirtualCores(); |
| LOG.info("Max virtual cores capability of resources in this cluster " + maxVCores); |
| |
| if (amVCores > maxVCores) { |
| LOG.info("AM virtual cores specified above max threshold of cluster. " |
| + "Using max value." + ", specified=" + amVCores |
| + ", max=" + maxVCores); |
| amVCores = maxVCores; |
| } |
| |
| // set the application name |
| ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext(); |
| applicationId = appContext.getApplicationId(); |
| |
| // Set up resource type requirements |
| // For now, both memory and vcores are supported, so we set memory and |
| // vcores requirements |
| List<ResourceTypeInfo> resourceTypes = yarnClient.getResourceTypeInfo(); |
| setAMResourceCapability(appContext, profiles, resourceTypes); |
| setContainerResources(profiles, resourceTypes); |
| |
| appContext.setKeepContainersAcrossApplicationAttempts(keepContainers); |
| appContext.setApplicationName(appName); |
| |
| if (attemptFailuresValidityInterval >= 0) { |
| appContext |
| .setAttemptFailuresValidityInterval(attemptFailuresValidityInterval); |
| } |
| |
| Set<String> tags = new HashSet<String>(); |
| if (applicationTags != null) { |
| tags.addAll(applicationTags); |
| } |
| if (flowName != null) { |
| tags.add(TimelineUtils.generateFlowNameTag(flowName)); |
| } |
| if (flowVersion != null) { |
| tags.add(TimelineUtils.generateFlowVersionTag(flowVersion)); |
| } |
| if (flowRunId != 0) { |
| tags.add(TimelineUtils.generateFlowRunIdTag(flowRunId)); |
| } |
| appContext.setApplicationTags(tags); |
| |
| // set local resources for the application master |
| // local files or archives as needed |
| // In this scenario, the jar file for the application master is part of the local resources |
| Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(); |
| |
| LOG.info("Copy App Master jar from local filesystem and add to local environment"); |
| // Copy the application master jar to the filesystem |
| // Create a local resource to point to the destination jar path |
| FileSystem fs = FileSystem.get(conf); |
| addToLocalResources(fs, appMasterJar, appMasterJarPath, |
| applicationId.toString(), localResources, null); |
| |
| // Set the log4j properties if needed |
| if (!log4jPropFile.isEmpty()) { |
| addToLocalResources(fs, log4jPropFile, log4jPath, |
| applicationId.toString(), localResources, null); |
| } |
| |
| // Process local files for localization |
| // Here we just upload the files, the AM |
| // will set up localization later. |
| StringBuilder localizableFiles = new StringBuilder(); |
| filesToLocalize.stream().forEach(path -> { |
| File f = new File(path); |
| |
| if (!f.exists()) { |
| throw new UncheckedIOException( |
| new IOException(path + " does not exist")); |
| } |
| |
| if (!f.canRead()) { |
| throw new UncheckedIOException( |
| new IOException(path + " cannot be read")); |
| } |
| |
| if (f.isDirectory()) { |
| throw new UncheckedIOException( |
| new IOException(path + " is a directory")); |
| } |
| |
| try { |
| String fileName = f.getName(); |
| uploadFile(fs, path, fileName, applicationId.toString()); |
| if (localizableFiles.length() == 0) { |
| localizableFiles.append(fileName); |
| } else { |
| localizableFiles.append(",").append(fileName); |
| } |
| } catch (IOException e) { |
| throw new UncheckedIOException("Cannot upload file: " + path, e); |
| } |
| }); |
| |
| // The shell script has to be made available on the final container(s) |
| // where it will be executed. |
| // To do this, we need to first copy into the filesystem that is visible |
| // to the yarn framework. |
| // We do not need to set this as a local resource for the application |
| // master as the application master does not need it. |
| String hdfsShellScriptLocation = ""; |
| long hdfsShellScriptLen = 0; |
| long hdfsShellScriptTimestamp = 0; |
| if (!shellScriptPath.isEmpty()) { |
| Path shellSrc = new Path(shellScriptPath); |
| String shellPathSuffix = |
| ApplicationMaster.getRelativePath(appName, |
| applicationId.toString(), |
| SCRIPT_PATH); |
| Path shellDst = |
| new Path(fs.getHomeDirectory(), shellPathSuffix); |
| fs.copyFromLocalFile(false, true, shellSrc, shellDst); |
| hdfsShellScriptLocation = shellDst.toUri().toString(); |
| FileStatus shellFileStatus = fs.getFileStatus(shellDst); |
| hdfsShellScriptLen = shellFileStatus.getLen(); |
| hdfsShellScriptTimestamp = shellFileStatus.getModificationTime(); |
| } |
| |
| if (!shellCommand.isEmpty()) { |
| addToLocalResources(fs, null, shellCommandPath, applicationId.toString(), |
| localResources, shellCommand); |
| } |
| |
| if (shellArgs.length > 0) { |
| addToLocalResources(fs, null, shellArgsPath, applicationId.toString(), |
| localResources, StringUtils.join(shellArgs, " ")); |
| } |
| |
| // Set the necessary security tokens as needed |
| //amContainer.setContainerTokens(containerToken); |
| |
| // Set the env variables to be setup in the env where the application master will be run |
| LOG.info("Set the environment for the application master"); |
| Map<String, String> env = new HashMap<String, String>(); |
| |
| // put location of shell script into env |
| // using the env info, the application master will create the correct local resource for the |
| // eventual containers that will be launched to execute the shell scripts |
| env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION, hdfsShellScriptLocation); |
| env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP, Long.toString(hdfsShellScriptTimestamp)); |
| env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN, Long.toString(hdfsShellScriptLen)); |
| if (domainId != null && domainId.length() > 0) { |
| env.put(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN, domainId); |
| } |
| |
| // Add AppMaster.jar location to classpath |
| // At some point we should not be required to add |
| // the hadoop specific classpaths to the env. |
| // It should be provided out of the box. |
| // For now setting all required classpaths including |
| // the classpath to "." for the application jar |
| StringBuilder classPathEnv = new StringBuilder(Environment.CLASSPATH.$$()) |
| .append(ApplicationConstants.CLASS_PATH_SEPARATOR).append("./*"); |
| for (String c : conf.getStrings( |
| YarnConfiguration.YARN_APPLICATION_CLASSPATH, |
| YarnConfiguration.DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH)) { |
| classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR) |
| .append(c.trim()); |
| } |
| classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR).append( |
| "./log4j.properties"); |
| |
| // add the runtime classpath needed for tests to work |
| if (conf.getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) { |
| classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR) |
| .append(System.getProperty("java.class.path")); |
| } |
| |
| env.put("CLASSPATH", classPathEnv.toString()); |
| |
| // Set the necessary command to execute the application master |
| Vector<CharSequence> vargs = new Vector<CharSequence>(30); |
| |
| // Set java executable command |
| LOG.info("Setting up app master command"); |
| // Need extra quote here because JAVA_HOME might contain space on Windows, |
| // e.g. C:/Program Files/Java... |
| vargs.add("\"" + Environment.JAVA_HOME.$$() + "/bin/java\""); |
| // Set Xmx based on am memory size |
| vargs.add("-Xmx" + amMemory + "m"); |
| // Set class name |
| vargs.add(appMasterMainClass); |
| // Set params for Application Master |
| if (containerType != null) { |
| vargs.add("--container_type " + String.valueOf(containerType)); |
| } |
| if (autoPromoteContainers) { |
| vargs.add("--promote_opportunistic_after_start"); |
| } |
| if (enforceExecType) { |
| vargs.add("--enforce_execution_type"); |
| } |
| if (containerMemory > 0) { |
| vargs.add("--container_memory " + String.valueOf(containerMemory)); |
| } |
| if (containerVirtualCores > 0) { |
| vargs.add("--container_vcores " + String.valueOf(containerVirtualCores)); |
| } |
| if (!containerResources.isEmpty()) { |
| Joiner.MapJoiner joiner = Joiner.on(',').withKeyValueSeparator("="); |
| vargs.add("--container_resources " + joiner.join(containerResources)); |
| } |
| if (containerResourceProfile != null && !containerResourceProfile |
| .isEmpty()) { |
| vargs.add("--container_resource_profile " + containerResourceProfile); |
| } |
| vargs.add("--num_containers " + String.valueOf(numContainers)); |
| if (placementSpec != null && placementSpec.length() > 0) { |
| // Encode the spec to avoid passing special chars via shell arguments. |
| String encodedSpec = Base64.getEncoder() |
| .encodeToString(placementSpec.getBytes(StandardCharsets.UTF_8)); |
| LOG.info("Encode placement spec: " + encodedSpec); |
| vargs.add("--placement_spec " + encodedSpec); |
| } |
| if (null != nodeLabelExpression) { |
| appContext.setNodeLabelExpression(nodeLabelExpression); |
| } |
| vargs.add("--priority " + String.valueOf(shellCmdPriority)); |
| |
| if (keepContainers) { |
| vargs.add("--keep_containers_across_application_attempts"); |
| } |
| for (Map.Entry<String, String> entry : shellEnv.entrySet()) { |
| vargs.add("--shell_env " + entry.getKey() + "=" + entry.getValue()); |
| } |
| if (debugFlag) { |
| vargs.add("--debug"); |
| } |
| if (localizableFiles.length() > 0) { |
| vargs.add("--localized_files " + localizableFiles.toString()); |
| } |
| vargs.add("--appname " + appName); |
| |
| vargs.add("--homedir " + fs.getHomeDirectory()); |
| |
| vargs.addAll(containerRetryOptions); |
| |
| vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout"); |
| vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr"); |
| |
| // Get final commmand |
| StringBuilder command = new StringBuilder(); |
| for (CharSequence str : vargs) { |
| command.append(str).append(" "); |
| } |
| |
| LOG.info("Completed setting up app master command " + command.toString()); |
| List<String> commands = new ArrayList<String>(); |
| commands.add(command.toString()); |
| |
| // Set up the container launch context for the application master |
| ContainerLaunchContext amContainer = ContainerLaunchContext.newInstance( |
| localResources, env, commands, null, null, null); |
| |
| // Service data is a binary blob that can be passed to the application |
| // Not needed in this scenario |
| // amContainer.setServiceData(serviceData); |
| |
| // Setup security tokens |
| Credentials rmCredentials = null; |
| if (UserGroupInformation.isSecurityEnabled()) { |
| // Note: Credentials class is marked as LimitedPrivate for HDFS and MapReduce |
| rmCredentials = new Credentials(); |
| String tokenRenewer = YarnClientUtils.getRmPrincipal(conf); |
| if (tokenRenewer == null || tokenRenewer.length() == 0) { |
| throw new IOException( |
| "Can't get Master Kerberos principal for the RM to use as renewer"); |
| } |
| |
| // For now, only getting tokens for the default file-system. |
| final Token<?> tokens[] = |
| fs.addDelegationTokens(tokenRenewer, rmCredentials); |
| if (tokens != null) { |
| for (Token<?> token : tokens) { |
| LOG.info("Got dt for " + fs.getUri() + "; " + token); |
| } |
| } |
| } |
| |
| // Add the docker client config credentials if supplied. |
| Credentials dockerCredentials = null; |
| if (dockerClientConfig != null) { |
| dockerCredentials = |
| DockerClientConfigHandler.readCredentialsFromConfigFile( |
| new Path(dockerClientConfig), conf, applicationId.toString()); |
| } |
| |
| if (rmCredentials != null || dockerCredentials != null) { |
| DataOutputBuffer dob = new DataOutputBuffer(); |
| if (rmCredentials != null) { |
| rmCredentials.writeTokenStorageToStream(dob); |
| } |
| if (dockerCredentials != null) { |
| dockerCredentials.writeTokenStorageToStream(dob); |
| } |
| ByteBuffer tokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); |
| amContainer.setTokens(tokens); |
| } |
| |
| appContext.setAMContainerSpec(amContainer); |
| |
| // Set the priority for the application master |
| // TODO - what is the range for priority? how to decide? |
| Priority pri = Priority.newInstance(amPriority); |
| appContext.setPriority(pri); |
| |
| // Set the queue to which this application is to be submitted in the RM |
| appContext.setQueue(amQueue); |
| |
| specifyLogAggregationContext(appContext); |
| |
| // Submit the application to the applications manager |
| // SubmitApplicationResponse submitResp = applicationsManager.submitApplication(appRequest); |
| // Ignore the response as either a valid response object is returned on success |
| // or an exception thrown to denote some form of a failure |
| LOG.info("Submitting application to ASM"); |
| |
| yarnClient.submitApplication(appContext); |
| |
| // TODO |
| // Try submitting the same request again |
| // app submission failure? |
| |
| // Monitor the application |
| return monitorApplication(applicationId); |
| |
| } |
| |
| @VisibleForTesting |
| void specifyLogAggregationContext(ApplicationSubmissionContext appContext) { |
| if (!rollingFilesPattern.isEmpty()) { |
| LogAggregationContext logAggregationContext = LogAggregationContext |
| .newInstance(null, null, rollingFilesPattern, ""); |
| appContext.setLogAggregationContext(logAggregationContext); |
| } |
| } |
| |
| /** |
| * Monitor the submitted application for completion. |
| * Kill application if time expires. |
| * @param appId Application Id of application to be monitored |
| * @return true if application completed successfully |
| * @throws YarnException |
| * @throws IOException |
| */ |
| private boolean monitorApplication(ApplicationId appId) |
| throws YarnException, IOException { |
| |
| boolean res = false; |
| boolean needForceKill = false; |
| while (isRunning.get()) { |
| // Check app status every 1 second. |
| try { |
| synchronized (objectLock) { |
| objectLock.wait(APP_MONITOR_INTERVAL); |
| } |
| needForceKill = stopSignalReceived.get(); |
| } catch (InterruptedException e) { |
| LOG.warn("Thread sleep in monitoring loop interrupted"); |
| // if the application is to be killed when client times out; |
| // then set needForceKill to true |
| break; |
| } finally { |
| if (needForceKill) { |
| break; |
| } |
| } |
| |
| // Get application report for the appId we are interested in |
| ApplicationReport report = yarnClient.getApplicationReport(appId); |
| |
| LOG.info("Got application report from ASM for" |
| + ", appId=" + appId.getId() |
| + ", clientToAMToken=" + report.getClientToAMToken() |
| + ", appDiagnostics=" + report.getDiagnostics() |
| + ", appMasterHost=" + report.getHost() |
| + ", appQueue=" + report.getQueue() |
| + ", appMasterRpcPort=" + report.getRpcPort() |
| + ", appStartTime=" + report.getStartTime() |
| + ", yarnAppState=" + report.getYarnApplicationState().toString() |
| + ", distributedFinalState=" + report.getFinalApplicationStatus().toString() |
| + ", appTrackingUrl=" + report.getTrackingUrl() |
| + ", appUser=" + report.getUser()); |
| |
| YarnApplicationState state = report.getYarnApplicationState(); |
| FinalApplicationStatus dsStatus = report.getFinalApplicationStatus(); |
| if (YarnApplicationState.FINISHED == state) { |
| if (FinalApplicationStatus.SUCCEEDED == dsStatus) { |
| LOG.info("Application has completed successfully. " |
| + "Breaking monitoring loop"); |
| res = true; |
| } else { |
| LOG.info("Application did finished unsuccessfully. " |
| + "YarnState={}, DSFinalStatus={}. Breaking monitoring loop", |
| state, dsStatus); |
| } |
| break; |
| } else if (YarnApplicationState.KILLED == state |
| || YarnApplicationState.FAILED == state) { |
| LOG.info("Application did not finish. YarnState={}, DSFinalStatus={}. " |
| + "Breaking monitoring loop", state, dsStatus); |
| break; |
| } |
| |
| // The value equal or less than 0 means no timeout |
| if (clientTimeout > 0 |
| && System.currentTimeMillis() > (clientStartTime + clientTimeout)) { |
| LOG.info("Reached client specified timeout for application. " + |
| "Killing application"); |
| needForceKill = true; |
| break; |
| } |
| } |
| |
| if (needForceKill) { |
| forceKillApplication(appId); |
| } |
| |
| isRunning.set(false); |
| |
| return res; |
| } |
| |
| /** |
| * Kill a submitted application by sending a call to the ASM |
| * @param appId Application Id to be killed. |
| * @throws YarnException |
| * @throws IOException |
| */ |
| private void forceKillApplication(ApplicationId appId) |
| throws YarnException, IOException { |
| // TODO clarify whether multiple jobs with the same app id can be submitted and be running at |
| // the same time. |
| // If yes, can we kill a particular attempt only? |
| |
| // Response can be ignored as it is non-null on success or |
| // throws an exception in case of failures |
| yarnClient.killApplication(appId); |
| } |
| |
| private void addToLocalResources(FileSystem fs, String fileSrcPath, |
| String fileDstPath, String appId, Map<String, LocalResource> localResources, |
| String resources) throws IOException { |
| String suffix = |
| ApplicationMaster.getRelativePath(appName, appId, fileDstPath); |
| Path dst = |
| new Path(fs.getHomeDirectory(), suffix); |
| if (fileSrcPath == null) { |
| try (FSDataOutputStream ostream = FileSystem.create(fs, dst, |
| new FsPermission((short) 0710))) { |
| ostream.writeUTF(resources); |
| } |
| } else { |
| fs.copyFromLocalFile(new Path(fileSrcPath), dst); |
| } |
| FileStatus scFileStatus = fs.getFileStatus(dst); |
| LocalResource scRsrc = |
| LocalResource.newInstance( |
| URL.fromURI(dst.toUri()), |
| LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, |
| scFileStatus.getLen(), scFileStatus.getModificationTime()); |
| localResources.put(fileDstPath, scRsrc); |
| } |
| |
| private void uploadFile(FileSystem fs, String fileSrcPath, |
| String fileDstPath, String appId) throws IOException { |
| String relativePath = |
| ApplicationMaster.getRelativePath(appName, appId, fileDstPath); |
| Path dst = |
| new Path(fs.getHomeDirectory(), relativePath); |
| LOG.info("Uploading file: " + fileSrcPath + " to " + dst); |
| fs.copyFromLocalFile(new Path(fileSrcPath), dst); |
| } |
| |
| @VisibleForTesting |
| ApplicationId getAppId() { |
| return applicationId; |
| } |
| |
| private void prepareTimelineDomain() { |
| TimelineClient timelineClient = null; |
| if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, |
| YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) { |
| timelineClient = TimelineClient.createTimelineClient(); |
| timelineClient.init(conf); |
| timelineClient.start(); |
| } else { |
| LOG.warn("Cannot put the domain " + domainId + |
| " because the timeline service is not enabled"); |
| return; |
| } |
| try { |
| //TODO: we need to check and combine the existing timeline domain ACLs, |
| //but let's do it once we have client java library to query domains. |
| TimelineDomain domain = new TimelineDomain(); |
| domain.setId(domainId); |
| domain.setReaders( |
| viewACLs != null && viewACLs.length() > 0 ? viewACLs : " "); |
| domain.setWriters( |
| modifyACLs != null && modifyACLs.length() > 0 ? modifyACLs : " "); |
| timelineClient.putDomain(domain); |
| LOG.info("Put the timeline domain: " + |
| TimelineUtils.dumpTimelineRecordtoJSON(domain)); |
| } catch (Exception e) { |
| LOG.error("Error when putting the timeline domain", e); |
| } finally { |
| timelineClient.stop(); |
| } |
| } |
| |
| private void setAMResourceCapability(ApplicationSubmissionContext appContext, |
| Map<String, Resource> profiles, List<ResourceTypeInfo> resourceTypes) |
| throws IllegalArgumentException, IOException, YarnException { |
| if (amMemory < -1 || amMemory == 0) { |
| throw new IllegalArgumentException("Invalid memory specified for" |
| + " application master, exiting. Specified memory=" + amMemory); |
| } |
| if (amVCores < -1 || amVCores == 0) { |
| throw new IllegalArgumentException("Invalid virtual cores specified for" |
| + " application master, exiting. " + |
| "Specified virtual cores=" + amVCores); |
| } |
| Resource capability = Resource.newInstance(0, 0); |
| |
| if (!amResourceProfile.isEmpty()) { |
| if (!profiles.containsKey(amResourceProfile)) { |
| throw new IllegalArgumentException( |
| "Failed to find specified resource profile for application master=" |
| + amResourceProfile); |
| } |
| capability = Resources.clone(profiles.get(amResourceProfile)); |
| } |
| |
| if (appContext.getAMContainerResourceRequests() == null) { |
| List<ResourceRequest> amResourceRequests = new ArrayList<ResourceRequest>(); |
| amResourceRequests |
| .add(ResourceRequest.newInstance(Priority.newInstance(amPriority), |
| "*", Resources.clone(Resources.none()), 1)); |
| appContext.setAMContainerResourceRequests(amResourceRequests); |
| } |
| |
| validateResourceTypes(amResources.keySet(), resourceTypes); |
| for (Map.Entry<String, Long> entry : amResources.entrySet()) { |
| capability.setResourceValue(entry.getKey(), entry.getValue()); |
| } |
| // set amMemory because it's used to set Xmx param |
| if (amMemory == -1) { |
| amMemory = DEFAULT_AM_MEMORY; |
| LOG.warn("AM Memory not specified, use " + DEFAULT_AM_MEMORY |
| + " mb as AM memory"); |
| } |
| if (amVCores == -1) { |
| amVCores = DEFAULT_AM_VCORES; |
| LOG.warn("AM vcore not specified, use " + DEFAULT_AM_VCORES |
| + " mb as AM vcores"); |
| } |
| capability.setMemorySize(amMemory); |
| capability.setVirtualCores(amVCores); |
| appContext.getAMContainerResourceRequests().get(0).setCapability( |
| capability); |
| LOG.warn("AM Resource capability=" + capability); |
| } |
| |
| private void setContainerResources(Map<String, Resource> profiles, |
| List<ResourceTypeInfo> resourceTypes) throws IllegalArgumentException { |
| if (containerMemory < -1 || containerMemory == 0) { |
| throw new IllegalArgumentException("Container memory '" + |
| containerMemory + "' has to be greated than 0"); |
| } |
| if (containerVirtualCores < -1 || containerVirtualCores == 0) { |
| throw new IllegalArgumentException("Container vcores '" + |
| containerVirtualCores + "' has to be greated than 0"); |
| } |
| validateResourceTypes(containerResources.keySet(), resourceTypes); |
| if (profiles == null) { |
| containerMemory = containerMemory == -1 ? |
| DEFAULT_CONTAINER_MEMORY : containerMemory; |
| containerVirtualCores = containerVirtualCores == -1 ? |
| DEFAULT_CONTAINER_VCORES : containerVirtualCores; |
| } |
| } |
| |
| private void validateResourceTypes(Iterable<String> resourceNames, |
| List<ResourceTypeInfo> resourceTypes) { |
| for (String resourceName : resourceNames) { |
| if (!resourceTypes.stream().anyMatch(e -> |
| e.getName().equals(resourceName))) { |
| throw new ResourceNotFoundException("Unknown resource: " + |
| resourceName); |
| } |
| } |
| } |
| |
| static Map<String, Long> parseResourcesString(String resourcesStr) { |
| Map<String, Long> resources = new HashMap<>(); |
| |
| // Ignore the grouping "[]" |
| if (resourcesStr.startsWith("[")) { |
| resourcesStr = resourcesStr.substring(1); |
| } |
| if (resourcesStr.endsWith("]")) { |
| resourcesStr = resourcesStr.substring(0, resourcesStr.length() - 1); |
| } |
| |
| for (String resource : resourcesStr.trim().split(",")) { |
| resource = resource.trim(); |
| if (!resource.matches("^[^=]+=\\d+\\s?\\w*$")) { |
| throw new IllegalArgumentException("\"" + resource + "\" is not a " + |
| "valid resource type/amount pair. " + |
| "Please provide key=amount pairs separated by commas."); |
| } |
| String[] splits = resource.split("="); |
| String key = splits[0], value = splits[1]; |
| String units = ResourceUtils.getUnits(value); |
| String valueWithoutUnit = value.substring( |
| 0, value.length() - units.length()).trim(); |
| Long resourceValue = Long.valueOf(valueWithoutUnit); |
| if (!units.isEmpty()) { |
| resourceValue = UnitsConversionUtil.convert(units, "Mi", resourceValue); |
| } |
| if (key.equals("memory")) { |
| key = ResourceInformation.MEMORY_URI; |
| } |
| resources.put(key, resourceValue); |
| } |
| return resources; |
| } |
| |
| @VisibleForTesting |
| protected void sendStopSignal() { |
| LOG.info("Sending stop Signal to Client"); |
| stopSignalReceived.set(true); |
| synchronized (objectLock) { |
| objectLock.notifyAll(); |
| } |
| int waitCount = 0; |
| LOG.info("Waiting for Client to exit loop"); |
| while (isRunning.get()) { |
| try { |
| Thread.sleep(50); |
| } catch (InterruptedException ie) { |
| // do nothing |
| } finally { |
| if (++waitCount > 2000) { |
| break; |
| } |
| } |
| } |
| LOG.info("Stopping yarnClient within the DS Client"); |
| yarnClient.stop(); |
| LOG.info("done stopping Client"); |
| } |
| } |