HAMA-939: Refactoring which was implement using out-of-date status response
git-svn-id: https://svn.apache.org/repos/asf/hama/trunk@1683197 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index e914bb1..b689b5d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -26,6 +26,7 @@
IMPROVEMENTS
+ HAMA-939: Refactoring which was implement using out-of-date status response (Minho Kim via edwardyoon)
HAMA-955: Support UnsafeByteArrayInputStream and UnSafeByteArrayOutputStream (Minho Kim via edwardyoon)
HAMA-944: Add JSON format option to fastgen command (Minho Kim via edwardyoon)
HAMA-919: Manage messages per Vertex (edwardyoon)
diff --git a/yarn/src/main/java/org/apache/hama/bsp/ApplicationMaster.java b/yarn/src/main/java/org/apache/hama/bsp/ApplicationMaster.java
new file mode 100644
index 0000000..62663fd
--- /dev/null
+++ b/yarn/src/main/java/org/apache/hama/bsp/ApplicationMaster.java
@@ -0,0 +1,1011 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.ExitUtil;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.*;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
+import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.hama.Constants;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.sync.SyncServer;
+import org.apache.hama.bsp.sync.SyncServiceFactory;
+import org.apache.hama.ipc.BSPPeerProtocol;
+import org.apache.hama.ipc.RPC;
+import org.apache.hama.ipc.Server;
+import org.apache.hama.util.BSPNetUtils;
+import org.apache.log4j.LogManager;
+
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class ApplicationMaster implements BSPClient, BSPPeerProtocol {
+ private static final Log LOG = LogFactory.getLog(ApplicationMaster.class);
+
+ // Configuration
+ private Configuration localConf;
+ private Configuration jobConf;
+
+ private String jobFile;
+ private String applicationName;
+ // RPC info where the AM receive client side requests
+ private String hostname;
+ private int clientPort;
+ private FileSystem fs;
+ private static int id = 0;
+
+ private volatile long superstep;
+ private Counters globalCounter = new Counters();
+ private BSPJobClient.RawSplit[] splits;
+
+ private BSPJobID jobId;
+
+ // SyncServer for Zookeeper
+ private SyncServer syncServer;
+
+ // Zookeeper thread pool
+ private static final ExecutorService threadPool = Executors
+ .newFixedThreadPool(1);
+
+ // RPC info where the AM receive client side requests
+ private int taskServerPort;
+
+ private Server clientServer;
+ private Server taskServer;
+
+ // Handle to communicate with the Resource Manager
+ @SuppressWarnings("rawtypes")
+ private AMRMClientAsync amRMClient;
+
+ // In both secure and non-secure modes, this points to the job-submitter.
+ @VisibleForTesting
+ UserGroupInformation appSubmitterUgi;
+
+ // Handle to communicate with the Node Manager
+ private NMClientAsync nmClientAsync;
+ // Listen to process the response from the Node Manager
+ private NMCallbackHandler containerListener;
+
+ // Application Attempt Id ( combination of attemptId and fail count )
+ @VisibleForTesting
+ protected ApplicationAttemptId appAttemptID;
+
+
+ // TODO
+ // For status update for clients - yet to be implemented
+ // Hostname of the container
+ private String appMasterHostname = "";
+ // Port on which the app master listens for status updates from clients
+ private int appMasterRpcPort = -1;
+ // Tracking url to which app master publishes info for clients to monitor
+ private String appMasterTrackingUrl = "";
+
+ // App Master configuration
+ // No. of containers to run shell command on
+ @VisibleForTesting
+ protected int numTotalContainers;
+ // Memory to request for the container on which the shell command will run
+ private int containerMemory;
+ // VirtualCores to request for the container on which the shell command will run
+ private int containerVirtualCores = 1;
+
+ // Priority of the request
+ private int requestPriority = 0;
+
+ // Counter for completed containers ( complete denotes successful or failed )
+ private AtomicInteger numCompletedContainers = new AtomicInteger();
+ // Allocated container count so that we know how many containers has the RM
+ // allocated to us
+ @VisibleForTesting
+ protected AtomicInteger numAllocatedContainers = new AtomicInteger();
+ // Count of failed containers
+ private AtomicInteger numFailedContainers = new AtomicInteger();
+ // Count of containers already requested from the RM
+ // Needed as once requested, we should not request for containers again.
+ // Only request for more if the original requirement changes.
+ @VisibleForTesting
+ protected AtomicInteger numRequestedContainers = new AtomicInteger();
+
+ private volatile boolean done;
+ private ByteBuffer allTokens;
+
+ // Launch threads
+ private List<Thread> launchThreads = new ArrayList<Thread>();
+
+ @VisibleForTesting
+ protected final Set<ContainerId> launchedContainers =
+ Collections.newSetFromMap(new ConcurrentHashMap<ContainerId, Boolean>());
+
+ public ApplicationMaster() {
+ // Set up the configuration
+ this.localConf = new YarnConfiguration();
+ }
+
+ public static void main(String[] args) throws IOException {
+ boolean result = false;
+ ApplicationMaster appMaster = new ApplicationMaster();
+
+ try {
+ LOG.info("Initializing ApplicationMaster");
+ boolean doRun = appMaster.init(args);
+ if (!doRun) {
+ System.exit(0);
+ }
+ appMaster.run();
+ result = appMaster.finish();
+ } catch (Throwable t) {
+ LOG.fatal("Error running ApplicationMaster", t);
+ LogManager.shutdown();
+ ExitUtil.terminate(1, t);
+ } finally {
+ appMaster.close();
+ }
+
+ if (result) {
+ LOG.info("Application Master completed successfully. exiting");
+ System.exit(0);
+ } else {
+ LOG.info("Application Master failed. exiting");
+ System.exit(2);
+ }
+ }
+
+ public boolean init(String[] args) throws Exception {
+ if (args.length != 1) {
+ throw new IllegalArgumentException();
+ }
+ this.jobFile = args[0];
+ this.jobConf = getSubmitConfiguration(jobFile);
+ localConf.addResource(localConf);
+ fs = FileSystem.get(jobConf);
+
+ this.applicationName = jobConf.get("bsp.job.name",
+ "<no bsp job name defined>");
+ if (applicationName.isEmpty()) {
+ this.applicationName = "<no bsp job name defined>";
+ }
+
+ appAttemptID = getApplicationAttemptId();
+ this.jobId = new BSPJobID(appAttemptID.toString(), 0);
+ this.appMasterHostname = BSPNetUtils.getCanonicalHostname();
+ this.appMasterTrackingUrl = "http://localhost:8088";
+ this.numTotalContainers = this.jobConf.getInt("bsp.peers.num", 1);
+ this.containerMemory = getMemoryRequirements(jobConf);
+
+ this.hostname = BSPNetUtils.getCanonicalHostname();
+ this.clientPort = BSPNetUtils.getFreePort(12000);
+
+ // Set configuration for starting SyncServer which run Zookeeper
+ this.jobConf.set(Constants.ZOOKEEPER_QUORUM, appMasterHostname);
+
+ // start our synchronization service
+ startSyncServer();
+
+ // start RPC server
+ startRPCServers();
+
+ /*
+ * Make sure that this executes after the start the RPC servers, because we
+ * are readjusting the configuration.
+ */
+ rewriteSubmitConfiguration(jobFile, jobConf);
+
+ String jobSplit = jobConf.get("bsp.job.split.file");
+ splits = null;
+ if (jobSplit != null) {
+ DataInputStream splitFile = fs.open(new Path(jobSplit));
+ try {
+ splits = BSPJobClient.readSplitFile(splitFile);
+ } finally {
+ splitFile.close();
+ }
+ }
+
+ return true;
+ }
+
+ /**
+ * Main run function for the application master
+ *
+ * @throws org.apache.hadoop.yarn.exceptions.YarnException
+ * @throws IOException
+ */
+ @SuppressWarnings({ "unchecked" })
+ public void run() throws YarnException, IOException, InterruptedException {
+ LOG.info("Starting ApplicationMaster");
+
+ // Note: Credentials, Token, UserGroupInformation, DataOutputBuffer class
+ // are marked as LimitedPrivate
+ Credentials credentials =
+ UserGroupInformation.getCurrentUser().getCredentials();
+ DataOutputBuffer dob = new DataOutputBuffer();
+ credentials.writeTokenStorageToStream(dob);
+ // Now remove the AM->RM token so that containers cannot access it.
+ Iterator<Token<?>> iter = credentials.getAllTokens().iterator();
+ LOG.info("Executing with tokens:");
+ while (iter.hasNext()) {
+ Token<?> token = iter.next();
+ LOG.info(token);
+ if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
+ iter.remove();
+ }
+ }
+ allTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+
+ // Create appSubmitterUgi and add original tokens to it
+ String appSubmitterUserName =
+ System.getenv(ApplicationConstants.Environment.USER.name());
+ appSubmitterUgi =
+ UserGroupInformation.createRemoteUser(appSubmitterUserName);
+ appSubmitterUgi.addCredentials(credentials);
+
+
+ AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
+ amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
+ amRMClient.init(localConf);
+ amRMClient.start();
+
+ containerListener = createNMCallbackHandler();
+ nmClientAsync = new NMClientAsyncImpl(containerListener);
+ nmClientAsync.init(localConf);
+ nmClientAsync.start();
+
+ // Setup local RPC Server to accept status requests directly from clients
+ // TODO need to setup a protocol for client to be able to communicate to
+ // the RPC server
+ // TODO use the rpc port info to register with the RM for the client to
+ // send requests to this app master
+
+ // Register self with ResourceManager
+ // This will start heartbeating to the RM
+ appMasterHostname = NetUtils.getHostname();
+ RegisterApplicationMasterResponse response = amRMClient
+ .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
+ appMasterTrackingUrl);
+ // Dump out information about cluster capability as seen by the
+ // resource manager
+ int maxMem = response.getMaximumResourceCapability().getMemory();
+ LOG.info("Max mem capability of resources in this cluster " + maxMem);
+
+ int maxVCores = response.getMaximumResourceCapability().getVirtualCores();
+ LOG.info("Max vcores capability of resources in this cluster " + maxVCores);
+
+ // A resource ask cannot exceed the max.
+ if (containerMemory > maxMem) {
+ LOG.info("Container memory specified above max threshold of cluster."
+ + " Using max value." + ", specified=" + containerMemory + ", max="
+ + maxMem);
+ containerMemory = maxMem;
+ }
+
+ if (containerVirtualCores > maxVCores) {
+ LOG.info("Container virtual cores specified above max threshold of cluster."
+ + " Using max value." + ", specified=" + containerVirtualCores + ", max="
+ + maxVCores);
+ containerVirtualCores = maxVCores;
+ }
+
+ List<Container> previousAMRunningContainers =
+ response.getContainersFromPreviousAttempts();
+ LOG.info(appAttemptID + " received " + previousAMRunningContainers.size()
+ + " previous attempts' running containers on AM registration.");
+ for(Container container: previousAMRunningContainers) {
+ launchedContainers.add(container.getId());
+ }
+ numAllocatedContainers.addAndGet(previousAMRunningContainers.size());
+
+
+ int numTotalContainersToRequest =
+ numTotalContainers - previousAMRunningContainers.size();
+ // Setup ask for containers from RM
+ // Send request for containers to RM
+ // Until we get our fully allocated quota, we keep on polling RM for
+ // containers
+ // Keep looping until all the containers are launched and shell script
+ // executed on them ( regardless of success/failure).
+ for (int i = 0; i < numTotalContainersToRequest; ++i) {
+ AMRMClient.ContainerRequest containerAsk = setupContainerAskForRM();
+ amRMClient.addContainerRequest(containerAsk);
+ }
+ numRequestedContainers.set(numTotalContainers);
+ }
+
+ @VisibleForTesting
+ NMCallbackHandler createNMCallbackHandler() {
+ return new NMCallbackHandler(this);
+ }
+
+ @VisibleForTesting
+ protected boolean finish() {
+ // wait for completion.
+ while (!done
+ && (numCompletedContainers.get() != numTotalContainers)) {
+ try {
+ Thread.sleep(200);
+ } catch (InterruptedException ex) {}
+ }
+
+ // Join all launched threads
+ // needed for when we time out
+ // and we need to release containers
+ for (Thread launchThread : launchThreads) {
+ try {
+ launchThread.join(10000);
+ } catch (InterruptedException e) {
+ LOG.info("Exception thrown in thread join: " + e.getMessage());
+ e.printStackTrace();
+ }
+ }
+
+ // When the application completes, it should stop all running containers
+ LOG.info("Application completed. Stopping running containers");
+ nmClientAsync.stop();
+
+ // When the application completes, it should send a finish application
+ // signal to the RM
+ LOG.info("Application completed. Signalling finish to RM");
+
+ FinalApplicationStatus appStatus;
+ String appMessage = null;
+ boolean success = true;
+ if (numFailedContainers.get() == 0 &&
+ numCompletedContainers.get() == numTotalContainers) {
+ appStatus = FinalApplicationStatus.SUCCEEDED;
+ } else {
+ appStatus = FinalApplicationStatus.FAILED;
+ appMessage = "Diagnostics." + ", total=" + numTotalContainers
+ + ", completed=" + numCompletedContainers.get() + ", allocated="
+ + numAllocatedContainers.get() + ", failed="
+ + numFailedContainers.get();
+ LOG.info(appMessage);
+ success = false;
+ }
+ try {
+ amRMClient.unregisterApplicationMaster(appStatus, appMessage, null);
+ } catch (YarnException ex) {
+ LOG.error("Failed to unregister application", ex);
+ } catch (IOException e) {
+ LOG.error("Failed to unregister application", e);
+ }
+
+ amRMClient.stop();
+
+ return success;
+ }
+
+ private class RMCallbackHandler implements AMRMClientAsync.CallbackHandler {
+ @SuppressWarnings("unchecked")
+ @Override
+ public void onContainersCompleted(List<ContainerStatus> completedContainers) {
+ LOG.info("Got response from RM for container ask, completedCnt="
+ + completedContainers.size());
+ for (ContainerStatus containerStatus : completedContainers) {
+ LOG.info(appAttemptID + " got container status for containerID="
+ + containerStatus.getContainerId() + ", state="
+ + containerStatus.getState() + ", exitStatus="
+ + containerStatus.getExitStatus() + ", diagnostics="
+ + containerStatus.getDiagnostics());
+
+ // non complete containers should not be here
+ assert (containerStatus.getState() == ContainerState.COMPLETE);
+ // ignore containers we know nothing about - probably from a previous
+ // attempt
+ if (!launchedContainers.contains(containerStatus.getContainerId())) {
+ LOG.info("Ignoring completed status of "
+ + containerStatus.getContainerId()
+ + "; unknown container(probably launched by previous attempt)");
+ continue;
+ }
+
+ // increment counters for completed/failed containers
+ int exitStatus = containerStatus.getExitStatus();
+ if (0 != exitStatus) {
+ // container failed
+ if (ContainerExitStatus.ABORTED != exitStatus) {
+ // shell script failed
+ // counts as completed
+ numCompletedContainers.incrementAndGet();
+ numFailedContainers.incrementAndGet();
+ } else {
+ // container was killed by framework, possibly preempted
+ // we should re-try as the container was lost for some reason
+ numAllocatedContainers.decrementAndGet();
+ numRequestedContainers.decrementAndGet();
+ // we do not need to release the container as it would be done
+ // by the RM
+ }
+ } else {
+ // nothing to do
+ // container completed successfully
+ numCompletedContainers.incrementAndGet();
+ LOG.info("Container completed successfully." + ", containerId="
+ + containerStatus.getContainerId());
+ }
+ }
+
+ // ask for more containers if any failed
+ int askCount = numTotalContainers - numRequestedContainers.get();
+ numRequestedContainers.addAndGet(askCount);
+
+ if (askCount > 0) {
+ for (int i = 0; i < askCount; ++i) {
+ AMRMClient.ContainerRequest containerAsk = setupContainerAskForRM();
+ amRMClient.addContainerRequest(containerAsk);
+ }
+ }
+
+ if (numCompletedContainers.get() == numTotalContainers) {
+ done = true;
+ }
+ }
+
+ @Override
+ public void onContainersAllocated(List<Container> allocatedContainers) {
+ LOG.info("Got response from RM for container ask, allocatedCnt="
+ + allocatedContainers.size());
+ numAllocatedContainers.addAndGet(allocatedContainers.size());
+ for (Container allocatedContainer : allocatedContainers) {
+ LOG.info("Launching shell command on a new container."
+ + ", containerId=" + allocatedContainer.getId()
+ + ", containerNode=" + allocatedContainer.getNodeId().getHost()
+ + ":" + allocatedContainer.getNodeId().getPort()
+ + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress()
+ + ", containerResourceMemory"
+ + allocatedContainer.getResource().getMemory()
+ + ", containerResourceVirtualCores"
+ + allocatedContainer.getResource().getVirtualCores());
+ // + ", containerToken"
+ // +allocatedContainer.getContainerToken().getIdentifier().toString());
+
+ Thread launchThread = createLaunchContainerThread(allocatedContainer);
+
+ // launch and start the container on a separate thread to keep
+ // the main thread unblocked
+ // as all containers may not be allocated at one go.
+ launchThreads.add(launchThread);
+ launchedContainers.add(allocatedContainer.getId());
+ launchThread.start();
+ id++;
+ }
+ }
+
+ @Override
+ public void onShutdownRequest() {
+ done = true;
+ }
+
+ @Override
+ public void onNodesUpdated(List<NodeReport> list) {
+
+ }
+
+ @Override
+ public float getProgress() {
+ // set progress to deliver to RM on next heartbeat
+ float progress = (float) numCompletedContainers.get()
+ / numTotalContainers;
+ return progress;
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ done = true;
+ amRMClient.stop();
+ }
+ }
+
+ @VisibleForTesting
+ static class NMCallbackHandler
+ implements NMClientAsync.CallbackHandler {
+
+ private ConcurrentMap<ContainerId, Container> containers =
+ new ConcurrentHashMap<ContainerId, Container>();
+ private final ApplicationMaster applicationMaster;
+
+ public NMCallbackHandler(ApplicationMaster applicationMaster) {
+ this.applicationMaster = applicationMaster;
+ }
+
+ public void addContainer(ContainerId containerId, Container container) {
+ containers.putIfAbsent(containerId, container);
+ }
+
+ @Override
+ public void onContainerStopped(ContainerId containerId) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Succeeded to stop Container " + containerId);
+ }
+ containers.remove(containerId);
+ }
+
+ @Override
+ public void onContainerStatusReceived(ContainerId containerId,
+ ContainerStatus containerStatus) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Container Status: id=" + containerId + ", status=" +
+ containerStatus);
+ }
+ }
+
+ @Override
+ public void onContainerStarted(ContainerId containerId,
+ Map<String, ByteBuffer> allServiceResponse) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Succeeded to start Container " + containerId);
+ }
+ Container container = containers.get(containerId);
+ if (container != null) {
+ applicationMaster.nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId());
+ }
+ }
+
+ @Override
+ public void onStartContainerError(ContainerId containerId, Throwable t) {
+ LOG.error("Failed to start Container " + containerId);
+ containers.remove(containerId);
+ applicationMaster.numCompletedContainers.incrementAndGet();
+ applicationMaster.numFailedContainers.incrementAndGet();
+ }
+
+ @Override
+ public void onGetContainerStatusError(
+ ContainerId containerId, Throwable t) {
+ LOG.error("Failed to query the status of Container " + containerId);
+ }
+
+ @Override
+ public void onStopContainerError(ContainerId containerId, Throwable t) {
+ LOG.error("Failed to stop Container " + containerId);
+ containers.remove(containerId);
+ }
+ }
+
+ /**
+ * Thread to connect to the {@link ContainerManagementProtocol} and launch the container
+ * that will execute the shell command.
+ */
+ private class LaunchContainerRunnable implements Runnable {
+
+ // Allocated container
+ Container container;
+
+ NMCallbackHandler containerListener;
+
+ Configuration conf;
+
+ /**
+ * @param lcontainer Allocated container
+ * @param containerListener Callback handler of the container
+ */
+ public LaunchContainerRunnable(
+ Container lcontainer, NMCallbackHandler containerListener, Configuration conf) {
+ this.container = lcontainer;
+ this.containerListener = containerListener;
+ this.conf = conf;
+ }
+
+ /**
+ * Connects to CM, sets up container launch context
+ * for shell command and eventually dispatches the container
+ * start request to the CM.
+ */
+ @Override
+ public void run() {
+ LOG.info("Setting up container launch container for containerid="
+ + container.getId());
+ // Now we setup a ContainerLaunchContext
+ ContainerLaunchContext ctx = Records
+ .newRecord(ContainerLaunchContext.class);
+
+ // Set the local resources
+ Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
+ LocalResource packageResource = Records.newRecord(LocalResource.class);
+ FileSystem fs = null;
+ try {
+ fs = FileSystem.get(conf);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ Path packageFile = new Path(System.getenv(YARNBSPConstants.HAMA_YARN_LOCATION));
+ URL packageUrl = null;
+ try {
+ packageUrl = ConverterUtils.getYarnUrlFromPath(packageFile
+ .makeQualified(fs.getUri(), fs.getWorkingDirectory()));
+ LOG.info("PackageURL has been composed to " + packageUrl.toString());
+ LOG.info("Reverting packageURL to path: "
+ + ConverterUtils.getPathFromYarnURL(packageUrl));
+ } catch (URISyntaxException e) {
+ LOG.fatal("If you see this error the workarround does not work", e);
+ numCompletedContainers.incrementAndGet();
+ numFailedContainers.incrementAndGet();
+ return;
+ }
+
+ packageResource.setResource(packageUrl);
+ packageResource.setSize(Long.parseLong(System.getenv(YARNBSPConstants.HAMA_YARN_SIZE)));
+ packageResource.setTimestamp(Long.parseLong(System.getenv(YARNBSPConstants.HAMA_YARN_TIMESTAMP)));
+ packageResource.setType(LocalResourceType.FILE);
+ packageResource.setVisibility(LocalResourceVisibility.APPLICATION);
+
+ localResources.put(YARNBSPConstants.APP_MASTER_JAR_PATH, packageResource);
+
+ Path hamaReleaseFile = new Path(System.getenv(YARNBSPConstants.HAMA_LOCATION));
+ URL hamaReleaseUrl = ConverterUtils.getYarnUrlFromPath(hamaReleaseFile
+ .makeQualified(fs.getUri(), fs.getWorkingDirectory()));
+ LOG.info("Hama release URL has been composed to " + hamaReleaseUrl
+ .toString());
+
+ RemoteIterator<LocatedFileStatus> fileStatusListIterator = null;
+ try {
+ fileStatusListIterator = fs.listFiles(
+ hamaReleaseFile, true);
+
+ while(fileStatusListIterator.hasNext()) {
+ LocatedFileStatus lfs = fileStatusListIterator.next();
+ LocalResource localRsrc = LocalResource.newInstance(
+ ConverterUtils.getYarnUrlFromPath(lfs.getPath()),
+ LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
+ lfs.getLen(), lfs.getModificationTime());
+ localResources.put(lfs.getPath().getName(), localRsrc);
+ }
+ } catch (IOException e) {
+ LOG.fatal("The error has occured to RemoteIterator " + e);
+ }
+
+ ctx.setLocalResources(localResources);
+
+ /*
+ * TODO Package classpath seems not to work if you're in pseudo distributed
+ * mode, because the resource must not be moved, it will never be unpacked.
+ * So we will check if our jar file has the file:// prefix and put it into
+ * the CP directly
+ */
+
+ StringBuilder classPathEnv = new StringBuilder(
+ ApplicationConstants.Environment.CLASSPATH.$()).append(File.pathSeparatorChar)
+ .append("./*");
+ for (String c : conf.getStrings(
+ YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+ YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
+ classPathEnv.append(File.pathSeparatorChar);
+ classPathEnv.append(c.trim());
+ }
+
+ Vector<CharSequence> vargs = new Vector<CharSequence>();
+ vargs.add("${JAVA_HOME}/bin/java");
+ vargs.add("-cp " + classPathEnv + "");
+ vargs.add(BSPRunner.class.getCanonicalName());
+
+ vargs.add(jobId.getJtIdentifier());
+ vargs.add(Integer.toString(id));
+ vargs.add(
+ new Path(jobFile).makeQualified(fs.getUri(), fs.getWorkingDirectory())
+ .toString());
+
+ vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/hama-worker.stdout");
+ vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/hama-worker.stderr");
+
+ // Get final commmand
+ StringBuilder command = new StringBuilder();
+ for (CharSequence str : vargs) {
+ command.append(str).append(" ");
+ }
+
+ List<String> commands = new ArrayList<String>();
+ commands.add(command.toString());
+
+ ctx.setCommands(commands);
+ ctx.setTokens(allTokens.duplicate());
+ LOG.info("Starting commands: " + commands);
+
+ containerListener.addContainer(container.getId(), container);
+ nmClientAsync.startContainerAsync(container, ctx);
+ }
+ }
+
+ /**
+ * Setup the request that will be sent to the RM for the container ask.
+ *
+ * @return the setup ResourceRequest to be sent to RM
+ */
+ private AMRMClient.ContainerRequest setupContainerAskForRM() {
+ // setup requirements for hosts
+ // using * as any host will do for the distributed shell app
+ // set the priority for the request
+ // TODO - what is the range for priority? how to decide?
+ Priority pri = Priority.newInstance(requestPriority);
+
+ // Set up resource type requirements
+ // For now, memory and CPU are supported so we set memory and cpu requirements
+ Resource capability = Resource.newInstance(containerMemory,
+ containerVirtualCores);
+
+ AMRMClient.ContainerRequest request = new AMRMClient.ContainerRequest(capability, null, null,
+ pri);
+ LOG.info("Requested container ask: " + request.toString());
+ return request;
+ }
+
+ /**
+ * Reads the configuration from the given path.
+ */
+ private static Configuration getSubmitConfiguration(String path)
+ throws IOException {
+ Path jobSubmitPath = new Path(path);
+ Configuration jobConf = new HamaConfiguration();
+
+ FileSystem fs = FileSystem.get(URI.create(path), jobConf);
+
+ InputStream in =fs.open(jobSubmitPath);
+ jobConf.addResource(in);
+
+ return jobConf;
+ }
+
+ /**
+ * Gets the application attempt ID from the environment. This should be set by
+ * YARN when the container has been launched.
+ *
+ * @return a new ApplicationAttemptId which is unique and identifies this
+ * task.
+ */
+ private static ApplicationAttemptId getApplicationAttemptId()
+ throws IOException {
+ Map<String, String> envs = System.getenv();
+ if (!envs.containsKey(ApplicationConstants.Environment.CONTAINER_ID.name())) {
+ throw new IllegalArgumentException(
+ "ApplicationAttemptId not set in the environment");
+ }
+
+ LOG.info("app attempt id!!!");
+ ContainerId containerId = ConverterUtils.toContainerId(envs
+ .get(ApplicationConstants.Environment.CONTAINER_ID.name()));
+ return containerId.getApplicationAttemptId();
+ }
+
+ /**
+ * This method starts the sync server on a specific port and waits for it to
+ * come up. Be aware that this method adds the "bsp.sync.server.address" that
+ * is needed for a task to connect to the service.
+ *
+ * @throws IOException
+ */
+ private void startSyncServer() throws Exception {
+ syncServer = SyncServiceFactory.getSyncServer(jobConf);
+ syncServer.init(jobConf);
+
+ ZKServerThread serverThread = new ZKServerThread(syncServer);
+ threadPool.submit(serverThread);
+ }
+
+ /**
+ * This method is to run Zookeeper in order to coordinates between BSPMaster and Groomservers
+ * using Runnable interface in java.
+ */
+ private static class ZKServerThread implements Runnable {
+ SyncServer server;
+
+ ZKServerThread(SyncServer s) {
+ server = s;
+ }
+
+ @Override
+ public void run() {
+ try {
+ server.start();
+ } catch (Exception e) {
+ LOG.error("Error running SyncServer.", e);
+ }
+ }
+ }
+
+ /**
+ * This method starts the needed RPC servers: client server and the task
+ * server. This method manipulates the configuration and therefore needs to be
+ * executed BEFORE the submitconfiguration gets rewritten.
+ *
+ * @throws IOException
+ */
+ private void startRPCServers() throws IOException {
+ // start the RPC server which talks to the client
+ this.clientServer = RPC.getServer(BSPClient.class, hostname, clientPort, jobConf);
+ this.clientServer.start();
+
+ // start the RPC server which talks to the tasks
+ this.taskServerPort = BSPNetUtils.getFreePort(10000);
+ this.taskServer = RPC.getServer(this, hostname, taskServerPort, jobConf);
+ this.taskServer.start();
+
+ // readjusting the configuration to let the tasks know where we are.
+ this.jobConf.set("hama.umbilical.address", hostname + ":" + taskServerPort);
+ }
+
+ /**
+ * Writes the current configuration to a given path to reflect changes. For
+ * example the sync server address is put after the file has been written.
+ */
+ private static void rewriteSubmitConfiguration(String path, Configuration conf)
+ throws IOException {
+ Path jobSubmitPath = new Path(path);
+ FileSystem fs = FileSystem.get(conf);
+ FSDataOutputStream out = fs.create(jobSubmitPath);
+ conf.writeXml(out);
+ out.close();
+
+ LOG.info("Written new configuration back to " + path);
+ }
+
+ /**
+ * Get container memory from "bsp.child.mem.in.mb" set on Hama configuration
+ * @return The memory of container.
+ */
+ private int getMemoryRequirements(Configuration conf) {
+ String newMemoryProperty = conf.get("bsp.child.mem.in.mb");
+ if (newMemoryProperty == null) {
+ LOG.warn("\"bsp.child.mem.in.mb\" was not set! Try parsing the child opts...");
+ return getMemoryFromOptString(conf.get("bsp.child.java.opts"));
+ } else {
+ return Integer.valueOf(newMemoryProperty);
+ }
+ }
+
+ // This really needs a testcase
+ private static int getMemoryFromOptString(String opts) {
+ final int DEFAULT_MEMORY_MB = 256;
+
+ if (opts == null) {
+ return DEFAULT_MEMORY_MB;
+ }
+
+ if (!opts.contains("-Xmx")) {
+ LOG.info(
+ "No \"-Xmx\" option found in child opts, using default amount of memory!");
+ return DEFAULT_MEMORY_MB;
+ } else {
+ // e.G: -Xmx512m
+
+ int startIndex = opts.indexOf("-Xmx") + 4;
+ String xmxString = opts.substring(startIndex);
+ char qualifier = xmxString.charAt(xmxString.length() - 1);
+ int memory = Integer
+ .valueOf(xmxString.substring(0, xmxString.length() - 1));
+ if (qualifier == 'm') {
+ return memory;
+ } else if (qualifier == 'g') {
+ return memory * 1024;
+ } else {
+ throw new IllegalArgumentException(
+ "Memory Limit in child opts was not set! \"bsp.child.java.opts\" String was: "
+ + opts);
+ }
+ }
+ }
+
+ @VisibleForTesting
+ Thread createLaunchContainerThread(Container allocatedContainer) {
+ LaunchContainerRunnable runnableLaunchContainer =
+ new LaunchContainerRunnable(allocatedContainer, containerListener, jobConf);
+ return new Thread(runnableLaunchContainer);
+ }
+
+ @Override
+ public LongWritable getCurrentSuperStep() {
+ return new LongWritable(superstep);
+ }
+
+ @Override
+ public Task getTask(TaskAttemptID taskid) throws IOException {
+ BSPJobClient.RawSplit assignedSplit = null;
+ String splitName = NullInputFormat.NullInputSplit.class.getName();
+ //String splitName = NullInputSplit.class.getCanonicalName();
+ if (splits != null) {
+ assignedSplit = splits[taskid.id];
+ splitName = assignedSplit.getClassName();
+ return new BSPTask(jobId, jobFile, taskid, taskid.id, splitName,
+ assignedSplit.getBytes());
+ } else {
+ return new BSPTask(jobId, jobFile, taskid, taskid.id, splitName,
+ new BytesWritable());
+ }
+ }
+
+ @Override
+ public boolean ping(TaskAttemptID taskid) throws IOException {
+ return false;
+ }
+
+ @Override
+ public void done(TaskAttemptID taskid) throws IOException {
+
+ }
+
+ @Override
+ public void fsError(TaskAttemptID taskId, String message) throws IOException {
+
+ }
+
+ @Override
+ public void fatalError(TaskAttemptID taskId, String message)
+ throws IOException {
+
+ }
+
+ @Override
+ public boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus)
+ throws IOException, InterruptedException {
+ if (taskStatus.getSuperstepCount() > superstep) {
+ superstep = taskStatus.getSuperstepCount();
+ LOG.info("Now in superstep " + superstep);
+ }
+
+ Counters counters = taskStatus.getCounters();
+ globalCounter.incrAllCounters(counters);
+
+ return true;
+ }
+
+ @Override
+ public int getAssignedPortNum(TaskAttemptID taskid) {
+ return 0;
+ }
+
+ @Override
+ public void close() throws IOException {
+ this.clientServer.stop();
+ this.taskServer.stop();
+ }
+
+ @Override
+ public long getProtocolVersion(String protocol, long clientVersion)
+ throws IOException {
+ return BSPClient.versionID;
+ }
+}
diff --git a/yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java b/yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java
deleted file mode 100644
index b032fe2..0000000
--- a/yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java
+++ /dev/null
@@ -1,483 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hama.bsp;
-
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.InetSocketAddress;
-import java.net.URI;
-import java.security.PrivilegedAction;
-import java.util.Collection;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
-import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
-import org.apache.hadoop.yarn.util.Clock;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.hadoop.yarn.util.Records;
-import org.apache.hadoop.yarn.util.SystemClock;
-import org.apache.hama.Constants;
-import org.apache.hama.HamaConfiguration;
-import org.apache.hama.bsp.Job.JobState;
-import org.apache.hama.bsp.sync.SyncServer;
-import org.apache.hama.bsp.sync.SyncServiceFactory;
-import org.apache.hama.ipc.BSPPeerProtocol;
-import org.apache.hama.ipc.RPC;
-import org.apache.hama.ipc.Server;
-import org.apache.hama.util.BSPNetUtils;
-
-
-/**
- * BSPApplicationMaster is an application master for Apache Hamas BSP Engine.
- */
-public class BSPApplicationMaster implements BSPClient, BSPPeerProtocol {
-
- private static final Log LOG = LogFactory.getLog(BSPApplicationMaster.class);
- private static final ExecutorService threadPool = Executors
- .newFixedThreadPool(1);
-
- private Configuration localConf;
- private Configuration jobConf;
- private String jobFile;
-
- private Clock clock;
- private YarnRPC yarnRPC;
-
- private ApplicationMasterProtocol amrmRPC;
-
- private ApplicationAttemptId appAttemptId;
- private String applicationName;
- private long startTime;
-
- private JobImpl job;
- private BSPJobID jobId;
-
- // RPC info where the AM receive client side requests
- private String hostname;
- private int clientPort;
- private int taskServerPort;
-
- private Server clientServer;
- private Server taskServer;
-
- private volatile long superstep;
- //private SyncServerRunner syncServer;
- private SyncServer syncServer;
-
- private Counters globalCounter = new Counters();
-
- private FileSystem fs;
- private BSPJobClient.RawSplit[] splits;
-
- private BSPApplicationMaster(String[] args) throws Exception {
- if (args.length != 1) {
- throw new IllegalArgumentException();
- }
-
- this.jobFile = args[0];
-
- this.jobConf = getSubmitConfiguration(jobFile);
-
- this.localConf = new YarnConfiguration();
- localConf.addResource(localConf);
- fs = FileSystem.get(jobConf);
-
- this.applicationName = jobConf.get("bsp.job.name",
- "<no bsp job name defined>");
- if (applicationName.isEmpty()) {
- this.applicationName = "<no bsp job name defined>";
- }
-
- this.appAttemptId = getApplicationAttemptId();
-
- this.yarnRPC = YarnRPC.create(localConf);
- this.clock = new SystemClock();
- this.startTime = clock.getTime();
-
- this.jobId = new BSPJobID(appAttemptId.toString(), 0);
-
- this.hostname = BSPNetUtils.getCanonicalHostname();
- this.clientPort = BSPNetUtils.getFreePort(12000);
-
- // Set configuration for starting SyncServer which run Zookeeper
- this.jobConf.set(Constants.ZOOKEEPER_QUORUM, hostname);
-
- // start our synchronization service
- startSyncServer();
-
- startRPCServers();
-
- /*
- * Make sure that this executes after the start the RPC servers, because we
- * are readjusting the configuration.
- */
-
- rewriteSubmitConfiguration(jobFile, jobConf);
-
- String jobSplit = jobConf.get("bsp.job.split.file");
- splits = null;
- if (jobSplit != null) {
- DataInputStream splitFile = fs.open(new Path(jobSplit));
- try {
- splits = BSPJobClient.readSplitFile(splitFile);
- } finally {
- splitFile.close();
- }
- }
-
- this.amrmRPC = getYarnRPCConnection(localConf);
- registerApplicationMaster(amrmRPC, hostname, clientPort,
- "http://localhost:8080");
- }
-
- /**
- * This method starts the needed RPC servers: client server and the task
- * server. This method manipulates the configuration and therefore needs to be
- * executed BEFORE the submitconfiguration gets rewritten.
- *
- * @throws IOException
- */
- private void startRPCServers() throws IOException {
- // start the RPC server which talks to the client
- this.clientServer = RPC.getServer(BSPClient.class, hostname, clientPort, jobConf);
- this.clientServer.start();
-
- // start the RPC server which talks to the tasks
- this.taskServerPort = BSPNetUtils.getFreePort(10000);
- this.taskServer = RPC.getServer(this, hostname, taskServerPort, jobConf);
- this.taskServer.start();
-
- // readjusting the configuration to let the tasks know where we are.
- this.jobConf.set("hama.umbilical.address", hostname + ":" + taskServerPort);
- }
-
- /**
- * This method starts the sync server on a specific port and waits for it to
- * come up. Be aware that this method adds the "bsp.sync.server.address" that
- * is needed for a task to connect to the service.
- *
- * @throws IOException
- */
- private void startSyncServer() throws Exception {
- syncServer = SyncServiceFactory.getSyncServer(jobConf);
- syncServer.init(jobConf);
-
- ZKServerThread serverThread = new ZKServerThread(syncServer);
- threadPool.submit(serverThread);
- }
-
- private static class ZKServerThread implements Runnable {
- SyncServer server;
-
- ZKServerThread(SyncServer s) {
- server = s;
- }
-
- @Override
- public void run() {
- try {
- server.start();
- } catch (Exception e) {
- LOG.error("Error running SyncServer.", e);
- }
- }
- }
-
- /**
- * Connects to the Resource Manager.
- *
- * @param yarnConf
- * @return a new RPC connection to the Resource Manager.
- */
- private ApplicationMasterProtocol getYarnRPCConnection(Configuration yarnConf) throws IOException {
- // Connect to the Scheduler of the ResourceManager.
- UserGroupInformation currentUser = UserGroupInformation.createRemoteUser(appAttemptId.toString());
- Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
-
- final InetSocketAddress rmAddress = NetUtils.createSocketAddr(yarnConf.get(
- YarnConfiguration.RM_SCHEDULER_ADDRESS,
- YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS));
-
- Token<? extends TokenIdentifier> amRMToken = setupAndReturnAMRMToken(rmAddress, credentials.getAllTokens());
- currentUser.addToken(amRMToken);
-
- final Configuration conf = yarnConf;
-
- ApplicationMasterProtocol client = currentUser
- .doAs(new PrivilegedAction<ApplicationMasterProtocol>() {
- @Override
- public ApplicationMasterProtocol run() {
- return (ApplicationMasterProtocol) yarnRPC.getProxy(ApplicationMasterProtocol.class, rmAddress, conf);
- }
- });
- LOG.info("Connecting to ResourceManager at " + rmAddress);
- return client;
- }
-
- private Token<? extends TokenIdentifier> setupAndReturnAMRMToken(
- InetSocketAddress rmBindAddress,
- Collection<Token<? extends TokenIdentifier>> allTokens) {
- for (Token<? extends TokenIdentifier> token : allTokens) {
- if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
- SecurityUtil.setTokenService(token, rmBindAddress);
- return token;
- }
- }
- return null;
- }
-
-
- /**
- * Registers this application master with the Resource Manager and retrieves a
- * response which is used to launch additional containers.
- */
- private static RegisterApplicationMasterResponse registerApplicationMaster(
- ApplicationMasterProtocol resourceManager, String appMasterHostName, int appMasterRpcPort,
- String appMasterTrackingUrl) throws YarnException, IOException {
-
- RegisterApplicationMasterRequest appMasterRequest = Records
- .newRecord(RegisterApplicationMasterRequest.class);
- appMasterRequest.setHost(appMasterHostName);
- appMasterRequest.setRpcPort(appMasterRpcPort);
- // TODO tracking URL
- appMasterRequest.setTrackingUrl(appMasterTrackingUrl);
- RegisterApplicationMasterResponse response = resourceManager
- .registerApplicationMaster(appMasterRequest);
- LOG.info("ApplicationMaster has maximum resource capability of: "
- + response.getMaximumResourceCapability().getMemory());
- return response;
- }
-
- /**
- * Gets the application attempt ID from the environment. This should be set by
- * YARN when the container has been launched.
- *
- * @return a new ApplicationAttemptId which is unique and identifies this
- * task.
- */
- private static ApplicationAttemptId getApplicationAttemptId()
- throws IOException {
- Map<String, String> envs = System.getenv();
- if (!envs.containsKey(Environment.CONTAINER_ID.name())) {
- throw new IllegalArgumentException(
- "ApplicationAttemptId not set in the environment");
- }
-
- return ConverterUtils.toContainerId(
- envs.get(Environment.CONTAINER_ID.name()))
- .getApplicationAttemptId();
- }
-
- private void start() throws IOException, YarnException /*throws Exception*/ {
- JobState finalState = null;
- try {
- job = new JobImpl(appAttemptId, jobConf, yarnRPC, amrmRPC, jobFile, jobId);
- finalState = job.startJob();
- } catch (Exception e) {
- LOG.error("error was occured. cleaning up");
- e.printStackTrace();
- } finally {
- if (finalState != null) {
- LOG.info("Job \"" + applicationName + "\"'s state after completion: "
- + finalState.toString());
- LOG.info("Job took " + ((clock.getTime() - startTime) / 1000L)
- + "s to finish!");
- }
- LOG.info("job is cleaning up");
- job.cleanup();
- }
- }
-
- private void cleanup() throws YarnException, IOException {
- //syncServer.stop();
- syncServer.stopServer();
-
- if (threadPool != null && !threadPool.isShutdown()) {
- threadPool.shutdownNow();
- }
-
- clientServer.stop();
- taskServer.stop();
- FinishApplicationMasterRequest finishReq = Records
- .newRecord(FinishApplicationMasterRequest.class);
- switch (job.getState()) {
- case SUCCESS:
- finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
- break;
- case KILLED:
- finishReq.setFinalApplicationStatus(FinalApplicationStatus.KILLED);
- break;
- case FAILED:
- finishReq.setFinalApplicationStatus(FinalApplicationStatus.FAILED);
- break;
- default:
- finishReq.setFinalApplicationStatus(FinalApplicationStatus.FAILED);
- }
- this.amrmRPC.finishApplicationMaster(finishReq);
- }
-
- public static void main(String[] args) throws YarnException, IOException {
- // we expect getting the qualified path of the job.xml as the first
- // element in the arguments
- BSPApplicationMaster master = null;
- try {
- master = new BSPApplicationMaster(args);
- master.start();
- } catch (Exception e) {
- LOG.fatal("Error starting BSPApplicationMaster", e);
- } finally {
- if (master != null) {
- master.cleanup();
- }
- }
- }
-
- /**
- * Reads the configuration from the given path.
- */
- private static Configuration getSubmitConfiguration(String path)
- throws IOException {
- Path jobSubmitPath = new Path(path);
- Configuration jobConf = new HamaConfiguration();
-
- FileSystem fs = FileSystem.get(URI.create(path), jobConf);
-
- InputStream in =fs.open(jobSubmitPath);
- jobConf.addResource(in);
-
- return jobConf;
- }
-
- /**
- * Writes the current configuration to a given path to reflect changes. For
- * example the sync server address is put after the file has been written.
- */
- private static void rewriteSubmitConfiguration(String path, Configuration conf)
- throws IOException {
- Path jobSubmitPath = new Path(path);
- FileSystem fs = FileSystem.get(conf);
- FSDataOutputStream out = fs.create(jobSubmitPath);
- conf.writeXml(out);
- out.close();
-
- LOG.info("Written new configuration back to " + path);
- }
-
- @Override
- public long getProtocolVersion(String arg0, long arg1) throws IOException {
- return BSPClient.versionID;
- }
-
- @Override
- public LongWritable getCurrentSuperStep() {
- return new LongWritable(superstep);
- }
-
- @Override
- public boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus)
- throws IOException, InterruptedException {
- if (taskStatus.getSuperstepCount() > superstep) {
- superstep = taskStatus.getSuperstepCount();
- LOG.info("Now in superstep " + superstep);
- }
-
- Counters counters = taskStatus.getCounters();
- globalCounter.incrAllCounters(counters);
-
- return true;
- }
-
- /**
- * most of the following methods are already handled over YARN and with the
- * JobImpl.
- */
-
- @Override
- public void close() throws IOException {
-
- }
-
- @Override
- public Task getTask(TaskAttemptID taskid) throws IOException {
- BSPJobClient.RawSplit assignedSplit = null;
- String splitName = NullInputFormat.NullInputSplit.class.getName();
- //String splitName = NullInputSplit.class.getCanonicalName();
- if (splits != null) {
- assignedSplit = splits[taskid.id];
- splitName = assignedSplit.getClassName();
- return new BSPTask(jobId, jobFile, taskid, taskid.id, splitName,
- assignedSplit.getBytes());
- } else {
- return new BSPTask(jobId, jobFile, taskid, taskid.id, splitName,
- new BytesWritable());
- }
- }
-
- @Override
- public boolean ping(TaskAttemptID taskid) throws IOException {
- return false;
- }
-
- @Override
- public void done(TaskAttemptID taskid) throws IOException {
-
- }
-
- @Override
- public void fsError(TaskAttemptID taskId, String message) throws IOException {
-
- }
-
- @Override
- public void fatalError(TaskAttemptID taskId, String message)
- throws IOException {
-
- }
-
- @Override
- public int getAssignedPortNum(TaskAttemptID taskid) {
- return 0;
- }
-
-}
diff --git a/yarn/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java b/yarn/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java
deleted file mode 100644
index ce08603..0000000
--- a/yarn/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java
+++ /dev/null
@@ -1,285 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hama.bsp;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URISyntaxException;
-import java.util.*;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocatedFileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.*;
-import org.apache.hadoop.yarn.api.records.*;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.hadoop.yarn.util.Records;
-
-public class BSPTaskLauncher {
-
- private static final Log LOG = LogFactory.getLog(BSPTaskLauncher.class);
-
- private final Container allocatedContainer;
- private final int id;
- private final ContainerManagementProtocol cm;
- private final Configuration conf;
- private String user;
- private final Path jobFile;
- private final BSPJobID jobId;
-
- private GetContainerStatusesRequest statusRequest;
-
- @Override
- protected void finalize() throws Throwable {
- stopAndCleanup();
- }
-
- public BSPTaskLauncher(int id, Container container, ContainerManagementProtocol cm,
- Configuration conf, Path jobFile, BSPJobID jobId)
- throws YarnException {
- this.id = id;
- this.cm = cm;
- this.conf = conf;
- this.allocatedContainer = container;
- this.jobFile = jobFile;
- this.jobId = jobId;
- // FIXME why does this contain mapreduce here?
- this.user = conf.get("bsp.user.name");
- if (this.user == null) {
- this.user = conf.get("mapreduce.job.user.name");
- }
- }
-
- public void stopAndCleanup() throws YarnException, IOException {
- StopContainersRequest stopRequest = Records.newRecord(StopContainersRequest.class);
- List<ContainerId> containerIds = new ArrayList<ContainerId>();
- containerIds.add(allocatedContainer.getId());
- LOG.info("getId : " + allocatedContainer.getId());
- stopRequest.setContainerIds(containerIds);
- LOG.info("StopContainer : " + stopRequest.getContainerIds());
- cm.stopContainers(stopRequest);
-
- }
-
- public void start() throws IOException, YarnException {
- LOG.info("Spawned task with id: " + this.id
- + " for allocated container id: "
- + this.allocatedContainer.getId().toString());
- statusRequest = setupContainer(allocatedContainer, cm, user, id);
- }
-
- /**
- * This polls the current container status from container manager. Null if the
- * container hasn't finished yet.
- *
- * @return
- * @throws Exception
- */
- public BSPTaskStatus poll() throws Exception {
-
- ContainerStatus lastStatus = null;
- GetContainerStatusesResponse getContainerStatusesResponse = cm.getContainerStatuses(statusRequest);
- List<ContainerStatus> containerStatuses = getContainerStatusesResponse.getContainerStatuses();
- if (containerStatuses.size() <= 0) {
- LOG.info("container Statuses size is zero");
- return null;
- }
-
- for (ContainerStatus containerStatus : containerStatuses) {
- LOG.info("Got container status for containerID=" + containerStatus
- .getContainerId() + ", state=" + containerStatus.getState()
- + ", exitStatus=" + containerStatus.getExitStatus() + ", diagnostics="
- + containerStatus.getDiagnostics());
-
- if (containerStatus.getContainerId().equals(allocatedContainer.getId())) {
- lastStatus = containerStatus;
- break;
- }
- }
-
- if (lastStatus.getState() != ContainerState.COMPLETE) {
- LOG.info("Not completed...");
- return null;
- }
- LOG.info(this.id + " Last report comes with exitstatus of " + lastStatus
- .getExitStatus() + " and diagnose string of " + lastStatus
- .getDiagnostics());
-
- return new BSPTaskStatus(id, lastStatus.getExitStatus());
- }
-
- private GetContainerStatusesRequest setupContainer(
- Container allocatedContainer, ContainerManagementProtocol cm, String user, int id) throws IOException, YarnException {
- LOG.info("Setting up a container for user " + user + " with id of " + id
- + " and containerID of " + allocatedContainer.getId() + " as " + user);
- // Now we setup a ContainerLaunchContext
- ContainerLaunchContext ctx = Records
- .newRecord(ContainerLaunchContext.class);
-
- // Set the local resources
- Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
- LocalResource packageResource = Records.newRecord(LocalResource.class);
- FileSystem fs = FileSystem.get(conf);
- Path packageFile = new Path(System.getenv(YARNBSPConstants.HAMA_YARN_LOCATION));
- URL packageUrl = ConverterUtils.getYarnUrlFromPath(packageFile
- .makeQualified(fs.getUri(), fs.getWorkingDirectory()));
- LOG.info("PackageURL has been composed to " + packageUrl.toString());
- try {
- LOG.info("Reverting packageURL to path: "
- + ConverterUtils.getPathFromYarnURL(packageUrl));
- } catch (URISyntaxException e) {
- LOG.fatal("If you see this error the workarround does not work", e);
- }
-
- packageResource.setResource(packageUrl);
- packageResource.setSize(Long.parseLong(System.getenv(YARNBSPConstants.HAMA_YARN_SIZE)));
- packageResource.setTimestamp(Long.parseLong(System.getenv(YARNBSPConstants.HAMA_YARN_TIMESTAMP)));
- packageResource.setType(LocalResourceType.FILE);
- packageResource.setVisibility(LocalResourceVisibility.APPLICATION);
-
- localResources.put(YARNBSPConstants.APP_MASTER_JAR_PATH, packageResource);
-
- Path hamaReleaseFile = new Path(System.getenv(YARNBSPConstants.HAMA_LOCATION));
- URL hamaReleaseUrl = ConverterUtils.getYarnUrlFromPath(hamaReleaseFile
- .makeQualified(fs.getUri(), fs.getWorkingDirectory()));
- LOG.info("Hama release URL has been composed to " + hamaReleaseUrl.toString());
-
- RemoteIterator<LocatedFileStatus> fileStatusListIterator = fs.listFiles(
- hamaReleaseFile, true);
-
- while(fileStatusListIterator.hasNext()) {
- LocatedFileStatus lfs = fileStatusListIterator.next();
- LocalResource localRsrc = LocalResource.newInstance(
- ConverterUtils.getYarnUrlFromPath(lfs.getPath()),
- LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
- lfs.getLen(), lfs.getModificationTime());
- localResources.put(lfs.getPath().getName(), localRsrc);
- }
-
- ctx.setLocalResources(localResources);
-
- /*
- * TODO Package classpath seems not to work if you're in pseudo distributed
- * mode, because the resource must not be moved, it will never be unpacked.
- * So we will check if our jar file has the file:// prefix and put it into
- * the CP directly
- */
-
- StringBuilder classPathEnv = new StringBuilder(
- ApplicationConstants.Environment.CLASSPATH.$()).append(File.pathSeparatorChar)
- .append("./*");
- for (String c : conf.getStrings(
- YarnConfiguration.YARN_APPLICATION_CLASSPATH,
- YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
- classPathEnv.append(File.pathSeparatorChar);
- classPathEnv.append(c.trim());
- }
-
- Vector<CharSequence> vargs = new Vector<CharSequence>();
- vargs.add("${JAVA_HOME}/bin/java");
- vargs.add("-cp " + classPathEnv + "");
- vargs.add(BSPRunner.class.getCanonicalName());
-
- vargs.add(jobId.getJtIdentifier());
- vargs.add(Integer.toString(id));
- vargs.add(this.jobFile.makeQualified(fs.getUri(), fs.getWorkingDirectory())
- .toString());
-
- vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/bsp.stdout");
- vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/bsp.stderr");
-
- // Get final commmand
- StringBuilder command = new StringBuilder();
- for (CharSequence str : vargs) {
- command.append(str).append(" ");
- }
-
- List<String> commands = new ArrayList<String>();
- commands.add(command.toString());
-
- ctx.setCommands(commands);
- LOG.info("Starting command: " + commands);
-
- StartContainerRequest startReq = Records
- .newRecord(StartContainerRequest.class);
- startReq.setContainerLaunchContext(ctx);
- startReq.setContainerToken(allocatedContainer.getContainerToken());
-
- List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
- list.add(startReq);
- StartContainersRequest requestList = StartContainersRequest.newInstance(list);
- cm.startContainers(requestList);
-
- GetContainerStatusesRequest statusReq = Records
- .newRecord(GetContainerStatusesRequest.class);
- List<ContainerId> containerIds = new ArrayList<ContainerId>();
- containerIds.add(allocatedContainer.getId());
- statusReq.setContainerIds(containerIds);
- return statusReq;
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + id;
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- BSPTaskLauncher other = (BSPTaskLauncher) obj;
- if (id != other.id)
- return false;
- return true;
- }
-
- public static class BSPTaskStatus {
- private final int id;
- private final int exitStatus;
-
- public BSPTaskStatus(int id, int exitStatus) {
- super();
- this.id = id;
- this.exitStatus = exitStatus;
- }
-
- public int getId() {
- return id;
- }
-
- public int getExitStatus() {
- return exitStatus;
- }
- }
-
-}
diff --git a/yarn/src/main/java/org/apache/hama/bsp/Job.java b/yarn/src/main/java/org/apache/hama/bsp/Job.java
deleted file mode 100644
index 7745301..0000000
--- a/yarn/src/main/java/org/apache/hama/bsp/Job.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hama.bsp;
-
-import org.apache.hadoop.yarn.exceptions.YarnException;
-
-import java.io.IOException;
-
-/**
- * Main interface to interact with the job. Provides only getters.
- */
-public interface Job {
-
- public enum JobState {
- NEW, RUNNING, SUCCESS, FAILED, KILLED
- }
-
- public enum BSPPhase {
- COMPUTATION, COMMUNICATION
- }
-
- public JobState startJob() throws Exception;
-
- public void cleanup() throws YarnException, IOException;
-
- JobState getState();
-
- BSPPhase getBSPPhase();
-
- int getTotalBSPTasks();
-
-}
diff --git a/yarn/src/main/java/org/apache/hama/bsp/JobImpl.java b/yarn/src/main/java/org/apache/hama/bsp/JobImpl.java
deleted file mode 100644
index 650295c..0000000
--- a/yarn/src/main/java/org/apache/hama/bsp/JobImpl.java
+++ /dev/null
@@ -1,324 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * regarding copyright ownership. The ASF licenses this file
- * distributed with this work for additional information
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hama.bsp;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.security.PrivilegedAction;
-import java.util.*;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
-import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.api.records.*;
-import org.apache.hadoop.yarn.api.records.Token;
-import org.apache.hadoop.yarn.client.api.NMTokenCache;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.hadoop.yarn.util.Records;
-import org.apache.hama.bsp.BSPTaskLauncher.BSPTaskStatus;
-
-public class JobImpl implements Job {
-
- private static final Log LOG = LogFactory.getLog(JobImpl.class);
- private static final int DEFAULT_MEMORY_MB = 256;
-
- private Configuration conf;
- private BSPJobID jobId;
- private int numBSPTasks;
- private int priority = 0;
- private String childOpts;
- private int taskMemoryInMb;
- private Path jobFile;
-
- private JobState state;
- private BSPPhase phase;
-
- private ApplicationAttemptId appAttemptId;
- private YarnRPC yarnRPC;
- private ApplicationMasterProtocol resourceManager;
-
- private List<Container> allocatedContainers;
- private List<ContainerId> releasedContainers = Collections.emptyList();
-
- private Map<Integer, BSPTaskLauncher> launchers = new HashMap<Integer, BSPTaskLauncher>();
- private Deque<BSPTaskLauncher> completionQueue = new LinkedList<BSPTaskLauncher>();
-
- private int lastResponseID = 0;
-
- private int getMemoryRequirements() {
- String newMemoryProperty = conf.get("bsp.child.mem.in.mb");
- if (newMemoryProperty == null) {
- LOG.warn("\"bsp.child.mem.in.mb\" was not set! Try parsing the child opts...");
- return getMemoryFromOptString(childOpts);
- } else {
- return Integer.valueOf(newMemoryProperty);
- }
- }
-
- public JobImpl(ApplicationAttemptId appAttemptId,
- Configuration jobConfiguration, YarnRPC yarnRPC, ApplicationMasterProtocol amrmRPC,
- String jobFile, BSPJobID jobId) {
- super();
- this.appAttemptId = appAttemptId;
- this.yarnRPC = yarnRPC;
- this.resourceManager = amrmRPC;
- this.jobFile = new Path(jobFile);
- this.state = JobState.NEW;
- this.jobId = jobId;
- this.conf = jobConfiguration;
- this.numBSPTasks = conf.getInt("bsp.peers.num", 1);
- this.childOpts = conf.get("bsp.child.java.opts");
-
- this.taskMemoryInMb = getMemoryRequirements();
- }
-
- // This really needs a testcase
- private static int getMemoryFromOptString(String opts) {
- if (opts == null) {
- return DEFAULT_MEMORY_MB;
- }
-
- if (!opts.contains("-Xmx")) {
- LOG.info("No \"-Xmx\" option found in child opts, using default amount of memory!");
- return DEFAULT_MEMORY_MB;
- } else {
- // e.G: -Xmx512m
-
- int startIndex = opts.indexOf("-Xmx") + 4;
- String xmxString = opts.substring(startIndex);
- char qualifier = xmxString.charAt(xmxString.length() - 1);
- int memory = Integer.valueOf(xmxString.substring(0,
- xmxString.length() - 1));
- if (qualifier == 'm') {
- return memory;
- } else if (qualifier == 'g') {
- return memory * 1024;
- } else {
- throw new IllegalArgumentException(
- "Memory Limit in child opts was not set! \"bsp.child.java.opts\" String was: "
- + opts);
- }
- }
- }
-
- @Override
- public JobState startJob() throws Exception {
-
- this.allocatedContainers = new ArrayList<Container>(numBSPTasks);
- NMTokenCache nmTokenCache = new NMTokenCache();
- while (allocatedContainers.size() < numBSPTasks) {
- AllocateRequest req = AllocateRequest.newInstance(lastResponseID, 0.0f,
- createBSPTaskRequest(numBSPTasks - allocatedContainers.size(), taskMemoryInMb,
- priority), releasedContainers, null);
-
- AllocateResponse allocateResponse = resourceManager.allocate(req);
- for (NMToken token : allocateResponse.getNMTokens()) {
- nmTokenCache.setToken(token.getNodeId().toString(), token.getToken());
- }
-
- LOG.info("Got response ID: " + allocateResponse.getResponseId()
- + " with num of containers: "
- + allocateResponse.getAllocatedContainers().size()
- + " and following resources: "
- + allocateResponse.getAvailableResources().getMemory() + "mb");
- this.lastResponseID = allocateResponse.getResponseId();
-
- this.allocatedContainers.addAll(allocateResponse.getAllocatedContainers());
-
- LOG.info("Waiting to allocate " + (numBSPTasks - allocatedContainers.size()) + " more containers...");
-
- Thread.sleep(1000l);
- }
-
- LOG.info("Got " + allocatedContainers.size() + " containers!");
-
- int id = 0;
- for (Container allocatedContainer : allocatedContainers) {
- LOG.info("Launching task on a new container." + ", containerId="
- + allocatedContainer.getId() + ", containerNode="
- + allocatedContainer.getNodeId().getHost() + ":"
- + allocatedContainer.getNodeId().getPort() + ", containerNodeURI="
- + allocatedContainer.getNodeHttpAddress() + ", containerResourceMemory"
- + allocatedContainer.getResource().getMemory());
-
- // Connect to ContainerManager on the allocated container
- String user = conf.get("bsp.user.name");
- if (user == null) {
- user = System.getenv(ApplicationConstants.Environment.USER.name());
- }
-
- ContainerManagementProtocol cm = null;
- try {
- cm = getContainerManagementProtocolProxy(yarnRPC,
- nmTokenCache.getToken(allocatedContainer.getNodeId().toString()), allocatedContainer.getNodeId(), user);
- } catch (Exception e) {
- LOG.error("Failed to create ContainerManager...");
- if (cm != null)
- yarnRPC.stopProxy(cm, conf);
- e.printStackTrace();
- }
-
- BSPTaskLauncher runnableLaunchContainer = new BSPTaskLauncher(id,
- allocatedContainer, cm, conf, jobFile, jobId);
-
- launchers.put(id, runnableLaunchContainer);
- runnableLaunchContainer.start();
- completionQueue.add(runnableLaunchContainer);
- id++;
- }
-
- LOG.info("Waiting for tasks to finish...");
- state = JobState.RUNNING;
- int completed = 0;
-
- while (completed != numBSPTasks) {
- for (BSPTaskLauncher task : completionQueue) {
- BSPTaskStatus returnedTask = task.poll();
- if(returnedTask != null && returnedTask.getExitStatus() == 0) {
- LOG.info("Task \"" + returnedTask.getId()
- + "\" sucessfully finished!");
- completed++;
- LOG.info("Waiting for " + (numBSPTasks - completed)
- + " tasks to finish!");
- }
-
- if(returnedTask != null && returnedTask.getExitStatus() != 0) {
- LOG.error("Task with id \"" + returnedTask.getId() + "\" failed!");
- completionQueue.add(task);
- state = JobState.FAILED;
- return state;
- }
- }
- Thread.sleep(1000L);
- }
-
- state = JobState.SUCCESS;
- return state;
- }
-
- /**
- *
- * @param rpc
- * @param nmToken
- * @param nodeId
- * @param user
- * @return
- */
- protected ContainerManagementProtocol getContainerManagementProtocolProxy(
- final YarnRPC rpc, Token nmToken, NodeId nodeId, String user) {
- ContainerManagementProtocol proxy;
- UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
- final InetSocketAddress addr =
- NetUtils.createSocketAddr(nodeId.getHost(), nodeId.getPort());
- if (nmToken != null) {
- ugi.addToken(ConverterUtils.convertFromYarn(nmToken, addr));
- }
-
- proxy = ugi
- .doAs(new PrivilegedAction<ContainerManagementProtocol>() {
- @Override
- public ContainerManagementProtocol run() {
- return (ContainerManagementProtocol) rpc.getProxy(
- ContainerManagementProtocol.class,
- addr, conf);
- }
- });
- return proxy;
- }
-
- /**
- * Makes a lookup for the taskid and stops its container and task. It also
- * removes the task from the launcher so that we won't have to stop it twice.
- *
- * @param id
- * @throws YarnException
- */
- private void cleanupTask(int id) throws YarnException, IOException {
- BSPTaskLauncher bspTaskLauncher = launchers.get(id);
- bspTaskLauncher.stopAndCleanup();
- launchers.remove(id);
- completionQueue.remove(bspTaskLauncher);
- }
-
- private List<ResourceRequest> createBSPTaskRequest(int numTasks,
- int memoryInMb, int priority) {
-
- List<ResourceRequest> reqList = new ArrayList<ResourceRequest>(numTasks);
- for (int i = 0; i < numTasks; i++) {
- // Resource Request
- ResourceRequest rsrcRequest = Records.newRecord(ResourceRequest.class);
-
- // setup requirements for hosts
- // whether a particular rack/host is needed
- // useful for applications that are sensitive
- // to data locality
- rsrcRequest.setResourceName("*");
-
- // set the priority for the request
- Priority pri = Records.newRecord(Priority.class);
- pri.setPriority(priority);
- rsrcRequest.setPriority(pri);
-
- // Set up resource type requirements
- // For now, only memory is supported so we set memory requirements
- Resource capability = Records.newRecord(Resource.class);
- capability.setMemory(memoryInMb);
- rsrcRequest.setCapability(capability);
-
- // set no. of containers needed
- // matching the specifications
- rsrcRequest.setNumContainers(numBSPTasks);
- reqList.add(rsrcRequest);
- }
- return reqList;
- }
-
- @Override
- public void cleanup() throws YarnException, IOException {
- for (BSPTaskLauncher launcher : completionQueue) {
- LOG.info("cleanup tasks !!!");
- launcher.stopAndCleanup();
- }
- }
-
- @Override
- public JobState getState() {
- return state;
- }
-
- @Override
- public int getTotalBSPTasks() {
- return numBSPTasks;
- }
-
- @Override
- public BSPPhase getBSPPhase() {
- return phase;
- }
-
-}
diff --git a/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java b/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java
index 153baa1..4d23384 100644
--- a/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java
+++ b/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java
@@ -33,10 +33,9 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.records.*;
import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.YarnClientApplication;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -131,7 +130,6 @@
yarnConf = new YarnConfiguration(conf);
yarnClient = YarnClient.createYarnClient();
yarnClient.init(yarnConf);
- yarnClient.start();
}
@Override
@@ -155,6 +153,7 @@
LOG.debug("Retrieved username: " + s);
}
+ yarnClient.start();
try {
YarnClusterMetrics clusterMetrics = yarnClient.getYarnClusterMetrics();
LOG.info("Got Cluster metric info from ASM"
@@ -188,14 +187,16 @@
}
}
- GetNewApplicationRequest request = Records.newRecord(GetNewApplicationRequest.class);
- GetNewApplicationResponse response = job.getApplicationsManager().getNewApplication(request);
- id = response.getApplicationId();
+ // Get a new application id
+ YarnClientApplication app = yarnClient.createApplication();
+
// Create a new ApplicationSubmissionContext
- ApplicationSubmissionContext appContext = Records.newRecord(ApplicationSubmissionContext.class);
- // set the ApplicationId
- appContext.setApplicationId(this.id);
+ //ApplicationSubmissionContext appContext = Records.newRecord(ApplicationSubmissionContext.class);
+ ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
+
+ id = appContext.getApplicationId();
+
// set the application name
appContext.setApplicationName(job.getJobName());
@@ -227,7 +228,11 @@
localResources.put(YARNBSPConstants.APP_MASTER_JAR_PATH, amJarRsrc);
// add hama related jar files to localresources for container
- List<File> hamaJars = localJarfromPath(System.getProperty("hama.home.dir"));
+ List<File> hamaJars;
+ if (System.getProperty("hama.home.dir") != null)
+ hamaJars = localJarfromPath(System.getProperty("hama.home.dir"));
+ else
+ hamaJars = localJarfromPath(getConf().get("hama.home.dir"));
String hamaPath = getSystemDir() + "/hama";
for (File fileEntry : hamaJars) {
addToLocalResources(fs, fileEntry.getCanonicalPath(),
@@ -266,7 +271,7 @@
Vector<CharSequence> vargs = new Vector<CharSequence>(5);
vargs.add("${JAVA_HOME}/bin/java");
vargs.add("-cp " + classPathEnv + "");
- vargs.add(BSPApplicationMaster.class.getCanonicalName());
+ vargs.add(ApplicationMaster.class.getCanonicalName());
vargs.add(submitJobFile.makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString());
vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/hama-appmaster.stdout");
diff --git a/yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java b/yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java
index 4d8eea8..7d74557 100644
--- a/yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java
+++ b/yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java
@@ -74,12 +74,13 @@
}
}
- //fs.delete(OUTPUT_PATH, true);
+ fs.delete(OUTPUT_PATH, true);
}
public static void main(String[] args) throws IOException,
InterruptedException, ClassNotFoundException {
HamaConfiguration conf = new HamaConfiguration();
+ conf.set("hama.home.dir", System.getenv().get("HAMA_HOME"));
YARNBSPJob job = new YARNBSPJob(conf);
job.setBspClass(HelloBSP.class);