/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hama.bsp;

import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.*;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hama.Constants;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.sync.SyncServer;
import org.apache.hama.bsp.sync.SyncServiceFactory;
import org.apache.hama.ipc.BSPPeerProtocol;
import org.apache.hama.ipc.RPC;
import org.apache.hama.ipc.Server;
import org.apache.hama.util.BSPNetUtils;
import org.apache.log4j.LogManager;

import java.io.DataInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

public class ApplicationMaster  implements BSPClient, BSPPeerProtocol {
  private static final Log LOG = LogFactory.getLog(ApplicationMaster.class);

  // Configuration
  private Configuration localConf;
  private Configuration jobConf;

  private String jobFile;
  private String applicationName;
  // RPC info where the AM receive client side requests
  private String hostname;
  private int clientPort;
  private FileSystem fs;
  private static int id = 0;

  private volatile long superstep;
  private Counters globalCounter = new Counters();
  private BSPJobClient.RawSplit[] splits;

  private BSPJobID jobId;

  // SyncServer for Zookeeper
  private SyncServer syncServer;

  // Zookeeper thread pool
  private static final ExecutorService threadPool = Executors
      .newFixedThreadPool(1);

  // RPC info where the AM receive client side requests
  private int taskServerPort;

  private Server clientServer;
  private Server taskServer;

  // Handle to communicate with the Resource Manager
  @SuppressWarnings("rawtypes")
  private AMRMClientAsync amRMClient;

  // In both secure and non-secure modes, this points to the job-submitter.
  @VisibleForTesting
  UserGroupInformation appSubmitterUgi;

  // Handle to communicate with the Node Manager
  private NMClientAsync nmClientAsync;
  // Listen to process the response from the Node Manager
  private NMCallbackHandler containerListener;

  // Application Attempt Id ( combination of attemptId and fail count )
  @VisibleForTesting
  protected ApplicationAttemptId appAttemptID;


  // TODO
  // For status update for clients - yet to be implemented
  // Hostname of the container
  private String appMasterHostname = "";
  // Port on which the app master listens for status updates from clients
  private int appMasterRpcPort = -1;
  // Tracking url to which app master publishes info for clients to monitor
  private String appMasterTrackingUrl = "";

  // App Master configuration
  // No. of containers to run shell command on
  @VisibleForTesting
  protected int numTotalContainers;
  // Memory to request for the container on which the shell command will run
  private int containerMemory;
  // VirtualCores to request for the container on which the shell command will run
  private int containerVirtualCores = 1;

  // Priority of the request
  private int requestPriority = 0;

  // Counter for completed containers ( complete denotes successful or failed )
  private AtomicInteger numCompletedContainers = new AtomicInteger();
  // Allocated container count so that we know how many containers has the RM
  // allocated to us
  @VisibleForTesting
  protected AtomicInteger numAllocatedContainers = new AtomicInteger();
  // Count of failed containers
  private AtomicInteger numFailedContainers = new AtomicInteger();
  // Count of containers already requested from the RM
  // Needed as once requested, we should not request for containers again.
  // Only request for more if the original requirement changes.
  @VisibleForTesting
  protected AtomicInteger numRequestedContainers = new AtomicInteger();

  private volatile boolean done;
  private ByteBuffer allTokens;

  // Launch threads
  private List<Thread> launchThreads = new ArrayList<Thread>();

  @VisibleForTesting
  protected final Set<ContainerId> launchedContainers =
      Collections.newSetFromMap(new ConcurrentHashMap<ContainerId, Boolean>());

  public ApplicationMaster() {
    // Set up the configuration
    this.localConf = new YarnConfiguration();
  }

