/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hama.bsp;

import java.io.File;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.*;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.*;
import org.apache.hadoop.yarn.api.records.*;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;

public class BSPTaskLauncher {

  private static final Log LOG = LogFactory.getLog(BSPTaskLauncher.class);

  private final Container allocatedContainer;
  private final int id;
  private final ContainerManagementProtocol cm;
  private final Configuration conf;
  private String user;
  private final Path jobFile;
  private final BSPJobID jobId;

  private GetContainerStatusesRequest statusRequest;
  
  @Override
  protected void finalize() throws Throwable {
    stopAndCleanup();
  }

  public BSPTaskLauncher(int id, Container container, ContainerManagementProtocol cm,
                         Configuration conf, Path jobFile, BSPJobID jobId)
      throws YarnException {
    this.id = id;
    this.cm = cm;
    this.conf = conf;
    this.allocatedContainer = container;
    this.jobFile = jobFile;
    this.jobId = jobId;
    // FIXME why does this contain mapreduce here?
    this.user = conf.get("bsp.user.name");
    if (this.user == null) {
      this.user = conf.get("mapreduce.job.user.name");
    }
  }

  public void stopAndCleanup() throws YarnException, IOException {
    StopContainersRequest stopRequest = Records.newRecord(StopContainersRequest.class);
    List<ContainerId> containerIds = new ArrayList<ContainerId>();
    containerIds.add(allocatedContainer.getId());
    LOG.info("getId : " + allocatedContainer.getId());
    stopRequest.setContainerIds(containerIds);
    LOG.info("StopContainer : " + stopRequest.getContainerIds());
    cm.stopContainers(stopRequest);

  }

  public void start() throws IOException, YarnException {
    LOG.info("Spawned task with id: " + this.id
        + " for allocated container id: "
        + this.allocatedContainer.getId().toString());
    statusRequest = setupContainer(allocatedContainer, cm, user, id);
  }

  /**
   * This polls the current container status from container manager. Null if the
   * container hasn't finished yet.
   * 
   * @return
   * @throws Exception
   */
  public BSPTaskStatus poll() throws Exception {

    ContainerStatus lastStatus = null;
    GetContainerStatusesResponse getContainerStatusesResponse = cm.getContainerStatuses(statusRequest);
    List<ContainerStatus> containerStatuses = getContainerStatusesResponse.getContainerStatuses();
    if (containerStatuses.size() <= 0) {
      LOG.info("container Statuses size is zero");
      return null;
    }

    for (ContainerStatus containerStatus : containerStatuses) {
      LOG.info("Got container status for containerID=" + containerStatus
          .getContainerId() + ", state=" + containerStatus.getState()
          + ", exitStatus=" + containerStatus.getExitStatus() + ", diagnostics="
          + containerStatus.getDiagnostics());

      if (containerStatus.getContainerId().equals(allocatedContainer.getId())) {
        lastStatus = containerStatus;
        break;
      }
    }

    if (lastStatus.getState() != ContainerState.COMPLETE) {
      LOG.info("Not completed...");
      return null;
    }
    LOG.info(this.id + " Last report comes with exitstatus of " + lastStatus
        .getExitStatus() + " and diagnose string of " + lastStatus
        .getDiagnostics());

    return new BSPTaskStatus(id, lastStatus.getExitStatus());
  }

