| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.applications.distributedshell; |
| |
| import java.io.BufferedReader; |
| import java.io.DataInputStream; |
| import java.io.File; |
| import java.io.FileInputStream; |
| import java.io.IOException; |
| import java.io.StringReader; |
| import java.io.UncheckedIOException; |
| import java.lang.reflect.UndeclaredThrowableException; |
| import java.net.URI; |
| import java.net.URISyntaxException; |
| import java.nio.ByteBuffer; |
| import java.nio.charset.StandardCharsets; |
| import java.security.PrivilegedExceptionAction; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.Vector; |
| import java.util.Base64; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.Arrays; |
| import java.util.concurrent.atomic.AtomicLong; |
| |
| import org.apache.hadoop.thirdparty.com.google.common.base.Strings; |
| import org.apache.commons.cli.CommandLine; |
| import org.apache.commons.cli.GnuParser; |
| import org.apache.commons.cli.HelpFormatter; |
| import org.apache.commons.cli.Options; |
| import org.apache.commons.cli.ParseException; |
| import org.apache.hadoop.classification.InterfaceAudience; |
| import org.apache.hadoop.classification.InterfaceAudience.Private; |
| import org.apache.hadoop.classification.InterfaceStability; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.io.DataOutputBuffer; |
| import org.apache.hadoop.io.IOUtils; |
| import org.apache.hadoop.net.NetUtils; |
| import org.apache.hadoop.security.Credentials; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.security.token.Token; |
| import org.apache.hadoop.util.ExitUtil; |
| import org.apache.hadoop.util.Shell; |
| import org.apache.hadoop.yarn.api.ApplicationConstants; |
| import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; |
| import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; |
| import org.apache.hadoop.yarn.api.ContainerManagementProtocol; |
| import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; |
| import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; |
| import org.apache.hadoop.yarn.api.records.ApplicationId; |
| import org.apache.hadoop.yarn.api.records.Container; |
| import org.apache.hadoop.yarn.api.records.ContainerExitStatus; |
| import org.apache.hadoop.yarn.api.records.ContainerId; |
| import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; |
| import org.apache.hadoop.yarn.api.records.ContainerRetryContext; |
| import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy; |
| import org.apache.hadoop.yarn.api.records.ContainerState; |
| import org.apache.hadoop.yarn.api.records.ContainerStatus; |
| import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; |
| import org.apache.hadoop.yarn.api.records.LocalResource; |
| import org.apache.hadoop.yarn.api.records.LocalResourceType; |
| import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; |
| import org.apache.hadoop.yarn.api.records.NodeReport; |
| import org.apache.hadoop.yarn.api.records.Priority; |
| import org.apache.hadoop.yarn.api.records.RejectedSchedulingRequest; |
| import org.apache.hadoop.yarn.api.records.Resource; |
| import org.apache.hadoop.yarn.api.records.ResourceRequest; |
| import org.apache.hadoop.yarn.api.records.ResourceSizing; |
| import org.apache.hadoop.yarn.api.records.SchedulingRequest; |
| import org.apache.hadoop.yarn.api.records.URL; |
| import org.apache.hadoop.yarn.api.records.UpdatedContainer; |
| import org.apache.hadoop.yarn.api.records.ExecutionType; |
| import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; |
| import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; |
| import org.apache.hadoop.yarn.api.records.ContainerUpdateType; |
| import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; |
| import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId; |
| import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; |
| import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; |
| import org.apache.hadoop.yarn.api.resource.PlacementConstraint; |
| import org.apache.hadoop.yarn.client.api.AMRMClient; |
| import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; |
| import org.apache.hadoop.yarn.client.api.TimelineClient; |
| import org.apache.hadoop.yarn.client.api.TimelineV2Client; |
| import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; |
| import org.apache.hadoop.yarn.client.api.async.NMClientAsync; |
| import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.hadoop.yarn.exceptions.YarnException; |
| import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; |
| import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; |
| import org.apache.hadoop.yarn.util.BoundedAppender; |
| import org.apache.hadoop.yarn.util.SystemClock; |
| import org.apache.hadoop.yarn.util.TimelineServiceHelper; |
| import org.apache.hadoop.yarn.util.resource.ResourceUtils; |
| import org.apache.hadoop.yarn.util.timeline.TimelineUtils; |
| import org.apache.log4j.LogManager; |
| |
| import org.apache.hadoop.classification.VisibleForTesting; |
| import com.sun.jersey.api.client.ClientHandlerException; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * An ApplicationMaster for executing shell commands on a set of launched |
| * containers using the YARN framework. |
| * |
| * <p> |
| * This class is meant to act as an example on how to write yarn-based |
| * application masters. |
| * </p> |
| * |
| * <p> |
| * The ApplicationMaster is started on a container by the |
| * <code>ResourceManager</code>'s launcher. The first thing that the |
| * <code>ApplicationMaster</code> needs to do is to connect and register itself |
| * with the <code>ResourceManager</code>. The registration sets up information |
| * within the <code>ResourceManager</code> regarding what host:port the |
| * ApplicationMaster is listening on to provide any form of functionality to a |
| * client as well as a tracking url that a client can use to keep track of |
| * status/job history if needed. However, in the distributedshell, trackingurl |
| * and appMasterHost:appMasterRpcPort are not supported. |
| * </p> |
| * |
| * <p> |
| * The <code>ApplicationMaster</code> needs to send a heartbeat to the |
| * <code>ResourceManager</code> at regular intervals to inform the |
| * <code>ResourceManager</code> that it is up and alive. The |
| * {@link ApplicationMasterProtocol#allocate} to the <code>ResourceManager</code> from the |
| * <code>ApplicationMaster</code> acts as a heartbeat. |
| * |
| * <p> |
| * For the actual handling of the job, the <code>ApplicationMaster</code> has to |
| * request the <code>ResourceManager</code> via {@link AllocateRequest} for the |
| * required no. of containers using {@link ResourceRequest} with the necessary |
| * resource specifications such as node location, computational |
| * (memory/disk/cpu) resource requirements. The <code>ResourceManager</code> |
| * responds with an {@link AllocateResponse} that informs the |
| * <code>ApplicationMaster</code> of the set of newly allocated containers, |
| * completed containers as well as current state of available resources. |
| * </p> |
| * |
| * <p> |
| * For each allocated container, the <code>ApplicationMaster</code> can then set |
| * up the necessary launch context via {@link ContainerLaunchContext} to specify |
| * the allocated container id, local resources required by the executable, the |
| * environment to be setup for the executable, commands to execute, etc. and |
| * submit a {@link StartContainerRequest} to the {@link ContainerManagementProtocol} to |
| * launch and execute the defined commands on the given allocated container. |
| * </p> |
| * |
| * <p> |
| * The <code>ApplicationMaster</code> can monitor the launched container by |
| * either querying the <code>ResourceManager</code> using |
| * {@link ApplicationMasterProtocol#allocate} to get updates on completed containers or via |
| * the {@link ContainerManagementProtocol} by querying for the status of the allocated |
| * container's {@link ContainerId}. |
| * |
| * <p> |
| * After the job has been completed, the <code>ApplicationMaster</code> has to |
| * send a {@link FinishApplicationMasterRequest} to the |
| * <code>ResourceManager</code> to inform it that the |
| * <code>ApplicationMaster</code> has been completed. |
| */ |
| @InterfaceAudience.Public |
| @InterfaceStability.Unstable |
| public class ApplicationMaster { |
| |
| private static final Logger LOG = LoggerFactory |
| .getLogger(ApplicationMaster.class); |
| |
| @VisibleForTesting |
| @Private |
| public enum DSEvent { |
| DS_APP_ATTEMPT_START, DS_APP_ATTEMPT_END, DS_CONTAINER_START, DS_CONTAINER_END |
| } |
| |
| @VisibleForTesting |
| @Private |
| public enum DSEntity { |
| DS_APP_ATTEMPT, DS_CONTAINER |
| } |
| |
| private static final String YARN_SHELL_ID = "YARN_SHELL_ID"; |
| |
| // Configuration |
| private Configuration conf; |
| |
| // Handle to communicate with the Resource Manager |
| @SuppressWarnings("rawtypes") |
| private AMRMClientAsync amRMClient; |
| |
| // In both secure and non-secure modes, this points to the job-submitter. |
| @VisibleForTesting |
| UserGroupInformation appSubmitterUgi; |
| |
| private Path homeDirectory; |
| |
| // Handle to communicate with the Node Manager |
| private NMClientAsync nmClientAsync; |
| // Listen to process the response from the Node Manager |
| private NMCallbackHandler containerListener; |
| |
| // Application Attempt Id ( combination of attemptId and fail count ) |
| @VisibleForTesting |
| protected ApplicationAttemptId appAttemptID; |
| |
| private ApplicationId appId; |
| private String appName; |
| |
| // TODO |
| // For status update for clients - yet to be implemented |
| // Hostname of the container |
| private String appMasterHostname = ""; |
| // Port on which the app master listens for status updates from clients |
| private int appMasterRpcPort = -1; |
| // Tracking url to which app master publishes info for clients to monitor |
| private String appMasterTrackingUrl = ""; |
| |
| private boolean timelineServiceV2Enabled = false; |
| |
| private boolean timelineServiceV1Enabled = false; |
| |
| // App Master configuration |
| // No. of containers to run shell command on |
| @VisibleForTesting |
| protected int numTotalContainers = 1; |
| // Memory to request for the container on which the shell command will run |
| private static final long DEFAULT_CONTAINER_MEMORY = 10; |
| private long containerMemory = DEFAULT_CONTAINER_MEMORY; |
| // VirtualCores to request for the container on which the shell command will run |
| private static final int DEFAULT_CONTAINER_VCORES = 1; |
| private int containerVirtualCores = DEFAULT_CONTAINER_VCORES; |
| // All other resources to request for the container |
| // on which the shell command will run |
| private Map<String, Long> containerResources = new HashMap<>(); |
| // Priority of the request |
| private int requestPriority; |
| // Execution type of the containers. |
| // Default GUARANTEED. |
| private ExecutionType containerType = ExecutionType.GUARANTEED; |
| // Whether to automatically promote opportunistic containers. |
| private boolean autoPromoteContainers = false; |
| // Whether to enforce execution type of the containers. |
| private boolean enforceExecType = false; |
| |
| // Resource profile for the container |
| private String containerResourceProfile = ""; |
| Map<String, Resource> resourceProfiles; |
| |
| private boolean keepContainersAcrossAttempts = false; |
| |
| // Counter for completed containers ( complete denotes successful or failed ) |
| private AtomicInteger numCompletedContainers = new AtomicInteger(); |
| // Allocated container count so that we know how many containers has the RM |
| // allocated to us |
| @VisibleForTesting |
| protected AtomicInteger numAllocatedContainers = new AtomicInteger(); |
| // Count of failed containers |
| private AtomicInteger numFailedContainers = new AtomicInteger(); |
| // Count of containers already requested from the RM |
| // Needed as once requested, we should not request for containers again. |
| // Only request for more if the original requirement changes. |
| @VisibleForTesting |
| protected AtomicInteger numRequestedContainers = new AtomicInteger(); |
| |
| protected AtomicInteger numIgnore = new AtomicInteger(); |
| |
| protected AtomicInteger totalRetries = new AtomicInteger(10); |
| |
| // Shell command to be executed |
| private String shellCommand = ""; |
| // Args to be passed to the shell command |
| private String shellArgs = ""; |
| // Env variables to be setup for the shell command |
| private Map<String, String> shellEnv = new HashMap<String, String>(); |
| |
| // Location of shell script ( obtained from info set in env ) |
| // Shell script path in fs |
| private String scriptPath = ""; |
| // Timestamp needed for creating a local resource |
| private long shellScriptPathTimestamp = 0; |
| // File length needed for local resource |
| private long shellScriptPathLen = 0; |
| |
| // Placement Specifications |
| private Map<String, PlacementSpec> placementSpecs = null; |
| |
| // Container retry options |
| private ContainerRetryPolicy containerRetryPolicy = |
| ContainerRetryPolicy.NEVER_RETRY; |
| private Set<Integer> containerRetryErrorCodes = null; |
| private int containerMaxRetries = 0; |
| private int containrRetryInterval = 0; |
| private long containerFailuresValidityInterval = -1; |
| |
| private List<String> localizableFiles = new ArrayList<>(); |
| |
| // Timeline domain ID |
| private String domainId = null; |
| |
| // Hardcoded path to shell script in launch container's local env |
| private static final String EXEC_SHELL_STRING_PATH = Client.SCRIPT_PATH |
| + ".sh"; |
| private static final String EXEC_BAT_SCRIPT_STRING_PATH = Client.SCRIPT_PATH |
| + ".bat"; |
| |
| // Hardcoded path to custom log_properties |
| private static final String log4jPath = "log4j.properties"; |
| |
| private static final String shellCommandPath = "shellCommands"; |
| private static final String shellArgsPath = "shellArgs"; |
| |
| private volatile boolean done; |
| |
| private ByteBuffer allTokens; |
| |
| // Launch threads |
| private List<Thread> launchThreads = new ArrayList<Thread>(); |
| |
| // Timeline Client |
| @VisibleForTesting |
| TimelineClient timelineClient; |
| |
| // Timeline v2 Client |
| @VisibleForTesting |
| TimelineV2Client timelineV2Client; |
| |
| static final String CONTAINER_ENTITY_GROUP_ID = "CONTAINERS"; |
| static final String APPID_TIMELINE_FILTER_NAME = "appId"; |
| static final String USER_TIMELINE_FILTER_NAME = "user"; |
| static final String DIAGNOSTICS = "Diagnostics"; |
| |
| private final String linux_bash_command = "bash"; |
| private final String windows_command = "cmd /c"; |
| |
| private int yarnShellIdCounter = 1; |
| private final AtomicLong allocIdCounter = new AtomicLong(1); |
| |
| @VisibleForTesting |
| protected final Set<ContainerId> launchedContainers = |
| Collections.newSetFromMap(new ConcurrentHashMap<ContainerId, Boolean>()); |
| |
| private BoundedAppender diagnostics = new BoundedAppender(64 * 1024); |
| |
| /** |
| * Container start times used to set id prefix while publishing entity |
| * to ATSv2. |
| */ |
| private final ConcurrentMap<ContainerId, Long> containerStartTimes = |
| new ConcurrentHashMap<ContainerId, Long>(); |
| |
| private ConcurrentMap<ContainerId, Long> getContainerStartTimes() { |
| return containerStartTimes; |
| } |
| |
| /** |
| * @param args Command line args |
| */ |
| public static void main(String[] args) { |
| boolean result = false; |
| ApplicationMaster appMaster = null; |
| try { |
| appMaster = new ApplicationMaster(); |
| LOG.info("Initializing ApplicationMaster"); |
| boolean doRun = appMaster.init(args); |
| if (!doRun) { |
| System.exit(0); |
| } |
| appMaster.run(); |
| result = appMaster.finish(); |
| } catch (Throwable t) { |
| LOG.error("Error running ApplicationMaster", t); |
| LogManager.shutdown(); |
| ExitUtil.terminate(1, t); |
| } finally { |
| if (appMaster != null) { |
| appMaster.cleanup(); |
| } |
| } |
| if (result) { |
| LOG.info("Application Master completed successfully. exiting"); |
| System.exit(0); |
| } else { |
| LOG.error("Application Master failed. exiting"); |
| System.exit(2); |
| } |
| } |
| |
| /** |
| * Dump out contents of $CWD and the environment to stdout for debugging |
| */ |
| private void dumpOutDebugInfo() { |
| |
| LOG.info("Dump debug output"); |
| Map<String, String> envs = System.getenv(); |
| for (Map.Entry<String, String> env : envs.entrySet()) { |
| LOG.info("System env: key=" + env.getKey() + ", val=" + env.getValue()); |
| System.out.println("System env: key=" + env.getKey() + ", val=" |
| + env.getValue()); |
| } |
| |
| BufferedReader buf = null; |
| try { |
| String lines = Shell.WINDOWS ? Shell.execCommand("cmd", "/c", "dir") : |
| Shell.execCommand("ls", "-al"); |
| buf = new BufferedReader(new StringReader(lines)); |
| String line = ""; |
| while ((line = buf.readLine()) != null) { |
| LOG.info("System CWD content: " + line); |
| System.out.println("System CWD content: " + line); |
| } |
| } catch (IOException e) { |
| e.printStackTrace(); |
| } finally { |
| IOUtils.cleanupWithLogger(LOG, buf); |
| } |
| } |
| |
| public ApplicationMaster() { |
| // Set up the configuration |
| conf = new YarnConfiguration(); |
| } |
| |
| /** |
| * Parse command line options |
| * |
| * @param args Command line args |
| * @return Whether init successful and run should be invoked |
| * @throws ParseException |
| * @throws IOException |
| */ |
| public boolean init(String[] args) throws ParseException, IOException { |
| Options opts = new Options(); |
| opts.addOption("appname", true, |
| "Application Name. Default value - DistributedShell"); |
| opts.addOption("app_attempt_id", true, |
| "App Attempt ID. Not to be used unless for testing purposes"); |
| opts.addOption("shell_env", true, |
| "Environment for shell script. Specified as env_key=env_val pairs"); |
| opts.addOption("container_type", true, |
| "Container execution type, GUARANTEED or OPPORTUNISTIC"); |
| opts.addOption("promote_opportunistic_after_start", false, |
| "Flag to indicate whether to automatically promote opportunistic" |
| + " containers to guaranteed."); |
| opts.addOption("enforce_execution_type", false, |
| "Flag to indicate whether to enforce execution type of containers"); |
| opts.addOption("container_memory", true, |
| "Amount of memory in MB to be requested to run the shell command"); |
| opts.addOption("container_vcores", true, |
| "Amount of virtual cores to be requested to run the shell command"); |
| opts.addOption("container_resources", true, |
| "Amount of resources to be requested to run the shell command. " + |
| "Specified as resource type=value pairs separated by commas. " + |
| "E.g. -container_resources memory-mb=512,vcores=1"); |
| opts.addOption("container_resource_profile", true, |
| "Resource profile to be requested to run the shell command"); |
| opts.addOption("num_containers", true, |
| "No. of containers on which the shell command needs to be executed"); |
| opts.addOption("priority", true, "Application Priority. Default 0"); |
| opts.addOption("container_retry_policy", true, |
| "Retry policy when container fails to run, " |
| + "0: NEVER_RETRY, 1: RETRY_ON_ALL_ERRORS, " |
| + "2: RETRY_ON_SPECIFIC_ERROR_CODES"); |
| opts.addOption("container_retry_error_codes", true, |
| "When retry policy is set to RETRY_ON_SPECIFIC_ERROR_CODES, error " |
| + "codes is specified with this option, " |
| + "e.g. --container_retry_error_codes 1,2,3"); |
| opts.addOption("container_max_retries", true, |
| "If container could retry, it specifies max retires"); |
| opts.addOption("container_retry_interval", true, |
| "Interval between each retry, unit is milliseconds"); |
| opts.addOption("container_failures_validity_interval", true, |
| "Failures which are out of the time window will not be added to" |
| + " the number of container retry attempts"); |
| opts.addOption("placement_spec", true, "Placement specification"); |
| opts.addOption("debug", false, "Dump out debug information"); |
| opts.addOption("keep_containers_across_application_attempts", false, |
| "Flag to indicate whether to keep containers across application " |
| + "attempts." |
| + " If the flag is true, running containers will not be killed when" |
| + " application attempt fails and these containers will be " |
| + "retrieved by" |
| + " the new application attempt "); |
| opts.addOption("localized_files", true, "List of localized files"); |
| opts.addOption("homedir", true, "Home Directory of Job Owner"); |
| |
| opts.addOption("help", false, "Print usage"); |
| CommandLine cliParser = new GnuParser().parse(opts, args); |
| |
| if (args.length == 0) { |
| printUsage(opts); |
| throw new IllegalArgumentException( |
| "No args specified for application master to initialize"); |
| } |
| |
| //Check whether customer log4j.properties file exists |
| if (fileExist(log4jPath)) { |
| try { |
| Log4jPropertyHelper.updateLog4jConfiguration(ApplicationMaster.class, |
| log4jPath); |
| } catch (Exception e) { |
| LOG.warn("Can not set up custom log4j properties. " + e); |
| } |
| } |
| |
| appName = cliParser.getOptionValue("appname", "DistributedShell"); |
| |
| if (cliParser.hasOption("help")) { |
| printUsage(opts); |
| return false; |
| } |
| |
| if (cliParser.hasOption("debug")) { |
| dumpOutDebugInfo(); |
| } |
| |
| homeDirectory = cliParser.hasOption("homedir") ? |
| new Path(cliParser.getOptionValue("homedir")) : |
| new Path("/user/" + System.getenv(ApplicationConstants. |
| Environment.USER.name())); |
| |
| if (cliParser.hasOption("placement_spec")) { |
| String placementSpec = cliParser.getOptionValue("placement_spec"); |
| String decodedSpec = getDecodedPlacementSpec(placementSpec); |
| LOG.info("Placement Spec received [{}]", decodedSpec); |
| |
| this.numTotalContainers = 0; |
| int globalNumOfContainers = Integer |
| .parseInt(cliParser.getOptionValue("num_containers", "0")); |
| parsePlacementSpecs(decodedSpec, globalNumOfContainers); |
| LOG.info("Total num containers requested [{}]", numTotalContainers); |
| |
| if (numTotalContainers == 0) { |
| throw new IllegalArgumentException( |
| "Cannot run distributed shell with no containers"); |
| } |
| } |
| |
| Map<String, String> envs = System.getenv(); |
| |
| if (!envs.containsKey(Environment.CONTAINER_ID.name())) { |
| if (cliParser.hasOption("app_attempt_id")) { |
| String appIdStr = cliParser.getOptionValue("app_attempt_id", ""); |
| appAttemptID = ApplicationAttemptId.fromString(appIdStr); |
| } else { |
| throw new IllegalArgumentException( |
| "Application Attempt Id not set in the environment"); |
| } |
| } else { |
| ContainerId containerId = ContainerId.fromString(envs |
| .get(Environment.CONTAINER_ID.name())); |
| appAttemptID = containerId.getApplicationAttemptId(); |
| appId = appAttemptID.getApplicationId(); |
| } |
| |
| if (!envs.containsKey(ApplicationConstants.APP_SUBMIT_TIME_ENV)) { |
| throw new RuntimeException(ApplicationConstants.APP_SUBMIT_TIME_ENV |
| + " not set in the environment"); |
| } |
| if (!envs.containsKey(Environment.NM_HOST.name())) { |
| throw new RuntimeException(Environment.NM_HOST.name() |
| + " not set in the environment"); |
| } |
| if (!envs.containsKey(Environment.NM_HTTP_PORT.name())) { |
| throw new RuntimeException(Environment.NM_HTTP_PORT |
| + " not set in the environment"); |
| } |
| if (!envs.containsKey(Environment.NM_PORT.name())) { |
| throw new RuntimeException(Environment.NM_PORT.name() |
| + " not set in the environment"); |
| } |
| |
| LOG.info("Application master for app" + ", appId=" |
| + appAttemptID.getApplicationId().getId() + ", clustertimestamp=" |
| + appAttemptID.getApplicationId().getClusterTimestamp() |
| + ", attemptId=" + appAttemptID.getAttemptId()); |
| |
| if (!fileExist(shellCommandPath) |
| && envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION).isEmpty()) { |
| throw new IllegalArgumentException( |
| "No shell command or shell script specified to be executed by application master"); |
| } |
| |
| if (fileExist(shellCommandPath)) { |
| shellCommand = readContent(shellCommandPath); |
| } |
| |
| if (fileExist(shellArgsPath)) { |
| shellArgs = readContent(shellArgsPath); |
| } |
| |
| if (cliParser.hasOption("shell_env")) { |
| String shellEnvs[] = cliParser.getOptionValues("shell_env"); |
| for (String env : shellEnvs) { |
| env = env.trim(); |
| int index = env.indexOf('='); |
| if (index == -1) { |
| shellEnv.put(env, ""); |
| continue; |
| } |
| String key = env.substring(0, index); |
| String val = ""; |
| if (index < (env.length() - 1)) { |
| val = env.substring(index + 1); |
| } |
| shellEnv.put(key, val); |
| } |
| } |
| |
| if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION)) { |
| scriptPath = envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION); |
| |
| if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP)) { |
| shellScriptPathTimestamp = Long.parseLong(envs |
| .get(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP)); |
| } |
| if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN)) { |
| shellScriptPathLen = Long.parseLong(envs |
| .get(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN)); |
| } |
| if (!scriptPath.isEmpty() |
| && (shellScriptPathTimestamp <= 0 || shellScriptPathLen <= 0)) { |
| LOG.error("Illegal values in env for shell script path" + ", path=" |
| + scriptPath + ", len=" + shellScriptPathLen + ", timestamp=" |
| + shellScriptPathTimestamp); |
| throw new IllegalArgumentException( |
| "Illegal values in env for shell script path"); |
| } |
| } |
| |
| if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN)) { |
| domainId = envs.get(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN); |
| } |
| |
| if (cliParser.hasOption("container_type")) { |
| String containerTypeStr = cliParser.getOptionValue("container_type"); |
| if (Arrays.stream(ExecutionType.values()).noneMatch( |
| executionType -> executionType.toString() |
| .equals(containerTypeStr))) { |
| throw new IllegalArgumentException("Invalid container_type: " |
| + containerTypeStr); |
| } |
| containerType = ExecutionType.valueOf(containerTypeStr); |
| } |
| if (cliParser.hasOption("promote_opportunistic_after_start")) { |
| autoPromoteContainers = true; |
| } |
| if (cliParser.hasOption("enforce_execution_type")) { |
| enforceExecType = true; |
| } |
| containerMemory = Integer.parseInt(cliParser.getOptionValue( |
| "container_memory", "-1")); |
| containerVirtualCores = Integer.parseInt(cliParser.getOptionValue( |
| "container_vcores", "-1")); |
| containerResources = new HashMap<>(); |
| if (cliParser.hasOption("container_resources")) { |
| Map<String, Long> resources = Client.parseResourcesString( |
| cliParser.getOptionValue("container_resources")); |
| for (Map.Entry<String, Long> entry : resources.entrySet()) { |
| containerResources.put(entry.getKey(), entry.getValue()); |
| } |
| } |
| containerResourceProfile = |
| cliParser.getOptionValue("container_resource_profile", ""); |
| |
| keepContainersAcrossAttempts = cliParser.hasOption( |
| "keep_containers_across_application_attempts"); |
| |
| if (this.placementSpecs == null) { |
| numTotalContainers = Integer.parseInt(cliParser.getOptionValue( |
| "num_containers", "1")); |
| } |
| if (numTotalContainers == 0) { |
| throw new IllegalArgumentException( |
| "Cannot run distributed shell with no containers"); |
| } |
| requestPriority = Integer.parseInt(cliParser |
| .getOptionValue("priority", "0")); |
| |
| containerRetryPolicy = ContainerRetryPolicy.values()[ |
| Integer.parseInt(cliParser.getOptionValue( |
| "container_retry_policy", "0"))]; |
| if (cliParser.hasOption("container_retry_error_codes")) { |
| containerRetryErrorCodes = new HashSet<>(); |
| for (String errorCode : |
| cliParser.getOptionValue("container_retry_error_codes").split(",")) { |
| containerRetryErrorCodes.add(Integer.parseInt(errorCode)); |
| } |
| } |
| containerMaxRetries = Integer.parseInt( |
| cliParser.getOptionValue("container_max_retries", "0")); |
| containrRetryInterval = Integer.parseInt(cliParser.getOptionValue( |
| "container_retry_interval", "0")); |
| containerFailuresValidityInterval = Long.parseLong( |
| cliParser.getOptionValue("container_failures_validity_interval", "-1")); |
| if (!YarnConfiguration.timelineServiceEnabled(conf)) { |
| timelineClient = null; |
| timelineV2Client = null; |
| LOG.warn("Timeline service is not enabled"); |
| } |
| |
| if (cliParser.hasOption("localized_files")) { |
| String localizedFilesArg = cliParser.getOptionValue("localized_files"); |
| if (localizedFilesArg.contains(",")) { |
| String[] files = localizedFilesArg.split(","); |
| localizableFiles = Arrays.asList(files); |
| } else { |
| localizableFiles.add(localizedFilesArg); |
| } |
| } |
| |
| return true; |
| } |
| |
| private void parsePlacementSpecs(String decodedSpec, |
| int globalNumOfContainers) { |
| Map<String, PlacementSpec> pSpecs = |
| PlacementSpec.parse(decodedSpec); |
| this.placementSpecs = new HashMap<>(); |
| for (PlacementSpec pSpec : pSpecs.values()) { |
| // Use global num of containers when the spec doesn't specify |
| // source tags. This is allowed when using node-attribute constraints. |
| if (Strings.isNullOrEmpty(pSpec.sourceTag) |
| && pSpec.getNumContainers() == 0 |
| && globalNumOfContainers > 0) { |
| pSpec.setNumContainers(globalNumOfContainers); |
| } |
| this.numTotalContainers += pSpec.getNumContainers(); |
| this.placementSpecs.put(pSpec.sourceTag, pSpec); |
| } |
| } |
| |
| private String getDecodedPlacementSpec(String placementSpecifications) { |
| Base64.Decoder decoder = Base64.getDecoder(); |
| byte[] decodedBytes = decoder.decode( |
| placementSpecifications.getBytes(StandardCharsets.UTF_8)); |
| String decodedSpec = new String(decodedBytes, StandardCharsets.UTF_8); |
| LOG.info("Decode placement spec: " + decodedSpec); |
| return decodedSpec; |
| } |
| |
| /** |
| * Helper function to print usage |
| * |
| * @param opts Parsed command line options |
| */ |
| private void printUsage(Options opts) { |
| new HelpFormatter().printHelp("ApplicationMaster", opts); |
| } |
| |
| protected void cleanup() { |
| try { |
| appSubmitterUgi.doAs(new PrivilegedExceptionAction<Void>() { |
| @Override |
| public Void run() throws IOException { |
| FileSystem fs = FileSystem.get(conf); |
| Path dst = new Path(homeDirectory, |
| getRelativePath(appName, appId.toString(), "")); |
| fs.delete(dst, true); |
| return null; |
| } |
| }); |
| } catch(Exception e) { |
| LOG.warn("Failed to remove application staging directory", e); |
| } |
| } |
| |
| /** |
| * Main run function for the application master |
| * |
| * @throws YarnException |
| * @throws IOException |
| */ |
| @SuppressWarnings({ "unchecked" }) |
| public void run() throws YarnException, IOException, InterruptedException { |
| LOG.info("Starting ApplicationMaster"); |
| |
| // Note: Credentials, Token, UserGroupInformation, DataOutputBuffer class |
| // are marked as LimitedPrivate |
| Credentials credentials = |
| UserGroupInformation.getCurrentUser().getCredentials(); |
| DataOutputBuffer dob = new DataOutputBuffer(); |
| credentials.writeTokenStorageToStream(dob); |
| // Now remove the AM->RM token so that containers cannot access it. |
| Iterator<Token<?>> iter = credentials.getAllTokens().iterator(); |
| LOG.info("Executing with tokens:"); |
| while (iter.hasNext()) { |
| Token<?> token = iter.next(); |
| LOG.info(token.toString()); |
| if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) { |
| iter.remove(); |
| } |
| } |
| allTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); |
| |
| // Create appSubmitterUgi and add original tokens to it |
| String appSubmitterUserName = |
| System.getenv(ApplicationConstants.Environment.USER.name()); |
| appSubmitterUgi = |
| UserGroupInformation.createRemoteUser(appSubmitterUserName); |
| appSubmitterUgi.addCredentials(credentials); |
| |
| AMRMClientAsync.AbstractCallbackHandler allocListener = |
| new RMCallbackHandler(); |
| amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener); |
| amRMClient.init(conf); |
| amRMClient.start(); |
| |
| containerListener = createNMCallbackHandler(); |
| nmClientAsync = new NMClientAsyncImpl(containerListener); |
| nmClientAsync.init(conf); |
| nmClientAsync.start(); |
| |
| startTimelineClient(conf); |
| if (timelineServiceV2Enabled) { |
| // need to bind timelineClient |
| amRMClient.registerTimelineV2Client(timelineV2Client); |
| publishApplicationAttemptEventOnTimelineServiceV2( |
| DSEvent.DS_APP_ATTEMPT_START); |
| } |
| |
| if (timelineServiceV1Enabled) { |
| publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(), |
| DSEvent.DS_APP_ATTEMPT_START, domainId, appSubmitterUgi); |
| } |
| |
| // Setup local RPC Server to accept status requests directly from clients |
| // TODO need to setup a protocol for client to be able to communicate to |
| // the RPC server |
| // TODO use the rpc port info to register with the RM for the client to |
| // send requests to this app master |
| |
| // Register self with ResourceManager |
| // This will start heartbeating to the RM |
| appMasterHostname = NetUtils.getHostname(); |
| Map<Set<String>, PlacementConstraint> placementConstraintMap = null; |
| if (this.placementSpecs != null) { |
| placementConstraintMap = new HashMap<>(); |
| for (PlacementSpec spec : this.placementSpecs.values()) { |
| if (spec.constraint != null) { |
| Set<String> allocationTags = Strings.isNullOrEmpty(spec.sourceTag) ? |
| Collections.emptySet() : Collections.singleton(spec.sourceTag); |
| placementConstraintMap.put(allocationTags, spec.constraint); |
| } |
| } |
| } |
| |
| RegisterApplicationMasterResponse response = amRMClient |
| .registerApplicationMaster(appMasterHostname, appMasterRpcPort, |
| appMasterTrackingUrl, placementConstraintMap); |
| resourceProfiles = response.getResourceProfiles(); |
| ResourceUtils.reinitializeResources(response.getResourceTypes()); |
| // Dump out information about cluster capability as seen by the |
| // resource manager |
| long maxMem = response.getMaximumResourceCapability().getMemorySize(); |
| LOG.info("Max mem capability of resources in this cluster " + maxMem); |
| |
| int maxVCores = response.getMaximumResourceCapability().getVirtualCores(); |
| LOG.info("Max vcores capability of resources in this cluster " + maxVCores); |
| |
| // A resource ask cannot exceed the max. |
| if (containerMemory > maxMem) { |
| LOG.info("Container memory specified above max threshold of cluster." |
| + " Using max value." + ", specified=" + containerMemory + ", max=" |
| + maxMem); |
| containerMemory = maxMem; |
| } |
| |
| if (containerVirtualCores > maxVCores) { |
| LOG.info("Container virtual cores specified above max threshold of cluster." |
| + " Using max value." + ", specified=" + containerVirtualCores + ", max=" |
| + maxVCores); |
| containerVirtualCores = maxVCores; |
| } |
| |
| List<Container> previousAMRunningContainers = |
| response.getContainersFromPreviousAttempts(); |
| LOG.info(appAttemptID + " received " + previousAMRunningContainers.size() |
| + " previous attempts' running containers on AM registration."); |
| for(Container container: previousAMRunningContainers) { |
| launchedContainers.add(container.getId()); |
| } |
| numAllocatedContainers.addAndGet(previousAMRunningContainers.size()); |
| |
| |
| int numTotalContainersToRequest = |
| numTotalContainers - previousAMRunningContainers.size(); |
| // Setup ask for containers from RM |
| // Send request for containers to RM |
| // Until we get our fully allocated quota, we keep on polling RM for |
| // containers |
| // Keep looping until all the containers are launched and shell script |
| // executed on them ( regardless of success/failure). |
| if (this.placementSpecs == null) { |
| LOG.info("placementSpecs null"); |
| for (int i = 0; i < numTotalContainersToRequest; ++i) { |
| ContainerRequest containerAsk = setupContainerAskForRM(); |
| amRMClient.addContainerRequest(containerAsk); |
| } |
| } else { |
| LOG.info("placementSpecs to create req:" + placementSpecs); |
| List<SchedulingRequest> schedReqs = new ArrayList<>(); |
| for (PlacementSpec pSpec : this.placementSpecs.values()) { |
| LOG.info("placementSpec :" + pSpec + ", container:" + pSpec |
| .getNumContainers()); |
| for (int i = 0; i < pSpec.getNumContainers(); i++) { |
| SchedulingRequest sr = setupSchedulingRequest(pSpec); |
| schedReqs.add(sr); |
| } |
| } |
| amRMClient.addSchedulingRequests(schedReqs); |
| } |
| numRequestedContainers.set(numTotalContainers); |
| } |
| |
| @VisibleForTesting |
| void startTimelineClient(final Configuration conf) |
| throws YarnException, IOException, InterruptedException { |
| try { |
| appSubmitterUgi.doAs(new PrivilegedExceptionAction<Void>() { |
| @Override |
| public Void run() throws Exception { |
| if (YarnConfiguration.timelineServiceEnabled(conf)) { |
| timelineServiceV1Enabled = |
| YarnConfiguration.timelineServiceV1Enabled(conf); |
| timelineServiceV2Enabled = |
| YarnConfiguration.timelineServiceV2Enabled(conf); |
| // Creating the Timeline Client |
| if (timelineServiceV1Enabled) { |
| timelineClient = TimelineClient.createTimelineClient(); |
| timelineClient.init(conf); |
| timelineClient.start(); |
| LOG.info("Timeline service V1 client is enabled"); |
| } |
| if (timelineServiceV2Enabled) { |
| timelineV2Client = TimelineV2Client.createTimelineClient( |
| appAttemptID.getApplicationId()); |
| timelineV2Client.init(conf); |
| timelineV2Client.start(); |
| LOG.info("Timeline service V2 client is enabled"); |
| } |
| } else { |
| timelineClient = null; |
| timelineV2Client = null; |
| LOG.warn("Timeline service is not enabled"); |
| } |
| return null; |
| } |
| }); |
| } catch (UndeclaredThrowableException e) { |
| throw new YarnException(e.getCause()); |
| } |
| } |
| |
| @VisibleForTesting |
| NMCallbackHandler createNMCallbackHandler() { |
| return new NMCallbackHandler(this); |
| } |
| |
| @VisibleForTesting |
| protected boolean finish() { |
| // wait for completion. |
| while (!done |
| && (numCompletedContainers.get() != numTotalContainers)) { |
| try { |
| Thread.sleep(200); |
| } catch (InterruptedException ex) {} |
| } |
| |
| if (timelineServiceV1Enabled) { |
| publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(), |
| DSEvent.DS_APP_ATTEMPT_END, domainId, appSubmitterUgi); |
| } |
| |
| if (timelineServiceV2Enabled) { |
| publishApplicationAttemptEventOnTimelineServiceV2( |
| DSEvent.DS_APP_ATTEMPT_END); |
| } |
| |
| // Join all launched threads |
| // needed for when we time out |
| // and we need to release containers |
| for (Thread launchThread : launchThreads) { |
| try { |
| launchThread.join(10000); |
| } catch (InterruptedException e) { |
| LOG.info("Exception thrown in thread join: " + e.getMessage()); |
| e.printStackTrace(); |
| } |
| } |
| |
| // When the application completes, it should stop all running containers |
| LOG.info("Application completed. Stopping running containers"); |
| nmClientAsync.stop(); |
| |
| // When the application completes, it should send a finish application |
| // signal to the RM |
| LOG.info("Application completed. Signalling finished to RM"); |
| |
| FinalApplicationStatus appStatus; |
| boolean success = true; |
| String message = null; |
| if (numCompletedContainers.get() - numFailedContainers.get() |
| >= numTotalContainers) { |
| appStatus = FinalApplicationStatus.SUCCEEDED; |
| } else { |
| appStatus = FinalApplicationStatus.FAILED; |
| message = String.format("Application Failure: desired = %d, " + |
| "completed = %d, allocated = %d, failed = %d, " + |
| "diagnostics = %s", numRequestedContainers.get(), |
| numCompletedContainers.get(), numAllocatedContainers.get(), |
| numFailedContainers.get(), diagnostics); |
| success = false; |
| } |
| try { |
| amRMClient.unregisterApplicationMaster(appStatus, message, null); |
| } catch (YarnException | IOException ex) { |
| LOG.error("Failed to unregister application", ex); |
| } |
| amRMClient.stop(); |
| |
| // Stop Timeline Client |
| if(timelineServiceV1Enabled) { |
| timelineClient.stop(); |
| } |
| if (timelineServiceV2Enabled) { |
| timelineV2Client.stop(); |
| } |
| |
| return success; |
| } |
| |
| public static String getRelativePath(String appName, |
| String appId, String fileDstPath) { |
| return appName + "/" + appId + "/" + fileDstPath; |
| } |
| |
| @VisibleForTesting |
| class RMCallbackHandler extends AMRMClientAsync.AbstractCallbackHandler { |
| @SuppressWarnings("unchecked") |
| @Override |
| public void onContainersCompleted(List<ContainerStatus> completedContainers) { |
| LOG.info("Got response from RM for container ask, completedCnt=" |
| + completedContainers.size()); |
| for (ContainerStatus containerStatus : completedContainers) { |
| String message = appAttemptID + " got container status for containerID=" |
| + containerStatus.getContainerId() + ", state=" |
| + containerStatus.getState() + ", exitStatus=" |
| + containerStatus.getExitStatus() + ", diagnostics=" |
| + containerStatus.getDiagnostics(); |
| if (containerStatus.getExitStatus() != 0) { |
| LOG.error(message); |
| diagnostics.append(containerStatus.getDiagnostics()); |
| } else { |
| LOG.info(message); |
| } |
| |
| // non complete containers should not be here |
| assert (containerStatus.getState() == ContainerState.COMPLETE); |
| // ignore containers we know nothing about - probably from a previous |
| // attempt |
| if (!launchedContainers.contains(containerStatus.getContainerId())) { |
| LOG.info("Ignoring completed status of " |
| + containerStatus.getContainerId() |
| + "; unknown container(probably launched by previous attempt)"); |
| continue; |
| } |
| |
| // increment counters for completed/failed containers |
| int exitStatus = containerStatus.getExitStatus(); |
| if (0 != exitStatus) { |
| // container failed |
| if (ContainerExitStatus.ABORTED != exitStatus) { |
| // shell script failed |
| // counts as completed |
| numCompletedContainers.incrementAndGet(); |
| numFailedContainers.incrementAndGet(); |
| } else { |
| // container was killed by framework, possibly preempted |
| // we should re-try as the container was lost for some reason |
| numAllocatedContainers.decrementAndGet(); |
| numRequestedContainers.decrementAndGet(); |
| // we do not need to release the container as it would be done |
| // by the RM |
| |
| // Ignore these containers if placementspec is enabled |
| // for the time being. |
| if (placementSpecs != null) { |
| numIgnore.incrementAndGet(); |
| } |
| } |
| } else { |
| // nothing to do |
| // container completed successfully |
| numCompletedContainers.incrementAndGet(); |
| LOG.info("Container completed successfully." + ", containerId=" |
| + containerStatus.getContainerId()); |
| } |
| if (timelineServiceV2Enabled) { |
| Long containerStartTime = |
| containerStartTimes.get(containerStatus.getContainerId()); |
| if (containerStartTime == null) { |
| containerStartTime = SystemClock.getInstance().getTime(); |
| containerStartTimes.put(containerStatus.getContainerId(), |
| containerStartTime); |
| } |
| publishContainerEndEventOnTimelineServiceV2(containerStatus, |
| containerStartTime); |
| } |
| if (timelineServiceV1Enabled) { |
| publishContainerEndEvent(timelineClient, containerStatus, domainId, |
| appSubmitterUgi); |
| } |
| } |
| |
| // ask for more containers if any failed |
| int askCount = numTotalContainers - numRequestedContainers.get(); |
| numRequestedContainers.addAndGet(askCount); |
| |
| // Dont bother re-asking if we are using placementSpecs |
| if (placementSpecs == null) { |
| if (askCount > 0) { |
| for (int i = 0; i < askCount; ++i) { |
| ContainerRequest containerAsk = setupContainerAskForRM(); |
| amRMClient.addContainerRequest(containerAsk); |
| } |
| } |
| } |
| |
| if (numCompletedContainers.get() + numIgnore.get() >= |
| numTotalContainers) { |
| done = true; |
| } |
| } |
| |
| @Override |
| public void onContainersAllocated(List<Container> allocatedContainers) { |
| LOG.info("Got response from RM for container ask, allocatedCnt=" |
| + allocatedContainers.size()); |
| for (Container allocatedContainer : allocatedContainers) { |
| if (numAllocatedContainers.get() == numTotalContainers) { |
| LOG.info("The requested number of containers have been allocated." |
| + " Releasing the extra container allocation from the RM."); |
| amRMClient.releaseAssignedContainer(allocatedContainer.getId()); |
| } else { |
| numAllocatedContainers.addAndGet(1); |
| String yarnShellId = Integer.toString(yarnShellIdCounter); |
| yarnShellIdCounter++; |
| LOG.info( |
| "Launching shell command on a new container." |
| + ", containerId=" + allocatedContainer.getId() |
| + ", yarnShellId=" + yarnShellId |
| + ", containerNode=" |
| + allocatedContainer.getNodeId().getHost() |
| + ":" + allocatedContainer.getNodeId().getPort() |
| + ", containerNodeURI=" |
| + allocatedContainer.getNodeHttpAddress() |
| + ", containerResourceMemory" |
| + allocatedContainer.getResource().getMemorySize() |
| + ", containerResourceVirtualCores" |
| + allocatedContainer.getResource().getVirtualCores()); |
| |
| Thread launchThread = |
| createLaunchContainerThread(allocatedContainer, yarnShellId); |
| |
| // launch and start the container on a separate thread to keep |
| // the main thread unblocked |
| // as all containers may not be allocated at one go. |
| launchThreads.add(launchThread); |
| launchedContainers.add(allocatedContainer.getId()); |
| launchThread.start(); |
| |
| // Remove the corresponding request |
| Collection<AMRMClient.ContainerRequest> requests = |
| amRMClient.getMatchingRequests( |
| allocatedContainer.getAllocationRequestId()); |
| if (requests.iterator().hasNext()) { |
| AMRMClient.ContainerRequest request = requests.iterator().next(); |
| amRMClient.removeContainerRequest(request); |
| } |
| } |
| } |
| } |
| |
| @Override |
| public void onContainersUpdated( |
| List<UpdatedContainer> containers) { |
| for (UpdatedContainer container : containers) { |
| LOG.info("Container {} updated, updateType={}, resource={}, " |
| + "execType={}", |
| container.getContainer().getId(), |
| container.getUpdateType().toString(), |
| container.getContainer().getResource().toString(), |
| container.getContainer().getExecutionType()); |
| |
| // TODO Remove this line with finalized updateContainer API. |
| // Currently nm client needs to notify the NM to update container |
| // execution type via NMClient#updateContainerResource() or |
| // NMClientAsync#updateContainerResourceAsync() when |
| // auto-update.containers is disabled, but this API is |
| // under evolving and will need to be replaced by a proper new API. |
| nmClientAsync.updateContainerResourceAsync(container.getContainer()); |
| } |
| } |
| |
| @Override |
| public void onRequestsRejected(List<RejectedSchedulingRequest> rejReqs) { |
| List<SchedulingRequest> reqsToRetry = new ArrayList<>(); |
| for (RejectedSchedulingRequest rejReq : rejReqs) { |
| LOG.info("Scheduling Request {} has been rejected. Reason {}", |
| rejReq.getRequest(), rejReq.getReason()); |
| reqsToRetry.add(rejReq.getRequest()); |
| } |
| totalRetries.addAndGet(-1 * reqsToRetry.size()); |
| if (totalRetries.get() <= 0) { |
| LOG.info("Exiting, since retries are exhausted !!"); |
| done = true; |
| } else { |
| amRMClient.addSchedulingRequests(reqsToRetry); |
| } |
| } |
| |
| @Override public void onShutdownRequest() { |
| if (keepContainersAcrossAttempts) { |
| LOG.info("Shutdown request received. Ignoring since " |
| + "keep_containers_across_application_attempts is enabled"); |
| } else{ |
| LOG.info("Shutdown request received. Processing since " |
| + "keep_containers_across_application_attempts is disabled"); |
| done = true; |
| } |
| } |
| |
| @Override |
| public void onNodesUpdated(List<NodeReport> updatedNodes) {} |
| |
| @Override |
| public float getProgress() { |
| // set progress to deliver to RM on next heartbeat |
| float progress = (float) numCompletedContainers.get() |
| / numTotalContainers; |
| return progress; |
| } |
| |
| @Override |
| public void onError(Throwable e) { |
| LOG.error("Error in RMCallbackHandler: ", e); |
| done = true; |
| } |
| } |
| |
| @VisibleForTesting |
| class NMCallbackHandler extends NMClientAsync.AbstractCallbackHandler { |
| |
| private ConcurrentMap<ContainerId, Container> containers = |
| new ConcurrentHashMap<ContainerId, Container>(); |
| private final ApplicationMaster applicationMaster; |
| |
| public NMCallbackHandler(ApplicationMaster applicationMaster) { |
| this.applicationMaster = applicationMaster; |
| } |
| |
| public void addContainer(ContainerId containerId, Container container) { |
| containers.putIfAbsent(containerId, container); |
| } |
| |
| @Override |
| public void onContainerStopped(ContainerId containerId) { |
| LOG.debug("Succeeded to stop Container {}", containerId); |
| containers.remove(containerId); |
| } |
| |
| @Override |
| public void onContainerStatusReceived(ContainerId containerId, |
| ContainerStatus containerStatus) { |
| LOG.debug("Container Status: id={}, status={}", containerId, |
| containerStatus); |
| |
| // If promote_opportunistic_after_start is set, automatically promote |
| // opportunistic containers to guaranteed. |
| if (autoPromoteContainers) { |
| if (containerStatus.getState() == ContainerState.RUNNING) { |
| Container container = containers.get(containerId); |
| if (container.getExecutionType() == ExecutionType.OPPORTUNISTIC) { |
| // Promote container |
| LOG.info("Promoting container {} to {}", container.getId(), |
| container.getExecutionType()); |
| UpdateContainerRequest updateRequest = UpdateContainerRequest |
| .newInstance(container.getVersion(), container.getId(), |
| ContainerUpdateType.PROMOTE_EXECUTION_TYPE, null, |
| ExecutionType.GUARANTEED); |
| amRMClient.requestContainerUpdate(container, updateRequest); |
| } |
| } |
| } |
| } |
| |
| @Override |
| public void onContainerStarted(ContainerId containerId, |
| Map<String, ByteBuffer> allServiceResponse) { |
| LOG.debug("Succeeded to start Container {}", containerId); |
| Container container = containers.get(containerId); |
| if (container != null) { |
| applicationMaster.nmClientAsync.getContainerStatusAsync( |
| containerId, container.getNodeId()); |
| } |
| if (applicationMaster.timelineServiceV2Enabled) { |
| long startTime = SystemClock.getInstance().getTime(); |
| applicationMaster.getContainerStartTimes().put(containerId, startTime); |
| applicationMaster.publishContainerStartEventOnTimelineServiceV2( |
| container, startTime); |
| } |
| if (applicationMaster.timelineServiceV1Enabled) { |
| applicationMaster.publishContainerStartEvent( |
| applicationMaster.timelineClient, container, |
| applicationMaster.domainId, applicationMaster.appSubmitterUgi); |
| } |
| } |
| |
| @Override |
| public void onStartContainerError(ContainerId containerId, Throwable t) { |
| LOG.error("Failed to start Container {}", containerId, t); |
| containers.remove(containerId); |
| applicationMaster.numCompletedContainers.incrementAndGet(); |
| applicationMaster.numFailedContainers.incrementAndGet(); |
| if (timelineServiceV2Enabled) { |
| publishContainerStartFailedEventOnTimelineServiceV2(containerId, |
| t.getMessage()); |
| } |
| if (timelineServiceV1Enabled) { |
| publishContainerStartFailedEvent(containerId, t.getMessage()); |
| } |
| } |
| |
| @Override |
| public void onGetContainerStatusError( |
| ContainerId containerId, Throwable t) { |
| LOG.error("Failed to query the status of Container " + containerId); |
| } |
| |
| @Override |
| public void onStopContainerError(ContainerId containerId, Throwable t) { |
| LOG.error("Failed to stop Container " + containerId); |
| containers.remove(containerId); |
| } |
| |
| @Deprecated |
| @Override |
| public void onIncreaseContainerResourceError( |
| ContainerId containerId, Throwable t) {} |
| |
| @Deprecated |
| @Override |
| public void onContainerResourceIncreased( |
| ContainerId containerId, Resource resource) {} |
| |
| @Override |
| public void onUpdateContainerResourceError( |
| ContainerId containerId, Throwable t) { |
| } |
| |
| @Override |
| public void onContainerResourceUpdated(ContainerId containerId, |
| Resource resource) { |
| } |
| } |
| |
| /** |
| * Thread to connect to the {@link ContainerManagementProtocol} and launch the container |
| * that will execute the shell command. |
| */ |
| private class LaunchContainerRunnable implements Runnable { |
| |
| // Allocated container |
| private Container container; |
| private String shellId; |
| |
| NMCallbackHandler containerListener; |
| |
| /** |
| * @param lcontainer Allocated container |
| * @param containerListener Callback handler of the container |
| */ |
| public LaunchContainerRunnable(Container lcontainer, |
| NMCallbackHandler containerListener, String shellId) { |
| this.container = lcontainer; |
| this.containerListener = containerListener; |
| this.shellId = shellId; |
| } |
| |
| @Override |
| /** |
| * Connects to CM, sets up container launch context |
| * for shell command and eventually dispatches the container |
| * start request to the CM. |
| */ |
| public void run() { |
| LOG.info("Setting up container launch container for containerid=" |
| + container.getId() + " with shellid=" + shellId); |
| |
| // Set the local resources |
| Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(); |
| |
| // The container for the eventual shell commands needs its own local |
| // resources too. |
| // In this scenario, if a shell script is specified, we need to have it |
| // copied and made available to the container. |
| if (!scriptPath.isEmpty()) { |
| Path renamedScriptPath = null; |
| if (Shell.WINDOWS) { |
| renamedScriptPath = new Path(scriptPath + ".bat"); |
| } else { |
| renamedScriptPath = new Path(scriptPath + ".sh"); |
| } |
| |
| try { |
| // rename the script file based on the underlying OS syntax. |
| renameScriptFile(renamedScriptPath); |
| } catch (Exception e) { |
| LOG.error( |
| "Not able to add suffix (.bat/.sh) to the shell script filename", |
| e); |
| // We know we cannot continue launching the container |
| // so we should release it. |
| numCompletedContainers.incrementAndGet(); |
| numFailedContainers.incrementAndGet(); |
| return; |
| } |
| |
| URL yarnUrl = null; |
| try { |
| yarnUrl = URL.fromURI(new URI(renamedScriptPath.toString())); |
| } catch (URISyntaxException e) { |
| LOG.error("Error when trying to use shell script path specified" |
| + " in env, path=" + renamedScriptPath, e); |
| // A failure scenario on bad input such as invalid shell script path |
| // We know we cannot continue launching the container |
| // so we should release it. |
| // TODO |
| numCompletedContainers.incrementAndGet(); |
| numFailedContainers.incrementAndGet(); |
| return; |
| } |
| LocalResource shellRsrc = LocalResource.newInstance(yarnUrl, |
| LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, |
| shellScriptPathLen, shellScriptPathTimestamp); |
| localResources.put(Shell.WINDOWS ? EXEC_BAT_SCRIPT_STRING_PATH : |
| EXEC_SHELL_STRING_PATH, shellRsrc); |
| shellCommand = Shell.WINDOWS ? windows_command : linux_bash_command; |
| } |
| |
| // Set up localization for the container which runs the command |
| if (localizableFiles.size() > 0) { |
| FileSystem fs; |
| try { |
| fs = FileSystem.get(conf); |
| } catch (IOException e) { |
| numCompletedContainers.incrementAndGet(); |
| numFailedContainers.incrementAndGet(); |
| throw new UncheckedIOException("Cannot get FileSystem", e); |
| } |
| |
| localizableFiles.stream().forEach(fileName -> { |
| try { |
| String relativePath = |
| getRelativePath(appName, appId.toString(), fileName); |
| Path dst = |
| new Path(homeDirectory, relativePath); |
| FileStatus fileStatus = fs.getFileStatus(dst); |
| LocalResource localRes = LocalResource.newInstance( |
| URL.fromURI(dst.toUri()), |
| LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, |
| fileStatus.getLen(), fileStatus.getModificationTime()); |
| LOG.info("Setting up file for localization: " + dst); |
| localResources.put(fileName, localRes); |
| } catch (IOException e) { |
| numCompletedContainers.incrementAndGet(); |
| numFailedContainers.incrementAndGet(); |
| throw new UncheckedIOException( |
| "Error during localization setup", e); |
| } |
| }); |
| } |
| |
| // Set the necessary command to execute on the allocated container |
| Vector<CharSequence> vargs = new Vector<CharSequence>(5); |
| |
| // Set executable command |
| vargs.add(shellCommand); |
| // Set shell script path |
| if (!scriptPath.isEmpty()) { |
| vargs.add(Shell.WINDOWS ? EXEC_BAT_SCRIPT_STRING_PATH |
| : EXEC_SHELL_STRING_PATH); |
| } |
| |
| // Set args for the shell command if any |
| vargs.add(shellArgs); |
| // Add log redirect params |
| vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout"); |
| vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"); |
| |
| // Get final commmand |
| StringBuilder command = new StringBuilder(); |
| for (CharSequence str : vargs) { |
| command.append(str).append(" "); |
| } |
| |
| List<String> commands = new ArrayList<String>(); |
| commands.add(command.toString()); |
| |
| // Set up ContainerLaunchContext, setting local resource, environment, |
| // command and token for constructor. |
| |
| // Note for tokens: Set up tokens for the container too. Today, for normal |
| // shell commands, the container in distribute-shell doesn't need any |
| // tokens. We are populating them mainly for NodeManagers to be able to |
| // download anyfiles in the distributed file-system. The tokens are |
| // otherwise also useful in cases, for e.g., when one is running a |
| // "hadoop dfs" command inside the distributed shell. |
| Map<String, String> myShellEnv = new HashMap<String, String>(shellEnv); |
| myShellEnv.put(YARN_SHELL_ID, shellId); |
| ContainerRetryContext containerRetryContext = |
| ContainerRetryContext.newInstance( |
| containerRetryPolicy, containerRetryErrorCodes, |
| containerMaxRetries, containrRetryInterval, |
| containerFailuresValidityInterval); |
| ContainerLaunchContext ctx = ContainerLaunchContext.newInstance( |
| localResources, myShellEnv, commands, null, allTokens.duplicate(), |
| null, containerRetryContext); |
| containerListener.addContainer(container.getId(), container); |
| nmClientAsync.startContainerAsync(container, ctx); |
| } |
| } |
| |
| private void renameScriptFile(final Path renamedScriptPath) |
| throws IOException, InterruptedException { |
| appSubmitterUgi.doAs(new PrivilegedExceptionAction<Void>() { |
| @Override |
| public Void run() throws IOException { |
| FileSystem fs = renamedScriptPath.getFileSystem(conf); |
| fs.rename(new Path(scriptPath), renamedScriptPath); |
| return null; |
| } |
| }); |
| LOG.info("User " + appSubmitterUgi.getUserName() |
| + " added suffix(.sh/.bat) to script file as " + renamedScriptPath); |
| } |
| |
| /** |
| * Setup the request that will be sent to the RM for the container ask. |
| * |
| * @return the setup ResourceRequest to be sent to RM |
| */ |
| private ContainerRequest setupContainerAskForRM() { |
| // setup requirements for hosts |
| // using * as any host will do for the distributed shell app |
| // set the priority for the request |
| // TODO - what is the range for priority? how to decide? |
| Priority pri = Priority.newInstance(requestPriority); |
| |
| // Set up resource type requirements |
| ContainerRequest request = new ContainerRequest( |
| getTaskResourceCapability(), |
| null, null, pri, 0, true, null, |
| ExecutionTypeRequest.newInstance(containerType, enforceExecType), |
| containerResourceProfile); |
| LOG.info("Requested container ask: " + request.toString()); |
| return request; |
| } |
| |
| private SchedulingRequest setupSchedulingRequest(PlacementSpec spec) { |
| long allocId = allocIdCounter.incrementAndGet(); |
| SchedulingRequest sReq = SchedulingRequest.newInstance( |
| allocId, Priority.newInstance(requestPriority), |
| ExecutionTypeRequest.newInstance(), |
| Collections.singleton(spec.sourceTag), |
| ResourceSizing.newInstance( |
| getTaskResourceCapability()), null); |
| sReq.setPlacementConstraint(spec.constraint); |
| LOG.info("Scheduling Request made: " + sReq.toString()); |
| return sReq; |
| } |
| |
| private boolean fileExist(String filePath) { |
| return new File(filePath).exists(); |
| } |
| |
| private String readContent(String filePath) throws IOException { |
| try (DataInputStream ds = new DataInputStream( |
| new FileInputStream(filePath))) { |
| return ds.readUTF(); |
| } |
| } |
| |
| private void publishContainerStartEvent( |
| final TimelineClient timelineClient, final Container container, |
| String domainId, UserGroupInformation ugi) { |
| final TimelineEntity entity = new TimelineEntity(); |
| entity.setEntityId(container.getId().toString()); |
| entity.setEntityType(DSEntity.DS_CONTAINER.toString()); |
| entity.setDomainId(domainId); |
| entity.addPrimaryFilter(USER_TIMELINE_FILTER_NAME, ugi.getShortUserName()); |
| entity.addPrimaryFilter(APPID_TIMELINE_FILTER_NAME, container.getId() |
| .getApplicationAttemptId().getApplicationId().toString()); |
| TimelineEvent event = new TimelineEvent(); |
| event.setTimestamp(System.currentTimeMillis()); |
| event.setEventType(DSEvent.DS_CONTAINER_START.toString()); |
| event.addEventInfo("Node", container.getNodeId().toString()); |
| event.addEventInfo("Resources", container.getResource().toString()); |
| entity.addEvent(event); |
| |
| try { |
| processTimelineResponseErrors( |
| putContainerEntity(timelineClient, |
| container.getId().getApplicationAttemptId(), |
| entity)); |
| } catch (YarnException | IOException | ClientHandlerException e) { |
| LOG.error("Container start event could not be published for " |
| + container.getId().toString(), e); |
| } |
| } |
| |
| @VisibleForTesting |
| void publishContainerEndEvent( |
| final TimelineClient timelineClient, ContainerStatus container, |
| String domainId, UserGroupInformation ugi) { |
| final TimelineEntity entity = new TimelineEntity(); |
| entity.setEntityId(container.getContainerId().toString()); |
| entity.setEntityType(DSEntity.DS_CONTAINER.toString()); |
| entity.setDomainId(domainId); |
| entity.addPrimaryFilter(USER_TIMELINE_FILTER_NAME, ugi.getShortUserName()); |
| entity.addPrimaryFilter(APPID_TIMELINE_FILTER_NAME, |
| container.getContainerId().getApplicationAttemptId() |
| .getApplicationId().toString()); |
| TimelineEvent event = new TimelineEvent(); |
| event.setTimestamp(System.currentTimeMillis()); |
| event.setEventType(DSEvent.DS_CONTAINER_END.toString()); |
| event.addEventInfo("State", container.getState().name()); |
| event.addEventInfo("Exit Status", container.getExitStatus()); |
| event.addEventInfo(DIAGNOSTICS, container.getDiagnostics()); |
| entity.addEvent(event); |
| try { |
| processTimelineResponseErrors( |
| putContainerEntity(timelineClient, |
| container.getContainerId().getApplicationAttemptId(), |
| entity)); |
| } catch (YarnException | IOException | ClientHandlerException e) { |
| LOG.error("Container end event could not be published for " |
| + container.getContainerId().toString(), e); |
| } |
| } |
| |
| private TimelinePutResponse putContainerEntity( |
| TimelineClient timelineClient, ApplicationAttemptId currAttemptId, |
| TimelineEntity entity) |
| throws YarnException, IOException { |
| if (TimelineUtils.timelineServiceV1_5Enabled(conf)) { |
| TimelineEntityGroupId groupId = TimelineEntityGroupId.newInstance( |
| currAttemptId.getApplicationId(), |
| CONTAINER_ENTITY_GROUP_ID); |
| return timelineClient.putEntities(currAttemptId, groupId, entity); |
| } else { |
| return timelineClient.putEntities(entity); |
| } |
| } |
| |
| private void publishApplicationAttemptEvent( |
| final TimelineClient timelineClient, String appAttemptId, |
| DSEvent appEvent, String domainId, UserGroupInformation ugi) { |
| final TimelineEntity entity = new TimelineEntity(); |
| entity.setEntityId(appAttemptId); |
| entity.setEntityType(DSEntity.DS_APP_ATTEMPT.toString()); |
| entity.setDomainId(domainId); |
| entity.addPrimaryFilter(USER_TIMELINE_FILTER_NAME, ugi.getShortUserName()); |
| TimelineEvent event = new TimelineEvent(); |
| event.setEventType(appEvent.toString()); |
| event.setTimestamp(System.currentTimeMillis()); |
| entity.addEvent(event); |
| try { |
| TimelinePutResponse response = timelineClient.putEntities(entity); |
| processTimelineResponseErrors(response); |
| } catch (YarnException | IOException | ClientHandlerException e) { |
| LOG.error("App Attempt " |
| + (appEvent.equals(DSEvent.DS_APP_ATTEMPT_START) ? "start" : "end") |
| + " event could not be published for " |
| + appAttemptID, e); |
| } |
| } |
| |
| private TimelinePutResponse processTimelineResponseErrors( |
| TimelinePutResponse response) { |
| List<TimelinePutResponse.TimelinePutError> errors = response.getErrors(); |
| if (errors.size() == 0) { |
| LOG.debug("Timeline entities are successfully put"); |
| } else { |
| for (TimelinePutResponse.TimelinePutError error : errors) { |
| LOG.error( |
| "Error when publishing entity [" + error.getEntityType() + "," |
| + error.getEntityId() + "], server side error code: " |
| + error.getErrorCode()); |
| } |
| } |
| return response; |
| } |
| |
| RMCallbackHandler getRMCallbackHandler() { |
| return new RMCallbackHandler(); |
| } |
| |
| @VisibleForTesting |
| void setAmRMClient(AMRMClientAsync client) { |
| this.amRMClient = client; |
| } |
| |
| @VisibleForTesting |
| int getNumCompletedContainers() { |
| return numCompletedContainers.get(); |
| } |
| |
| @VisibleForTesting |
| boolean getDone() { |
| return done; |
| } |
| |
| @VisibleForTesting |
| Thread createLaunchContainerThread(Container allocatedContainer, |
| String shellId) { |
| LaunchContainerRunnable runnableLaunchContainer = |
| new LaunchContainerRunnable(allocatedContainer, containerListener, |
| shellId); |
| return new Thread(runnableLaunchContainer); |
| } |
| |
| private void publishContainerStartEventOnTimelineServiceV2( |
| Container container, long startTime) { |
| final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity |
| entity = |
| new org.apache.hadoop.yarn.api.records.timelineservice. |
| TimelineEntity(); |
| entity.setId(container.getId().toString()); |
| entity.setType(DSEntity.DS_CONTAINER.toString()); |
| entity.setCreatedTime(startTime); |
| entity.addInfo("user", appSubmitterUgi.getShortUserName()); |
| |
| org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event = |
| new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent(); |
| event.setTimestamp(startTime); |
| event.setId(DSEvent.DS_CONTAINER_START.toString()); |
| event.addInfo("Node", container.getNodeId().toString()); |
| event.addInfo("Resources", container.getResource().toString()); |
| entity.addEvent(event); |
| entity.setIdPrefix(TimelineServiceHelper.invertLong(startTime)); |
| |
| try { |
| appSubmitterUgi.doAs(new PrivilegedExceptionAction<Object>() { |
| @Override |
| public TimelinePutResponse run() throws Exception { |
| timelineV2Client.putEntitiesAsync(entity); |
| return null; |
| } |
| }); |
| } catch (Exception e) { |
| LOG.error("Container start event could not be published for " |
| + container.getId().toString(), |
| e instanceof UndeclaredThrowableException ? e.getCause() : e); |
| } |
| } |
| |
| private void publishContainerStartFailedEventOnTimelineServiceV2( |
| final ContainerId containerId, String diagnostics) { |
| final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity |
| entity = new org.apache.hadoop.yarn.api.records.timelineservice. |
| TimelineEntity(); |
| entity.setId(containerId.toString()); |
| entity.setType(DSEntity.DS_CONTAINER.toString()); |
| entity.addInfo("user", appSubmitterUgi.getShortUserName()); |
| org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event = |
| new org.apache.hadoop.yarn.api.records.timelineservice |
| .TimelineEvent(); |
| event.setTimestamp(System.currentTimeMillis()); |
| event.setId(DSEvent.DS_CONTAINER_END.toString()); |
| event.addInfo(DIAGNOSTICS, diagnostics); |
| entity.addEvent(event); |
| try { |
| appSubmitterUgi.doAs((PrivilegedExceptionAction<Object>) () -> { |
| timelineV2Client.putEntitiesAsync(entity); |
| return null; |
| }); |
| } catch (Exception e) { |
| LOG.error("Container start failed event could not be published for {}", |
| containerId, |
| e instanceof UndeclaredThrowableException ? e.getCause() : e); |
| } |
| } |
| |
| private void publishContainerStartFailedEvent(final ContainerId containerId, |
| String diagnostics) { |
| final TimelineEntity entityV1 = new TimelineEntity(); |
| entityV1.setEntityId(containerId.toString()); |
| entityV1.setEntityType(DSEntity.DS_CONTAINER.toString()); |
| entityV1.setDomainId(domainId); |
| entityV1.addPrimaryFilter(USER_TIMELINE_FILTER_NAME, appSubmitterUgi |
| .getShortUserName()); |
| entityV1.addPrimaryFilter(APPID_TIMELINE_FILTER_NAME, |
| containerId.getApplicationAttemptId().getApplicationId().toString()); |
| |
| TimelineEvent eventV1 = new TimelineEvent(); |
| eventV1.setTimestamp(System.currentTimeMillis()); |
| eventV1.setEventType(DSEvent.DS_CONTAINER_END.toString()); |
| eventV1.addEventInfo(DIAGNOSTICS, diagnostics); |
| entityV1.addEvent(eventV1); |
| try { |
| processTimelineResponseErrors(putContainerEntity(timelineClient, |
| containerId.getApplicationAttemptId(), entityV1)); |
| } catch (YarnException | IOException | ClientHandlerException e) { |
| LOG.error("Container end event could not be published for {}", |
| containerId, e); |
| } |
| } |
| |
| private void publishContainerEndEventOnTimelineServiceV2( |
| final ContainerStatus container, long containerStartTime) { |
| final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity |
| entity = |
| new org.apache.hadoop.yarn.api.records.timelineservice. |
| TimelineEntity(); |
| entity.setId(container.getContainerId().toString()); |
| entity.setType(DSEntity.DS_CONTAINER.toString()); |
| //entity.setDomainId(domainId); |
| entity.addInfo("user", appSubmitterUgi.getShortUserName()); |
| org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event = |
| new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent(); |
| event.setTimestamp(System.currentTimeMillis()); |
| event.setId(DSEvent.DS_CONTAINER_END.toString()); |
| event.addInfo("State", container.getState().name()); |
| event.addInfo("Exit Status", container.getExitStatus()); |
| event.addInfo(DIAGNOSTICS, container.getDiagnostics()); |
| entity.addEvent(event); |
| entity.setIdPrefix(TimelineServiceHelper.invertLong(containerStartTime)); |
| |
| try { |
| appSubmitterUgi.doAs(new PrivilegedExceptionAction<Object>() { |
| @Override |
| public TimelinePutResponse run() throws Exception { |
| timelineV2Client.putEntitiesAsync(entity); |
| return null; |
| } |
| }); |
| } catch (Exception e) { |
| LOG.error("Container end event could not be published for " |
| + container.getContainerId().toString(), |
| e instanceof UndeclaredThrowableException ? e.getCause() : e); |
| } |
| } |
| |
| private void publishApplicationAttemptEventOnTimelineServiceV2( |
| DSEvent appEvent) { |
| final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity |
| entity = |
| new org.apache.hadoop.yarn.api.records.timelineservice. |
| TimelineEntity(); |
| entity.setId(appAttemptID.toString()); |
| entity.setType(DSEntity.DS_APP_ATTEMPT.toString()); |
| long ts = System.currentTimeMillis(); |
| if (appEvent == DSEvent.DS_APP_ATTEMPT_START) { |
| entity.setCreatedTime(ts); |
| } |
| entity.addInfo("user", appSubmitterUgi.getShortUserName()); |
| org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event = |
| new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent(); |
| event.setId(appEvent.toString()); |
| event.setTimestamp(ts); |
| entity.addEvent(event); |
| entity.setIdPrefix( |
| TimelineServiceHelper.invertLong(appAttemptID.getAttemptId())); |
| |
| try { |
| appSubmitterUgi.doAs(new PrivilegedExceptionAction<Object>() { |
| @Override |
| public TimelinePutResponse run() throws Exception { |
| timelineV2Client.putEntitiesAsync(entity); |
| return null; |
| } |
| }); |
| } catch (Exception e) { |
| LOG.error("App Attempt " |
| + (appEvent.equals(DSEvent.DS_APP_ATTEMPT_START) ? "start" : "end") |
| + " event could not be published for " |
| + appAttemptID, |
| e instanceof UndeclaredThrowableException ? e.getCause() : e); |
| } |
| } |
| |
| private Resource getTaskResourceCapability() |
| throws YarnRuntimeException { |
| if (containerMemory < -1 || containerMemory == 0) { |
| throw new YarnRuntimeException("Value of AM memory '" + containerMemory |
| + "' has to be greater than 0"); |
| } |
| if (containerVirtualCores < -1 || containerVirtualCores == 0) { |
| throw new YarnRuntimeException( |
| "Value of AM vcores '" + containerVirtualCores |
| + "' has to be greater than 0"); |
| } |
| |
| Resource resourceCapability = |
| Resource.newInstance(containerMemory, containerVirtualCores); |
| containerMemory = |
| containerMemory == -1 ? DEFAULT_CONTAINER_MEMORY : containerMemory; |
| containerVirtualCores = containerVirtualCores == -1 ? |
| DEFAULT_CONTAINER_VCORES : |
| containerVirtualCores; |
| resourceCapability.setMemorySize(containerMemory); |
| resourceCapability.setVirtualCores(containerVirtualCores); |
| for (Map.Entry<String, Long> entry : containerResources.entrySet()) { |
| resourceCapability.setResourceValue(entry.getKey(), entry.getValue()); |
| } |
| |
| return resourceCapability; |
| } |
| } |