blob: ce086035fe536447b8fd778149422715e3f9b06f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hama.bsp;
import java.io.File;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.*;
import org.apache.hadoop.yarn.api.records.*;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
public class BSPTaskLauncher {
private static final Log LOG = LogFactory.getLog(BSPTaskLauncher.class);
private final Container allocatedContainer;
private final int id;
private final ContainerManagementProtocol cm;
private final Configuration conf;
private String user;
private final Path jobFile;
private final BSPJobID jobId;
private GetContainerStatusesRequest statusRequest;
@Override
protected void finalize() throws Throwable {
stopAndCleanup();
}
public BSPTaskLauncher(int id, Container container, ContainerManagementProtocol cm,
Configuration conf, Path jobFile, BSPJobID jobId)
throws YarnException {
this.id = id;
this.cm = cm;
this.conf = conf;
this.allocatedContainer = container;
this.jobFile = jobFile;
this.jobId = jobId;
// FIXME why does this contain mapreduce here?
this.user = conf.get("bsp.user.name");
if (this.user == null) {
this.user = conf.get("mapreduce.job.user.name");
}
}
public void stopAndCleanup() throws YarnException, IOException {
StopContainersRequest stopRequest = Records.newRecord(StopContainersRequest.class);
List<ContainerId> containerIds = new ArrayList<ContainerId>();
containerIds.add(allocatedContainer.getId());
LOG.info("getId : " + allocatedContainer.getId());
stopRequest.setContainerIds(containerIds);
LOG.info("StopContainer : " + stopRequest.getContainerIds());
cm.stopContainers(stopRequest);
}
public void start() throws IOException, YarnException {
LOG.info("Spawned task with id: " + this.id
+ " for allocated container id: "
+ this.allocatedContainer.getId().toString());
statusRequest = setupContainer(allocatedContainer, cm, user, id);
}
/**
* This polls the current container status from container manager. Null if the
* container hasn't finished yet.
*
* @return
* @throws Exception
*/
public BSPTaskStatus poll() throws Exception {
ContainerStatus lastStatus = null;
GetContainerStatusesResponse getContainerStatusesResponse = cm.getContainerStatuses(statusRequest);
List<ContainerStatus> containerStatuses = getContainerStatusesResponse.getContainerStatuses();
if (containerStatuses.size() <= 0) {
LOG.info("container Statuses size is zero");
return null;
}
for (ContainerStatus containerStatus : containerStatuses) {
LOG.info("Got container status for containerID=" + containerStatus
.getContainerId() + ", state=" + containerStatus.getState()
+ ", exitStatus=" + containerStatus.getExitStatus() + ", diagnostics="
+ containerStatus.getDiagnostics());
if (containerStatus.getContainerId().equals(allocatedContainer.getId())) {
lastStatus = containerStatus;
break;
}
}
if (lastStatus.getState() != ContainerState.COMPLETE) {
LOG.info("Not completed...");
return null;
}
LOG.info(this.id + " Last report comes with exitstatus of " + lastStatus
.getExitStatus() + " and diagnose string of " + lastStatus
.getDiagnostics());
return new BSPTaskStatus(id, lastStatus.getExitStatus());
}
private GetContainerStatusesRequest setupContainer(
Container allocatedContainer, ContainerManagementProtocol cm, String user, int id) throws IOException, YarnException {
LOG.info("Setting up a container for user " + user + " with id of " + id
+ " and containerID of " + allocatedContainer.getId() + " as " + user);
// Now we setup a ContainerLaunchContext
ContainerLaunchContext ctx = Records
.newRecord(ContainerLaunchContext.class);
// Set the local resources
Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
LocalResource packageResource = Records.newRecord(LocalResource.class);
FileSystem fs = FileSystem.get(conf);
Path packageFile = new Path(System.getenv(YARNBSPConstants.HAMA_YARN_LOCATION));
URL packageUrl = ConverterUtils.getYarnUrlFromPath(packageFile
.makeQualified(fs.getUri(), fs.getWorkingDirectory()));
LOG.info("PackageURL has been composed to " + packageUrl.toString());
try {
LOG.info("Reverting packageURL to path: "
+ ConverterUtils.getPathFromYarnURL(packageUrl));
} catch (URISyntaxException e) {
LOG.fatal("If you see this error the workarround does not work", e);
}
packageResource.setResource(packageUrl);
packageResource.setSize(Long.parseLong(System.getenv(YARNBSPConstants.HAMA_YARN_SIZE)));
packageResource.setTimestamp(Long.parseLong(System.getenv(YARNBSPConstants.HAMA_YARN_TIMESTAMP)));
packageResource.setType(LocalResourceType.FILE);
packageResource.setVisibility(LocalResourceVisibility.APPLICATION);
localResources.put(YARNBSPConstants.APP_MASTER_JAR_PATH, packageResource);
Path hamaReleaseFile = new Path(System.getenv(YARNBSPConstants.HAMA_LOCATION));
URL hamaReleaseUrl = ConverterUtils.getYarnUrlFromPath(hamaReleaseFile
.makeQualified(fs.getUri(), fs.getWorkingDirectory()));
LOG.info("Hama release URL has been composed to " + hamaReleaseUrl.toString());
RemoteIterator<LocatedFileStatus> fileStatusListIterator = fs.listFiles(
hamaReleaseFile, true);
while(fileStatusListIterator.hasNext()) {
LocatedFileStatus lfs = fileStatusListIterator.next();
LocalResource localRsrc = LocalResource.newInstance(
ConverterUtils.getYarnUrlFromPath(lfs.getPath()),
LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
lfs.getLen(), lfs.getModificationTime());
localResources.put(lfs.getPath().getName(), localRsrc);
}
ctx.setLocalResources(localResources);
/*
* TODO Package classpath seems not to work if you're in pseudo distributed
* mode, because the resource must not be moved, it will never be unpacked.
* So we will check if our jar file has the file:// prefix and put it into
* the CP directly
*/
StringBuilder classPathEnv = new StringBuilder(
ApplicationConstants.Environment.CLASSPATH.$()).append(File.pathSeparatorChar)
.append("./*");
for (String c : conf.getStrings(
YarnConfiguration.YARN_APPLICATION_CLASSPATH,
YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
classPathEnv.append(File.pathSeparatorChar);
classPathEnv.append(c.trim());
}
Vector<CharSequence> vargs = new Vector<CharSequence>();
vargs.add("${JAVA_HOME}/bin/java");
vargs.add("-cp " + classPathEnv + "");
vargs.add(BSPRunner.class.getCanonicalName());
vargs.add(jobId.getJtIdentifier());
vargs.add(Integer.toString(id));
vargs.add(this.jobFile.makeQualified(fs.getUri(), fs.getWorkingDirectory())
.toString());
vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/bsp.stdout");
vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/bsp.stderr");
// Get final commmand
StringBuilder command = new StringBuilder();
for (CharSequence str : vargs) {
command.append(str).append(" ");
}
List<String> commands = new ArrayList<String>();
commands.add(command.toString());
ctx.setCommands(commands);
LOG.info("Starting command: " + commands);
StartContainerRequest startReq = Records
.newRecord(StartContainerRequest.class);
startReq.setContainerLaunchContext(ctx);
startReq.setContainerToken(allocatedContainer.getContainerToken());
List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
list.add(startReq);
StartContainersRequest requestList = StartContainersRequest.newInstance(list);
cm.startContainers(requestList);
GetContainerStatusesRequest statusReq = Records
.newRecord(GetContainerStatusesRequest.class);
List<ContainerId> containerIds = new ArrayList<ContainerId>();
containerIds.add(allocatedContainer.getId());
statusReq.setContainerIds(containerIds);
return statusReq;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + id;
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
BSPTaskLauncher other = (BSPTaskLauncher) obj;
if (id != other.id)
return false;
return true;
}
public static class BSPTaskStatus {
private final int id;
private final int exitStatus;
public BSPTaskStatus(int id, int exitStatus) {
super();
this.id = id;
this.exitStatus = exitStatus;
}
public int getId() {
return id;
}
public int getExitStatus() {
return exitStatus;
}
}
}