  private GetContainerStatusesRequest setupContainer(
      Container allocatedContainer, ContainerManagementProtocol cm, String user, int id) throws IOException, YarnException {
    LOG.info("Setting up a container for user " + user + " with id of " + id
        + " and containerID of " + allocatedContainer.getId() + " as " + user);
    // Now we setup a ContainerLaunchContext
    ContainerLaunchContext ctx = Records
        .newRecord(ContainerLaunchContext.class);

    // Set the local resources
    Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
    LocalResource packageResource = Records.newRecord(LocalResource.class);
    FileSystem fs = FileSystem.get(conf);
    Path packageFile = new Path(System.getenv(YARNBSPConstants.HAMA_YARN_LOCATION));
    URL packageUrl = ConverterUtils.getYarnUrlFromPath(packageFile
        .makeQualified(fs.getUri(), fs.getWorkingDirectory()));
    LOG.info("PackageURL has been composed to " + packageUrl.toString());
    try {
      LOG.info("Reverting packageURL to path: "
          + ConverterUtils.getPathFromYarnURL(packageUrl));
    } catch (URISyntaxException e) {
      LOG.fatal("If you see this error the workarround does not work", e);
    }

    packageResource.setResource(packageUrl);
    packageResource.setSize(Long.parseLong(System.getenv(YARNBSPConstants.HAMA_YARN_SIZE)));
    packageResource.setTimestamp(Long.parseLong(System.getenv(YARNBSPConstants.HAMA_YARN_TIMESTAMP)));
    packageResource.setType(LocalResourceType.FILE);
    packageResource.setVisibility(LocalResourceVisibility.APPLICATION);

    localResources.put(YARNBSPConstants.APP_MASTER_JAR_PATH, packageResource);

    Path hamaReleaseFile = new Path(System.getenv(YARNBSPConstants.HAMA_LOCATION));
    URL hamaReleaseUrl = ConverterUtils.getYarnUrlFromPath(hamaReleaseFile
        .makeQualified(fs.getUri(), fs.getWorkingDirectory()));
    LOG.info("Hama release URL has been composed to " + hamaReleaseUrl.toString());

    RemoteIterator<LocatedFileStatus> fileStatusListIterator = fs.listFiles(
        hamaReleaseFile, true);

    while(fileStatusListIterator.hasNext()) {
      LocatedFileStatus lfs = fileStatusListIterator.next();
      LocalResource localRsrc = LocalResource.newInstance(
          ConverterUtils.getYarnUrlFromPath(lfs.getPath()),
          LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
          lfs.getLen(), lfs.getModificationTime());
      localResources.put(lfs.getPath().getName(), localRsrc);
    }

    ctx.setLocalResources(localResources);

    /*
     * TODO Package classpath seems not to work if you're in pseudo distributed
     * mode, because the resource must not be moved, it will never be unpacked.
     * So we will check if our jar file has the file:// prefix and put it into
     * the CP directly
     */

    StringBuilder classPathEnv = new StringBuilder(
        ApplicationConstants.Environment.CLASSPATH.$()).append(File.pathSeparatorChar)
        .append("./*");
    for (String c : conf.getStrings(
        YarnConfiguration.YARN_APPLICATION_CLASSPATH,
        YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
      classPathEnv.append(File.pathSeparatorChar);
      classPathEnv.append(c.trim());
    }

    Vector<CharSequence> vargs = new Vector<CharSequence>();
    vargs.add("${JAVA_HOME}/bin/java");
    vargs.add("-cp " + classPathEnv + "");
    vargs.add(BSPRunner.class.getCanonicalName());
    
    vargs.add(jobId.getJtIdentifier());
    vargs.add(Integer.toString(id));
    vargs.add(this.jobFile.makeQualified(fs.getUri(), fs.getWorkingDirectory())
        .toString());

    vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/bsp.stdout");
    vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/bsp.stderr");

    // Get final commmand
    StringBuilder command = new StringBuilder();
    for (CharSequence str : vargs) {
      command.append(str).append(" ");
    }

    List<String> commands = new ArrayList<String>();
    commands.add(command.toString());

    ctx.setCommands(commands);
    LOG.info("Starting command: " + commands);

    StartContainerRequest startReq = Records
        .newRecord(StartContainerRequest.class);
    startReq.setContainerLaunchContext(ctx);
    startReq.setContainerToken(allocatedContainer.getContainerToken());

    List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
    list.add(startReq);
    StartContainersRequest requestList = StartContainersRequest.newInstance(list);
    cm.startContainers(requestList);

    GetContainerStatusesRequest statusReq = Records
        .newRecord(GetContainerStatusesRequest.class);
    List<ContainerId> containerIds = new ArrayList<ContainerId>();
    containerIds.add(allocatedContainer.getId());
    statusReq.setContainerIds(containerIds);
    return statusReq;
  }

  @Override
  public int hashCode() {
    final int prime = 31;
    int result = 1;
    result = prime * result + id;
    return result;
  }

  @Override
  public boolean equals(Object obj) {
    if (this == obj)
      return true;
    if (obj == null)
      return false;
    if (getClass() != obj.getClass())
      return false;
    BSPTaskLauncher other = (BSPTaskLauncher) obj;
    if (id != other.id)
      return false;
    return true;
  }

  public static class BSPTaskStatus {
    private final int id;
    private final int exitStatus;

    public BSPTaskStatus(int id, int exitStatus) {
      super();
      this.id = id;
      this.exitStatus = exitStatus;
    }

    public int getId() {
      return id;
    }

    public int getExitStatus() {
      return exitStatus;
    }
  }

}