  public static void main(String[] args) throws IOException {
    boolean result = false;
    ApplicationMaster appMaster = new ApplicationMaster();
    
    try {
      LOG.info("Initializing ApplicationMaster");
      boolean doRun = appMaster.init(args);
      if (!doRun) {
        System.exit(0);
      }
      appMaster.run();
      result = appMaster.finish();
    } catch (Throwable t) {
      LOG.fatal("Error running ApplicationMaster", t);
      LogManager.shutdown();
      ExitUtil.terminate(1, t);
    } finally {
      appMaster.close();
    }
    
    if (result) {
      LOG.info("Application Master completed successfully. exiting");
      System.exit(0);
    } else {
      LOG.info("Application Master failed. exiting");
      System.exit(2);
    }
  }

  public boolean init(String[] args) throws Exception {
    if (args.length != 1) {
      throw new IllegalArgumentException();
    }
    this.jobFile = args[0];
    this.jobConf = getSubmitConfiguration(jobFile);
    localConf.addResource(localConf);
    fs = FileSystem.get(jobConf);

    this.applicationName = jobConf.get("bsp.job.name",
        "<no bsp job name defined>");
    if (applicationName.isEmpty()) {
      this.applicationName = "<no bsp job name defined>";
    }

    appAttemptID = getApplicationAttemptId();
    this.jobId = new BSPJobID(appAttemptID.toString(), 0);
    this.appMasterHostname = BSPNetUtils.getCanonicalHostname();
    this.appMasterTrackingUrl = "http://localhost:8088";
    this.numTotalContainers = this.jobConf.getInt("bsp.peers.num", 1);
    this.containerMemory = getMemoryRequirements(jobConf);

    this.hostname = BSPNetUtils.getCanonicalHostname();
    this.clientPort = BSPNetUtils.getFreePort(12000);

    // Set configuration for starting SyncServer which run Zookeeper
    this.jobConf.set(Constants.ZOOKEEPER_QUORUM, appMasterHostname);

    // start our synchronization service
    startSyncServer();

    // start RPC server
    startRPCServers();

    /*
     * Make sure that this executes after the start the RPC servers, because we
     * are readjusting the configuration.
     */
    rewriteSubmitConfiguration(jobFile, jobConf);

    String jobSplit = jobConf.get("bsp.job.split.file");
    splits = null;
    if (jobSplit != null) {
      DataInputStream splitFile = fs.open(new Path(jobSplit));
      try {
        splits = BSPJobClient.readSplitFile(splitFile);
      } finally {
        splitFile.close();
      }
    }

    return true;
  }

