HAMA-939: Refactoring which was implement using out-of-date status response

git-svn-id: https://svn.apache.org/repos/asf/hama/trunk@1683197 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index e914bb1..b689b5d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -26,6 +26,7 @@
 
   IMPROVEMENTS
   
+   HAMA-939: Refactoring which was implement using out-of-date status response (Minho Kim via edwardyoon)
    HAMA-955: Support UnsafeByteArrayInputStream and UnSafeByteArrayOutputStream (Minho Kim via edwardyoon)
    HAMA-944: Add JSON format option to fastgen command (Minho Kim via edwardyoon)
    HAMA-919: Manage messages per Vertex (edwardyoon)
diff --git a/yarn/src/main/java/org/apache/hama/bsp/ApplicationMaster.java b/yarn/src/main/java/org/apache/hama/bsp/ApplicationMaster.java
new file mode 100644
index 0000000..62663fd
--- /dev/null
+++ b/yarn/src/main/java/org/apache/hama/bsp/ApplicationMaster.java
@@ -0,0 +1,1011 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.ExitUtil;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.*;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
+import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.hama.Constants;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.sync.SyncServer;
+import org.apache.hama.bsp.sync.SyncServiceFactory;
+import org.apache.hama.ipc.BSPPeerProtocol;
+import org.apache.hama.ipc.RPC;
+import org.apache.hama.ipc.Server;
+import org.apache.hama.util.BSPNetUtils;
+import org.apache.log4j.LogManager;
+
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class ApplicationMaster  implements BSPClient, BSPPeerProtocol {
+  private static final Log LOG = LogFactory.getLog(ApplicationMaster.class);
+
+  // Configuration
+  private Configuration localConf;
+  private Configuration jobConf;
+
+  private String jobFile;
+  private String applicationName;
+  // RPC info where the AM receive client side requests
+  private String hostname;
+  private int clientPort;
+  private FileSystem fs;
+  private static int id = 0;
+
+  private volatile long superstep;
+  private Counters globalCounter = new Counters();
+  private BSPJobClient.RawSplit[] splits;
+
+  private BSPJobID jobId;
+
+  // SyncServer for Zookeeper
+  private SyncServer syncServer;
+
+  // Zookeeper thread pool
+  private static final ExecutorService threadPool = Executors
+      .newFixedThreadPool(1);
+
+  // RPC info where the AM receive client side requests
+  private int taskServerPort;
+
+  private Server clientServer;
+  private Server taskServer;
+
+  // Handle to communicate with the Resource Manager
+  @SuppressWarnings("rawtypes")
+  private AMRMClientAsync amRMClient;
+
+  // In both secure and non-secure modes, this points to the job-submitter.
+  @VisibleForTesting
+  UserGroupInformation appSubmitterUgi;
+
+  // Handle to communicate with the Node Manager
+  private NMClientAsync nmClientAsync;
+  // Listen to process the response from the Node Manager
+  private NMCallbackHandler containerListener;
+
+  // Application Attempt Id ( combination of attemptId and fail count )
+  @VisibleForTesting
+  protected ApplicationAttemptId appAttemptID;
+
+
+  // TODO
+  // For status update for clients - yet to be implemented
+  // Hostname of the container
+  private String appMasterHostname = "";
+  // Port on which the app master listens for status updates from clients
+  private int appMasterRpcPort = -1;
+  // Tracking url to which app master publishes info for clients to monitor
+  private String appMasterTrackingUrl = "";
+
+  // App Master configuration
+  // No. of containers to run shell command on
+  @VisibleForTesting
+  protected int numTotalContainers;
+  // Memory to request for the container on which the shell command will run
+  private int containerMemory;
+  // VirtualCores to request for the container on which the shell command will run
+  private int containerVirtualCores = 1;
+
+  // Priority of the request
+  private int requestPriority = 0;
+
+  // Counter for completed containers ( complete denotes successful or failed )
+  private AtomicInteger numCompletedContainers = new AtomicInteger();
+  // Allocated container count so that we know how many containers has the RM
+  // allocated to us
+  @VisibleForTesting
+  protected AtomicInteger numAllocatedContainers = new AtomicInteger();
+  // Count of failed containers
+  private AtomicInteger numFailedContainers = new AtomicInteger();
+  // Count of containers already requested from the RM
+  // Needed as once requested, we should not request for containers again.
+  // Only request for more if the original requirement changes.
+  @VisibleForTesting
+  protected AtomicInteger numRequestedContainers = new AtomicInteger();
+
+  private volatile boolean done;
+  private ByteBuffer allTokens;
+
+  // Launch threads
+  private List<Thread> launchThreads = new ArrayList<Thread>();
+
+  @VisibleForTesting
+  protected final Set<ContainerId> launchedContainers =
+      Collections.newSetFromMap(new ConcurrentHashMap<ContainerId, Boolean>());
+
+  public ApplicationMaster() {
+    // Set up the configuration
+    this.localConf = new YarnConfiguration();
+  }
+
+  public static void main(String[] args) throws IOException {
+    boolean result = false;
+    ApplicationMaster appMaster = new ApplicationMaster();
+    
+    try {
+      LOG.info("Initializing ApplicationMaster");
+      boolean doRun = appMaster.init(args);
+      if (!doRun) {
+        System.exit(0);
+      }
+      appMaster.run();
+      result = appMaster.finish();
+    } catch (Throwable t) {
+      LOG.fatal("Error running ApplicationMaster", t);
+      LogManager.shutdown();
+      ExitUtil.terminate(1, t);
+    } finally {
+      appMaster.close();
+    }
+    
+    if (result) {
+      LOG.info("Application Master completed successfully. exiting");
+      System.exit(0);
+    } else {
+      LOG.info("Application Master failed. exiting");
+      System.exit(2);
+    }
+  }
+
+  public boolean init(String[] args) throws Exception {
+    if (args.length != 1) {
+      throw new IllegalArgumentException();
+    }
+    this.jobFile = args[0];
+    this.jobConf = getSubmitConfiguration(jobFile);
+    localConf.addResource(localConf);
+    fs = FileSystem.get(jobConf);
+
+    this.applicationName = jobConf.get("bsp.job.name",
+        "<no bsp job name defined>");
+    if (applicationName.isEmpty()) {
+      this.applicationName = "<no bsp job name defined>";
+    }
+
+    appAttemptID = getApplicationAttemptId();
+    this.jobId = new BSPJobID(appAttemptID.toString(), 0);
+    this.appMasterHostname = BSPNetUtils.getCanonicalHostname();
+    this.appMasterTrackingUrl = "http://localhost:8088";
+    this.numTotalContainers = this.jobConf.getInt("bsp.peers.num", 1);
+    this.containerMemory = getMemoryRequirements(jobConf);
+
+    this.hostname = BSPNetUtils.getCanonicalHostname();
+    this.clientPort = BSPNetUtils.getFreePort(12000);
+
+    // Set configuration for starting SyncServer which run Zookeeper
+    this.jobConf.set(Constants.ZOOKEEPER_QUORUM, appMasterHostname);
+
+    // start our synchronization service
+    startSyncServer();
+
+    // start RPC server
+    startRPCServers();
+
+    /*
+     * Make sure that this executes after the start the RPC servers, because we
+     * are readjusting the configuration.
+     */
+    rewriteSubmitConfiguration(jobFile, jobConf);
+
+    String jobSplit = jobConf.get("bsp.job.split.file");
+    splits = null;
+    if (jobSplit != null) {
+      DataInputStream splitFile = fs.open(new Path(jobSplit));
+      try {
+        splits = BSPJobClient.readSplitFile(splitFile);
+      } finally {
+        splitFile.close();
+      }
+    }
+
+    return true;
+  }
+
+  /**
+   * Main run function for the application master
+   *
+   * @throws org.apache.hadoop.yarn.exceptions.YarnException
+   * @throws IOException
+   */
+  @SuppressWarnings({ "unchecked" })
+  public void run() throws YarnException, IOException, InterruptedException {
+    LOG.info("Starting ApplicationMaster");
+
+    // Note: Credentials, Token, UserGroupInformation, DataOutputBuffer class
+    // are marked as LimitedPrivate
+    Credentials credentials =
+        UserGroupInformation.getCurrentUser().getCredentials();
+    DataOutputBuffer dob = new DataOutputBuffer();
+    credentials.writeTokenStorageToStream(dob);
+    // Now remove the AM->RM token so that containers cannot access it.
+    Iterator<Token<?>> iter = credentials.getAllTokens().iterator();
+    LOG.info("Executing with tokens:");
+    while (iter.hasNext()) {
+      Token<?> token = iter.next();
+      LOG.info(token);
+      if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
+        iter.remove();
+      }
+    }
+    allTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+
+    // Create appSubmitterUgi and add original tokens to it
+    String appSubmitterUserName =
+        System.getenv(ApplicationConstants.Environment.USER.name());
+    appSubmitterUgi =
+        UserGroupInformation.createRemoteUser(appSubmitterUserName);
+    appSubmitterUgi.addCredentials(credentials);
+
+
+    AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
+    amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
+    amRMClient.init(localConf);
+    amRMClient.start();
+
+    containerListener = createNMCallbackHandler();
+    nmClientAsync = new NMClientAsyncImpl(containerListener);
+    nmClientAsync.init(localConf);
+    nmClientAsync.start();
+
+    // Setup local RPC Server to accept status requests directly from clients
+    // TODO need to setup a protocol for client to be able to communicate to
+    // the RPC server
+    // TODO use the rpc port info to register with the RM for the client to
+    // send requests to this app master
+
+    // Register self with ResourceManager
+    // This will start heartbeating to the RM
+    appMasterHostname = NetUtils.getHostname();
+    RegisterApplicationMasterResponse response = amRMClient
+        .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
+            appMasterTrackingUrl);
+    // Dump out information about cluster capability as seen by the
+    // resource manager
+    int maxMem = response.getMaximumResourceCapability().getMemory();
+    LOG.info("Max mem capability of resources in this cluster " + maxMem);
+
+    int maxVCores = response.getMaximumResourceCapability().getVirtualCores();
+    LOG.info("Max vcores capability of resources in this cluster " + maxVCores);
+
+    // A resource ask cannot exceed the max.
+    if (containerMemory > maxMem) {
+      LOG.info("Container memory specified above max threshold of cluster."
+          + " Using max value." + ", specified=" + containerMemory + ", max="
+          + maxMem);
+      containerMemory = maxMem;
+    }
+
+    if (containerVirtualCores > maxVCores) {
+      LOG.info("Container virtual cores specified above max threshold of cluster."
+          + " Using max value." + ", specified=" + containerVirtualCores + ", max="
+          + maxVCores);
+      containerVirtualCores = maxVCores;
+    }
+
+    List<Container> previousAMRunningContainers =
+        response.getContainersFromPreviousAttempts();
+    LOG.info(appAttemptID + " received " + previousAMRunningContainers.size()
+        + " previous attempts' running containers on AM registration.");
+    for(Container container: previousAMRunningContainers) {
+      launchedContainers.add(container.getId());
+    }
+    numAllocatedContainers.addAndGet(previousAMRunningContainers.size());
+
+
+    int numTotalContainersToRequest =
+        numTotalContainers - previousAMRunningContainers.size();
+    // Setup ask for containers from RM
+    // Send request for containers to RM
+    // Until we get our fully allocated quota, we keep on polling RM for
+    // containers
+    // Keep looping until all the containers are launched and shell script
+    // executed on them ( regardless of success/failure).
+    for (int i = 0; i < numTotalContainersToRequest; ++i) {
+      AMRMClient.ContainerRequest containerAsk = setupContainerAskForRM();
+      amRMClient.addContainerRequest(containerAsk);
+    }
+    numRequestedContainers.set(numTotalContainers);
+  }
+
+  @VisibleForTesting
+  NMCallbackHandler createNMCallbackHandler() {
+    return new NMCallbackHandler(this);
+  }
+
+  @VisibleForTesting
+  protected boolean finish() {
+    // wait for completion.
+    while (!done
+        && (numCompletedContainers.get() != numTotalContainers)) {
+      try {
+        Thread.sleep(200);
+      } catch (InterruptedException ex) {}
+    }
+
+    // Join all launched threads
+    // needed for when we time out
+    // and we need to release containers
+    for (Thread launchThread : launchThreads) {
+      try {
+        launchThread.join(10000);
+      } catch (InterruptedException e) {
+        LOG.info("Exception thrown in thread join: " + e.getMessage());
+        e.printStackTrace();
+      }
+    }
+
+    // When the application completes, it should stop all running containers
+    LOG.info("Application completed. Stopping running containers");
+    nmClientAsync.stop();
+
+    // When the application completes, it should send a finish application
+    // signal to the RM
+    LOG.info("Application completed. Signalling finish to RM");
+
+    FinalApplicationStatus appStatus;
+    String appMessage = null;
+    boolean success = true;
+    if (numFailedContainers.get() == 0 &&
+        numCompletedContainers.get() == numTotalContainers) {
+      appStatus = FinalApplicationStatus.SUCCEEDED;
+    } else {
+      appStatus = FinalApplicationStatus.FAILED;
+      appMessage = "Diagnostics." + ", total=" + numTotalContainers
+          + ", completed=" + numCompletedContainers.get() + ", allocated="
+          + numAllocatedContainers.get() + ", failed="
+          + numFailedContainers.get();
+      LOG.info(appMessage);
+      success = false;
+    }
+    try {
+      amRMClient.unregisterApplicationMaster(appStatus, appMessage, null);
+    } catch (YarnException ex) {
+      LOG.error("Failed to unregister application", ex);
+    } catch (IOException e) {
+      LOG.error("Failed to unregister application", e);
+    }
+
+    amRMClient.stop();
+
+    return success;
+  }
+
+  private class RMCallbackHandler implements AMRMClientAsync.CallbackHandler {
+    @SuppressWarnings("unchecked")
+    @Override
+    public void onContainersCompleted(List<ContainerStatus> completedContainers) {
+      LOG.info("Got response from RM for container ask, completedCnt="
+          + completedContainers.size());
+      for (ContainerStatus containerStatus : completedContainers) {
+        LOG.info(appAttemptID + " got container status for containerID="
+            + containerStatus.getContainerId() + ", state="
+            + containerStatus.getState() + ", exitStatus="
+            + containerStatus.getExitStatus() + ", diagnostics="
+            + containerStatus.getDiagnostics());
+
+        // non complete containers should not be here
+        assert (containerStatus.getState() == ContainerState.COMPLETE);
+        // ignore containers we know nothing about - probably from a previous
+        // attempt
+        if (!launchedContainers.contains(containerStatus.getContainerId())) {
+          LOG.info("Ignoring completed status of "
+              + containerStatus.getContainerId()
+              + "; unknown container(probably launched by previous attempt)");
+          continue;
+        }
+
+        // increment counters for completed/failed containers
+        int exitStatus = containerStatus.getExitStatus();
+        if (0 != exitStatus) {
+          // container failed
+          if (ContainerExitStatus.ABORTED != exitStatus) {
+            // shell script failed
+            // counts as completed
+            numCompletedContainers.incrementAndGet();
+            numFailedContainers.incrementAndGet();
+          } else {
+            // container was killed by framework, possibly preempted
+            // we should re-try as the container was lost for some reason
+            numAllocatedContainers.decrementAndGet();
+            numRequestedContainers.decrementAndGet();
+            // we do not need to release the container as it would be done
+            // by the RM
+          }
+        } else {
+          // nothing to do
+          // container completed successfully
+          numCompletedContainers.incrementAndGet();
+          LOG.info("Container completed successfully." + ", containerId="
+              + containerStatus.getContainerId());
+        }
+      }
+
+      // ask for more containers if any failed
+      int askCount = numTotalContainers - numRequestedContainers.get();
+      numRequestedContainers.addAndGet(askCount);
+
+      if (askCount > 0) {
+        for (int i = 0; i < askCount; ++i) {
+          AMRMClient.ContainerRequest containerAsk = setupContainerAskForRM();
+          amRMClient.addContainerRequest(containerAsk);
+        }
+      }
+
+      if (numCompletedContainers.get() == numTotalContainers) {
+        done = true;
+      }
+    }
+
+    @Override
+    public void onContainersAllocated(List<Container> allocatedContainers) {
+      LOG.info("Got response from RM for container ask, allocatedCnt="
+          + allocatedContainers.size());
+      numAllocatedContainers.addAndGet(allocatedContainers.size());
+      for (Container allocatedContainer : allocatedContainers) {
+        LOG.info("Launching shell command on a new container."
+            + ", containerId=" + allocatedContainer.getId()
+            + ", containerNode=" + allocatedContainer.getNodeId().getHost()
+            + ":" + allocatedContainer.getNodeId().getPort()
+            + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress()
+            + ", containerResourceMemory"
+            + allocatedContainer.getResource().getMemory()
+            + ", containerResourceVirtualCores"
+            + allocatedContainer.getResource().getVirtualCores());
+        // + ", containerToken"
+        // +allocatedContainer.getContainerToken().getIdentifier().toString());
+
+        Thread launchThread = createLaunchContainerThread(allocatedContainer);
+
+        // launch and start the container on a separate thread to keep
+        // the main thread unblocked
+        // as all containers may not be allocated at one go.
+        launchThreads.add(launchThread);
+        launchedContainers.add(allocatedContainer.getId());
+        launchThread.start();
+        id++;
+      }
+    }
+
+    @Override
+    public void onShutdownRequest() {
+      done = true;
+    }
+
+    @Override
+    public void onNodesUpdated(List<NodeReport> list) {
+
+    }
+
+    @Override
+    public float getProgress() {
+      // set progress to deliver to RM on next heartbeat
+      float progress = (float) numCompletedContainers.get()
+          / numTotalContainers;
+      return progress;
+    }
+
+    @Override
+    public void onError(Throwable throwable) {
+      done = true;
+      amRMClient.stop();
+    }
+  }
+
+  @VisibleForTesting
+  static class NMCallbackHandler
+      implements NMClientAsync.CallbackHandler {
+
+    private ConcurrentMap<ContainerId, Container> containers =
+        new ConcurrentHashMap<ContainerId, Container>();
+    private final ApplicationMaster applicationMaster;
+
+    public NMCallbackHandler(ApplicationMaster applicationMaster) {
+      this.applicationMaster = applicationMaster;
+    }
+
+    public void addContainer(ContainerId containerId, Container container) {
+      containers.putIfAbsent(containerId, container);
+    }
+
+    @Override
+    public void onContainerStopped(ContainerId containerId) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Succeeded to stop Container " + containerId);
+      }
+      containers.remove(containerId);
+    }
+
+    @Override
+    public void onContainerStatusReceived(ContainerId containerId,
+        ContainerStatus containerStatus) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Container Status: id=" + containerId + ", status=" +
+            containerStatus);
+      }
+    }
+
+    @Override
+    public void onContainerStarted(ContainerId containerId,
+        Map<String, ByteBuffer> allServiceResponse) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Succeeded to start Container " + containerId);
+      }
+      Container container = containers.get(containerId);
+      if (container != null) {
+        applicationMaster.nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId());
+      }
+    }
+
+    @Override
+    public void onStartContainerError(ContainerId containerId, Throwable t) {
+      LOG.error("Failed to start Container " + containerId);
+      containers.remove(containerId);
+      applicationMaster.numCompletedContainers.incrementAndGet();
+      applicationMaster.numFailedContainers.incrementAndGet();
+    }
+
+    @Override
+    public void onGetContainerStatusError(
+        ContainerId containerId, Throwable t) {
+      LOG.error("Failed to query the status of Container " + containerId);
+    }
+
+    @Override
+    public void onStopContainerError(ContainerId containerId, Throwable t) {
+      LOG.error("Failed to stop Container " + containerId);
+      containers.remove(containerId);
+    }
+  }
+
+  /**
+   * Thread to connect to the {@link ContainerManagementProtocol} and launch the container
+   * that will execute the shell command.
+   */
+  private class LaunchContainerRunnable implements Runnable {
+
+    // Allocated container
+    Container container;
+
+    NMCallbackHandler containerListener;
+
+    Configuration conf;
+
+    /**
+     * @param lcontainer        Allocated container
+     * @param containerListener Callback handler of the container
+     */
+    public LaunchContainerRunnable(
+        Container lcontainer, NMCallbackHandler containerListener, Configuration conf) {
+      this.container = lcontainer;
+      this.containerListener = containerListener;
+      this.conf = conf;
+    }
+
+    /**
+     * Connects to CM, sets up container launch context
+     * for shell command and eventually dispatches the container
+     * start request to the CM.
+     */
+    @Override
+    public void run() {
+      LOG.info("Setting up container launch container for containerid="
+          + container.getId());
+      // Now we setup a ContainerLaunchContext
+      ContainerLaunchContext ctx = Records
+          .newRecord(ContainerLaunchContext.class);
+
+      // Set the local resources
+      Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
+      LocalResource packageResource = Records.newRecord(LocalResource.class);
+      FileSystem fs = null;
+      try {
+        fs = FileSystem.get(conf);
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
+      Path packageFile = new Path(System.getenv(YARNBSPConstants.HAMA_YARN_LOCATION));
+      URL packageUrl = null;
+      try {
+        packageUrl = ConverterUtils.getYarnUrlFromPath(packageFile
+          .makeQualified(fs.getUri(), fs.getWorkingDirectory()));
+        LOG.info("PackageURL has been composed to " + packageUrl.toString());
+        LOG.info("Reverting packageURL to path: "
+            + ConverterUtils.getPathFromYarnURL(packageUrl));
+      } catch (URISyntaxException e) {
+        LOG.fatal("If you see this error the workarround does not work", e);
+        numCompletedContainers.incrementAndGet();
+        numFailedContainers.incrementAndGet();
+        return;
+      }
+
+      packageResource.setResource(packageUrl);
+      packageResource.setSize(Long.parseLong(System.getenv(YARNBSPConstants.HAMA_YARN_SIZE)));
+      packageResource.setTimestamp(Long.parseLong(System.getenv(YARNBSPConstants.HAMA_YARN_TIMESTAMP)));
+      packageResource.setType(LocalResourceType.FILE);
+      packageResource.setVisibility(LocalResourceVisibility.APPLICATION);
+
+      localResources.put(YARNBSPConstants.APP_MASTER_JAR_PATH, packageResource);
+
+      Path hamaReleaseFile = new Path(System.getenv(YARNBSPConstants.HAMA_LOCATION));
+      URL hamaReleaseUrl = ConverterUtils.getYarnUrlFromPath(hamaReleaseFile
+          .makeQualified(fs.getUri(), fs.getWorkingDirectory()));
+      LOG.info("Hama release URL has been composed to " + hamaReleaseUrl
+          .toString());
+
+      RemoteIterator<LocatedFileStatus> fileStatusListIterator = null;
+      try {
+        fileStatusListIterator = fs.listFiles(
+            hamaReleaseFile, true);
+
+        while(fileStatusListIterator.hasNext()) {
+          LocatedFileStatus lfs = fileStatusListIterator.next();
+          LocalResource localRsrc = LocalResource.newInstance(
+              ConverterUtils.getYarnUrlFromPath(lfs.getPath()),
+              LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
+              lfs.getLen(), lfs.getModificationTime());
+          localResources.put(lfs.getPath().getName(), localRsrc);
+        }
+      } catch (IOException e) {
+        LOG.fatal("The error has occured to RemoteIterator  " + e);
+      }
+
+      ctx.setLocalResources(localResources);
+
+    /*
+     * TODO Package classpath seems not to work if you're in pseudo distributed
+     * mode, because the resource must not be moved, it will never be unpacked.
+     * So we will check if our jar file has the file:// prefix and put it into
+     * the CP directly
+     */
+
+      StringBuilder classPathEnv = new StringBuilder(
+          ApplicationConstants.Environment.CLASSPATH.$()).append(File.pathSeparatorChar)
+          .append("./*");
+      for (String c : conf.getStrings(
+          YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+          YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
+        classPathEnv.append(File.pathSeparatorChar);
+        classPathEnv.append(c.trim());
+      }
+
+      Vector<CharSequence> vargs = new Vector<CharSequence>();
+      vargs.add("${JAVA_HOME}/bin/java");
+      vargs.add("-cp " + classPathEnv + "");
+      vargs.add(BSPRunner.class.getCanonicalName());
+
+      vargs.add(jobId.getJtIdentifier());
+      vargs.add(Integer.toString(id));
+      vargs.add(
+          new Path(jobFile).makeQualified(fs.getUri(), fs.getWorkingDirectory())
+              .toString());
+
+      vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/hama-worker.stdout");
+      vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/hama-worker.stderr");
+
+      // Get final commmand
+      StringBuilder command = new StringBuilder();
+      for (CharSequence str : vargs) {
+        command.append(str).append(" ");
+      }
+
+      List<String> commands = new ArrayList<String>();
+      commands.add(command.toString());
+
+      ctx.setCommands(commands);
+      ctx.setTokens(allTokens.duplicate());
+      LOG.info("Starting commands: " + commands);
+
+      containerListener.addContainer(container.getId(), container);
+      nmClientAsync.startContainerAsync(container, ctx);
+    }
+  }
+
+  /**
+   * Setup the request that will be sent to the RM for the container ask.
+   *
+   * @return the setup ResourceRequest to be sent to RM
+   */
+  private AMRMClient.ContainerRequest setupContainerAskForRM() {
+    // setup requirements for hosts
+    // using * as any host will do for the distributed shell app
+    // set the priority for the request
+    // TODO - what is the range for priority? how to decide?
+    Priority pri = Priority.newInstance(requestPriority);
+
+    // Set up resource type requirements
+    // For now, memory and CPU are supported so we set memory and cpu requirements
+    Resource capability = Resource.newInstance(containerMemory,
+        containerVirtualCores);
+
+    AMRMClient.ContainerRequest request = new AMRMClient.ContainerRequest(capability, null, null,
+        pri);
+    LOG.info("Requested container ask: " + request.toString());
+    return request;
+  }
+
+  /**
+   * Reads the configuration from the given path.
+   */
+  private static Configuration getSubmitConfiguration(String path)
+      throws IOException {
+    Path jobSubmitPath = new Path(path);
+    Configuration jobConf = new HamaConfiguration();
+
+    FileSystem fs = FileSystem.get(URI.create(path), jobConf);
+
+    InputStream in =fs.open(jobSubmitPath);
+    jobConf.addResource(in);
+
+    return jobConf;
+  }
+
+  /**
+   * Gets the application attempt ID from the environment. This should be set by
+   * YARN when the container has been launched.
+   *
+   * @return a new ApplicationAttemptId which is unique and identifies this
+   *         task.
+   */
+  private static ApplicationAttemptId getApplicationAttemptId()
+      throws IOException {
+    Map<String, String> envs = System.getenv();
+    if (!envs.containsKey(ApplicationConstants.Environment.CONTAINER_ID.name())) {
+      throw new IllegalArgumentException(
+          "ApplicationAttemptId not set in the environment");
+    }
+
+    LOG.info("app attempt id!!!");
+    ContainerId containerId = ConverterUtils.toContainerId(envs
+        .get(ApplicationConstants.Environment.CONTAINER_ID.name()));
+    return containerId.getApplicationAttemptId();
+  }
+
+  /**
+   * This method starts the sync server on a specific port and waits for it to
+   * come up. Be aware that this method adds the "bsp.sync.server.address" that
+   * is needed for a task to connect to the service.
+   *
+   * @throws IOException
+   */
+  private void startSyncServer() throws Exception {
+    syncServer = SyncServiceFactory.getSyncServer(jobConf);
+    syncServer.init(jobConf);
+
+    ZKServerThread serverThread = new ZKServerThread(syncServer);
+    threadPool.submit(serverThread);
+  }
+
+  /**
+   * This method is to run Zookeeper in order to coordinates between BSPMaster and Groomservers
+   * using Runnable interface in java.
+   */
+  private static class ZKServerThread implements Runnable {
+    SyncServer server;
+
+    ZKServerThread(SyncServer s) {
+      server = s;
+    }
+
+    @Override
+    public void run() {
+      try {
+        server.start();
+      } catch (Exception e) {
+        LOG.error("Error running SyncServer.", e);
+      }
+    }
+  }
+
+  /**
+   * This method starts the needed RPC servers: client server and the task
+   * server. This method manipulates the configuration and therefore needs to be
+   * executed BEFORE the submitconfiguration gets rewritten.
+   *
+   * @throws IOException
+   */
+  private void startRPCServers() throws IOException {
+    // start the RPC server which talks to the client
+    this.clientServer = RPC.getServer(BSPClient.class, hostname, clientPort, jobConf);
+    this.clientServer.start();
+
+    // start the RPC server which talks to the tasks
+    this.taskServerPort = BSPNetUtils.getFreePort(10000);
+    this.taskServer = RPC.getServer(this, hostname, taskServerPort, jobConf);
+    this.taskServer.start();
+
+    // readjusting the configuration to let the tasks know where we are.
+    this.jobConf.set("hama.umbilical.address", hostname + ":" + taskServerPort);
+  }
+
+  /**
+   * Writes the current configuration to a given path to reflect changes. For
+   * example the sync server address is put after the file has been written.
+   */
+  private static void rewriteSubmitConfiguration(String path, Configuration conf)
+      throws IOException {
+    Path jobSubmitPath = new Path(path);
+    FileSystem fs = FileSystem.get(conf);
+    FSDataOutputStream out = fs.create(jobSubmitPath);
+    conf.writeXml(out);
+    out.close();
+
+    LOG.info("Written new configuration back to " + path);
+  }
+
+  /**
+   * Get container memory from "bsp.child.mem.in.mb" set on Hama configuration
+   * @return The memory of container.
+   */
+  private int getMemoryRequirements(Configuration conf) {
+    String newMemoryProperty = conf.get("bsp.child.mem.in.mb");
+    if (newMemoryProperty == null) {
+      LOG.warn("\"bsp.child.mem.in.mb\" was not set! Try parsing the child opts...");
+      return getMemoryFromOptString(conf.get("bsp.child.java.opts"));
+    } else {
+      return Integer.valueOf(newMemoryProperty);
+    }
+  }
+
+  // This really needs a testcase
+  private static int getMemoryFromOptString(String opts) {
+    final int DEFAULT_MEMORY_MB = 256;
+
+    if (opts == null) {
+      return DEFAULT_MEMORY_MB;
+    }
+
+    if (!opts.contains("-Xmx")) {
+      LOG.info(
+          "No \"-Xmx\" option found in child opts, using default amount of memory!");
+      return DEFAULT_MEMORY_MB;
+    } else {
+      // e.G: -Xmx512m
+
+      int startIndex = opts.indexOf("-Xmx") + 4;
+      String xmxString = opts.substring(startIndex);
+      char qualifier = xmxString.charAt(xmxString.length() - 1);
+      int memory = Integer
+          .valueOf(xmxString.substring(0, xmxString.length() - 1));
+      if (qualifier == 'm') {
+        return memory;
+      } else if (qualifier == 'g') {
+        return memory * 1024;
+      } else {
+        throw new IllegalArgumentException(
+            "Memory Limit in child opts was not set! \"bsp.child.java.opts\" String was: "
+                + opts);
+      }
+    }
+  }
+
+  @VisibleForTesting
+  Thread createLaunchContainerThread(Container allocatedContainer) {
+    LaunchContainerRunnable runnableLaunchContainer =
+        new LaunchContainerRunnable(allocatedContainer, containerListener, jobConf);
+    return new Thread(runnableLaunchContainer);
+  }
+
+  @Override
+  public LongWritable getCurrentSuperStep() {
+    return new LongWritable(superstep);
+  }
+
+  @Override
+  public Task getTask(TaskAttemptID taskid) throws IOException {
+    BSPJobClient.RawSplit assignedSplit = null;
+    String splitName = NullInputFormat.NullInputSplit.class.getName();
+    //String splitName = NullInputSplit.class.getCanonicalName();
+    if (splits != null) {
+      assignedSplit = splits[taskid.id];
+      splitName = assignedSplit.getClassName();
+      return new BSPTask(jobId, jobFile, taskid, taskid.id, splitName,
+          assignedSplit.getBytes());
+    } else {
+      return new BSPTask(jobId, jobFile, taskid, taskid.id, splitName,
+          new BytesWritable());
+    }
+  }
+
+  @Override
+  public boolean ping(TaskAttemptID taskid) throws IOException {
+    return false;
+  }
+
+  @Override
+  public void done(TaskAttemptID taskid) throws IOException {
+
+  }
+
+  @Override
+  public void fsError(TaskAttemptID taskId, String message) throws IOException {
+
+  }
+
+  @Override
+  public void fatalError(TaskAttemptID taskId, String message)
+      throws IOException {
+
+  }
+
+  @Override
+  public boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus)
+      throws IOException, InterruptedException {
+    if (taskStatus.getSuperstepCount() > superstep) {
+      superstep = taskStatus.getSuperstepCount();
+      LOG.info("Now in superstep " + superstep);
+    }
+
+    Counters counters = taskStatus.getCounters();
+    globalCounter.incrAllCounters(counters);
+
+    return true;
+  }
+
+  @Override
+  public int getAssignedPortNum(TaskAttemptID taskid) {
+    return 0;
+  }
+
+  @Override
+  public void close() throws IOException {
+    this.clientServer.stop();
+    this.taskServer.stop();
+  }
+
+  @Override
+  public long getProtocolVersion(String protocol, long clientVersion)
+      throws IOException {
+    return BSPClient.versionID;
+  }
+}
diff --git a/yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java b/yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java
deleted file mode 100644
index b032fe2..0000000
--- a/yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java
+++ /dev/null
@@ -1,483 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hama.bsp;
-
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.InetSocketAddress;
-import java.net.URI;
-import java.security.PrivilegedAction;
-import java.util.Collection;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
-import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
-import org.apache.hadoop.yarn.util.Clock;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.hadoop.yarn.util.Records;
-import org.apache.hadoop.yarn.util.SystemClock;
-import org.apache.hama.Constants;
-import org.apache.hama.HamaConfiguration;
-import org.apache.hama.bsp.Job.JobState;
-import org.apache.hama.bsp.sync.SyncServer;
-import org.apache.hama.bsp.sync.SyncServiceFactory;
-import org.apache.hama.ipc.BSPPeerProtocol;
-import org.apache.hama.ipc.RPC;
-import org.apache.hama.ipc.Server;
-import org.apache.hama.util.BSPNetUtils;
-
-
-/**
- * BSPApplicationMaster is an application master for Apache Hamas BSP Engine.
- */
-public class BSPApplicationMaster implements BSPClient, BSPPeerProtocol {
-
-  private static final Log LOG = LogFactory.getLog(BSPApplicationMaster.class);
-  private static final ExecutorService threadPool = Executors
-      .newFixedThreadPool(1);
-
-  private Configuration localConf;
-  private Configuration jobConf;
-  private String jobFile;
-
-  private Clock clock;
-  private YarnRPC yarnRPC;
-
-  private ApplicationMasterProtocol amrmRPC;
-
-  private ApplicationAttemptId appAttemptId;
-  private String applicationName;
-  private long startTime;
-
-  private JobImpl job;
-  private BSPJobID jobId;
-
-  // RPC info where the AM receive client side requests
-  private String hostname;
-  private int clientPort;
-  private int taskServerPort;
-
-  private Server clientServer;
-  private Server taskServer;
-
-  private volatile long superstep;
-  //private SyncServerRunner syncServer;
-  private SyncServer syncServer;
-
-  private Counters globalCounter = new Counters();
-
-  private FileSystem fs;
-  private BSPJobClient.RawSplit[] splits;
-
-  private BSPApplicationMaster(String[] args) throws Exception {
-    if (args.length != 1) {
-      throw new IllegalArgumentException();
-    }
-
-    this.jobFile = args[0];
-
-    this.jobConf = getSubmitConfiguration(jobFile);
-
-    this.localConf = new YarnConfiguration();
-    localConf.addResource(localConf);
-    fs = FileSystem.get(jobConf);
-
-    this.applicationName = jobConf.get("bsp.job.name",
-        "<no bsp job name defined>");
-    if (applicationName.isEmpty()) {
-      this.applicationName = "<no bsp job name defined>";
-    }
-
-    this.appAttemptId = getApplicationAttemptId();
-
-    this.yarnRPC = YarnRPC.create(localConf);
-    this.clock = new SystemClock();
-    this.startTime = clock.getTime();
-
-    this.jobId = new BSPJobID(appAttemptId.toString(), 0);
-
-    this.hostname = BSPNetUtils.getCanonicalHostname();
-    this.clientPort = BSPNetUtils.getFreePort(12000);
-
-    // Set configuration for starting SyncServer which run Zookeeper
-    this.jobConf.set(Constants.ZOOKEEPER_QUORUM, hostname);
-
-    // start our synchronization service
-    startSyncServer();
-
-    startRPCServers();
-
-    /*
-     * Make sure that this executes after the start the RPC servers, because we
-     * are readjusting the configuration.
-     */
-
-    rewriteSubmitConfiguration(jobFile, jobConf);
-
-    String jobSplit = jobConf.get("bsp.job.split.file");
-    splits = null;
-    if (jobSplit != null) {
-      DataInputStream splitFile = fs.open(new Path(jobSplit));
-      try {
-        splits = BSPJobClient.readSplitFile(splitFile);
-      } finally {
-        splitFile.close();
-      }
-    }
-
-    this.amrmRPC = getYarnRPCConnection(localConf);
-    registerApplicationMaster(amrmRPC, hostname, clientPort,
-        "http://localhost:8080");
-  }
-
-  /**
-   * This method starts the needed RPC servers: client server and the task
-   * server. This method manipulates the configuration and therefore needs to be
-   * executed BEFORE the submitconfiguration gets rewritten.
-   * 
-   * @throws IOException
-   */
-  private void startRPCServers() throws IOException {
-    // start the RPC server which talks to the client
-    this.clientServer = RPC.getServer(BSPClient.class, hostname, clientPort, jobConf);
-    this.clientServer.start();
-
-    // start the RPC server which talks to the tasks
-    this.taskServerPort = BSPNetUtils.getFreePort(10000);
-    this.taskServer = RPC.getServer(this, hostname, taskServerPort, jobConf);
-    this.taskServer.start();
-
-    // readjusting the configuration to let the tasks know where we are.
-    this.jobConf.set("hama.umbilical.address", hostname + ":" + taskServerPort);
-  }
-
-  /**
-   * This method starts the sync server on a specific port and waits for it to
-   * come up. Be aware that this method adds the "bsp.sync.server.address" that
-   * is needed for a task to connect to the service.
-   * 
-   * @throws IOException
-   */
-  private void startSyncServer() throws Exception {
-    syncServer = SyncServiceFactory.getSyncServer(jobConf);
-    syncServer.init(jobConf);
-
-    ZKServerThread serverThread = new ZKServerThread(syncServer);
-    threadPool.submit(serverThread);
-  }
-
-  private static class ZKServerThread implements Runnable {
-    SyncServer server;
-
-    ZKServerThread(SyncServer s) {
-      server = s;
-    }
-
-    @Override
-    public void run() {
-      try {
-        server.start();
-      } catch (Exception e) {
-        LOG.error("Error running SyncServer.", e);
-      }
-    }
-  }
-
-  /**
-   * Connects to the Resource Manager.
-   * 
-   * @param yarnConf
-   * @return a new RPC connection to the Resource Manager.
-   */
-  private ApplicationMasterProtocol getYarnRPCConnection(Configuration yarnConf) throws IOException {
-    // Connect to the Scheduler of the ResourceManager.
-    UserGroupInformation currentUser = UserGroupInformation.createRemoteUser(appAttemptId.toString());
-    Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
-
-    final InetSocketAddress rmAddress = NetUtils.createSocketAddr(yarnConf.get(
-        YarnConfiguration.RM_SCHEDULER_ADDRESS,
-        YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS));
-
-    Token<? extends TokenIdentifier> amRMToken = setupAndReturnAMRMToken(rmAddress, credentials.getAllTokens());
-    currentUser.addToken(amRMToken);
-
-    final Configuration conf = yarnConf;
-
-    ApplicationMasterProtocol client = currentUser
-        .doAs(new PrivilegedAction<ApplicationMasterProtocol>() {
-          @Override
-          public ApplicationMasterProtocol run() {
-            return (ApplicationMasterProtocol) yarnRPC.getProxy(ApplicationMasterProtocol.class, rmAddress, conf);
-          }
-        });
-    LOG.info("Connecting to ResourceManager at " + rmAddress);
-    return client;
-  }
-
-  private Token<? extends TokenIdentifier> setupAndReturnAMRMToken(
-      InetSocketAddress rmBindAddress,
-      Collection<Token<? extends TokenIdentifier>> allTokens) {
-    for (Token<? extends TokenIdentifier> token : allTokens) {
-      if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
-        SecurityUtil.setTokenService(token, rmBindAddress);
-        return token;
-      }
-    }
-    return null;
-  }
-
-
-  /**
-   * Registers this application master with the Resource Manager and retrieves a
-   * response which is used to launch additional containers.
-   */
-  private static RegisterApplicationMasterResponse registerApplicationMaster(
-      ApplicationMasterProtocol resourceManager, String appMasterHostName, int appMasterRpcPort,
-      String appMasterTrackingUrl) throws YarnException, IOException {
-
-    RegisterApplicationMasterRequest appMasterRequest = Records
-        .newRecord(RegisterApplicationMasterRequest.class);
-    appMasterRequest.setHost(appMasterHostName);
-    appMasterRequest.setRpcPort(appMasterRpcPort);
-    // TODO tracking URL
-    appMasterRequest.setTrackingUrl(appMasterTrackingUrl);
-    RegisterApplicationMasterResponse response = resourceManager
-        .registerApplicationMaster(appMasterRequest);
-    LOG.info("ApplicationMaster has maximum resource capability of: "
-        + response.getMaximumResourceCapability().getMemory());
-    return response;
-  }
-
-  /**
-   * Gets the application attempt ID from the environment. This should be set by
-   * YARN when the container has been launched.
-   * 
-   * @return a new ApplicationAttemptId which is unique and identifies this
-   *         task.
-   */
-  private static ApplicationAttemptId getApplicationAttemptId()
-      throws IOException {
-    Map<String, String> envs = System.getenv();
-    if (!envs.containsKey(Environment.CONTAINER_ID.name())) {
-      throw new IllegalArgumentException(
-          "ApplicationAttemptId not set in the environment");
-    }
-
-    return ConverterUtils.toContainerId(
-        envs.get(Environment.CONTAINER_ID.name()))
-        .getApplicationAttemptId();
-  }
-
-  private void start() throws IOException, YarnException /*throws Exception*/ {
-    JobState finalState = null;
-    try {
-      job = new JobImpl(appAttemptId, jobConf, yarnRPC, amrmRPC, jobFile, jobId);
-      finalState = job.startJob();
-    } catch (Exception e) {
-      LOG.error("error was occured. cleaning up");
-      e.printStackTrace();
-    } finally {
-      if (finalState != null) {
-        LOG.info("Job \"" + applicationName + "\"'s state after completion: "
-            + finalState.toString());
-        LOG.info("Job took " + ((clock.getTime() - startTime) / 1000L)
-            + "s to finish!");
-      }
-      LOG.info("job is cleaning up");
-      job.cleanup();
-    }
-  }
-
-  private void cleanup() throws YarnException, IOException {
-    //syncServer.stop();
-    syncServer.stopServer();
-
-    if (threadPool != null && !threadPool.isShutdown()) {
-      threadPool.shutdownNow();
-    }
-
-    clientServer.stop();
-    taskServer.stop();
-    FinishApplicationMasterRequest finishReq = Records
-        .newRecord(FinishApplicationMasterRequest.class);
-    switch (job.getState()) {
-      case SUCCESS:
-        finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
-        break;
-      case KILLED:
-        finishReq.setFinalApplicationStatus(FinalApplicationStatus.KILLED);
-        break;
-      case FAILED:
-        finishReq.setFinalApplicationStatus(FinalApplicationStatus.FAILED);
-        break;
-      default:
-        finishReq.setFinalApplicationStatus(FinalApplicationStatus.FAILED);
-    }
-    this.amrmRPC.finishApplicationMaster(finishReq);
-  }
-
-  public static void main(String[] args) throws YarnException, IOException {
-    // we expect getting the qualified path of the job.xml as the first
-    // element in the arguments
-    BSPApplicationMaster master = null;
-    try {
-      master = new BSPApplicationMaster(args);
-      master.start();
-    } catch (Exception e) {
-      LOG.fatal("Error starting BSPApplicationMaster", e);
-    } finally {
-      if (master != null) {
-        master.cleanup();
-      }
-    }
-  }
-
-  /**
-   * Reads the configuration from the given path.
-   */
-  private static Configuration getSubmitConfiguration(String path)
-      throws IOException {
-    Path jobSubmitPath = new Path(path);
-    Configuration jobConf = new HamaConfiguration();
-
-    FileSystem fs = FileSystem.get(URI.create(path), jobConf);
-
-    InputStream in =fs.open(jobSubmitPath);
-    jobConf.addResource(in);
-
-    return jobConf;
-  }
-
-  /**
-   * Writes the current configuration to a given path to reflect changes. For
-   * example the sync server address is put after the file has been written.
-   */
-  private static void rewriteSubmitConfiguration(String path, Configuration conf)
-      throws IOException {
-    Path jobSubmitPath = new Path(path);
-    FileSystem fs = FileSystem.get(conf);
-    FSDataOutputStream out = fs.create(jobSubmitPath);
-    conf.writeXml(out);
-    out.close();
-
-    LOG.info("Written new configuration back to " + path);
-  }
-
-  @Override
-  public long getProtocolVersion(String arg0, long arg1) throws IOException {
-    return BSPClient.versionID;
-  }
-
-  @Override
-  public LongWritable getCurrentSuperStep() {
-    return new LongWritable(superstep);
-  }
-
-  @Override
-  public boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus)
-      throws IOException, InterruptedException {
-    if (taskStatus.getSuperstepCount() > superstep) {
-      superstep = taskStatus.getSuperstepCount();
-      LOG.info("Now in superstep " + superstep);
-    }
-
-    Counters counters = taskStatus.getCounters();
-    globalCounter.incrAllCounters(counters);
-
-    return true;
-  }
-
-  /**
-   * most of the following methods are already handled over YARN and with the
-   * JobImpl.
-   */
-
-  @Override
-  public void close() throws IOException {
-
-  }
-
-  @Override
-  public Task getTask(TaskAttemptID taskid) throws IOException {
-    BSPJobClient.RawSplit assignedSplit = null;
-    String splitName = NullInputFormat.NullInputSplit.class.getName();
-    //String splitName = NullInputSplit.class.getCanonicalName();
-    if (splits != null) {
-      assignedSplit = splits[taskid.id];
-      splitName = assignedSplit.getClassName();
-      return new BSPTask(jobId, jobFile, taskid, taskid.id, splitName,
-          assignedSplit.getBytes());
-    } else {
-      return new BSPTask(jobId, jobFile, taskid, taskid.id, splitName,
-          new BytesWritable());
-    }
-  }
-
-  @Override
-  public boolean ping(TaskAttemptID taskid) throws IOException {
-    return false;
-  }
-
-  @Override
-  public void done(TaskAttemptID taskid) throws IOException {
-
-  }
-
-  @Override
-  public void fsError(TaskAttemptID taskId, String message) throws IOException {
-
-  }
-
-  @Override
-  public void fatalError(TaskAttemptID taskId, String message)
-      throws IOException {
-
-  }
-
-  @Override
-  public int getAssignedPortNum(TaskAttemptID taskid) {
-    return 0;
-  }
-
-}
diff --git a/yarn/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java b/yarn/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java
deleted file mode 100644
index ce08603..0000000
--- a/yarn/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java
+++ /dev/null
@@ -1,285 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hama.bsp;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URISyntaxException;
-import java.util.*;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocatedFileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.*;
-import org.apache.hadoop.yarn.api.records.*;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.hadoop.yarn.util.Records;
-
-public class BSPTaskLauncher {
-
-  private static final Log LOG = LogFactory.getLog(BSPTaskLauncher.class);
-
-  private final Container allocatedContainer;
-  private final int id;
-  private final ContainerManagementProtocol cm;
-  private final Configuration conf;
-  private String user;
-  private final Path jobFile;
-  private final BSPJobID jobId;
-
-  private GetContainerStatusesRequest statusRequest;
-  
-  @Override
-  protected void finalize() throws Throwable {
-    stopAndCleanup();
-  }
-
-  public BSPTaskLauncher(int id, Container container, ContainerManagementProtocol cm,
-                         Configuration conf, Path jobFile, BSPJobID jobId)
-      throws YarnException {
-    this.id = id;
-    this.cm = cm;
-    this.conf = conf;
-    this.allocatedContainer = container;
-    this.jobFile = jobFile;
-    this.jobId = jobId;
-    // FIXME why does this contain mapreduce here?
-    this.user = conf.get("bsp.user.name");
-    if (this.user == null) {
-      this.user = conf.get("mapreduce.job.user.name");
-    }
-  }
-
-  public void stopAndCleanup() throws YarnException, IOException {
-    StopContainersRequest stopRequest = Records.newRecord(StopContainersRequest.class);
-    List<ContainerId> containerIds = new ArrayList<ContainerId>();
-    containerIds.add(allocatedContainer.getId());
-    LOG.info("getId : " + allocatedContainer.getId());
-    stopRequest.setContainerIds(containerIds);
-    LOG.info("StopContainer : " + stopRequest.getContainerIds());
-    cm.stopContainers(stopRequest);
-
-  }
-
-  public void start() throws IOException, YarnException {
-    LOG.info("Spawned task with id: " + this.id
-        + " for allocated container id: "
-        + this.allocatedContainer.getId().toString());
-    statusRequest = setupContainer(allocatedContainer, cm, user, id);
-  }
-
-  /**
-   * This polls the current container status from container manager. Null if the
-   * container hasn't finished yet.
-   * 
-   * @return
-   * @throws Exception
-   */
-  public BSPTaskStatus poll() throws Exception {
-
-    ContainerStatus lastStatus = null;
-    GetContainerStatusesResponse getContainerStatusesResponse = cm.getContainerStatuses(statusRequest);
-    List<ContainerStatus> containerStatuses = getContainerStatusesResponse.getContainerStatuses();
-    if (containerStatuses.size() <= 0) {
-      LOG.info("container Statuses size is zero");
-      return null;
-    }
-
-    for (ContainerStatus containerStatus : containerStatuses) {
-      LOG.info("Got container status for containerID=" + containerStatus
-          .getContainerId() + ", state=" + containerStatus.getState()
-          + ", exitStatus=" + containerStatus.getExitStatus() + ", diagnostics="
-          + containerStatus.getDiagnostics());
-
-      if (containerStatus.getContainerId().equals(allocatedContainer.getId())) {
-        lastStatus = containerStatus;
-        break;
-      }
-    }
-
-    if (lastStatus.getState() != ContainerState.COMPLETE) {
-      LOG.info("Not completed...");
-      return null;
-    }
-    LOG.info(this.id + " Last report comes with exitstatus of " + lastStatus
-        .getExitStatus() + " and diagnose string of " + lastStatus
-        .getDiagnostics());
-
-    return new BSPTaskStatus(id, lastStatus.getExitStatus());
-  }
-
-  private GetContainerStatusesRequest setupContainer(
-      Container allocatedContainer, ContainerManagementProtocol cm, String user, int id) throws IOException, YarnException {
-    LOG.info("Setting up a container for user " + user + " with id of " + id
-        + " and containerID of " + allocatedContainer.getId() + " as " + user);
-    // Now we setup a ContainerLaunchContext
-    ContainerLaunchContext ctx = Records
-        .newRecord(ContainerLaunchContext.class);
-
-    // Set the local resources
-    Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
-    LocalResource packageResource = Records.newRecord(LocalResource.class);
-    FileSystem fs = FileSystem.get(conf);
-    Path packageFile = new Path(System.getenv(YARNBSPConstants.HAMA_YARN_LOCATION));
-    URL packageUrl = ConverterUtils.getYarnUrlFromPath(packageFile
-        .makeQualified(fs.getUri(), fs.getWorkingDirectory()));
-    LOG.info("PackageURL has been composed to " + packageUrl.toString());
-    try {
-      LOG.info("Reverting packageURL to path: "
-          + ConverterUtils.getPathFromYarnURL(packageUrl));
-    } catch (URISyntaxException e) {
-      LOG.fatal("If you see this error the workarround does not work", e);
-    }
-
-    packageResource.setResource(packageUrl);
-    packageResource.setSize(Long.parseLong(System.getenv(YARNBSPConstants.HAMA_YARN_SIZE)));
-    packageResource.setTimestamp(Long.parseLong(System.getenv(YARNBSPConstants.HAMA_YARN_TIMESTAMP)));
-    packageResource.setType(LocalResourceType.FILE);
-    packageResource.setVisibility(LocalResourceVisibility.APPLICATION);
-
-    localResources.put(YARNBSPConstants.APP_MASTER_JAR_PATH, packageResource);
-
-    Path hamaReleaseFile = new Path(System.getenv(YARNBSPConstants.HAMA_LOCATION));
-    URL hamaReleaseUrl = ConverterUtils.getYarnUrlFromPath(hamaReleaseFile
-        .makeQualified(fs.getUri(), fs.getWorkingDirectory()));
-    LOG.info("Hama release URL has been composed to " + hamaReleaseUrl.toString());
-
-    RemoteIterator<LocatedFileStatus> fileStatusListIterator = fs.listFiles(
-        hamaReleaseFile, true);
-
-    while(fileStatusListIterator.hasNext()) {
-      LocatedFileStatus lfs = fileStatusListIterator.next();
-      LocalResource localRsrc = LocalResource.newInstance(
-          ConverterUtils.getYarnUrlFromPath(lfs.getPath()),
-          LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
-          lfs.getLen(), lfs.getModificationTime());
-      localResources.put(lfs.getPath().getName(), localRsrc);
-    }
-
-    ctx.setLocalResources(localResources);
-
-    /*
-     * TODO Package classpath seems not to work if you're in pseudo distributed
-     * mode, because the resource must not be moved, it will never be unpacked.
-     * So we will check if our jar file has the file:// prefix and put it into
-     * the CP directly
-     */
-
-    StringBuilder classPathEnv = new StringBuilder(
-        ApplicationConstants.Environment.CLASSPATH.$()).append(File.pathSeparatorChar)
-        .append("./*");
-    for (String c : conf.getStrings(
-        YarnConfiguration.YARN_APPLICATION_CLASSPATH,
-        YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
-      classPathEnv.append(File.pathSeparatorChar);
-      classPathEnv.append(c.trim());
-    }
-
-    Vector<CharSequence> vargs = new Vector<CharSequence>();
-    vargs.add("${JAVA_HOME}/bin/java");
-    vargs.add("-cp " + classPathEnv + "");
-    vargs.add(BSPRunner.class.getCanonicalName());
-    
-    vargs.add(jobId.getJtIdentifier());
-    vargs.add(Integer.toString(id));
-    vargs.add(this.jobFile.makeQualified(fs.getUri(), fs.getWorkingDirectory())
-        .toString());
-
-    vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/bsp.stdout");
-    vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/bsp.stderr");
-
-    // Get final commmand
-    StringBuilder command = new StringBuilder();
-    for (CharSequence str : vargs) {
-      command.append(str).append(" ");
-    }
-
-    List<String> commands = new ArrayList<String>();
-    commands.add(command.toString());
-
-    ctx.setCommands(commands);
-    LOG.info("Starting command: " + commands);
-
-    StartContainerRequest startReq = Records
-        .newRecord(StartContainerRequest.class);
-    startReq.setContainerLaunchContext(ctx);
-    startReq.setContainerToken(allocatedContainer.getContainerToken());
-
-    List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
-    list.add(startReq);
-    StartContainersRequest requestList = StartContainersRequest.newInstance(list);
-    cm.startContainers(requestList);
-
-    GetContainerStatusesRequest statusReq = Records
-        .newRecord(GetContainerStatusesRequest.class);
-    List<ContainerId> containerIds = new ArrayList<ContainerId>();
-    containerIds.add(allocatedContainer.getId());
-    statusReq.setContainerIds(containerIds);
-    return statusReq;
-  }
-
-  @Override
-  public int hashCode() {
-    final int prime = 31;
-    int result = 1;
-    result = prime * result + id;
-    return result;
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (this == obj)
-      return true;
-    if (obj == null)
-      return false;
-    if (getClass() != obj.getClass())
-      return false;
-    BSPTaskLauncher other = (BSPTaskLauncher) obj;
-    if (id != other.id)
-      return false;
-    return true;
-  }
-
-  public static class BSPTaskStatus {
-    private final int id;
-    private final int exitStatus;
-
-    public BSPTaskStatus(int id, int exitStatus) {
-      super();
-      this.id = id;
-      this.exitStatus = exitStatus;
-    }
-
-    public int getId() {
-      return id;
-    }
-
-    public int getExitStatus() {
-      return exitStatus;
-    }
-  }
-
-}
diff --git a/yarn/src/main/java/org/apache/hama/bsp/Job.java b/yarn/src/main/java/org/apache/hama/bsp/Job.java
deleted file mode 100644
index 7745301..0000000
--- a/yarn/src/main/java/org/apache/hama/bsp/Job.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hama.bsp;
-
-import org.apache.hadoop.yarn.exceptions.YarnException;
-
-import java.io.IOException;
-
-/**
- * Main interface to interact with the job. Provides only getters.
- */
-public interface Job {
-
-  public enum JobState {
-    NEW, RUNNING, SUCCESS, FAILED, KILLED
-  }
-
-  public enum BSPPhase {
-    COMPUTATION, COMMUNICATION
-  }
-
-  public JobState startJob() throws Exception;
-
-  public void cleanup() throws YarnException, IOException;
-
-  JobState getState();
-
-  BSPPhase getBSPPhase();
-
-  int getTotalBSPTasks();
-
-}
diff --git a/yarn/src/main/java/org/apache/hama/bsp/JobImpl.java b/yarn/src/main/java/org/apache/hama/bsp/JobImpl.java
deleted file mode 100644
index 650295c..0000000
--- a/yarn/src/main/java/org/apache/hama/bsp/JobImpl.java
+++ /dev/null
@@ -1,324 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * regarding copyright ownership.  The ASF licenses this file
- * distributed with this work for additional information
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hama.bsp;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.security.PrivilegedAction;
-import java.util.*;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
-import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.api.records.*;
-import org.apache.hadoop.yarn.api.records.Token;
-import org.apache.hadoop.yarn.client.api.NMTokenCache;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.hadoop.yarn.util.Records;
-import org.apache.hama.bsp.BSPTaskLauncher.BSPTaskStatus;
-
-public class JobImpl implements Job {
-
-  private static final Log LOG = LogFactory.getLog(JobImpl.class);
-  private static final int DEFAULT_MEMORY_MB = 256;
-
-  private Configuration conf;
-  private BSPJobID jobId;
-  private int numBSPTasks;
-  private int priority = 0;
-  private String childOpts;
-  private int taskMemoryInMb;
-  private Path jobFile;
-
-  private JobState state;
-  private BSPPhase phase;
-
-  private ApplicationAttemptId appAttemptId;
-  private YarnRPC yarnRPC;
-  private ApplicationMasterProtocol resourceManager;
-
-  private List<Container> allocatedContainers;
-  private List<ContainerId> releasedContainers = Collections.emptyList();
-
-  private Map<Integer, BSPTaskLauncher> launchers = new HashMap<Integer, BSPTaskLauncher>();
-  private Deque<BSPTaskLauncher> completionQueue = new LinkedList<BSPTaskLauncher>();
-
-  private int lastResponseID = 0;
-
-  private int getMemoryRequirements() {
-    String newMemoryProperty = conf.get("bsp.child.mem.in.mb");
-    if (newMemoryProperty == null) {
-      LOG.warn("\"bsp.child.mem.in.mb\" was not set! Try parsing the child opts...");
-      return getMemoryFromOptString(childOpts);
-    } else {
-      return Integer.valueOf(newMemoryProperty);
-    }
-  }
-
-  public JobImpl(ApplicationAttemptId appAttemptId,
-                 Configuration jobConfiguration, YarnRPC yarnRPC, ApplicationMasterProtocol amrmRPC,
-                 String jobFile, BSPJobID jobId) {
-    super();
-    this.appAttemptId = appAttemptId;
-    this.yarnRPC = yarnRPC;
-    this.resourceManager = amrmRPC;
-    this.jobFile = new Path(jobFile);
-    this.state = JobState.NEW;
-    this.jobId = jobId;
-    this.conf = jobConfiguration;
-    this.numBSPTasks = conf.getInt("bsp.peers.num", 1);
-    this.childOpts = conf.get("bsp.child.java.opts");
-
-    this.taskMemoryInMb = getMemoryRequirements();
-  }
-
-  // This really needs a testcase
-  private static int getMemoryFromOptString(String opts) {
-    if (opts == null) {
-      return DEFAULT_MEMORY_MB;
-    }
-
-    if (!opts.contains("-Xmx")) {
-      LOG.info("No \"-Xmx\" option found in child opts, using default amount of memory!");
-      return DEFAULT_MEMORY_MB;
-    } else {
-      // e.G: -Xmx512m
-
-      int startIndex = opts.indexOf("-Xmx") + 4;
-      String xmxString = opts.substring(startIndex);
-      char qualifier = xmxString.charAt(xmxString.length() - 1);
-      int memory = Integer.valueOf(xmxString.substring(0,
-          xmxString.length() - 1));
-      if (qualifier == 'm') {
-        return memory;
-      } else if (qualifier == 'g') {
-        return memory * 1024;
-      } else {
-        throw new IllegalArgumentException(
-            "Memory Limit in child opts was not set! \"bsp.child.java.opts\" String was: "
-                + opts);
-      }
-    }
-  }
-
-  @Override
-  public JobState startJob() throws Exception {
-
-    this.allocatedContainers = new ArrayList<Container>(numBSPTasks);
-    NMTokenCache nmTokenCache = new NMTokenCache();
-    while (allocatedContainers.size() < numBSPTasks) {
-      AllocateRequest req = AllocateRequest.newInstance(lastResponseID, 0.0f,
-          createBSPTaskRequest(numBSPTasks - allocatedContainers.size(), taskMemoryInMb,
-              priority), releasedContainers, null);
-
-      AllocateResponse allocateResponse = resourceManager.allocate(req);
-      for (NMToken token : allocateResponse.getNMTokens()) {
-        nmTokenCache.setToken(token.getNodeId().toString(), token.getToken());
-      }
-
-      LOG.info("Got response ID: " + allocateResponse.getResponseId()
-          + " with num of containers: "
-          + allocateResponse.getAllocatedContainers().size()
-          + " and following resources: "
-          + allocateResponse.getAvailableResources().getMemory() + "mb");
-      this.lastResponseID = allocateResponse.getResponseId();
-
-      this.allocatedContainers.addAll(allocateResponse.getAllocatedContainers());
-
-      LOG.info("Waiting to allocate " + (numBSPTasks - allocatedContainers.size()) + " more containers...");
-
-      Thread.sleep(1000l);
-    }
-
-    LOG.info("Got " + allocatedContainers.size() + " containers!");
-
-    int id = 0;
-    for (Container allocatedContainer : allocatedContainers) {
-      LOG.info("Launching task on a new container." + ", containerId="
-          + allocatedContainer.getId() + ", containerNode="
-          + allocatedContainer.getNodeId().getHost() + ":"
-          + allocatedContainer.getNodeId().getPort() + ", containerNodeURI="
-          + allocatedContainer.getNodeHttpAddress() + ", containerResourceMemory"
-          + allocatedContainer.getResource().getMemory());
-
-      // Connect to ContainerManager on the allocated container
-      String user = conf.get("bsp.user.name");
-      if (user == null) {
-        user = System.getenv(ApplicationConstants.Environment.USER.name());
-      }
-
-      ContainerManagementProtocol cm = null;
-      try {
-        cm = getContainerManagementProtocolProxy(yarnRPC,
-            nmTokenCache.getToken(allocatedContainer.getNodeId().toString()), allocatedContainer.getNodeId(), user);
-      } catch (Exception e) {
-        LOG.error("Failed to create ContainerManager...");
-        if (cm != null)
-          yarnRPC.stopProxy(cm, conf);
-        e.printStackTrace();
-      }
-
-      BSPTaskLauncher runnableLaunchContainer = new BSPTaskLauncher(id,
-          allocatedContainer, cm, conf, jobFile, jobId);
-
-      launchers.put(id, runnableLaunchContainer);
-      runnableLaunchContainer.start();
-      completionQueue.add(runnableLaunchContainer);
-      id++;
-    }
-
-    LOG.info("Waiting for tasks to finish...");
-    state = JobState.RUNNING;
-    int completed = 0;
-
-    while (completed != numBSPTasks) {
-      for (BSPTaskLauncher task : completionQueue) {
-        BSPTaskStatus returnedTask = task.poll();
-        if(returnedTask != null && returnedTask.getExitStatus() == 0) {
-          LOG.info("Task \"" + returnedTask.getId()
-              + "\" sucessfully finished!");
-          completed++;
-          LOG.info("Waiting for " + (numBSPTasks - completed)
-              + " tasks to finish!");
-        }
-
-        if(returnedTask != null && returnedTask.getExitStatus() != 0) {
-          LOG.error("Task with id \"" + returnedTask.getId() + "\" failed!");
-          completionQueue.add(task);
-          state = JobState.FAILED;
-          return state;
-        }
-      }
-      Thread.sleep(1000L);
-    }
-
-    state = JobState.SUCCESS;
-    return state;
-  }
-
-  /**
-   *
-   * @param rpc
-   * @param nmToken
-   * @param nodeId
-   * @param user
-   * @return
-   */
-  protected ContainerManagementProtocol getContainerManagementProtocolProxy(
-      final YarnRPC rpc, Token nmToken, NodeId nodeId, String user) {
-    ContainerManagementProtocol proxy;
-    UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
-    final InetSocketAddress addr =
-        NetUtils.createSocketAddr(nodeId.getHost(), nodeId.getPort());
-    if (nmToken != null) {
-      ugi.addToken(ConverterUtils.convertFromYarn(nmToken, addr));
-    }
-
-    proxy = ugi
-        .doAs(new PrivilegedAction<ContainerManagementProtocol>() {
-          @Override
-          public ContainerManagementProtocol run() {
-            return (ContainerManagementProtocol) rpc.getProxy(
-                ContainerManagementProtocol.class,
-                addr, conf);
-          }
-        });
-    return proxy;
-  }
-
-  /**
-   * Makes a lookup for the taskid and stops its container and task. It also
-   * removes the task from the launcher so that we won't have to stop it twice.
-   * 
-   * @param id
-   * @throws YarnException
-   */
-  private void cleanupTask(int id) throws YarnException, IOException {
-    BSPTaskLauncher bspTaskLauncher = launchers.get(id);
-    bspTaskLauncher.stopAndCleanup();
-    launchers.remove(id);
-    completionQueue.remove(bspTaskLauncher);
-  }
-
-  private List<ResourceRequest> createBSPTaskRequest(int numTasks,
-      int memoryInMb, int priority) {
-
-    List<ResourceRequest> reqList = new ArrayList<ResourceRequest>(numTasks);
-    for (int i = 0; i < numTasks; i++) {
-      // Resource Request
-      ResourceRequest rsrcRequest = Records.newRecord(ResourceRequest.class);
-
-      // setup requirements for hosts
-      // whether a particular rack/host is needed
-      // useful for applications that are sensitive
-      // to data locality
-      rsrcRequest.setResourceName("*");
-
-      // set the priority for the request
-      Priority pri = Records.newRecord(Priority.class);
-      pri.setPriority(priority);
-      rsrcRequest.setPriority(pri);
-
-      // Set up resource type requirements
-      // For now, only memory is supported so we set memory requirements
-      Resource capability = Records.newRecord(Resource.class);
-      capability.setMemory(memoryInMb);
-      rsrcRequest.setCapability(capability);
-
-      // set no. of containers needed
-      // matching the specifications
-      rsrcRequest.setNumContainers(numBSPTasks);
-      reqList.add(rsrcRequest);
-    }
-    return reqList;
-  }
-
-  @Override
-  public void cleanup() throws YarnException, IOException {
-    for (BSPTaskLauncher launcher : completionQueue) {
-      LOG.info("cleanup tasks !!!");
-      launcher.stopAndCleanup();
-    }
-  }
-
-  @Override
-  public JobState getState() {
-    return state;
-  }
-
-  @Override
-  public int getTotalBSPTasks() {
-    return numBSPTasks;
-  }
-
-  @Override
-  public BSPPhase getBSPPhase() {
-    return phase;
-  }
-
-}
diff --git a/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java b/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java
index 153baa1..4d23384 100644
--- a/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java
+++ b/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java
@@ -33,10 +33,9 @@
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
 import org.apache.hadoop.yarn.api.records.*;
 import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.YarnClientApplication;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -131,7 +130,6 @@
     yarnConf = new YarnConfiguration(conf);
     yarnClient = YarnClient.createYarnClient();
     yarnClient.init(yarnConf);
-    yarnClient.start();
   }
 
   @Override
