package org.apache.helix.task;

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.helix.common.caches.TaskDataCache;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Message;
import org.apache.helix.model.Partition;
import org.apache.helix.model.Resource;
import org.apache.helix.task.assigner.AssignableInstance;
import org.apache.helix.task.assigner.TaskAssignResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AssignableInstanceManager {

  private static final Logger LOG = LoggerFactory.getLogger(AssignableInstanceManager.class);
  public static final int QUOTA_TYPE_NOT_EXIST = -1;
  private static ObjectMapper mapper = new ObjectMapper();
  // Instance name -> AssignableInstance
  private Map<String, AssignableInstance> _assignableInstanceMap;
  // TaskID -> TaskAssignResult TODO: Hunter: Move this if not needed
  private Map<String, TaskAssignResult> _taskAssignResultMap;

  // With one dimentional quota, and every task only needs 1 quota, this map will save some work
  // in case quota is full. But if there is multi-dimential quota, such aggregated quota map may not
  // help in most of the cases, as this global view does not mean that a single instance has all
  // these quota available.
  // This map is quota type -> remaining global quota
  private Map<String, Integer> _globalThreadBasedQuotaMap;

  /**
   * Basic constructor for AssignableInstanceManager to allow an empty instantiation.
   * buildAssignableInstances() must be explicitly called after instantiation.
   */
  public AssignableInstanceManager() {
    _assignableInstanceMap = new ConcurrentHashMap<>();
    _taskAssignResultMap = new ConcurrentHashMap<>();
    _globalThreadBasedQuotaMap = new ConcurrentHashMap<>();
  }

  /**
   * Builds AssignableInstances and restores TaskAssignResults from scratch by reading from
   * TaskDataCache. It re-computes current quota profile for each AssignableInstance.
   * @param clusterConfig
   * @param taskDataCache
   * @param liveInstances
   * @param instanceConfigs
   */
  public void buildAssignableInstances(ClusterConfig clusterConfig, TaskDataCache taskDataCache,
      Map<String, LiveInstance> liveInstances, Map<String, InstanceConfig> instanceConfigs) {
    // Reset all cached information
    _assignableInstanceMap.clear();
    _taskAssignResultMap.clear();

    // Create all AssignableInstance objects based on what's in liveInstances
    for (Map.Entry<String, LiveInstance> liveInstanceEntry : liveInstances.entrySet()) {
      // Prepare instance-specific metadata
      String instanceName = liveInstanceEntry.getKey();
      LiveInstance liveInstance = liveInstanceEntry.getValue();
      if (!instanceConfigs.containsKey(instanceName)) {
        continue; // Ill-formatted input; skip over this instance
      }
      InstanceConfig instanceConfig = instanceConfigs.get(instanceName);

      // Create an AssignableInstance
      AssignableInstance assignableInstance =
          new AssignableInstance(clusterConfig, instanceConfig, liveInstance);
      _assignableInstanceMap.put(instanceConfig.getInstanceName(), assignableInstance);
      LOG.debug("AssignableInstance created for instance: {}", instanceName);
    }

    // Update task profiles by traversing all TaskContexts
    Map<String, JobConfig> jobConfigMap = taskDataCache.getJobConfigMap();
    for (String jobName : jobConfigMap.keySet()) {
      JobConfig jobConfig = jobConfigMap.get(jobName);
      JobContext jobContext = taskDataCache.getJobContext(jobName);
      if (jobConfig == null || jobContext == null) {
        LOG.debug(
            "JobConfig or JobContext for this job is null. Skipping this job! Job name: {}, JobConfig: {}, JobContext: {}",
            jobName, jobConfig, jobContext);
        continue; // Ignore this job if either the config or context is null
      }

      // First, check that the workflow and job are in valid states. This is important because
      // sometimes aborted jobs do not get a proper update of their task states, meaning there could
      // be INIT and RUNNING tasks we want to ignore
      String workflowName = jobConfig.getWorkflow();
      WorkflowConfig workflowConfig = taskDataCache.getWorkflowConfig(workflowName);
      WorkflowContext workflowContext = taskDataCache.getWorkflowContext(workflowName);
      if (workflowConfig == null || workflowContext == null) {
        // There is no workflow config or context - meaning no tasks are currently scheduled and
        // invalid, so skip this job
        continue;
      }
      TaskState workflowState = workflowContext.getWorkflowState();
      TaskState jobState = workflowContext.getJobState(jobName);
      if (isResourceTerminalOrStopped(workflowState) || isResourceTerminalOrStopped(jobState)) {
        continue;
      }

      String quotaType = jobConfig.getJobType();
      if (quotaType == null) {
        quotaType = AssignableInstance.DEFAULT_QUOTA_TYPE;
      }
      Set<Integer> taskIndices = jobContext.getPartitionSet(); // Each integer represents a task in
      // this job (this is NOT taskId)
      for (int taskIndex : taskIndices) {
        TaskPartitionState taskState = jobContext.getPartitionState(taskIndex);
        if (taskState == TaskPartitionState.INIT || taskState == TaskPartitionState.RUNNING) {
          // Because task state is INIT or RUNNING, find the right AssignableInstance and subtract
          // the right amount of resources. STOPPED means it's been cancelled, so it will be
          // re-assigned and therefore does not use instances' resources

          String assignedInstance = jobContext.getAssignedParticipant(taskIndex);
          String taskId = jobContext.getTaskIdForPartition(taskIndex);
          if (taskId == null) {
            // For targeted tasks, taskId will be null
            // We instead use pName (see FixedTargetTaskAssignmentCalculator)
            taskId = String.format("%s_%s", jobConfig.getJobId(), taskIndex);
          }
          if (assignedInstance == null) {
            LOG.debug(
                "This task's TaskContext does not have an assigned instance! Task will be ignored. "
                    + "Job: {}, TaskId: {}, TaskIndex: {}",
                jobContext.getName(), taskId, taskIndex);
            continue;
          }
          if (_assignableInstanceMap.containsKey(assignedInstance)) {
            TaskConfig taskConfig = jobConfig.getTaskConfig(taskId);
            AssignableInstance assignableInstance = _assignableInstanceMap.get(assignedInstance);
            TaskAssignResult taskAssignResult =
                assignableInstance.restoreTaskAssignResult(taskId, taskConfig, quotaType);
            if (taskAssignResult.isSuccessful()) {
              _taskAssignResultMap.put(taskId, taskAssignResult);
              LOG.debug("TaskAssignResult restored for taskId: {}, assigned on instance: {}",
                  taskId, assignedInstance);
            }
          } else {
            LOG.debug(
                "While building AssignableInstance map, discovered that the instance a task is assigned to is no "
                    + "longer a LiveInstance! TaskAssignResult will not be created and no resource will be taken "
                    + "up for this task. Job: {}, TaskId: {}, TaskIndex: {}, Instance: {}",
                jobContext.getName(), taskId, taskIndex, assignedInstance);
          }
        }
      }
    }
    LOG.info(
        "AssignableInstanceManager built AssignableInstances from scratch based on contexts in TaskDataCache due to Controller switch or ClusterConfig change.");
    computeGlobalThreadBasedCapacity();
  }

  /**
   * Builds AssignableInstances and restores TaskAssignResults from scratch by reading from
   * CurrentState. It re-computes current quota profile for each AssignableInstance.
   * If a task current state is INIT or RUNNING or if there is a pending message which it's ToState
   * is RUNNING, the task/partition will be assigned to AssignableInstances of the instance.
   * @param clusterConfig
   * @param taskDataCache
   * @param liveInstances
   * @param instanceConfigs
   * @param currentStateOutput
   * @param resourceMap
   */
  public void buildAssignableInstancesFromCurrentState(ClusterConfig clusterConfig,
      TaskDataCache taskDataCache, Map<String, LiveInstance> liveInstances,
      Map<String, InstanceConfig> instanceConfigs, CurrentStateOutput currentStateOutput,
      Map<String, Resource> resourceMap) {
    _assignableInstanceMap.clear();
    _taskAssignResultMap.clear();

    // Create all AssignableInstance objects based on what's in liveInstances
    for (Map.Entry<String, LiveInstance> liveInstanceEntry : liveInstances.entrySet()) {
      // Prepare instance-specific metadata
      String instanceName = liveInstanceEntry.getKey();
      LiveInstance liveInstance = liveInstanceEntry.getValue();
      if (!instanceConfigs.containsKey(instanceName)) {
        continue; // Ill-formatted input; skip over this instance
      }
      InstanceConfig instanceConfig = instanceConfigs.get(instanceName);

      // Create an AssignableInstance
      AssignableInstance assignableInstance =
          new AssignableInstance(clusterConfig, instanceConfig, liveInstance);
      _assignableInstanceMap.put(instanceConfig.getInstanceName(), assignableInstance);
      LOG.debug("AssignableInstance created for instance: {}", instanceName);
    }

    Map<String, JobConfig> jobConfigMap = taskDataCache.getJobConfigMap();

    // Update task profiles by traversing all CurrentStates
    for (Map.Entry<String, Resource> resourceEntry : resourceMap.entrySet()) {
      String resourceName = resourceEntry.getKey();
      if (resourceEntry.getValue().getStateModelDefRef().equals(TaskConstants.STATE_MODEL_NAME)) {
        JobConfig jobConfig = jobConfigMap.get(resourceName);
        JobContext jobContext = taskDataCache.getJobContext(resourceName);
        String quotaType = getQuotaType(jobConfig);
        Map<Partition, Map<String, String>> currentStateMap =
            currentStateOutput.getCurrentStateMap(resourceName);
        for (Map.Entry<Partition, Map<String, String>> currentStateMapEntry : currentStateMap
            .entrySet()) {
          Partition partition = currentStateMapEntry.getKey();
          String taskId = getTaskID(jobConfig, jobContext, partition);
          for (Map.Entry<String, String> instanceCurrentStateEntry : currentStateMapEntry.getValue()
              .entrySet()) {
            String assignedInstance = instanceCurrentStateEntry.getKey();
            String taskState = instanceCurrentStateEntry.getValue();
            // If a task in in INIT or RUNNING state on the instance, this task should occupy one
            // quota from this instance.
            if (taskState == null) {
              LOG.warn("CurrentState is null for job {}, task {} on instance {}", resourceName,
                  taskId, assignedInstance);
            }
            if (TaskPartitionState.INIT.name().equals(taskState)
                || TaskPartitionState.RUNNING.name().equals(taskState)) {
              assignTaskToInstance(assignedInstance, jobConfig, taskId, quotaType);
            }
          }
        }
        Map<Partition, Map<String, Message>> pendingMessageMap =
            currentStateOutput.getPendingMessageMap(resourceName);
        for (Map.Entry<Partition, Map<String, Message>> pendingMessageMapEntry : pendingMessageMap
            .entrySet()) {
          Partition partition = pendingMessageMapEntry.getKey();
          String taskId = getTaskID(jobConfig, jobContext, partition);
          for (Map.Entry<String, Message> instancePendingMessageEntry : pendingMessageMapEntry
              .getValue().entrySet()) {
            String assignedInstance = instancePendingMessageEntry.getKey();
            String messageToState = instancePendingMessageEntry.getValue().getToState();
            // If there is a pending message on the instance which has ToState of RUNNING, the task
            // will run on the instance soon. So the task needs to occupy one quota on this instance.
            if (TaskPartitionState.RUNNING.name().equals(messageToState)
                && !TaskPartitionState.INIT.name().equals(
                    currentStateOutput.getCurrentState(resourceName, partition, assignedInstance))
                && !TaskPartitionState.RUNNING.name().equals(currentStateOutput
                    .getCurrentState(resourceName, partition, assignedInstance))) {
              assignTaskToInstance(assignedInstance, jobConfig, taskId, quotaType);
            }
          }
        }
      }
    }
    LOG.info(
        "AssignableInstanceManager built AssignableInstances from scratch based on CurrentState.");
    computeGlobalThreadBasedCapacity();
  }

  /**
   * Assign the task to the instance's Assignable Instance
   * @param instance
   * @param jobConfig
   * @param taskId
   * @param quotaType
   */
  private void assignTaskToInstance(String instance, JobConfig jobConfig, String taskId,
      String quotaType) {
    if (_assignableInstanceMap.containsKey(instance)) {
      TaskConfig taskConfig = getTaskConfig(jobConfig, taskId);
      AssignableInstance assignableInstance = _assignableInstanceMap.get(instance);
      TaskAssignResult taskAssignResult =
          assignableInstance.restoreTaskAssignResult(taskId, taskConfig, quotaType);
      if (taskAssignResult.isSuccessful()) {
        _taskAssignResultMap.put(taskId, taskAssignResult);
        LOG.debug("TaskAssignResult restored for taskId: {}, assigned on instance: {}", taskId,
            instance);
      }
    } else {
      LOG.debug(
          "While building AssignableInstance map, discovered that the instance a task is assigned to is no "
              + "longer a LiveInstance! TaskAssignResult will not be created and no resource will be taken "
              + "up for this task. TaskId: {}, Instance: {}",
          taskId, instance);
    }
  }

  /**
   * Extract the quota type information of the Job
   * @param jobConfig
   * @return
   */
  private String getQuotaType(JobConfig jobConfig) {
    // If jobConfig is null (job has been deleted but participant has not dropped the task yet), use
    // default quota for the task
    if (jobConfig == null || jobConfig.getJobType() == null) {
      return AssignableInstance.DEFAULT_QUOTA_TYPE;
    }
    return jobConfig.getJobType();
  }

  /**
   * Calculate the TaskID based on the JobConfig and JobContext information
   * @param jobConfig
   * @param jobContext
   * @param partition
   * @return
   */
  private String getTaskID(JobConfig jobConfig, JobContext jobContext, Partition partition) {
    if (jobConfig == null || jobContext == null) {
      // If JobConfig or JobContext is null, use the partition name
      return partition.getPartitionName();
    }
    int taskIndex = TaskUtil.getPartitionId(partition.getPartitionName());
    String taskId = jobContext.getTaskIdForPartition(taskIndex);
    if (taskId == null) {
      // For targeted tasks, taskId will be null
      // We instead use pName (see FixedTargetTaskAssignmentCalculator)
      taskId = String.format("%s_%s", jobConfig.getJobId(), taskIndex);
    }
    return taskId;
  }

  /**
   * A method that return the task config a task based on the JonConfig information
   * @param jobConfig
   * @param taskId
   * @return
   */
  private TaskConfig getTaskConfig (JobConfig jobConfig, String taskId) {
    if (jobConfig == null){
      return new TaskConfig(null, null, taskId, null);
    }
    return jobConfig.getTaskConfig(taskId);
  }

  /**
   * Updates AssignableInstances when there are changes in LiveInstances or InstanceConfig. This
   * update only keeps an up-to-date count of AssignableInstances and does NOT re-build tasks
   * (because it's costly).
   * Call this when there is only LiveInstance/InstanceConfig change.
   * @param clusterConfig
   * @param liveInstances
   * @param instanceConfigs
   */
  public void updateAssignableInstances(ClusterConfig clusterConfig,
      Map<String, LiveInstance> liveInstances, Map<String, InstanceConfig> instanceConfigs) {
    // Keep a collection to determine what's no longer a LiveInstance, in which case the
    // corresponding AssignableInstance must be removed
    Collection<AssignableInstance> staleAssignableInstances =
        new HashSet<>(_assignableInstanceMap.values());

    // Loop over new LiveInstances
    for (Map.Entry<String, LiveInstance> liveInstanceEntry : liveInstances.entrySet()) {
      // Prepare instance-specific metadata
      String instanceName = liveInstanceEntry.getKey();
      LiveInstance liveInstance = liveInstanceEntry.getValue();
      if (!instanceConfigs.containsKey(instanceName)) {
        continue; // Ill-formatted input; skip over this instance
      }
      InstanceConfig instanceConfig = instanceConfigs.get(instanceName);

      // Update configs for currently existing instance
      if (_assignableInstanceMap.containsKey(instanceName)) {
        _assignableInstanceMap.get(instanceName).updateConfigs(clusterConfig, instanceConfig,
            liveInstance);
      } else {
        // create a new AssignableInstance for a newly added LiveInstance; this is a new
        // LiveInstance so TaskAssignResults are not re-created and no tasks are assigned
        AssignableInstance assignableInstance =
            new AssignableInstance(clusterConfig, instanceConfig, liveInstance);
        _assignableInstanceMap.put(instanceName, assignableInstance);
        LOG.debug("AssignableInstance created for instance: {} during updateAssignableInstances",
            instanceName);
      }
      // Remove because we've confirmed that this AssignableInstance is a LiveInstance as well
      staleAssignableInstances.remove(_assignableInstanceMap.get(instanceName));
    }

    // AssignableInstances that are not live need to be removed from the map because they are not
    // live
    for (AssignableInstance instanceToBeRemoved : staleAssignableInstances) {
      // Remove all tasks on this instance first
      for (String taskToRemove : instanceToBeRemoved.getCurrentAssignments()) {
        // Check that AssignableInstances match
        if (_taskAssignResultMap.containsKey(taskToRemove)) {
          if (_taskAssignResultMap.get(taskToRemove).getAssignableInstance().getInstanceName()
              .equals(instanceToBeRemoved.getInstanceName())) {
            _taskAssignResultMap.remove(taskToRemove); // TODO: Hunter: Move this if necessary
            LOG.debug(
                "TaskAssignResult removed because its assigned instance is no longer live. TaskID: {}, instance: {}",
                taskToRemove, instanceToBeRemoved.getInstanceName());
          }
        }
      }
      _assignableInstanceMap.remove(instanceToBeRemoved.getInstanceName());
      LOG.debug(
          "Non-live AssignableInstance removed for instance: {} during updateAssignableInstances",
          instanceToBeRemoved.getInstanceName());
    }
    LOG.info(
        "AssignableInstanceManager updated AssignableInstances due to LiveInstance/InstanceConfig change.");
    computeGlobalThreadBasedCapacity();
  }

  /**
   * Returns all instanceName -> AssignableInstance mappings.
   * @return assignableInstanceMap
   */
  public Map<String, AssignableInstance> getAssignableInstanceMap() {
    return Collections.unmodifiableMap(_assignableInstanceMap);
  }

  /**
   * Returns an AssignableInstance object by name.
   * @param instanceName
   * @return
   */
  public AssignableInstance getAssignableInstance(String instanceName) {
    return _assignableInstanceMap.get(instanceName);
  }

  /**
   * Returns all AssignableInstances that support a given quota type.
   * @param quotaType
   * @return unmodifiable set of AssignableInstances
   */
  public Set<AssignableInstance> getAssignableInstancesForQuotaType(String quotaType) {
    // TODO: Currently, quota types are global settings across all AssignableInstances. When this
    // TODO: becomes customizable, we need to actually implement this so that it doesn't return all
    // TODO: AssignableInstances
    return Collections.unmodifiableSet(new HashSet<>(_assignableInstanceMap.values()));
  }

  /**
   * Returns taskId -> TaskAssignResult mappings.
   * @return taskAssignResultMap
   */
  public Map<String, TaskAssignResult> getTaskAssignResultMap() {
    return _taskAssignResultMap;
  }

  /**
   * Returns a mapping of: jobType -> available threads in all instances for this jobType
   * @return globalThreadBasedQuotaMap
   */
  public Map<String, Integer> getGlobalCapacityMap() {
    return Collections.unmodifiableMap(_globalThreadBasedQuotaMap);
  }

  /**
   * Check remained global quota of certain quota type for skipping redundant computation
   * @param quotaType
   * @return
   */
  public boolean hasGlobalCapacity(String quotaType) {
    return _globalThreadBasedQuotaMap.containsKey(quotaType)
        && _globalThreadBasedQuotaMap.get(quotaType) > 0;
  }

  /**
   * Check whether quota maps contains the quota type or not
   * @param quotaType
   * @return
   */
  public boolean hasQuotaType(String quotaType) {
    return _globalThreadBasedQuotaMap.containsKey(quotaType);
  }

  /**
   * Wrapper for AssignableInstance release
   * @param instanceName
   * @param taskConfig
   * @param quotaType
   */
  public void release(String instanceName, TaskConfig taskConfig, String quotaType) {
    if (quotaType == null) {
      LOG.debug("Task {}'s quotaType is null. Trying to release as DEFAULT type.",
          taskConfig.getId());
      quotaType = AssignableInstance.DEFAULT_QUOTA_TYPE;
    }
    if (_assignableInstanceMap.containsKey(instanceName)) {
      _assignableInstanceMap.get(instanceName).release(taskConfig, quotaType);
    }

    if (_globalThreadBasedQuotaMap.containsKey(quotaType)) {
      _globalThreadBasedQuotaMap.put(quotaType, _globalThreadBasedQuotaMap.get(quotaType) + 1);
    }
  }

  /**
   * Wrapper for AssignableInstance tryAssign
   * @param instanceName
   * @param task
   * @param quotaType
   * @return
   * @throws IllegalArgumentException
   */
  public TaskAssignResult tryAssign(String instanceName, TaskConfig task, String quotaType)
      throws IllegalArgumentException {
    if (_assignableInstanceMap.containsKey(instanceName)) {
      return _assignableInstanceMap.get(instanceName).tryAssign(task, quotaType);
    }
    return null;
  }

  /**
   * Wrapper for AssignableInstance assign
   * @param instanceName
   * @param result
   * @throws IllegalStateException
   */
  public void assign(String instanceName, TaskAssignResult result) throws IllegalStateException {
    if (result != null && _assignableInstanceMap.containsKey(instanceName)) {
      _assignableInstanceMap.get(instanceName).assign(result);
      _taskAssignResultMap.put(result.getTaskConfig().getId(), result);
    }

    if (_globalThreadBasedQuotaMap.containsKey(result.getQuotaType())) {
      _globalThreadBasedQuotaMap
          .put(result.getQuotaType(), _globalThreadBasedQuotaMap.get(result.getQuotaType()) - 1);
    }
  }

  /**
   * Get all the AssignableInstance names
   * @return
   */
  public Set<String> getAssignableInstanceNames() {
    return Collections.unmodifiableSet(_assignableInstanceMap.keySet());
  }

  /**
   * Determines whether it's possible for a given workflow or a job to have any running tasks. In
   * other words, rule out all resources that are in terminal states or have been stopped.
   * @param state
   * @return
   */
  private boolean isResourceTerminalOrStopped(TaskState state) {
    if (state == null) {
      // If the state is null, it cannot have currently-running tasks either, so consider it
      // inactive
      return true;
    }
    switch (state) {
    case ABORTED:
    case FAILED:
    case STOPPED:
    case COMPLETED:
    case TIMED_OUT:
    case NOT_STARTED:
      return true;
    }
    return false;
  }

  /*
   * Creates a JSON-style String that shows the quota profile and logs it.
   * TODO: Make this with an associated event ID if this becomes a performance bottleneck
   * @param onlyDisplayIfFull if true, this String will only contain the profile for instances whose
   * quota capacity is at its full to avoid cluttering up the log
   */
  public void logQuotaProfileJSON(boolean onlyDisplayIfFull) {
    // Create a String to use as the log for quota status
    JsonNode instanceNode = mapper.createObjectNode();

    // Loop through all instances
    for (Map.Entry<String, AssignableInstance> instanceEntry : _assignableInstanceMap.entrySet()) {
      AssignableInstance assignableInstance = instanceEntry.getValue();
      boolean capacityFull = false;
      JsonNode resourceTypeNode = mapper.createObjectNode();
      for (Map.Entry<String, Map<String, Integer>> capacityEntry : assignableInstance
          .getTotalCapacity().entrySet()) {
        String resourceType = capacityEntry.getKey();
        Map<String, Integer> quotaTypeMap = capacityEntry.getValue();
        JsonNode quotaTypeNode = mapper.createObjectNode();
        for (Map.Entry<String, Integer> typeEntry : quotaTypeMap.entrySet()) {
          String quotaType = typeEntry.getKey();
          int totalCapacity = typeEntry.getValue();
          int usedCapacity = assignableInstance.getUsedCapacity().get(resourceType).get(quotaType);
          if (!capacityFull) {
            capacityFull = totalCapacity <= usedCapacity;
          }
          String capacityString = String.format("%d/%d", usedCapacity, totalCapacity);
          ((ObjectNode) quotaTypeNode).put(quotaType, capacityString);
        }
        ((ObjectNode) resourceTypeNode).put(resourceType, quotaTypeNode);
      }
      // If onlyDisplayIfFull, do not add the JsonNode to the parent node
      if (onlyDisplayIfFull && !capacityFull) {
        continue;
      }
      ((ObjectNode) instanceNode).put(instanceEntry.getKey(), resourceTypeNode);
    }
    if (instanceNode.size() > 0) {
      LOG.info("Current quota capacity: {}", instanceNode.toString());
    }
  }

  private void computeGlobalThreadBasedCapacity() {
    _globalThreadBasedQuotaMap.clear();
    for (AssignableInstance assignableInstance : _assignableInstanceMap.values()) {
      Map<String, Map<String, Integer>> capacityMap = assignableInstance.getTotalCapacity();
      for (Map.Entry<String, Integer> entry : capacityMap
          .get(LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name()).entrySet()) {
        int value = entry.getValue();
        if (_globalThreadBasedQuotaMap.containsKey(entry.getKey())) {
          value += _globalThreadBasedQuotaMap.get(entry.getKey());
        }
        _globalThreadBasedQuotaMap.put(entry.getKey(), value);
      }
    }
  }
}
