/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.yarn.server.resourcemanager;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicInteger;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.Task.State;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;

import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;


import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
    .TestUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.resource.Resources;

@Private
public class Application {
  private static final Logger LOG =
      LoggerFactory.getLogger(Application.class);
  
  private AtomicInteger taskCounter = new AtomicInteger(0);

  private AtomicInteger numAttempts = new AtomicInteger(0);
  final private String user;
  final private String queue;
  final private ApplicationId applicationId;
  final private ApplicationAttemptId applicationAttemptId;
  final private ResourceManager resourceManager;
  private final static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);

  final private Map<SchedulerRequestKey, Resource> requestSpec =
      new TreeMap<>();

  final private Map<SchedulerRequestKey, Map<String, ResourceRequest>>
      requests = new TreeMap<>();

  final Map<SchedulerRequestKey, Set<Task>> tasks = new TreeMap<>();

  final private Set<ResourceRequest> ask =
      new TreeSet<>(
          new org.apache.hadoop.yarn.api.records.ResourceRequest
              .ResourceRequestComparator());

  final private Map<String, NodeManager> nodes = new HashMap<>();
  
  Resource used = recordFactory.newRecordInstance(Resource.class);
  
  public Application(String user, ResourceManager resourceManager) 
      throws YarnException {
    this(user, "default", resourceManager);
  }
  
  public Application(String user, String queue, ResourceManager resourceManager) 
      throws YarnException {
    this.user = user;
    this.queue = queue;
    this.resourceManager = resourceManager;
    // register an application
    GetNewApplicationRequest request =
            Records.newRecord(GetNewApplicationRequest.class);
    GetNewApplicationResponse newApp = 
        this.resourceManager.getClientRMService().getNewApplication(request);
    this.applicationId = newApp.getApplicationId();
  
    this.applicationAttemptId =
        ApplicationAttemptId.newInstance(this.applicationId,
          this.numAttempts.getAndIncrement());
  }

  public String getUser() {
    return user;
  }

  public String getQueue() {
    return queue;
  }

  public ApplicationId getApplicationId() {
    return applicationId;
  }
  
  public ApplicationAttemptId getApplicationAttemptId() {
    return applicationAttemptId;
  }

  public static String resolve(String hostName) {
    return NetworkTopology.DEFAULT_RACK;
  }
  
  public int getNextTaskId() {
    return taskCounter.incrementAndGet();
  }
  
  public Resource getUsedResources() {
    return used;
  }
  
  @SuppressWarnings("deprecation")
  public synchronized void submit() throws IOException, YarnException {
    ApplicationSubmissionContext context = recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
    context.setApplicationId(this.applicationId);
    context.setQueue(this.queue);
    
    // Set up the container launch context for the application master
    ContainerLaunchContext amContainer
        = Records.newRecord(ContainerLaunchContext.class);
    context.setAMContainerSpec(amContainer);
    context.setResource(Resources.createResource(
        YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB));
    
    SubmitApplicationRequest request = recordFactory
        .newRecordInstance(SubmitApplicationRequest.class);
    request.setApplicationSubmissionContext(context);
    final ResourceScheduler scheduler = resourceManager.getResourceScheduler();
    
    resourceManager.getClientRMService().submitApplication(request);

    RMAppEvent event =
        new RMAppEvent(this.applicationId, RMAppEventType.START);
    resourceManager.getRMContext().getRMApps().get(applicationId).handle(event);
    event =
        new RMAppEvent(this.applicationId, RMAppEventType.APP_NEW_SAVED);
    resourceManager.getRMContext().getRMApps().get(applicationId).handle(event);
    event =
        new RMAppEvent(this.applicationId, RMAppEventType.APP_ACCEPTED);
    resourceManager.getRMContext().getRMApps().get(applicationId).handle(event);

    // Notify scheduler
    AppAddedSchedulerEvent addAppEvent =
        new AppAddedSchedulerEvent(this.applicationId, this.queue, "user");
    scheduler.handle(addAppEvent);
    AppAttemptAddedSchedulerEvent addAttemptEvent =
        new AppAttemptAddedSchedulerEvent(this.applicationAttemptId, false);
    scheduler.handle(addAttemptEvent);
  }

  public synchronized void addResourceRequestSpec(
      Priority priority, Resource capability) {
    addResourceRequestSpec(TestUtils.toSchedulerKey(priority.getPriority()),
        capability);
  }
  public synchronized void addResourceRequestSpec(
      SchedulerRequestKey schedulerKey, Resource capability) {
    Resource currentSpec = requestSpec.put(schedulerKey, capability);
    if (currentSpec != null) {
      throw new IllegalStateException("Resource spec already exists for " +
          "priority " + schedulerKey.getPriority().getPriority()
          + " - " + currentSpec.getMemorySize());
    }
  }
  
  public synchronized void addNodeManager(String host,
      int containerManagerPort, NodeManager nodeManager) {
    nodes.put(host + ":" + containerManagerPort, nodeManager);
  }
  
  private synchronized NodeManager getNodeManager(String host) {
    return nodes.get(host);
  }
  
  public synchronized void addTask(Task task) {
    SchedulerRequestKey schedulerKey = task.getSchedulerKey();
    Map<String, ResourceRequest> requests = this.requests.get(schedulerKey);
    if (requests == null) {
      requests = new HashMap<String, ResourceRequest>();
      this.requests.put(schedulerKey, requests);
      LOG.debug("Added priority={} application={}", schedulerKey.getPriority(),
          applicationId);
    }
    
    final Resource capability = requestSpec.get(schedulerKey);
    
    // Note down the task
    Set<Task> tasks = this.tasks.get(schedulerKey);
    if (tasks == null) {
      tasks = new HashSet<Task>();
      this.tasks.put(schedulerKey, tasks);
    }
    tasks.add(task);
    
    LOG.info("Added task " + task.getTaskId() + " to application " + 
        applicationId + " at priority " + schedulerKey.getPriority());
    
    LOG.debug("addTask: application={} #asks={}", applicationId, ask.size());
    
    // Create resource requests
    for (String host : task.getHosts()) {
      // Data-local
      addResourceRequest(schedulerKey, requests, host, capability);
    }
        
    // Rack-local
    for (String rack : task.getRacks()) {
      addResourceRequest(schedulerKey, requests, rack, capability);
    }
      
    // Off-switch
    addResourceRequest(schedulerKey, requests, ResourceRequest.ANY, capability);
  }
  
  public synchronized void finishTask(Task task) throws IOException,
      YarnException {
    Set<Task> tasks = this.tasks.get(task.getSchedulerKey());
    if (!tasks.remove(task)) {
      throw new IllegalStateException(
          "Finishing unknown task " + task.getTaskId() + 
          " from application " + applicationId);
    }
    
    NodeManager nodeManager = task.getNodeManager();
    ContainerId containerId = task.getContainerId();
    task.stop();
    List<ContainerId> containerIds = new ArrayList<ContainerId>();
    containerIds.add(containerId);
    StopContainersRequest stopRequest =
        StopContainersRequest.newInstance(containerIds);
    nodeManager.stopContainers(stopRequest);
    
    Resources.subtractFrom(used, requestSpec.get(task.getSchedulerKey()));
    
    LOG.info("Finished task " + task.getTaskId() + 
        " of application " + applicationId + 
        " on node " + nodeManager.getHostName() + 
        ", currently using " + used + " resources");
  }
  
  private synchronized void addResourceRequest(
      SchedulerRequestKey schedulerKey, Map<String, ResourceRequest> requests,
      String resourceName, Resource capability) {
    ResourceRequest request = requests.get(resourceName);
    if (request == null) {
      request = 
        org.apache.hadoop.yarn.server.utils.BuilderUtils.newResourceRequest(
            schedulerKey.getPriority(), resourceName, capability, 1);
      requests.put(resourceName, request);
    } else {
      request.setNumContainers(request.getNumContainers() + 1);
    }
    if (request.getNodeLabelExpression() == null) {
      request.setNodeLabelExpression(RMNodeLabelsManager.NO_LABEL);
    }
    
    // Note this down for next interaction with ResourceManager
    ask.remove(request);
    // clone to ensure the RM doesn't manipulate the same obj
    ask.add(ResourceRequest.clone(request));

    if (LOG.isDebugEnabled()) {
      LOG.debug("addResourceRequest: applicationId=" + applicationId.getId()
          + " priority=" + schedulerKey.getPriority().getPriority()
          + " resourceName=" + resourceName + " capability=" + capability
          + " numContainers=" + request.getNumContainers()
          + " #asks=" + ask.size());
    }
  }
  
  public synchronized List<Container> getResources() throws IOException {
    if(LOG.isDebugEnabled()) {
      LOG.debug("getResources begin: application={} #ask={}",
          applicationId, ask.size());

      for (ResourceRequest request : ask) {
        LOG.debug("getResources: application={} ask-request={}",
            applicationId, request);
      }
    }
    
    // Get resources from the ResourceManager
    Allocation allocation = resourceManager.getResourceScheduler().allocate(
        applicationAttemptId, new ArrayList<ResourceRequest>(ask), null, new ArrayList<ContainerId>(), null, null,
        new ContainerUpdates());

    if (LOG.isInfoEnabled()) {
      LOG.info("-=======" + applicationAttemptId + System.lineSeparator() +
          "----------" + resourceManager.getRMContext().getRMApps()
              .get(applicationId).getRMAppAttempt(applicationAttemptId));
    }

    List<Container> containers = allocation.getContainers();

    // Clear state for next interaction with ResourceManager
    ask.clear();
    
    if(LOG.isDebugEnabled()) {
      LOG.debug("getResources() for {}: ask={} received={}",
          applicationId, ask.size(), containers.size());
    }
    
    return containers;
  }
  
  public synchronized void assign(List<Container> containers) 
  throws IOException, YarnException {
    
    int numContainers = containers.size();
    // Schedule in priority order
    for (SchedulerRequestKey schedulerKey: requests.keySet()) {
      assign(schedulerKey, NodeType.NODE_LOCAL, containers);
      assign(schedulerKey, NodeType.RACK_LOCAL, containers);
      assign(schedulerKey, NodeType.OFF_SWITCH, containers);

      if (containers.isEmpty()) { 
        break;
      }
    }
    
    int assignedContainers = numContainers - containers.size();
    LOG.info("Application " + applicationId + " assigned " + 
        assignedContainers + "/" + numContainers);
  }
  
  public synchronized void schedule() throws IOException, YarnException {
    assign(getResources());
  }
  
  private synchronized void assign(SchedulerRequestKey schedulerKey,
      NodeType type, List<Container> containers)
      throws IOException, YarnException {
    for (Iterator<Container> i=containers.iterator(); i.hasNext();) {
      Container container = i.next();
      String host = container.getNodeId().toString();
      
      if (Resources.equals(requestSpec.get(schedulerKey),
          container.getResource())) {
        // See which task can use this container
        for (Iterator<Task> t=tasks.get(schedulerKey).iterator();
             t.hasNext();) {
          Task task = t.next();
          if (task.getState() == State.PENDING && task.canSchedule(type, host)) {
            NodeManager nodeManager = getNodeManager(host);
            
            task.start(nodeManager, container.getId());
            i.remove();
            
            // Track application resource usage
            Resources.addTo(used, container.getResource());

            LOG.info("Assigned container (" + container + ") of type " + type +
                " to task " + task.getTaskId() + " at priority " +
                schedulerKey.getPriority() +
                " on node " + nodeManager.getHostName() +
                ", currently using " + used + " resources");

            // Update resource requests
            updateResourceRequests(requests.get(schedulerKey), type, task);

            // Launch the container
            StartContainerRequest scRequest =
                StartContainerRequest.newInstance(createCLC(),
                  container.getContainerToken());
            List<StartContainerRequest> list =
                new ArrayList<StartContainerRequest>();
            list.add(scRequest);
            StartContainersRequest allRequests =
                StartContainersRequest.newInstance(list);
            nodeManager.startContainers(allRequests);
            break;
          }
        }
      }
    }
  }

  private void updateResourceRequests(Map<String, ResourceRequest> requests, 
      NodeType type, Task task) {
    if (type == NodeType.NODE_LOCAL) {
      for (String host : task.getHosts()) {
        if(LOG.isDebugEnabled()) {
          LOG.debug("updateResourceDemands:" + " application=" + applicationId
            + " type=" + type + " host=" + host
            + " request=" + ((requests == null) ? "null" : requests.get(host)));
        }
        updateResourceRequest(requests.get(host));
      }
    }
    
    if (type == NodeType.NODE_LOCAL || type == NodeType.RACK_LOCAL) {
      for (String rack : task.getRacks()) {
        if(LOG.isDebugEnabled()) {
          LOG.debug("updateResourceDemands:" + " application=" + applicationId
            + " type=" + type + " rack=" + rack
            + " request=" + ((requests == null) ? "null" : requests.get(rack)));
        }
        updateResourceRequest(requests.get(rack));
      }
    }
    
    updateResourceRequest(requests.get(ResourceRequest.ANY));
    
    LOG.debug("updateResourceDemands: application={} #asks={}",
        applicationId, ask.size());
  }
  
  private void updateResourceRequest(ResourceRequest request) {
    request.setNumContainers(request.getNumContainers() - 1);

    // Note this for next interaction with ResourceManager
    ask.remove(request);
    // clone to ensure the RM doesn't manipulate the same obj
    ask.add(ResourceRequest.clone(request));

    LOG.debug("updateResourceRequest: application={} request={}",
        applicationId, request);
  }

  private ContainerLaunchContext createCLC() {
    ContainerLaunchContext clc = recordFactory.newRecordInstance(ContainerLaunchContext.class);
    return clc;
  }
}