@@ -155,6 +153,7 @@
       LOG.debug("Retrieved username: " + s);
     }
 
+    yarnClient.start();
     try {
       YarnClusterMetrics clusterMetrics = yarnClient.getYarnClusterMetrics();
       LOG.info("Got Cluster metric info from ASM"
@@ -188,14 +187,16 @@
         }
       }
 
-      GetNewApplicationRequest request = Records.newRecord(GetNewApplicationRequest.class);
-      GetNewApplicationResponse response = job.getApplicationsManager().getNewApplication(request);
-      id = response.getApplicationId();
+      // Get a new application id
+      YarnClientApplication app = yarnClient.createApplication();
+
 
       // Create a new ApplicationSubmissionContext
-      ApplicationSubmissionContext appContext = Records.newRecord(ApplicationSubmissionContext.class);
-      // set the ApplicationId
-      appContext.setApplicationId(this.id);
+      //ApplicationSubmissionContext appContext = Records.newRecord(ApplicationSubmissionContext.class);
+      ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
+
+      id = appContext.getApplicationId();
+
       // set the application name
       appContext.setApplicationName(job.getJobName());
 
@@ -227,7 +228,11 @@
       localResources.put(YARNBSPConstants.APP_MASTER_JAR_PATH, amJarRsrc);
 
       // add hama related jar files to localresources for container