  /**
   * Main run function for the application master
   *
   * @throws org.apache.hadoop.yarn.exceptions.YarnException
   * @throws IOException
   */
  @SuppressWarnings({ "unchecked" })
  public void run() throws YarnException, IOException, InterruptedException {
    LOG.info("Starting ApplicationMaster");

    // Note: Credentials, Token, UserGroupInformation, DataOutputBuffer class
    // are marked as LimitedPrivate
    Credentials credentials =
        UserGroupInformation.getCurrentUser().getCredentials();
    DataOutputBuffer dob = new DataOutputBuffer();
    credentials.writeTokenStorageToStream(dob);
    // Now remove the AM->RM token so that containers cannot access it.
    Iterator<Token<?>> iter = credentials.getAllTokens().iterator();
    LOG.info("Executing with tokens:");
    while (iter.hasNext()) {
      Token<?> token = iter.next();
      LOG.info(token);
      if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
        iter.remove();
      }
    }
    allTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());

    // Create appSubmitterUgi and add original tokens to it
    String appSubmitterUserName =
        System.getenv(ApplicationConstants.Environment.USER.name());
    appSubmitterUgi =
        UserGroupInformation.createRemoteUser(appSubmitterUserName);
    appSubmitterUgi.addCredentials(credentials);


    AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
    amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
    amRMClient.init(localConf);
    amRMClient.start();

    containerListener = createNMCallbackHandler();
    nmClientAsync = new NMClientAsyncImpl(containerListener);
    nmClientAsync.init(localConf);
    nmClientAsync.start();

    // Setup local RPC Server to accept status requests directly from clients
    // TODO need to setup a protocol for client to be able to communicate to
    // the RPC server
    // TODO use the rpc port info to register with the RM for the client to
    // send requests to this app master

    // Register self with ResourceManager
    // This will start heartbeating to the RM
    appMasterHostname = NetUtils.getHostname();
    RegisterApplicationMasterResponse response = amRMClient
        .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
            appMasterTrackingUrl);
    // Dump out information about cluster capability as seen by the
    // resource manager
    int maxMem = response.getMaximumResourceCapability().getMemory();
    LOG.info("Max mem capability of resources in this cluster " + maxMem);

    int maxVCores = response.getMaximumResourceCapability().getVirtualCores();
    LOG.info("Max vcores capability of resources in this cluster " + maxVCores);

    // A resource ask cannot exceed the max.
    if (containerMemory > maxMem) {
      LOG.info("Container memory specified above max threshold of cluster."
          + " Using max value." + ", specified=" + containerMemory + ", max="
          + maxMem);
      containerMemory = maxMem;
    }

    if (containerVirtualCores > maxVCores) {
      LOG.info("Container virtual cores specified above max threshold of cluster."
          + " Using max value." + ", specified=" + containerVirtualCores + ", max="
          + maxVCores);
      containerVirtualCores = maxVCores;
    }

    List<Container> previousAMRunningContainers =
        response.getContainersFromPreviousAttempts();
    LOG.info(appAttemptID + " received " + previousAMRunningContainers.size()
        + " previous attempts' running containers on AM registration.");
    for(Container container: previousAMRunningContainers) {
      launchedContainers.add(container.getId());
    }
    numAllocatedContainers.addAndGet(previousAMRunningContainers.size());


    int numTotalContainersToRequest =
        numTotalContainers - previousAMRunningContainers.size();
    // Setup ask for containers from RM
    // Send request for containers to RM
    // Until we get our fully allocated quota, we keep on polling RM for
    // containers
    // Keep looping until all the containers are launched and shell script
    // executed on them ( regardless of success/failure).
    for (int i = 0; i < numTotalContainersToRequest; ++i) {
      AMRMClient.ContainerRequest containerAsk = setupContainerAskForRM();
      amRMClient.addContainerRequest(containerAsk);
    }
    numRequestedContainers.set(numTotalContainers);
  }

  @VisibleForTesting
  NMCallbackHandler createNMCallbackHandler() {
    return new NMCallbackHandler(this);
  }

  @VisibleForTesting
  protected boolean finish() {
    // wait for completion.
    while (!done
        && (numCompletedContainers.get() != numTotalContainers)) {
      try {
        Thread.sleep(200);
      } catch (InterruptedException ex) {}
    }

    // Join all launched threads
    // needed for when we time out
    // and we need to release containers
    for (Thread launchThread : launchThreads) {
      try {
        launchThread.join(10000);
      } catch (InterruptedException e) {
        LOG.info("Exception thrown in thread join: " + e.getMessage());
        e.printStackTrace();
      }
    }

    // When the application completes, it should stop all running containers
    LOG.info("Application completed. Stopping running containers");
    nmClientAsync.stop();

    // When the application completes, it should send a finish application
    // signal to the RM
    LOG.info("Application completed. Signalling finish to RM");

    FinalApplicationStatus appStatus;
    String appMessage = null;
    boolean success = true;
    if (numFailedContainers.get() == 0 &&
        numCompletedContainers.get() == numTotalContainers) {
      appStatus = FinalApplicationStatus.SUCCEEDED;
    } else {
      appStatus = FinalApplicationStatus.FAILED;
      appMessage = "Diagnostics." + ", total=" + numTotalContainers
          + ", completed=" + numCompletedContainers.get() + ", allocated="
          + numAllocatedContainers.get() + ", failed="
          + numFailedContainers.get();
      LOG.info(appMessage);
      success = false;
    }
    try {
      amRMClient.unregisterApplicationMaster(appStatus, appMessage, null);
    } catch (YarnException ex) {
      LOG.error("Failed to unregister application", ex);
    } catch (IOException e) {
      LOG.error("Failed to unregister application", e);
    }

    amRMClient.stop();

    return success;
  }

  private class RMCallbackHandler implements AMRMClientAsync.CallbackHandler {
    @SuppressWarnings("unchecked")
    @Override
    public void onContainersCompleted(List<ContainerStatus> completedContainers) {
      LOG.info("Got response from RM for container ask, completedCnt="
          + completedContainers.size());
      for (ContainerStatus containerStatus : completedContainers) {
        LOG.info(appAttemptID + " got container status for containerID="
            + containerStatus.getContainerId() + ", state="
            + containerStatus.getState() + ", exitStatus="
            + containerStatus.getExitStatus() + ", diagnostics="
            + containerStatus.getDiagnostics());

        // non complete containers should not be here
        assert (containerStatus.getState() == ContainerState.COMPLETE);
        // ignore containers we know nothing about - probably from a previous
        // attempt
        if (!launchedContainers.contains(containerStatus.getContainerId())) {
          LOG.info("Ignoring completed status of "
              + containerStatus.getContainerId()
              + "; unknown container(probably launched by previous attempt)");
          continue;
        }

        // increment counters for completed/failed containers
        int exitStatus = containerStatus.getExitStatus();
        if (0 != exitStatus) {
          // container failed
          if (ContainerExitStatus.ABORTED != exitStatus) {
            // shell script failed
            // counts as completed
            numCompletedContainers.incrementAndGet();
            numFailedContainers.incrementAndGet();
          } else {
            // container was killed by framework, possibly preempted
            // we should re-try as the container was lost for some reason
            numAllocatedContainers.decrementAndGet();
            numRequestedContainers.decrementAndGet();
            // we do not need to release the container as it would be done
            // by the RM
          }
        } else {
          // nothing to do
          // container completed successfully
          numCompletedContainers.incrementAndGet();
          LOG.info("Container completed successfully." + ", containerId="
              + containerStatus.getContainerId());
        }
      }

      // ask for more containers if any failed
      int askCount = numTotalContainers - numRequestedContainers.get();
      numRequestedContainers.addAndGet(askCount);

      if (askCount > 0) {
        for (int i = 0; i < askCount; ++i) {
          AMRMClient.ContainerRequest containerAsk = setupContainerAskForRM();
          amRMClient.addContainerRequest(containerAsk);
        }
      }

      if (numCompletedContainers.get() == numTotalContainers) {
        done = true;
      }
    }

    @Override
    public void onContainersAllocated(List<Container> allocatedContainers) {
      LOG.info("Got response from RM for container ask, allocatedCnt="
          + allocatedContainers.size());
      numAllocatedContainers.addAndGet(allocatedContainers.size());
      for (Container allocatedContainer : allocatedContainers) {
        LOG.info("Launching shell command on a new container."
            + ", containerId=" + allocatedContainer.getId()
            + ", containerNode=" + allocatedContainer.getNodeId().getHost()
            + ":" + allocatedContainer.getNodeId().getPort()
            + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress()
            + ", containerResourceMemory"
            + allocatedContainer.getResource().getMemory()
            + ", containerResourceVirtualCores"
            + allocatedContainer.getResource().getVirtualCores());
        // + ", containerToken"
        // +allocatedContainer.getContainerToken().getIdentifier().toString());

        Thread launchThread = createLaunchContainerThread(allocatedContainer);

        // launch and start the container on a separate thread to keep
        // the main thread unblocked
        // as all containers may not be allocated at one go.
        launchThreads.add(launchThread);
        launchedContainers.add(allocatedContainer.getId());
        launchThread.start();
        id++;
      }
    }

    @Override
    public void onShutdownRequest() {
      done = true;
    }

    @Override
    public void onNodesUpdated(List<NodeReport> list) {

    }

    @Override
    public float getProgress() {
      // set progress to deliver to RM on next heartbeat
      float progress = (float) numCompletedContainers.get()
          / numTotalContainers;
      return progress;
    }

    @Override
    public void onError(Throwable throwable) {
      done = true;
      amRMClient.stop();
    }
  }

  @VisibleForTesting
  static class NMCallbackHandler
      implements NMClientAsync.CallbackHandler {

    private ConcurrentMap<ContainerId, Container> containers =
        new ConcurrentHashMap<ContainerId, Container>();
    private final ApplicationMaster applicationMaster;

    public NMCallbackHandler(ApplicationMaster applicationMaster) {
      this.applicationMaster = applicationMaster;
    }

    public void addContainer(ContainerId containerId, Container container) {
      containers.putIfAbsent(containerId, container);
    }

    @Override
    public void onContainerStopped(ContainerId containerId) {
      if (LOG.isDebugEnabled()) {
        LOG.debug("Succeeded to stop Container " + containerId);
      }
      containers.remove(containerId);
    }

    @Override
    public void onContainerStatusReceived(ContainerId containerId,
        ContainerStatus containerStatus) {
      if (LOG.isDebugEnabled()) {
        LOG.debug("Container Status: id=" + containerId + ", status=" +
            containerStatus);
      }
    }

    @Override
    public void onContainerStarted(ContainerId containerId,
        Map<String, ByteBuffer> allServiceResponse) {
      if (LOG.isDebugEnabled()) {
        LOG.debug("Succeeded to start Container " + containerId);
      }
      Container container = containers.get(containerId);
      if (container != null) {
        applicationMaster.nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId());
      }
    }

    @Override
    public void onStartContainerError(ContainerId containerId, Throwable t) {
      LOG.error("Failed to start Container " + containerId);
      containers.remove(containerId);
      applicationMaster.numCompletedContainers.incrementAndGet();
      applicationMaster.numFailedContainers.incrementAndGet();
    }

    @Override
    public void onGetContainerStatusError(
        ContainerId containerId, Throwable t) {
      LOG.error("Failed to query the status of Container " + containerId);
    }

    @Override
    public void onStopContainerError(ContainerId containerId, Throwable t) {
      LOG.error("Failed to stop Container " + containerId);
      containers.remove(containerId);
    }
  }

  /**
   * Thread to connect to the {@link ContainerManagementProtocol} and launch the container
   * that will execute the shell command.
   */
  private class LaunchContainerRunnable implements Runnable {

    // Allocated container
    Container container;

    NMCallbackHandler containerListener;

    Configuration conf;

    /**
     * @param lcontainer        Allocated container
     * @param containerListener Callback handler of the container
     */
    public LaunchContainerRunnable(
        Container lcontainer, NMCallbackHandler containerListener, Configuration conf) {
      this.container = lcontainer;
      this.containerListener = containerListener;
      this.conf = conf;
    }

    /**
     * Connects to CM, sets up container launch context
     * for shell command and eventually dispatches the container
     * start request to the CM.
     */
    @Override
    public void run() {
      LOG.info("Setting up container launch container for containerid="
          + container.getId());
      // Now we setup a ContainerLaunchContext
      ContainerLaunchContext ctx = Records
          .newRecord(ContainerLaunchContext.class);

      // Set the local resources
      Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
      LocalResource packageResource = Records.newRecord(LocalResource.class);
      FileSystem fs = null;
      try {
        fs = FileSystem.get(conf);
      } catch (IOException e) {
        e.printStackTrace();
      }
      Path packageFile = new Path(System.getenv(YARNBSPConstants.HAMA_YARN_LOCATION));
      URL packageUrl = null;
      try {
        packageUrl = ConverterUtils.getYarnUrlFromPath(packageFile
          .makeQualified(fs.getUri(), fs.getWorkingDirectory()));
        LOG.info("PackageURL has been composed to " + packageUrl.toString());
        LOG.info("Reverting packageURL to path: "
            + ConverterUtils.getPathFromYarnURL(packageUrl));
      } catch (URISyntaxException e) {
        LOG.fatal("If you see this error the workarround does not work", e);
        numCompletedContainers.incrementAndGet();
        numFailedContainers.incrementAndGet();
        return;
      }

      packageResource.setResource(packageUrl);
      packageResource.setSize(Long.parseLong(System.getenv(YARNBSPConstants.HAMA_YARN_SIZE)));
      packageResource.setTimestamp(Long.parseLong(System.getenv(YARNBSPConstants.HAMA_YARN_TIMESTAMP)));
      packageResource.setType(LocalResourceType.FILE);
      packageResource.setVisibility(LocalResourceVisibility.APPLICATION);

      localResources.put(YARNBSPConstants.APP_MASTER_JAR_PATH, packageResource);

      Path hamaReleaseFile = new Path(System.getenv(YARNBSPConstants.HAMA_LOCATION));
      URL hamaReleaseUrl = ConverterUtils.getYarnUrlFromPath(hamaReleaseFile
          .makeQualified(fs.getUri(), fs.getWorkingDirectory()));
      LOG.info("Hama release URL has been composed to " + hamaReleaseUrl
          .toString());

      RemoteIterator<LocatedFileStatus> fileStatusListIterator = null;
      try {
        fileStatusListIterator = fs.listFiles(
            hamaReleaseFile, true);

        while(fileStatusListIterator.hasNext()) {
          LocatedFileStatus lfs = fileStatusListIterator.next();
          LocalResource localRsrc = LocalResource.newInstance(
              ConverterUtils.getYarnUrlFromPath(lfs.getPath()),
              LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
              lfs.getLen(), lfs.getModificationTime());
          localResources.put(lfs.getPath().getName(), localRsrc);
        }
      } catch (IOException e) {
        LOG.fatal("The error has occured to RemoteIterator  " + e);
      }

      ctx.setLocalResources(localResources);

    /*
     * TODO Package classpath seems not to work if you're in pseudo distributed
     * mode, because the resource must not be moved, it will never be unpacked.
     * So we will check if our jar file has the file:// prefix and put it into
     * the CP directly
     */

      StringBuilder classPathEnv = new StringBuilder(
          ApplicationConstants.Environment.CLASSPATH.$()).append(File.pathSeparatorChar)
          .append("./*");
      for (String c : conf.getStrings(
          YarnConfiguration.YARN_APPLICATION_CLASSPATH,
          YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
        classPathEnv.append(File.pathSeparatorChar);
        classPathEnv.append(c.trim());
      }

      Vector<CharSequence> vargs = new Vector<CharSequence>();
      vargs.add("${JAVA_HOME}/bin/java");
      vargs.add("-cp " + classPathEnv + "");
      vargs.add(BSPRunner.class.getCanonicalName());

      vargs.add(jobId.getJtIdentifier());
      vargs.add(Integer.toString(id));
      vargs.add(
          new Path(jobFile).makeQualified(fs.getUri(), fs.getWorkingDirectory())
              .toString());

      vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/hama-worker.stdout");
      vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/hama-worker.stderr");

      // Get final commmand
      StringBuilder command = new StringBuilder();
      for (CharSequence str : vargs) {
        command.append(str).append(" ");
      }

      List<String> commands = new ArrayList<String>();
      commands.add(command.toString());

      ctx.setCommands(commands);
      ctx.setTokens(allTokens.duplicate());
      LOG.info("Starting commands: " + commands);

      containerListener.addContainer(container.getId(), container);
      nmClientAsync.startContainerAsync(container, ctx);
    }
  }

  /**
   * Setup the request that will be sent to the RM for the container ask.
   *
   * @return the setup ResourceRequest to be sent to RM
   */
  private AMRMClient.ContainerRequest setupContainerAskForRM() {
    // setup requirements for hosts
    // using * as any host will do for the distributed shell app
    // set the priority for the request
    // TODO - what is the range for priority? how to decide?
    Priority pri = Priority.newInstance(requestPriority);

    // Set up resource type requirements
    // For now, memory and CPU are supported so we set memory and cpu requirements
    Resource capability = Resource.newInstance(containerMemory,
        containerVirtualCores);

    AMRMClient.ContainerRequest request = new AMRMClient.ContainerRequest(capability, null, null,
        pri);
    LOG.info("Requested container ask: " + request.toString());
    return request;
  }

  /**
   * Reads the configuration from the given path.
   */
  private static Configuration getSubmitConfiguration(String path)
      throws IOException {
    Path jobSubmitPath = new Path(path);
    Configuration jobConf = new HamaConfiguration();

    FileSystem fs = FileSystem.get(URI.create(path), jobConf);

    InputStream in =fs.open(jobSubmitPath);
    jobConf.addResource(in);

    return jobConf;
  }

  /**
   * Gets the application attempt ID from the environment. This should be set by
   * YARN when the container has been launched.
   *
   * @return a new ApplicationAttemptId which is unique and identifies this
   *         task.
   */
  private static ApplicationAttemptId getApplicationAttemptId()
      throws IOException {
    Map<String, String> envs = System.getenv();
    if (!envs.containsKey(ApplicationConstants.Environment.CONTAINER_ID.name())) {
      throw new IllegalArgumentException(
          "ApplicationAttemptId not set in the environment");
    }

    LOG.info("app attempt id!!!");
    ContainerId containerId = ConverterUtils.toContainerId(envs
        .get(ApplicationConstants.Environment.CONTAINER_ID.name()));
    return containerId.getApplicationAttemptId();
  }

  /**
   * This method starts the sync server on a specific port and waits for it to
   * come up. Be aware that this method adds the "bsp.sync.server.address" that
   * is needed for a task to connect to the service.
   *
   * @throws IOException
   */
  private void startSyncServer() throws Exception {
    syncServer = SyncServiceFactory.getSyncServer(jobConf);
    syncServer.init(jobConf);

    ZKServerThread serverThread = new ZKServerThread(syncServer);
    threadPool.submit(serverThread);
  }

  /**
   * This method is to run Zookeeper in order to coordinates between BSPMaster and Groomservers
   * using Runnable interface in java.
   */
  private static class ZKServerThread implements Runnable {
    SyncServer server;

    ZKServerThread(SyncServer s) {
      server = s;
    }

    @Override
    public void run() {
      try {
        server.start();
      } catch (Exception e) {
        LOG.error("Error running SyncServer.", e);
      }
    }
  }

  /**
   * This method starts the needed RPC servers: client server and the task
   * server. This method manipulates the configuration and therefore needs to be
   * executed BEFORE the submitconfiguration gets rewritten.
   *
   * @throws IOException
   */
  private void startRPCServers() throws IOException {
    // start the RPC server which talks to the client
    this.clientServer = RPC.getServer(BSPClient.class, hostname, clientPort, jobConf);
    this.clientServer.start();

    // start the RPC server which talks to the tasks
    this.taskServerPort = BSPNetUtils.getFreePort(10000);
    this.taskServer = RPC.getServer(this, hostname, taskServerPort, jobConf);
    this.taskServer.start();

    // readjusting the configuration to let the tasks know where we are.
    this.jobConf.set("hama.umbilical.address", hostname + ":" + taskServerPort);
  }

  /**
   * Writes the current configuration to a given path to reflect changes. For
   * example the sync server address is put after the file has been written.
   */
  private static void rewriteSubmitConfiguration(String path, Configuration conf)
      throws IOException {
    Path jobSubmitPath = new Path(path);
    FileSystem fs = FileSystem.get(conf);
    FSDataOutputStream out = fs.create(jobSubmitPath);
    conf.writeXml(out);
    out.close();

    LOG.info("Written new configuration back to " + path);
  }

  /**
   * Get container memory from "bsp.child.mem.in.mb" set on Hama configuration
   * @return The memory of container.
   */
  private int getMemoryRequirements(Configuration conf) {
    String newMemoryProperty = conf.get("bsp.child.mem.in.mb");
    if (newMemoryProperty == null) {
      LOG.warn("\"bsp.child.mem.in.mb\" was not set! Try parsing the child opts...");
      return getMemoryFromOptString(conf.get("bsp.child.java.opts"));
    } else {
      return Integer.valueOf(newMemoryProperty);
    }
  }

  // This really needs a testcase
  private static int getMemoryFromOptString(String opts) {
    final int DEFAULT_MEMORY_MB = 256;

    if (opts == null) {
      return DEFAULT_MEMORY_MB;
    }

    if (!opts.contains("-Xmx")) {
      LOG.info(
          "No \"-Xmx\" option found in child opts, using default amount of memory!");
      return DEFAULT_MEMORY_MB;
    } else {
      // e.G: -Xmx512m

      int startIndex = opts.indexOf("-Xmx") + 4;
      String xmxString = opts.substring(startIndex);
      char qualifier = xmxString.charAt(xmxString.length() - 1);
      int memory = Integer
          .valueOf(xmxString.substring(0, xmxString.length() - 1));
      if (qualifier == 'm') {
        return memory;
      } else if (qualifier == 'g') {
        return memory * 1024;
      } else {
        throw new IllegalArgumentException(
            "Memory Limit in child opts was not set! \"bsp.child.java.opts\" String was: "
                + opts);
      }
    }
  }

  @VisibleForTesting
  Thread createLaunchContainerThread(Container allocatedContainer) {
    LaunchContainerRunnable runnableLaunchContainer =
        new LaunchContainerRunnable(allocatedContainer, containerListener, jobConf);
    return new Thread(runnableLaunchContainer);
  }

  @Override
  public LongWritable getCurrentSuperStep() {
    return new LongWritable(superstep);
  }

  @Override
  public Task getTask(TaskAttemptID taskid) throws IOException {
    BSPJobClient.RawSplit assignedSplit = null;
    String splitName = NullInputFormat.NullInputSplit.class.getName();
    //String splitName = NullInputSplit.class.getCanonicalName();
    if (splits != null) {
      assignedSplit = splits[taskid.id];
      splitName = assignedSplit.getClassName();
      return new BSPTask(jobId, jobFile, taskid, taskid.id, splitName,
          assignedSplit.getBytes());
    } else {
      return new BSPTask(jobId, jobFile, taskid, taskid.id, splitName,
          new BytesWritable());
    }
  }

  @Override
  public boolean ping(TaskAttemptID taskid) throws IOException {
    return false;
  }

  @Override
  public void done(TaskAttemptID taskid) throws IOException {

  }

  @Override
  public void fsError(TaskAttemptID taskId, String message) throws IOException {

  }

  @Override
  public void fatalError(TaskAttemptID taskId, String message)
      throws IOException {

  }

  @Override
  public boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus)
      throws IOException, InterruptedException {
    if (taskStatus.getSuperstepCount() > superstep) {
      superstep = taskStatus.getSuperstepCount();
      LOG.info("Now in superstep " + superstep);
    }

    Counters counters = taskStatus.getCounters();
    globalCounter.incrAllCounters(counters);

    return true;
  }

  @Override
  public int getAssignedPortNum(TaskAttemptID taskid) {
    return 0;
  }

  @Override
  public void close() throws IOException {
    this.clientServer.stop();
    this.taskServer.stop();
  }

  @Override
  public long getProtocolVersion(String protocol, long clientVersion)
      throws IOException {
    return BSPClient.versionID;
  }
}
