package org.apache.helix.task.assigner;

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.PriorityQueue;

import java.util.Set;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.task.AssignableInstanceManager;
import org.apache.helix.task.TaskConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ThreadCountBasedTaskAssigner implements TaskAssigner {
  private static final Logger logger = LoggerFactory.getLogger(ThreadCountBasedTaskAssigner.class);
  private static final int SCHED_QUEUE_INIT_CAPACITY = 200;

  private AssignableInstanceManager _assignableInstanceManager;

  /**
   * Assigns given tasks to given AssignableInstances assuming the DEFAULT quota type for all tasks.
   * @param assignableInstances AssignableInstances
   * @param tasks TaskConfigs of the same quota type
   * @return taskID -> TaskAssignmentResult mappings
   */
  public Map<String, TaskAssignResult> assignTasks(Iterable<AssignableInstance> assignableInstances,
      Iterable<TaskConfig> tasks) {
    return assignTasks(assignableInstances, tasks, AssignableInstance.DEFAULT_QUOTA_TYPE);
  }

  /**
   * This is a simple task assigning algorithm that uses the following assumptions to achieve
   * efficiency in assigning tasks:
   * 1. All tasks have same quota type
   * 2. All tasks only need 1 thread for assignment, no other things to consider
   * The algorithm ensures the spread-out of tasks with same quota type or tasks from same job, with
   * best effort.
   * NOTE: once we have more things to consider during scheduling, we will need to come up with
   * a more generic task assignment algorithm.
   * @param assignableInstances AssignableInstances
   * @param tasks TaskConfigs of the same quota type
   * @param quotaType quota type of the tasks
   * @return taskID -> TaskAssignmentResult mappings
   */
  @Override
  public Map<String, TaskAssignResult> assignTasks(Iterable<AssignableInstance> assignableInstances,
      Iterable<TaskConfig> tasks, String quotaType) {
    throw new UnsupportedOperationException();
  }

  @Override
  public Map<String, TaskAssignResult> assignTasks(
      AssignableInstanceManager assignableInstanceManager, Collection<String> instances,
      Iterable<TaskConfig> tasks, String quotaType) {
    Set<AssignableInstance> assignableInstances = new HashSet<>();
    // Only add the AssignableInstances that are also in instances
    for (String instance : instances) {
      assignableInstances.add(assignableInstanceManager.getAssignableInstance(instance));
    }

    if (tasks == null || !tasks.iterator().hasNext()) {
      return Collections.emptyMap();
    }
    if (assignableInstances.isEmpty()) {
      return buildNoInstanceAssignment(tasks, quotaType);
    }
    if (quotaType == null || quotaType.equals("") || quotaType.equals("null")) {
      // Sometimes null is stored as a String literal
      logger.warn("Quota type is null. Assigning it as DEFAULT type!");
      quotaType = AssignableInstance.DEFAULT_QUOTA_TYPE;
    }

    logger.info("Assigning tasks with quota type {}", quotaType);

    // Build a sched queue
    PriorityQueue<AssignableInstance> queue = buildSchedQueue(quotaType, assignableInstances);

    // Assign
    Map<String, TaskAssignResult> assignResults = new HashMap<>();
    for (TaskConfig task : tasks) {

      // Dedup
      if (assignResults.containsKey(task.getId())) {
        logger.warn("Duplicated task assignment {}", task);
        continue;
      }

      // Try to assign the task to least used instance
      AssignableInstance instance = queue.poll();
      TaskAssignResult result = instance.tryAssign(task, quotaType);
      assignResults.put(task.getId(), result);

      if (result.isSuccessful()) {
        // If the task is successfully accepted by the instance, assign it to the instance
        assignableInstanceManager.assign(instance.getInstanceName(), result);
      }

      // requeue the instance to rank again
      queue.offer(instance);
    }
    logger.info("Finished assigning tasks with quota type {}", quotaType);
    return assignResults;
  }

  private PriorityQueue<AssignableInstance> buildSchedQueue(String quotaType,
      Iterable<AssignableInstance> instances) {
    AssignableInstanceComparator comparator = new AssignableInstanceComparator(quotaType);
    PriorityQueue<AssignableInstance> queue =
        new PriorityQueue<>(SCHED_QUEUE_INIT_CAPACITY, comparator);
    for (AssignableInstance assignableInstance : instances) {
      queue.offer(assignableInstance);
    }
    return queue;
  }

  private Map<String, TaskAssignResult> buildNoInstanceAssignment(Iterable<TaskConfig> tasks,
      String quotaType) {
    Map<String, TaskAssignResult> result = new HashMap<>();
    for (TaskConfig taskConfig : tasks) {
      result.put(taskConfig.getId(), new TaskAssignResult(taskConfig, quotaType, null, false, 0,
          TaskAssignResult.FailureReason.INSUFFICIENT_QUOTA, "No assignable instance to assign"));
    }
    return result;
  }

  private static class AssignableInstanceComparator implements Comparator<AssignableInstance> {

    /**
     * Resource type this comparator needs to compare
     */
    private final String RESOURCE_TYPE = LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name();

    /**
     * Resource quota type this comparator needs to compare
     */
    private final String _quotaType;

    public AssignableInstanceComparator(String quotaType) {
      _quotaType = quotaType;
    }

    /**
     * Using this comparator, AssignableInstance will be sorted based on availability of
     * quota given job type in the priority queue. Top of the queue will be the one with
     * highest priority
     * @return a negative integer, zero, or a positive integer as the
     *         first argument is less than, equal to, or greater than the
     *         second
     */
    @Override
    public int compare(AssignableInstance o1, AssignableInstance o2) {
      Integer o1RemainingCapacity = getRemainingUsage(o1.getTotalCapacity(), o1.getUsedCapacity());
      Integer o2RemainingCapacity = getRemainingUsage(o2.getTotalCapacity(), o2.getUsedCapacity());
      return o2RemainingCapacity - o1RemainingCapacity;
    }

    private Integer getRemainingUsage(Map<String, Map<String, Integer>> capacity,
        Map<String, Map<String, Integer>> used) {
      if (capacity.containsKey(RESOURCE_TYPE)) {
        String quotaType = AssignableInstance.DEFAULT_QUOTA_TYPE;
        if (capacity.get(RESOURCE_TYPE).containsKey(_quotaType)) {
          // If the quotaType is not supported, sort as DEFAULT because it will be assigned as
          // DEFAULT
          quotaType = _quotaType;
        }
        return capacity.get(RESOURCE_TYPE).get(quotaType) - used.get(RESOURCE_TYPE).get(quotaType);
      }
      return 0;
    }
  }

  public void init(AssignableInstanceManager assignableInstanceManager) {
    _assignableInstanceManager = assignableInstanceManager;
  }
}