-      List<File> hamaJars = localJarfromPath(System.getProperty("hama.home.dir"));
+      List<File> hamaJars;
+      if (System.getProperty("hama.home.dir") != null)
+        hamaJars = localJarfromPath(System.getProperty("hama.home.dir"));
+      else
+        hamaJars = localJarfromPath(getConf().get("hama.home.dir"));
       String hamaPath = getSystemDir() + "/hama";
       for (File fileEntry : hamaJars) {
         addToLocalResources(fs, fileEntry.getCanonicalPath(),
@@ -266,7 +271,7 @@
       Vector<CharSequence> vargs = new Vector<CharSequence>(5);
       vargs.add("${JAVA_HOME}/bin/java");
       vargs.add("-cp " + classPathEnv + "");
-      vargs.add(BSPApplicationMaster.class.getCanonicalName());
+      vargs.add(ApplicationMaster.class.getCanonicalName());
       vargs.add(submitJobFile.makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString());
 
       vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/hama-appmaster.stdout");
diff --git a/yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java b/yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java
index 4d8eea8..7d74557 100644
--- a/yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java
+++ b/yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java
@@ -74,12 +74,13 @@
       }
     }
 
-    //fs.delete(OUTPUT_PATH, true);
+    fs.delete(OUTPUT_PATH, true);
   }
 
   public static void main(String[] args) throws IOException,
       InterruptedException, ClassNotFoundException {
     HamaConfiguration conf = new HamaConfiguration();
+    conf.set("hama.home.dir", System.getenv().get("HAMA_HOME"));
 
     YARNBSPJob job = new YARNBSPJob(conf);
     job.setBspClass(HelloBSP.class);