| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.s4.tools.yarn; |
| |
| import java.io.IOException; |
| import java.lang.Thread.UncaughtExceptionHandler; |
| import java.net.InetSocketAddress; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Vector; |
| import java.util.concurrent.CopyOnWriteArrayList; |
| import java.util.concurrent.atomic.AtomicInteger; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.LocatedFileStatus; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.RemoteIterator; |
| import org.apache.hadoop.net.NetUtils; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.yarn.api.AMRMProtocol; |
| import org.apache.hadoop.yarn.api.ApplicationConstants; |
| import org.apache.hadoop.yarn.api.ContainerManager; |
| import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; |
| import org.apache.hadoop.yarn.api.records.AMResponse; |
| import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; |
| import org.apache.hadoop.yarn.api.records.Container; |
| import org.apache.hadoop.yarn.api.records.ContainerId; |
| import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; |
| import org.apache.hadoop.yarn.api.records.ContainerState; |
| import org.apache.hadoop.yarn.api.records.ContainerStatus; |
| import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; |
| import org.apache.hadoop.yarn.api.records.LocalResource; |
| import org.apache.hadoop.yarn.api.records.LocalResourceType; |
| import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; |
| import org.apache.hadoop.yarn.api.records.Priority; |
| import org.apache.hadoop.yarn.api.records.Resource; |
| import org.apache.hadoop.yarn.api.records.ResourceRequest; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.hadoop.yarn.exceptions.YarnRemoteException; |
| import org.apache.hadoop.yarn.ipc.YarnRPC; |
| import org.apache.hadoop.yarn.util.ConverterUtils; |
| import org.apache.hadoop.yarn.util.Records; |
| import org.apache.s4.deploy.HdfsFetcherModule; |
| import org.apache.s4.tools.Tools; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.google.common.base.Strings; |
| import com.google.common.collect.Lists; |
| |
| /** |
| * An ApplicationMaster for launching S4 nodes on a set of launched containers using the YARN framework. |
| * |
| * The code is inspired by the shell example from {@link http |
| * ://hadoop.apache.org/docs/r2.0.2-alpha/hadoop-yarn/hadoop-yarn-site/WritingYarnApplications.html} |
| * |
| * |
| */ |
| public class S4ApplicationMaster { |
| |
| private static Logger logger = LoggerFactory.getLogger(S4ApplicationMaster.class); |
| |
| // Configuration |
| private Configuration conf; |
| |
| // YARN RPC to communicate with the Resource Manager or Node Manager |
| private YarnRPC rpc; |
| |
| // Handle to communicate with the Resource Manager |
| private AMRMProtocol resourceManager; |
| |
| // Application Attempt Id ( combination of attemptId and fail count ) |
| private ApplicationAttemptId appAttemptID; |
| |
| // For status update for clients - yet to be implemented |
| // Hostname of the container |
| private final String appMasterHostname = ""; |
| // Port on which the app master listens for status update requests from clients |
| private final int appMasterRpcPort = 0; |
| // Tracking url to which app master publishes info for clients to monitor |
| private final String appMasterTrackingUrl = ""; |
| |
| // Incremental counter for rpc calls to the RM |
| private final AtomicInteger rmRequestID = new AtomicInteger(); |
| |
| // Simple flag to denote whether all works is done |
| private boolean appDone = false; |
| // Counter for completed containers ( complete denotes successful or failed ) |
| private final AtomicInteger numCompletedContainers = new AtomicInteger(); |
| // Allocated container count so that we know how many containers has the RM |
| // allocated to us |
| private final AtomicInteger numAllocatedContainers = new AtomicInteger(); |
| // Count of failed containers |
| private final AtomicInteger numFailedContainers = new AtomicInteger(); |
| // Count of containers already requested from the RM |
| // Needed as once requested, we should not request for containers again and again. |
| // Only request for more if the original requirement changes. |
| private final AtomicInteger numRequestedContainers = new AtomicInteger(); |
| |
| // Containers to be released |
| private final CopyOnWriteArrayList<ContainerId> releasedContainers = new CopyOnWriteArrayList<ContainerId>(); |
| |
| // Launch threads |
| private final List<Thread> launchThreads = new ArrayList<Thread>(); |
| |
| private int containerMemory; |
| |
| private static AppMasterYarnArgs yarnArgs; |
| |
| /** |
| * @param args |
| * Command line args |
| */ |
| public static void main(String[] args) { |
| boolean result = false; |
| try { |
| yarnArgs = new AppMasterYarnArgs(); |
| logger.info("S4YarnApplicationMaster args = " + Arrays.toString(args)); |
| |
| Tools.parseArgs(yarnArgs, args); |
| |
| S4ApplicationMaster appMaster = new S4ApplicationMaster(); |
| logger.info("Initializing ApplicationMaster with args " + Arrays.toString(args)); |
| appMaster.init(); |
| result = appMaster.run(); |
| } catch (Throwable t) { |
| t.printStackTrace(); |
| logger.error("Error running ApplicationMaster", t); |
| System.exit(1); |
| } |
| if (result) { |
| logger.info("Application Master completed successfully. exiting"); |
| System.exit(0); |
| } else { |
| logger.info("Application Master failed. exiting"); |
| System.exit(2); |
| } |
| } |
| |
| public S4ApplicationMaster() throws Exception { |
| |
| Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler() { |
| |
| @Override |
| public void uncaughtException(Thread t, Throwable e) { |
| logger.error("Uncaught exception in thread {}", t.getName(), e); |
| |
| } |
| }); |
| } |
| |
| /** |
| */ |
| public void init() throws IOException { |
| |
| containerMemory = Utils.extractMemoryParam(yarnArgs.containerMemory, yarnArgs.extraS4NodeJVMParams); |
| |
| Map<String, String> envs = System.getenv(); |
| |
| appAttemptID = Records.newRecord(ApplicationAttemptId.class); |
| if (!envs.containsKey(ApplicationConstants.AM_CONTAINER_ID_ENV)) { |
| if (!Strings.isNullOrEmpty(yarnArgs.appAttemptId)) { |
| appAttemptID = ConverterUtils.toApplicationAttemptId(yarnArgs.appAttemptId); |
| } else { |
| throw new IllegalArgumentException("Application Attempt Id not set in the environment"); |
| } |
| } else { |
| ContainerId containerId = ConverterUtils.toContainerId(envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV)); |
| appAttemptID = containerId.getApplicationAttemptId(); |
| } |
| |
| logger.info("Application master for app" + ", appId=" + appAttemptID.getApplicationId().getId() |
| + ", clustertimestamp=" + appAttemptID.getApplicationId().getClusterTimestamp() + ", attemptId=" |
| + appAttemptID.getAttemptId()); |
| |
| if (Strings.isNullOrEmpty(yarnArgs.cluster)) { |
| throw new IllegalArgumentException("No cluster ID specified to be provisioned by application master"); |
| } |
| |
| conf = new YarnConfiguration(); |
| if (yarnArgs.test) { |
| // testMode = true; |
| conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "hdfs://localhost:9000"); |
| } |
| rpc = YarnRPC.create(conf); |
| } |
| |
| /** |
| * Main run function for the application master |
| * |
| * @throws YarnRemoteException |
| */ |
| public boolean run() throws YarnRemoteException { |
| logger.info("Starting ApplicationMaster"); |
| |
| // Connect to ResourceManager |
| resourceManager = connectToRM(); |
| |
| // Setup local RPC Server to accept status requests directly from clients |
| // TODO need to setup a protocol for client to be able to communicate to the RPC server |
| // TODO use the rpc port info to register with the RM for the client to send requests to this app master |
| |
| // Register self with ResourceManager |
| RegisterApplicationMasterResponse response = registerToRM(); |
| // Dump out information about cluster capability as seen by the resource manager |
| int minMem = response.getMinimumResourceCapability().getMemory(); |
| int maxMem = response.getMaximumResourceCapability().getMemory(); |
| logger.info("Min mem capability of resources in this cluster " + minMem); |
| logger.info("Max mem capability of resources in this cluster " + maxMem); |
| |
| // A resource ask has to be atleast the minimum of the capability of the cluster, the value has to be |
| // a multiple of the min value and cannot exceed the max. |
| // If it is not an exact multiple of min, the RM will allocate to the nearest multiple of min |
| if (containerMemory < minMem) { |
| logger.info("Container memory for S4 node specified below min threshold of YARN cluster. Using min value." |
| + ", specified=" + containerMemory + ", min=" + minMem); |
| containerMemory = minMem; |
| } else if (containerMemory > maxMem) { |
| logger.info("Container memory for S4 node specified above max threshold of YARN cluster. Using max value." |
| + ", specified=" + containerMemory + ", max=" + maxMem); |
| containerMemory = maxMem; |
| } |
| |
| int loopCounter = -1; |
| |
| while (numCompletedContainers.get() < yarnArgs.numContainers && !appDone) { |
| loopCounter++; |
| |
| // log current state |
| logger.info("Current application state: loop=" + loopCounter + ", appDone=" + appDone + ", total=" |
| + yarnArgs.numContainers + ", requested=" + numRequestedContainers + ", completed=" |
| + numCompletedContainers + ", failed=" + numFailedContainers + ", currentAllocated=" |
| + numAllocatedContainers); |
| |
| // Sleep before each loop when asking RM for containers |
| // to avoid flooding RM with spurious requests when it |
| // need not have any available containers |
| // Sleeping for 1000 ms. |
| try { |
| Thread.sleep(1000); |
| } catch (InterruptedException e) { |
| logger.info("Sleep interrupted " + e.getMessage()); |
| } |
| |
| // No. of containers to request |
| // For the first loop, askCount will be equal to total containers needed |
| // From that point on, askCount will always be 0 as current implementation |
| // does not change its ask on container failures. |
| int askCount = yarnArgs.numContainers - numRequestedContainers.get(); |
| numRequestedContainers.addAndGet(askCount); |
| |
| // Setup request to be sent to RM to allocate containers |
| List<ResourceRequest> resourceReq = new ArrayList<ResourceRequest>(); |
| if (askCount > 0) { |
| ResourceRequest containerAsk = setupContainerAskForRM(askCount); |
| resourceReq.add(containerAsk); |
| } |
| |
| // Send the request to RM |
| logger.info("Asking RM for containers" + ", askCount=" + askCount); |
| AMResponse amResp = sendContainerAskToRM(resourceReq); |
| |
| // Retrieve list of allocated containers from the response |
| List<Container> allocatedContainers = amResp.getAllocatedContainers(); |
| logger.info("Got response from RM for container ask, allocatedCnt=" + allocatedContainers.size()); |
| numAllocatedContainers.addAndGet(allocatedContainers.size()); |
| for (Container allocatedContainer : allocatedContainers) { |
| logger.info("Launching S4 node on a new container." + ", containerId=" + allocatedContainer.getId() |
| + ", containerNode=" + allocatedContainer.getNodeId().getHost() + ":" |
| + allocatedContainer.getNodeId().getPort() + ", containerNodeURI=" |
| + allocatedContainer.getNodeHttpAddress() + ", containerState" + allocatedContainer.getState() |
| + ", containerResourceMemory" + allocatedContainer.getResource().getMemory()); |
| // + ", containerToken" + allocatedContainer.getContainerToken().getIdentifier().toString()); |
| |
| LaunchContainerRunnable runnableLaunchContainer = new LaunchContainerRunnable(allocatedContainer); |
| Thread launchThread = new Thread(runnableLaunchContainer); |
| |
| // launch and start the container on a separate thread to keep the main thread unblocked |
| // as all containers may not be allocated at one go. |
| launchThreads.add(launchThread); |
| launchThread.start(); |
| } |
| |
| // Check what the current available resources in the cluster are |
| // TODO should we do anything if the available resources are not enough? |
| Resource availableResources = amResp.getAvailableResources(); |
| logger.info("Current available resources in the cluster " + availableResources); |
| |
| // Check the completed containers |
| List<ContainerStatus> completedContainers = amResp.getCompletedContainersStatuses(); |
| logger.info("Got response from RM for container ask, completedCnt=" + completedContainers.size()); |
| for (ContainerStatus containerStatus : completedContainers) { |
| logger.info("Got container status for containerID= " + containerStatus.getContainerId() + ", state=" |
| + containerStatus.getState() + ", exitStatus=" + containerStatus.getExitStatus() |
| + ", diagnostics=" + containerStatus.getDiagnostics()); |
| |
| // non complete containers should not be here |
| assert (containerStatus.getState() == ContainerState.COMPLETE); |
| |
| // increment counters for completed/failed containers |
| int exitStatus = containerStatus.getExitStatus(); |
| if (0 != exitStatus) { |
| // container failed |
| if (-100 != exitStatus) { |
| // shell script failed |
| // counts as completed |
| numCompletedContainers.incrementAndGet(); |
| numFailedContainers.incrementAndGet(); |
| } else { |
| // something else bad happened |
| // app job did not complete for some reason |
| // we should re-try as the container was lost for some reason |
| numAllocatedContainers.decrementAndGet(); |
| numRequestedContainers.decrementAndGet(); |
| // we do not need to release the container as it would be done |
| // by the RM/CM. |
| } |
| } else { |
| // nothing to do |
| // container completed successfully |
| numCompletedContainers.incrementAndGet(); |
| logger.info("Container completed successfully." + ", containerId=" |
| + containerStatus.getContainerId()); |
| } |
| |
| } |
| if (numCompletedContainers.get() == yarnArgs.numContainers) { |
| appDone = true; |
| } |
| |
| logger.info("Current application state: loop=" + loopCounter + ", appDone=" + appDone + ", total=" |
| + yarnArgs.numContainers + ", requested=" + numRequestedContainers + ", completed=" |
| + numCompletedContainers + ", failed=" + numFailedContainers + ", currentAllocated=" |
| + numAllocatedContainers); |
| |
| } |
| |
| // Join all launched threads |
| // needed for when we time out |
| // and we need to release containers |
| for (Thread launchThread : launchThreads) { |
| try { |
| launchThread.join(0); |
| } catch (InterruptedException e) { |
| logger.info("Exception thrown in thread join: " + e.getMessage()); |
| e.printStackTrace(); |
| } |
| } |
| |
| // When the application completes, it should send a finish application signal |
| // to the RM |
| logger.info("Application completed. Signalling finish to RM"); |
| |
| FinishApplicationMasterRequest finishReq = Records.newRecord(FinishApplicationMasterRequest.class); |
| finishReq.setAppAttemptId(appAttemptID); |
| boolean isSuccess = true; |
| if (numFailedContainers.get() == 0) { |
| finishReq.setFinishApplicationStatus(FinalApplicationStatus.SUCCEEDED); |
| } else { |
| finishReq.setFinishApplicationStatus(FinalApplicationStatus.FAILED); |
| String diagnostics = "Diagnostics." + ", total=" + yarnArgs.numContainers + ", completed=" |
| + numCompletedContainers.get() + ", allocated=" + numAllocatedContainers.get() + ", failed=" |
| + numFailedContainers.get(); |
| finishReq.setDiagnostics(diagnostics); |
| isSuccess = false; |
| } |
| resourceManager.finishApplicationMaster(finishReq); |
| return isSuccess; |
| } |
| |
| /** |
| * Thread to connect to the {@link ContainerManager} and launch the container that will execute the shell command. |
| */ |
| private class LaunchContainerRunnable implements Runnable { |
| |
| // Allocated container |
| Container container; |
| // Handle to communicate with ContainerManager |
| ContainerManager cm; |
| |
| /** |
| * @param lcontainer |
| * Allocated container |
| */ |
| public LaunchContainerRunnable(Container lcontainer) { |
| this.container = lcontainer; |
| } |
| |
| /** |
| * Helper function to connect to CM |
| */ |
| private void connectToCM() { |
| logger.debug("Connecting to ContainerManager for containerid=" + container.getId()); |
| String cmIpPortStr = container.getNodeId().getHost() + ":" + container.getNodeId().getPort(); |
| InetSocketAddress cmAddress = NetUtils.createSocketAddr(cmIpPortStr); |
| logger.info("Connecting to ContainerManager at " + cmIpPortStr); |
| this.cm = ((ContainerManager) rpc.getProxy(ContainerManager.class, cmAddress, conf)); |
| } |
| |
| @Override |
| /** |
| * Connects to CM, sets up container launch context |
| * for shell command and eventually dispatches the container |
| * start request to the CM. |
| */ |
| public void run() { |
| |
| // Connect to ContainerManager |
| connectToCM(); |
| |
| logger.info("Setting up container launch container for containerid=" + container.getId()); |
| ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class); |
| |
| ctx.setContainerId(container.getId()); |
| ctx.setResource(container.getResource()); |
| |
| try { |
| if (!Strings.isNullOrEmpty(yarnArgs.user)) { |
| ctx.setUser(yarnArgs.user); |
| } else { |
| logger.info("Using default user name {}", UserGroupInformation.getCurrentUser().getShortUserName()); |
| ctx.setUser(UserGroupInformation.getCurrentUser().getShortUserName()); |
| } |
| } catch (IOException e) { |
| logger.info("Getting current user info failed when trying to launch the container" + e.getMessage()); |
| } |
| |
| // Set the local resources |
| Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(); |
| |
| try { |
| FileSystem fs = FileSystem.get(conf); |
| |
| RemoteIterator<LocatedFileStatus> files = fs.listFiles(new Path(fs.getHomeDirectory(), "/app-" |
| + appAttemptID.getApplicationId().getId()), false); |
| while (files.hasNext()) { |
| LocatedFileStatus file = files.next(); |
| LocalResource localResource = Records.newRecord(LocalResource.class); |
| |
| localResource.setType(LocalResourceType.FILE); |
| localResource.setVisibility(LocalResourceVisibility.APPLICATION); |
| localResource.setResource(ConverterUtils.getYarnUrlFromPath(file.getPath())); |
| localResource.setTimestamp(file.getModificationTime()); |
| localResource.setSize(file.getLen()); |
| localResources.put(file.getPath().getName(), localResource); |
| } |
| ctx.setLocalResources(localResources); |
| |
| } catch (IOException e1) { |
| // TODO Auto-generated catch block |
| e1.printStackTrace(); |
| } |
| |
| StringBuilder classPathEnv = new StringBuilder("${CLASSPATH}:./*"); |
| |
| for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH, |
| YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) { |
| classPathEnv.append(':'); |
| classPathEnv.append(c.trim()); |
| } |
| |
| // classPathEnv.append(System.getProperty("java.class.path")); |
| Map<String, String> env = new HashMap<String, String>(); |
| |
| env.put("CLASSPATH", classPathEnv.toString()); |
| ctx.setEnvironment(env); |
| |
| // Set the necessary command to execute on the allocated container |
| Vector<CharSequence> vargs = new Vector<CharSequence>(5); |
| |
| vargs.add("java"); |
| |
| vargs.add("-Xmx" + containerMemory + "m"); |
| |
| S4YarnClient.addListElementsToCommandLineBuffer(vargs, null, " ", yarnArgs.extraS4NodeJVMParams); |
| |
| // TODO add memory parameter |
| // vargs.add("-Xdebug"); |
| // vargs.add("-Xrunjdwp:transport=dt_socket,address=8889,server=y"); |
| vargs.add("org.apache.s4.core.Main"); |
| vargs.add("-zk=" + yarnArgs.zkString); |
| vargs.add("-c=" + yarnArgs.cluster); |
| |
| List<String> extraModulesClasses = Lists.newArrayList(yarnArgs.extraModulesClasses); |
| // add module for fetchings from hdfs |
| extraModulesClasses.add(HdfsFetcherModule.class.getName()); |
| S4YarnClient.addListElementsToCommandLineBuffer(vargs, CommonS4YarnArgs.EXTRA_MODULES_CLASSES, ",", |
| extraModulesClasses); |
| |
| // add reference to the configuration |
| List<String> namedStringParams = Lists.newArrayList(yarnArgs.extraNamedParameters); |
| if (yarnArgs.test) { |
| namedStringParams.add(FileSystem.FS_DEFAULT_NAME_KEY + "=" + conf.get(FileSystem.FS_DEFAULT_NAME_KEY)); |
| } |
| S4YarnClient.addListElementsToCommandLineBuffer(vargs, CommonS4YarnArgs.NAMED_STRING_PARAMETERS, ",", |
| namedStringParams); |
| |
| // TODO |
| // We should redirect the output to hdfs instead of local logs |
| // so as to be able to look at the final output after the containers |
| // have been released. |
| // Could use a path suffixed with /AppId/AppAttempId/ContainerId/std[out|err] |
| vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/s4-node-stdout"); |
| vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/s4-node-stderr"); |
| |
| // Get final commmand |
| StringBuilder command = new StringBuilder(); |
| for (CharSequence str : vargs) { |
| command.append(str).append(" "); |
| } |
| |
| List<String> commands = new ArrayList<String>(); |
| commands.add(command.toString()); |
| ctx.setCommands(commands); |
| |
| StartContainerRequest startReq = Records.newRecord(StartContainerRequest.class); |
| startReq.setContainerLaunchContext(ctx); |
| try { |
| cm.startContainer(startReq); |
| } catch (YarnRemoteException e) { |
| logger.info("Start container failed for :" + ", containerId=" + container.getId()); |
| e.printStackTrace(); |
| } |
| |
| } |
| } |
| |
| /** |
| * Connect to the Resource Manager |
| * |
| * @return Handle to communicate with the RM |
| */ |
| private AMRMProtocol connectToRM() { |
| YarnConfiguration yarnConf = new YarnConfiguration(conf); |
| InetSocketAddress rmAddress = yarnConf.getSocketAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS, |
| YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS, YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT); |
| logger.info("Connecting to ResourceManager at " + rmAddress); |
| return ((AMRMProtocol) rpc.getProxy(AMRMProtocol.class, rmAddress, conf)); |
| } |
| |
| /** |
| * Register the Application Master to the Resource Manager |
| * |
| * @return the registration response from the RM |
| * @throws YarnRemoteException |
| */ |
| private RegisterApplicationMasterResponse registerToRM() throws YarnRemoteException { |
| RegisterApplicationMasterRequest appMasterRequest = Records.newRecord(RegisterApplicationMasterRequest.class); |
| |
| // set the required info into the registration request: |
| // application attempt id, |
| // host on which the app master is running |
| // rpc port on which the app master accepts requests from the client |
| // tracking url for the app master |
| appMasterRequest.setApplicationAttemptId(appAttemptID); |
| appMasterRequest.setHost(appMasterHostname); |
| appMasterRequest.setRpcPort(appMasterRpcPort); |
| appMasterRequest.setTrackingUrl(appMasterTrackingUrl); |
| |
| return resourceManager.registerApplicationMaster(appMasterRequest); |
| } |
| |
| /** |
| * Setup the request that will be sent to the RM for the container ask. |
| * |
| * @param numContainers |
| * Containers to ask for from RM |
| * @return the setup ResourceRequest to be sent to RM |
| */ |
| private ResourceRequest setupContainerAskForRM(int numContainers) { |
| ResourceRequest request = Records.newRecord(ResourceRequest.class); |
| |
| // setup requirements for hosts |
| // whether a particular rack/host is needed |
| // Refer to apis under org.apache.hadoop.net for more |
| // details on how to get figure out rack/host mapping. |
| // using * as any host will do for the distributed shell app |
| request.setHostName("*"); |
| |
| // set no. of containers needed |
| request.setNumContainers(numContainers); |
| |
| // set the priority for the request |
| Priority pri = Records.newRecord(Priority.class); |
| // TODO - what is the range for priority? how to decide? |
| pri.setPriority(yarnArgs.priority); |
| request.setPriority(pri); |
| |
| // Set up resource type requirements |
| // For now, only memory is supported so we set memory requirements |
| Resource capability = Records.newRecord(Resource.class); |
| capability.setMemory(containerMemory); |
| request.setCapability(capability); |
| |
| return request; |
| } |
| |
| /** |
| * Ask RM to allocate given no. of containers to this Application Master |
| * |
| * @param requestedContainers |
| * Containers to ask for from RM |
| * @return Response from RM to AM with allocated containers |
| * @throws YarnRemoteException |
| */ |
| private AMResponse sendContainerAskToRM(List<ResourceRequest> requestedContainers) throws YarnRemoteException { |
| AllocateRequest req = Records.newRecord(AllocateRequest.class); |
| req.setResponseId(rmRequestID.incrementAndGet()); |
| req.setApplicationAttemptId(appAttemptID); |
| req.addAllAsks(requestedContainers); |
| req.addAllReleases(releasedContainers); |
| req.setProgress((float) numCompletedContainers.get() / yarnArgs.numContainers); |
| |
| logger.info("Sending request to RM for containers" + ", requestedSet=" + requestedContainers.size() |
| + ", releasedSet=" + releasedContainers.size() + ", progress=" + req.getProgress()); |
| |
| for (ResourceRequest rsrcReq : requestedContainers) { |
| logger.info("Requested container ask: " + rsrcReq.toString()); |
| } |
| for (ContainerId id : releasedContainers) { |
| logger.info("Released container, id=" + id.getId()); |
| } |
| |
| AllocateResponse resp = resourceManager.allocate(req); |
| return resp.getAMResponse(); |
| } |
| } |