| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hama.bsp; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.util.*; |
| |
| import org.apache.commons.io.FileUtils; |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.io.DataOutputBuffer; |
| import org.apache.hadoop.security.Credentials; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.security.token.Token; |
| import org.apache.hadoop.yarn.api.ApplicationConstants; |
| import org.apache.hadoop.yarn.api.records.*; |
| import org.apache.hadoop.yarn.client.api.YarnClient; |
| import org.apache.hadoop.yarn.client.api.YarnClientApplication; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.hadoop.yarn.exceptions.YarnException; |
| import org.apache.hadoop.yarn.util.ConverterUtils; |
| import org.apache.hadoop.yarn.util.Records; |
| import org.apache.hama.HamaConfiguration; |
| |
| public class YARNBSPJobClient extends BSPJobClient { |
| |
| private static final Log LOG = LogFactory.getLog(YARNBSPJobClient.class); |
| |
| private ApplicationId id; |
| private ApplicationReport report; |
| |
| // Configuration |
| private YarnClient yarnClient; |
| private YarnConfiguration yarnConf; |
| |
| // Start time for client |
| private final long clientStartTime = System.currentTimeMillis(); |
| // Timeout threshold for client. Kill app after time interval expires. |
| private long clientTimeout = 60000; |
| |
| class NetworkedJob implements RunningJob { |
| @Override |
| public BSPJobID getID() { |
| return null; |
| } |
| |
| @Override |
| public String getJobName() { |
| return null; |
| } |
| |
| @Override |
| public long progress() throws IOException { |
| return 0; |
| } |
| |
| @Override |
| public boolean isComplete() throws IOException { |
| return false; |
| } |
| |
| @Override |
| public boolean isSuccessful() throws IOException { |
| return false; |
| } |
| |
| @Override |
| public void waitForCompletion() throws IOException { |
| |
| } |
| |
| @Override |
| public int getJobState() throws IOException { |
| return 0; |
| } |
| |
| @Override |
| public void killJob() throws IOException { |
| |
| } |
| |
| @Override |
| public void killTask(TaskAttemptID taskId, boolean shouldFail) throws IOException { |
| |
| } |
| |
| @Override |
| public long getSuperstepCount() throws IOException { |
| return 0; |
| } |
| |
| @Override |
| public JobStatus getStatus() { |
| return null; |
| } |
| |
| @Override |
| public TaskCompletionEvent[] getTaskCompletionEvents(int eventCounter) { |
| return new TaskCompletionEvent[0]; |
| } |
| |
| @Override |
| public String getJobFile() { |
| return null; |
| } |
| |
| @Override |
| public Counters getCounters() { |
| // TODO Auto-generated method stub |
| return null; |
| } |
| } |
| |
| public YARNBSPJobClient(HamaConfiguration conf) { |
| setConf(conf); |
| yarnConf = new YarnConfiguration(conf); |
| yarnClient = YarnClient.createYarnClient(); |
| yarnClient.init(yarnConf); |
| } |
| |
| @Override |
| protected RunningJob launchJob(BSPJobID jobId, BSPJob normalJob, |
| Path submitJobFile, FileSystem pFs) throws IOException { |
| YARNBSPJob job = (YARNBSPJob) normalJob; |
| |
| LOG.info("Submitting job..."); |
| if (getConf().get("bsp.child.mem.in.mb") == null) { |
| LOG.warn("BSP Child memory has not been set, YARN will guess your needs or use default values."); |
| } |
| |
| FileSystem fs = pFs; |
| if (fs == null) { |
| fs = FileSystem.get(getConf()); |
| } |
| |
| if (getConf().get("bsp.user.name") == null) { |
| String s = getUnixUserName(); |
| getConf().set("bsp.user.name", s); |
| LOG.debug("Retrieved username: " + s); |
| } |
| |
| yarnClient.start(); |
| try { |
| YarnClusterMetrics clusterMetrics = yarnClient.getYarnClusterMetrics(); |
| LOG.info("Got Cluster metric info from ASM" |
| + ", numNodeManagers=" + clusterMetrics.getNumNodeManagers()); |
| |
| List<NodeReport> clusterNodeReports = yarnClient.getNodeReports( |
| NodeState.RUNNING); |
| LOG.info("Got Cluster node info from ASM"); |
| for (NodeReport node : clusterNodeReports) { |
| LOG.info("Got node report from ASM for" |
| + ", nodeId=" + node.getNodeId() |
| + ", nodeAddress" + node.getHttpAddress() |
| + ", nodeRackName" + node.getRackName() |
| + ", nodeNumContainers" + node.getNumContainers()); |
| } |
| |
| QueueInfo queueInfo = yarnClient.getQueueInfo("default"); |
| LOG.info("Queue info" |
| + ", queueName=" + queueInfo.getQueueName() |
| + ", queueCurrentCapacity=" + queueInfo.getCurrentCapacity() |
| + ", queueMaxCapacity=" + queueInfo.getMaximumCapacity() |
| + ", queueApplicationCount=" + queueInfo.getApplications().size() |
| + ", queueChildQueueCount=" + queueInfo.getChildQueues().size()); |
| |
| List<QueueUserACLInfo> listAclInfo = yarnClient.getQueueAclsInfo(); |
| for (QueueUserACLInfo aclInfo : listAclInfo) { |
| for (QueueACL userAcl : aclInfo.getUserAcls()) { |
| LOG.info("User ACL Info for Queue" |
| + ", queueName=" + aclInfo.getQueueName() |
| + ", userAcl=" + userAcl.name()); |
| } |
| } |
| |
| // Get a new application id |
| YarnClientApplication app = yarnClient.createApplication(); |
| |
| |
| // Create a new ApplicationSubmissionContext |
| //ApplicationSubmissionContext appContext = Records.newRecord(ApplicationSubmissionContext.class); |
| ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext(); |
| |
| id = appContext.getApplicationId(); |
| |
| // set the application name |
| appContext.setApplicationName(job.getJobName()); |
| |
| // Create a new container launch context for the AM's container |
| ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class); |
| |
| // Define the local resources required |
| Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(); |
| // Lets assume the jar we need for our ApplicationMaster is available in |
| // HDFS at a certain known path to us and we want to make it available to |
| // the ApplicationMaster in the launched container |
| if (job.getJar() == null) { |
| throw new IllegalArgumentException("Jar must be set in order to run the application!"); |
| } |
| |
| Path jarPath = new Path(job.getJar()); |
| jarPath = fs.makeQualified(jarPath); |
| getConf().set("bsp.jar", jarPath.makeQualified(fs.getUri(), jarPath).toString()); |
| |
| FileStatus jarStatus = fs.getFileStatus(jarPath); |
| LocalResource amJarRsrc = Records.newRecord(LocalResource.class); |
| amJarRsrc.setType(LocalResourceType.FILE); |
| amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION); |
| amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(jarPath)); |
| amJarRsrc.setTimestamp(jarStatus.getModificationTime()); |
| amJarRsrc.setSize(jarStatus.getLen()); |
| |
| // this creates a symlink in the working directory |
| localResources.put(YARNBSPConstants.APP_MASTER_JAR_PATH, amJarRsrc); |
| |
| // add hama related jar files to localresources for container |
| List<File> hamaJars; |
| if (System.getProperty("hama.home.dir") != null) |
| hamaJars = localJarfromPath(System.getProperty("hama.home.dir")); |
| else |
| hamaJars = localJarfromPath(getConf().get("hama.home.dir")); |
| String hamaPath = getSystemDir() + "/hama"; |
| for (File fileEntry : hamaJars) { |
| addToLocalResources(fs, fileEntry.getCanonicalPath(), |
| hamaPath, fileEntry.getName(), localResources); |
| } |
| |
| // Set the local resources into the launch context |
| amContainer.setLocalResources(localResources); |
| |
| // Set up the environment needed for the launch context |
| Map<String, String> env = new HashMap<String, String>(); |
| // Assuming our classes or jars are available as local resources in the |
| // working directory from which the command will be run, we need to append |
| // "." to the path. |
| // By default, all the hadoop specific classpaths will already be available |
| // in $CLASSPATH, so we should be careful not to overwrite it. |
| StringBuilder classPathEnv = new StringBuilder( |
| ApplicationConstants.Environment.CLASSPATH.$()).append(File.pathSeparatorChar) |
| .append("./*"); |
| for (String c : yarnConf.getStrings( |
| YarnConfiguration.YARN_APPLICATION_CLASSPATH, |
| YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) { |
| classPathEnv.append(File.pathSeparatorChar); |
| classPathEnv.append(c.trim()); |
| } |
| |
| env.put(YARNBSPConstants.HAMA_YARN_LOCATION, jarPath.toUri().toString()); |
| env.put(YARNBSPConstants.HAMA_YARN_SIZE, Long.toString(jarStatus.getLen())); |
| env.put(YARNBSPConstants.HAMA_YARN_TIMESTAMP, Long.toString(jarStatus.getModificationTime())); |
| |
| env.put(YARNBSPConstants.HAMA_LOCATION, hamaPath); |
| env.put("CLASSPATH", classPathEnv.toString()); |
| amContainer.setEnvironment(env); |
| |
| // Set the necessary command to execute on the allocated container |
| Vector<CharSequence> vargs = new Vector<CharSequence>(5); |
| vargs.add("${JAVA_HOME}/bin/java"); |
| vargs.add("-cp " + classPathEnv + ""); |
| vargs.add(ApplicationMaster.class.getCanonicalName()); |
| vargs.add(submitJobFile.makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString()); |
| |
| vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/hama-appmaster.stdout"); |
| vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/hama-appmaster.stderr"); |
| |
| // Get final commmand |
| StringBuilder command = new StringBuilder(); |
| for (CharSequence str : vargs) { |
| command.append(str).append(" "); |
| } |
| |
| List<String> commands = new ArrayList<String>(); |
| commands.add(command.toString()); |
| amContainer.setCommands(commands); |
| |
| LOG.debug("Start command: " + command); |
| |
| Resource capability = Records.newRecord(Resource.class); |
| // we have at least 3 threads, which comsumes 1mb each, for each bsptask and |
| // a base usage of 100mb |
| capability.setMemory(3 * job.getNumBspTask() + getConf().getInt("hama.appmaster.memory.mb", 100)); |
| LOG.info("Set memory for the application master to " + capability.getMemory() + "mb!"); |
| |
| // Set the container launch content into the ApplicationSubmissionContext |
| appContext.setResource(capability); |
| |
| // Setup security tokens |
| if (UserGroupInformation.isSecurityEnabled()) { |
| // Note: Credentials class is marked as LimitedPrivate for HDFS and MapReduce |
| Credentials credentials = new Credentials(); |
| String tokenRenewer = yarnConf.get(YarnConfiguration.RM_PRINCIPAL); |
| if (tokenRenewer == null || tokenRenewer.length() == 0) { |
| throw new IOException( |
| "Can't get Master Kerberos principal for the RM to use as renewer"); |
| } |
| |
| // For now, only getting tokens for the default file-system. |
| final Token<?> tokens[] = |
| fs.addDelegationTokens(tokenRenewer, credentials); |
| if (tokens != null) { |
| for (Token<?> token : tokens) { |
| LOG.info("Got dt for " + fs.getUri() + "; " + token); |
| } |
| } |
| DataOutputBuffer dob = new DataOutputBuffer(); |
| credentials.writeTokenStorageToStream(dob); |
| ByteBuffer fsTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); |
| amContainer.setTokens(fsTokens); |
| } |
| |
| appContext.setAMContainerSpec(amContainer); |
| |
| // Create the request to send to the ApplicationsManager |
| ApplicationId appId = appContext.getApplicationId(); |
| yarnClient.submitApplication(appContext); |
| |
| return monitorApplication(appId) ? new NetworkedJob() : null; |
| } catch (YarnException e) { |
| e.printStackTrace(); |
| return null; |
| } |
| } |
| |
| @Override |
| public Path getSystemDir() { |
| return new Path(getConf().get("bsp.local.dir", "/tmp/hama-yarn/")); |
| } |
| |
| ApplicationId getId() { |
| return id; |
| } |
| |
| public ApplicationReport getReport() { |
| return report; |
| } |
| |
| private boolean monitorApplication(ApplicationId appId) |
| throws IOException, YarnException { |
| while (true) { |
| try { |
| Thread.sleep(1000); |
| } catch (InterruptedException e) { |
| LOG.debug("Thread sleep in monitoring loop interrupted"); |
| } |
| |
| // Get application report for the appId we are interested in |
| report = yarnClient.getApplicationReport(appId); |
| |
| LOG.info("Got application report from ASM for" + ", appId=" |
| + appId.getId() + ", clientToAMToken=" |
| + report.getClientToAMToken() + ", appDiagnostics=" |
| + report.getDiagnostics() + ", appMasterHost=" |
| + report.getHost() + ", appQueue=" + report.getQueue() |
| + ", appMasterRpcPort=" + report.getRpcPort() |
| + ", appStartTime=" + report.getStartTime() |
| + ", yarnAppState=" |
| + report.getYarnApplicationState().toString() |
| + ", distributedFinalState=" |
| + report.getFinalApplicationStatus().toString() |
| + ", appTrackingUrl=" + report.getTrackingUrl() |
| + ", appUser=" + report.getUser()); |
| |
| YarnApplicationState state = report.getYarnApplicationState(); |
| FinalApplicationStatus dsStatus = report |
| .getFinalApplicationStatus(); |
| if (YarnApplicationState.FINISHED == state) { |
| if (FinalApplicationStatus.SUCCEEDED == dsStatus) { |
| LOG.info("Application has completed successfully. Breaking monitoring loop"); |
| return true; |
| } else { |
| LOG.info("Application did finished unsuccessfully." |
| + " YarnState=" + state.toString() |
| + ", DSFinalStatus=" + dsStatus.toString() |
| + ". Breaking monitoring loop"); |
| return false; |
| } |
| } else if (YarnApplicationState.KILLED == state |
| || YarnApplicationState.FAILED == state) { |
| LOG.info("Application did not finish." + " YarnState=" |
| + state.toString() + ", DSFinalStatus=" |
| + dsStatus.toString() + ". Breaking monitoring loop"); |
| return false; |
| } |
| |
| if (System.currentTimeMillis() > (clientStartTime + clientTimeout)) { |
| LOG.info("Reached client specified timeout for application. Killing application"); |
| forceKillApplication(appId); |
| return false; |
| } |
| } |
| } |
| |
| /** |
| * Kill a submitted application by sending a call to the ASM |
| * @param appId Application Id to be killed. |
| * @throws YarnException |
| * @throws IOException |
| */ |
| private void forceKillApplication(ApplicationId appId) |
| throws YarnException, IOException { |
| // TODO clarify whether multiple jobs with the same app id can be submitted and be running at |
| // the same time. |
| // If yes, can we kill a particular attempt only? |
| |
| // Response can be ignored as it is non-null on success or |
| // throws an exception in case of failures |
| yarnClient.killApplication(appId); |
| } |
| |
| private List<File> localJarfromPath(String path) throws IOException { |
| File hamaHome = new File(path); |
| String[] extensions = new String[]{"jar"}; |
| List<File> files = (List<File>)FileUtils.listFiles(hamaHome, extensions, true); |
| |
| return files; |
| } |
| |
| private void addToLocalResources(FileSystem fs, String fileSrcPath, |
| String fileDstPath, String fileName, Map<String, LocalResource> localResources) |
| throws IOException { |
| Path dstPath = new Path(fileDstPath, fileName); |
| dstPath = fs.makeQualified(dstPath); |
| fs.copyFromLocalFile(false, true, new Path(fileSrcPath), dstPath); |
| FileStatus fileStatus = fs.getFileStatus(dstPath); |
| LocalResource localRsrc = |
| LocalResource.newInstance( |
| ConverterUtils.getYarnUrlFromURI(dstPath.toUri()), |
| LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, |
| fileStatus.getLen(), fileStatus.getModificationTime()); |
| localResources.put(fileName, localRsrc); |
| } |
| } |