| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * regarding copyright ownership. The ASF licenses this file |
| * distributed with this work for additional information |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hama.bsp; |
| |
| import java.io.IOException; |
| import java.net.InetSocketAddress; |
| import java.security.PrivilegedAction; |
| import java.util.*; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.net.NetUtils; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.yarn.api.ApplicationConstants; |
| import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; |
| import org.apache.hadoop.yarn.api.ContainerManagementProtocol; |
| import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; |
| import org.apache.hadoop.yarn.api.records.*; |
| import org.apache.hadoop.yarn.api.records.Token; |
| import org.apache.hadoop.yarn.client.api.NMTokenCache; |
| import org.apache.hadoop.yarn.exceptions.YarnException; |
| import org.apache.hadoop.yarn.ipc.YarnRPC; |
| import org.apache.hadoop.yarn.util.ConverterUtils; |
| import org.apache.hadoop.yarn.util.Records; |
| import org.apache.hama.bsp.BSPTaskLauncher.BSPTaskStatus; |
| |
| public class JobImpl implements Job { |
| |
| private static final Log LOG = LogFactory.getLog(JobImpl.class); |
| private static final int DEFAULT_MEMORY_MB = 256; |
| |
| private Configuration conf; |
| private BSPJobID jobId; |
| private int numBSPTasks; |
| private int priority = 0; |
| private String childOpts; |
| private int taskMemoryInMb; |
| private Path jobFile; |
| |
| private JobState state; |
| private BSPPhase phase; |
| |
| private ApplicationAttemptId appAttemptId; |
| private YarnRPC yarnRPC; |
| private ApplicationMasterProtocol resourceManager; |
| |
| private List<Container> allocatedContainers; |
| private List<ContainerId> releasedContainers = Collections.emptyList(); |
| |
| private Map<Integer, BSPTaskLauncher> launchers = new HashMap<Integer, BSPTaskLauncher>(); |
| private Deque<BSPTaskLauncher> completionQueue = new LinkedList<BSPTaskLauncher>(); |
| |
| private int lastResponseID = 0; |
| |
| private int getMemoryRequirements() { |
| String newMemoryProperty = conf.get("bsp.child.mem.in.mb"); |
| if (newMemoryProperty == null) { |
| LOG.warn("\"bsp.child.mem.in.mb\" was not set! Try parsing the child opts..."); |
| return getMemoryFromOptString(childOpts); |
| } else { |
| return Integer.valueOf(newMemoryProperty); |
| } |
| } |
| |
| public JobImpl(ApplicationAttemptId appAttemptId, |
| Configuration jobConfiguration, YarnRPC yarnRPC, ApplicationMasterProtocol amrmRPC, |
| String jobFile, BSPJobID jobId) { |
| super(); |
| this.appAttemptId = appAttemptId; |
| this.yarnRPC = yarnRPC; |
| this.resourceManager = amrmRPC; |
| this.jobFile = new Path(jobFile); |
| this.state = JobState.NEW; |
| this.jobId = jobId; |
| this.conf = jobConfiguration; |
| this.numBSPTasks = conf.getInt("bsp.peers.num", 1); |
| this.childOpts = conf.get("bsp.child.java.opts"); |
| |
| this.taskMemoryInMb = getMemoryRequirements(); |
| } |
| |
| // This really needs a testcase |
| private static int getMemoryFromOptString(String opts) { |
| if (opts == null) { |
| return DEFAULT_MEMORY_MB; |
| } |
| |
| if (!opts.contains("-Xmx")) { |
| LOG.info("No \"-Xmx\" option found in child opts, using default amount of memory!"); |
| return DEFAULT_MEMORY_MB; |
| } else { |
| // e.G: -Xmx512m |
| |
| int startIndex = opts.indexOf("-Xmx") + 4; |
| String xmxString = opts.substring(startIndex); |
| char qualifier = xmxString.charAt(xmxString.length() - 1); |
| int memory = Integer.valueOf(xmxString.substring(0, |
| xmxString.length() - 1)); |
| if (qualifier == 'm') { |
| return memory; |
| } else if (qualifier == 'g') { |
| return memory * 1024; |
| } else { |
| throw new IllegalArgumentException( |
| "Memory Limit in child opts was not set! \"bsp.child.java.opts\" String was: " |
| + opts); |
| } |
| } |
| } |
| |
| @Override |
| public JobState startJob() throws Exception { |
| |
| this.allocatedContainers = new ArrayList<Container>(numBSPTasks); |
| NMTokenCache nmTokenCache = new NMTokenCache(); |
| while (allocatedContainers.size() < numBSPTasks) { |
| AllocateRequest req = AllocateRequest.newInstance(lastResponseID, 0.0f, |
| createBSPTaskRequest(numBSPTasks - allocatedContainers.size(), taskMemoryInMb, |
| priority), releasedContainers, null); |
| |
| AllocateResponse allocateResponse = resourceManager.allocate(req); |
| for (NMToken token : allocateResponse.getNMTokens()) { |
| nmTokenCache.setToken(token.getNodeId().toString(), token.getToken()); |
| } |
| |
| LOG.info("Got response ID: " + allocateResponse.getResponseId() |
| + " with num of containers: " |
| + allocateResponse.getAllocatedContainers().size() |
| + " and following resources: " |
| + allocateResponse.getAvailableResources().getMemory() + "mb"); |
| this.lastResponseID = allocateResponse.getResponseId(); |
| |
| this.allocatedContainers.addAll(allocateResponse.getAllocatedContainers()); |
| |
| LOG.info("Waiting to allocate " + (numBSPTasks - allocatedContainers.size()) + " more containers..."); |
| |
| Thread.sleep(1000l); |
| } |
| |
| LOG.info("Got " + allocatedContainers.size() + " containers!"); |
| |
| int id = 0; |
| for (Container allocatedContainer : allocatedContainers) { |
| LOG.info("Launching task on a new container." + ", containerId=" |
| + allocatedContainer.getId() + ", containerNode=" |
| + allocatedContainer.getNodeId().getHost() + ":" |
| + allocatedContainer.getNodeId().getPort() + ", containerNodeURI=" |
| + allocatedContainer.getNodeHttpAddress() + ", containerResourceMemory" |
| + allocatedContainer.getResource().getMemory()); |
| |
| // Connect to ContainerManager on the allocated container |
| String user = conf.get("bsp.user.name"); |
| if (user == null) { |
| user = System.getenv(ApplicationConstants.Environment.USER.name()); |
| } |
| |
| ContainerManagementProtocol cm = null; |
| try { |
| cm = getContainerManagementProtocolProxy(yarnRPC, |
| nmTokenCache.getToken(allocatedContainer.getNodeId().toString()), allocatedContainer.getNodeId(), user); |
| } catch (Exception e) { |
| LOG.error("Failed to create ContainerManager..."); |
| if (cm != null) |
| yarnRPC.stopProxy(cm, conf); |
| e.printStackTrace(); |
| } |
| |
| BSPTaskLauncher runnableLaunchContainer = new BSPTaskLauncher(id, |
| allocatedContainer, cm, conf, jobFile, jobId); |
| |
| launchers.put(id, runnableLaunchContainer); |
| runnableLaunchContainer.start(); |
| completionQueue.add(runnableLaunchContainer); |
| id++; |
| } |
| |
| LOG.info("Waiting for tasks to finish..."); |
| state = JobState.RUNNING; |
| int completed = 0; |
| |
| while (completed != numBSPTasks) { |
| for (BSPTaskLauncher task : completionQueue) { |
| BSPTaskStatus returnedTask = task.poll(); |
| if(returnedTask != null && returnedTask.getExitStatus() == 0) { |
| LOG.info("Task \"" + returnedTask.getId() |
| + "\" sucessfully finished!"); |
| completed++; |
| LOG.info("Waiting for " + (numBSPTasks - completed) |
| + " tasks to finish!"); |
| } |
| |
| if(returnedTask != null && returnedTask.getExitStatus() != 0) { |
| LOG.error("Task with id \"" + returnedTask.getId() + "\" failed!"); |
| completionQueue.add(task); |
| state = JobState.FAILED; |
| return state; |
| } |
| } |
| Thread.sleep(1000L); |
| } |
| |
| state = JobState.SUCCESS; |
| return state; |
| } |
| |
| /** |
| * |
| * @param rpc |
| * @param nmToken |
| * @param nodeId |
| * @param user |
| * @return |
| */ |
| protected ContainerManagementProtocol getContainerManagementProtocolProxy( |
| final YarnRPC rpc, Token nmToken, NodeId nodeId, String user) { |
| ContainerManagementProtocol proxy; |
| UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user); |
| final InetSocketAddress addr = |
| NetUtils.createSocketAddr(nodeId.getHost(), nodeId.getPort()); |
| if (nmToken != null) { |
| ugi.addToken(ConverterUtils.convertFromYarn(nmToken, addr)); |
| } |
| |
| proxy = ugi |
| .doAs(new PrivilegedAction<ContainerManagementProtocol>() { |
| @Override |
| public ContainerManagementProtocol run() { |
| return (ContainerManagementProtocol) rpc.getProxy( |
| ContainerManagementProtocol.class, |
| addr, conf); |
| } |
| }); |
| return proxy; |
| } |
| |
| /** |
| * Makes a lookup for the taskid and stops its container and task. It also |
| * removes the task from the launcher so that we won't have to stop it twice. |
| * |
| * @param id |
| * @throws YarnException |
| */ |
| private void cleanupTask(int id) throws YarnException, IOException { |
| BSPTaskLauncher bspTaskLauncher = launchers.get(id); |
| bspTaskLauncher.stopAndCleanup(); |
| launchers.remove(id); |
| completionQueue.remove(bspTaskLauncher); |
| } |
| |
| private List<ResourceRequest> createBSPTaskRequest(int numTasks, |
| int memoryInMb, int priority) { |
| |
| List<ResourceRequest> reqList = new ArrayList<ResourceRequest>(numTasks); |
| for (int i = 0; i < numTasks; i++) { |
| // Resource Request |
| ResourceRequest rsrcRequest = Records.newRecord(ResourceRequest.class); |
| |
| // setup requirements for hosts |
| // whether a particular rack/host is needed |
| // useful for applications that are sensitive |
| // to data locality |
| rsrcRequest.setResourceName("*"); |
| |
| // set the priority for the request |
| Priority pri = Records.newRecord(Priority.class); |
| pri.setPriority(priority); |
| rsrcRequest.setPriority(pri); |
| |
| // Set up resource type requirements |
| // For now, only memory is supported so we set memory requirements |
| Resource capability = Records.newRecord(Resource.class); |
| capability.setMemory(memoryInMb); |
| rsrcRequest.setCapability(capability); |
| |
| // set no. of containers needed |
| // matching the specifications |
| rsrcRequest.setNumContainers(numBSPTasks); |
| reqList.add(rsrcRequest); |
| } |
| return reqList; |
| } |
| |
| @Override |
| public void cleanup() throws YarnException, IOException { |
| for (BSPTaskLauncher launcher : completionQueue) { |
| LOG.info("cleanup tasks !!!"); |
| launcher.stopAndCleanup(); |
| } |
| } |
| |
| @Override |
| public JobState getState() { |
| return state; |
| } |
| |
| @Override |
| public int getTotalBSPTasks() { |
| return numBSPTasks; |
| } |
| |
| @Override |
| public BSPPhase getBSPPhase() { |
| return phase; |
| } |
| |
| } |