/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.heron.packing.roundrobin;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.logging.Logger;

import com.google.common.annotations.VisibleForTesting;

import org.apache.heron.api.generated.TopologyAPI;
import org.apache.heron.api.utils.TopologyUtils;
import org.apache.heron.common.basics.ByteAmount;
import org.apache.heron.common.basics.CPUShare;
import org.apache.heron.common.basics.ResourceMeasure;
import org.apache.heron.spi.common.Config;
import org.apache.heron.spi.common.Context;
import org.apache.heron.spi.packing.IPacking;
import org.apache.heron.spi.packing.IRepacking;
import org.apache.heron.spi.packing.InstanceId;
import org.apache.heron.spi.packing.PackingException;
import org.apache.heron.spi.packing.PackingPlan;
import org.apache.heron.spi.packing.Resource;

/**
 * Round-robin packing algorithm
 * <p>
 * This IPacking implementation generates PackingPlan: instances of the component are assigned
 * to each container one by one in circular order, without any priority. Each container is expected
 * to take equal number of instances if # of instances is multiple of # of containers.
 * <p>
 * Following semantics are guaranteed:
 * 1. Every container requires same size of resource, i.e. same CPU, RAM and disk.
 * Consider that instances in different containers can be different, the value of size
 * will be aligned to the max one.
 * <p>
 * 2. The size of resource required by the whole topology is equal to
 * ((# of container specified in config) + 1) * (size of resource required for a single container).
 * The extra 1 is considered for Heron internal container,
 * i.e. the one containing Scheduler and TMaster.
 * <p>
 * 3. The disk required for a container is calculated as:
 * value for org.apache.heron.api.Config.TOPOLOGY_CONTAINER_DISK_REQUESTED if exists, otherwise,
 * (disk for instances in container) + (disk padding for heron internal process)
 * <p>
 * 4. The CPU required for a container is calculated as:
 * value for org.apache.heron.api.Config.TOPOLOGY_CONTAINER_CPU_REQUESTED if exists, otherwise,
 * (CPU for instances in container) + (CPU padding for heron internal process)
 * <p>
 * 5. The RAM required for a container is calculated as:
 * value for org.apache.heron.api.Config.TOPOLOGY_CONTAINER_RAM_REQUESTED if exists, otherwise,
 * (RAM for instances in container) + (RAM padding for heron internal process)
 * <p>
 * 6. The RAM required for one instance is calculated as:
 * value in org.apache.heron.api.Config.TOPOLOGY_COMPONENT_RAMMAP if exists, otherwise,
 * - if org.apache.heron.api.Config.TOPOLOGY_CONTAINER_RAM_REQUESTED not exists:
 * the default RAM value for one instance
 * - if org.apache.heron.api.Config.TOPOLOGY_CONTAINER_RAM_REQUESTED exists:
 * ((TOPOLOGY_CONTAINER_RAM_REQUESTED) - (RAM padding for heron internal process)
 * - (RAM used by instances within TOPOLOGY_COMPONENT_RAMMAP config))) /
 * (the # of instances in container not specified in TOPOLOGY_COMPONENT_RAMMAP config)
 * 7. The pack() return null if PackingPlan fails to pass the safe check, for instance,
 * the size of RAM for an instance is less than the minimal required value.
 */
public class RoundRobinPacking implements IPacking, IRepacking {
  private static final Logger LOG = Logger.getLogger(RoundRobinPacking.class.getName());

  @VisibleForTesting
  static final ByteAmount DEFAULT_RAM_PADDING_PER_CONTAINER = ByteAmount.fromGigabytes(2);
  @VisibleForTesting
  static final double DEFAULT_CPU_PADDING_PER_CONTAINER = 1.0;
  private static final ByteAmount DEFAULT_DISK_PADDING_PER_CONTAINER = ByteAmount.fromGigabytes(12);

  @VisibleForTesting
  static final ByteAmount DEFAULT_DAEMON_PROCESS_RAM_PADDING = ByteAmount.fromGigabytes(1);
  private static final ByteAmount MIN_RAM_PER_INSTANCE = ByteAmount.fromMegabytes(192);

  // Use as a stub as default number value when getting config value
  private static final ByteAmount NOT_SPECIFIED_BYTE_AMOUNT = ByteAmount.fromBytes(-1);
  private static final double NOT_SPECIFIED_CPU_SHARE = -1.0;

  private static final String RAM = "RAM";
  private static final String CPU = "CPU";
  private static final String DISK = "DISK";

  private TopologyAPI.Topology topology;

  private ByteAmount instanceRamDefault;
  private double instanceCpuDefault;
  private ByteAmount instanceDiskDefault;
  private ByteAmount containerRamPadding = DEFAULT_RAM_PADDING_PER_CONTAINER;
  private double containerCpuPadding = DEFAULT_CPU_PADDING_PER_CONTAINER;

  @Override
  public void initialize(Config config, TopologyAPI.Topology inputTopology) {
    this.topology = inputTopology;
    this.instanceCpuDefault = Context.instanceCpu(config);
    this.instanceRamDefault = Context.instanceRam(config);
    this.instanceDiskDefault = Context.instanceDisk(config);
    this.containerRamPadding = getContainerRamPadding(topology.getTopologyConfig().getKvsList());
    LOG.info(String.format("Initalizing RoundRobinPacking. "
        + "CPU default: %f, RAM default: %s, DISK default: %s, RAM padding: %s.",
        this.instanceCpuDefault,
        this.instanceRamDefault.toString(),
        this.instanceDiskDefault.toString(),
        this.containerRamPadding.toString()));
  }

  @Override
  public PackingPlan pack() {
    int numContainer = TopologyUtils.getNumContainers(topology);
    Map<String, Integer> parallelismMap = TopologyUtils.getComponentParallelism(topology);

    return packInternal(numContainer, parallelismMap);
  }

  private PackingPlan packInternal(int numContainer, Map<String, Integer> parallelismMap) {
    // Get the instances' round-robin allocation
    Map<Integer, List<InstanceId>> roundRobinAllocation =
        getRoundRobinAllocation(numContainer, parallelismMap);

    // Get the RAM map for every instance
    Map<Integer, Map<InstanceId, ByteAmount>> instancesRamMap =
        calculateInstancesResourceMapInContainer(
        roundRobinAllocation,
        TopologyUtils.getComponentRamMapConfig(topology),
        getContainerRamHint(roundRobinAllocation),
        instanceRamDefault,
        containerRamPadding,
        ByteAmount.ZERO,
        NOT_SPECIFIED_BYTE_AMOUNT,
        RAM);

    // Get the CPU map for every instance
    Map<Integer, Map<InstanceId, CPUShare>> instancesCpuMap =
        calculateInstancesResourceMapInContainer(
        roundRobinAllocation,
        CPUShare.convertDoubleMapToCpuShareMap(TopologyUtils.getComponentCpuMapConfig(topology)),
        CPUShare.fromDouble(getContainerCpuHint(roundRobinAllocation)),
        CPUShare.fromDouble(instanceCpuDefault),
        CPUShare.fromDouble(containerCpuPadding),
        CPUShare.fromDouble(0.0),
        CPUShare.fromDouble(NOT_SPECIFIED_CPU_SHARE),
        CPU);

    ByteAmount containerDiskInBytes = getContainerDiskHint(roundRobinAllocation);
    double containerCpuHint = getContainerCpuHint(roundRobinAllocation);
    ByteAmount containerRamHint = getContainerRamHint(roundRobinAllocation);

    LOG.info(String.format("Pack internal: container CPU hint: %.3f, RAM hint: %s, disk hint: %s.",
        containerCpuHint,
        containerRamHint.toString(),
        containerDiskInBytes.toString()));

    // Construct the PackingPlan
    Set<PackingPlan.ContainerPlan> containerPlans = new HashSet<>();
    for (int containerId : roundRobinAllocation.keySet()) {
      List<InstanceId> instanceList = roundRobinAllocation.get(containerId);

      // Calculate the resource required for single instance
      Map<InstanceId, PackingPlan.InstancePlan> instancePlanMap = new HashMap<>();
      ByteAmount containerRam = containerRamPadding;
      double containerCpu = containerCpuPadding;

      for (InstanceId instanceId : instanceList) {
        ByteAmount instanceRam = instancesRamMap.get(containerId).get(instanceId);
        Double instanceCpu = instancesCpuMap.get(containerId).get(instanceId).getValue();

        // Currently not yet support disk config for different components, just use the default.
        ByteAmount instanceDisk = instanceDiskDefault;

        Resource resource = new Resource(instanceCpu, instanceRam, instanceDisk);

        // Insert it into the map
        instancePlanMap.put(instanceId, new PackingPlan.InstancePlan(instanceId, resource));
        containerRam = containerRam.plus(instanceRam);
        containerCpu += instanceCpu;
      }

      Resource resource = new Resource(Math.max(containerCpu, containerCpuHint),
          containerRam, containerDiskInBytes);
      PackingPlan.ContainerPlan containerPlan = new PackingPlan.ContainerPlan(
          containerId, new HashSet<>(instancePlanMap.values()), resource);

      containerPlans.add(containerPlan);

      LOG.info(String.format("Pack internal finalized: container#%d CPU: %f, RAM: %s, disk: %s.",
          containerId,
          resource.getCpu(),
          resource.getRam().toString(),
          resource.getDisk().toString()));
    }

    PackingPlan plan = new PackingPlan(topology.getId(), containerPlans);

    validatePackingPlan(plan);
    return plan;
  }

  @Override
  public void close() {

  }

  private ByteAmount getContainerRamPadding(List<TopologyAPI.Config.KeyValue> topologyConfig) {
    ByteAmount stmgrRam = TopologyUtils.getConfigWithDefault(topologyConfig,
        org.apache.heron.api.Config.TOPOLOGY_STMGR_RAM,
        DEFAULT_DAEMON_PROCESS_RAM_PADDING);
    ByteAmount metricsmgrRam = TopologyUtils.getConfigWithDefault(topologyConfig,
        org.apache.heron.api.Config.TOPOLOGY_METRICSMGR_RAM,
        DEFAULT_DAEMON_PROCESS_RAM_PADDING);
    String reliabilityMode = TopologyUtils.getConfigWithDefault(topologyConfig,
        org.apache.heron.api.Config.TOPOLOGY_RELIABILITY_MODE,
        org.apache.heron.api.Config.TopologyReliabilityMode.ATMOST_ONCE.name());
    boolean isStateful =
        org.apache.heron.api.Config.TopologyReliabilityMode
            .EFFECTIVELY_ONCE.name().equals(reliabilityMode);
    ByteAmount ckptmgrRam = TopologyUtils.getConfigWithDefault(topologyConfig,
        org.apache.heron.api.Config.TOPOLOGY_STATEFUL_CKPTMGR_RAM,
        isStateful ? DEFAULT_DAEMON_PROCESS_RAM_PADDING : ByteAmount.ZERO);

    ByteAmount daemonProcessPadding = stmgrRam.plus(metricsmgrRam).plus(ckptmgrRam);

    // return the container padding if it's set, otherwise return the total daemon request ram
    return TopologyUtils.getConfigWithDefault(topologyConfig,
        org.apache.heron.api.Config.TOPOLOGY_CONTAINER_RAM_PADDING,
        daemonProcessPadding);
  }

  @SuppressWarnings("unchecked")
  private <T extends ResourceMeasure> Map<Integer, Map<InstanceId, T>>
            calculateInstancesResourceMapInContainer(
                Map<Integer, List<InstanceId>> allocation,
                Map<String, T> resMap,
                T containerResHint,
                T instanceResDefault,
                T containerResPadding,
                T zero,
                T notSpecified,
                String resourceType) {
    Map<Integer, Map<InstanceId, T>> instancesResMapInContainer = new HashMap<>();

    for (int containerId : allocation.keySet()) {
      List<InstanceId> instanceIds = allocation.get(containerId);
      Map<InstanceId, T> resInsideContainer = new HashMap<>();
      instancesResMapInContainer.put(containerId, resInsideContainer);
      List<InstanceId> unspecifiedInstances = new ArrayList<>();

      // Register the instance resource allocation and calculate the used resource so far
      T usedRes = zero;
      for (InstanceId instanceId : instanceIds) {
        String componentName = instanceId.getComponentName();
        if (resMap.containsKey(componentName)) {
          T res = resMap.get(componentName);
          resInsideContainer.put(instanceId, res);
          usedRes = (T) usedRes.plus(res);
        } else {
          unspecifiedInstances.add(instanceId);
        }
      }

      // Validate instance resources specified so far don't violate container-level constraint
      if (!containerResHint.equals(notSpecified)
          && usedRes.greaterThan(containerResHint.minus(containerResPadding))) {
        throw new PackingException(String.format("Invalid packing plan generated. "
                + "Total instance %s in a container (%s) + padding(%s) have exceeded "
                + "the container-level constraint of %s.",
            resourceType, usedRes.toString(), containerResPadding.toString(), containerResHint));
      }

      // calculate resource for the remaining unspecified instances if any
      if (!unspecifiedInstances.isEmpty()) {
        T individualInstanceRes = instanceResDefault;

        // If container resource is specified
        if (!containerResHint.equals(notSpecified)) {
          // discount resource for heron internal process (padding) and used (usedRes)
          T remainingRes = (T) containerResHint.minus(containerResPadding).minus(usedRes);

          if (remainingRes.lessOrEqual(zero)) {
            throw new PackingException(String.format("Invalid packing plan generated. "
                + "No enough %s to allocate for unspecified instances", resourceType));
          }

          // Split remaining resource evenly
          individualInstanceRes = (T) remainingRes.divide(unspecifiedInstances.size());
        }

        // Put the results in resInsideContainer
        for (InstanceId instanceId : unspecifiedInstances) {
          resInsideContainer.put(instanceId, individualInstanceRes);
        }
      }
    }

    return instancesResMapInContainer;
  }

  /**
   * Get the instances' allocation basing on round robin algorithm
   *
   * @return containerId -&gt; list of InstanceId belonging to this container
   */
  private Map<Integer, List<InstanceId>> getRoundRobinAllocation(
      int numContainer, Map<String, Integer> parallelismMap) {
    Map<Integer, List<InstanceId>> allocation = new HashMap<>();
    int totalInstance = TopologyUtils.getTotalInstance(parallelismMap);
    if (numContainer > totalInstance) {
      throw new RuntimeException("More containers allocated than instance.");
    }

    for (int i = 1; i <= numContainer; ++i) {
      allocation.put(i, new ArrayList<>());
    }

    int index = 1;
    int globalTaskIndex = 1;
    for (String component : parallelismMap.keySet()) {
      int numInstance = parallelismMap.get(component);
      for (int i = 0; i < numInstance; ++i) {
        allocation.get(index).add(new InstanceId(component, globalTaskIndex, i));
        index = (index == numContainer) ? 1 : index + 1;
        globalTaskIndex++;
      }
    }
    return allocation;
  }

  /**
   * Get # of instances in the largest container
   *
   * @param allocation the instances' allocation
   * @return # of instances in the largest container
   */
  private int getLargestContainerSize(Map<Integer, List<InstanceId>> allocation) {
    int max = 0;
    for (List<InstanceId> instances : allocation.values()) {
      if (instances.size() > max) {
        max = instances.size();
      }
    }
    return max;
  }

  /**
   * Provide CPU per container.
   *
   * @param allocation packing output.
   * @return CPU per container.
   */
  private double getContainerCpuHint(Map<Integer, List<InstanceId>> allocation) {
    List<TopologyAPI.Config.KeyValue> topologyConfig = topology.getTopologyConfig().getKvsList();
    double defaultContainerCpu =
        DEFAULT_CPU_PADDING_PER_CONTAINER + getLargestContainerSize(allocation);

    String cpuHint = TopologyUtils.getConfigWithDefault(
        topologyConfig, org.apache.heron.api.Config.TOPOLOGY_CONTAINER_CPU_REQUESTED,
        Double.toString(defaultContainerCpu));

    return Double.parseDouble(cpuHint);
  }

  /**
   * Provide disk per container.
   *
   * @param allocation packing output.
   * @return disk per container.
   */
  private ByteAmount getContainerDiskHint(Map<Integer, List<InstanceId>> allocation) {
    ByteAmount defaultContainerDisk = instanceDiskDefault
        .multiply(getLargestContainerSize(allocation))
        .plus(DEFAULT_DISK_PADDING_PER_CONTAINER);

    List<TopologyAPI.Config.KeyValue> topologyConfig = topology.getTopologyConfig().getKvsList();

    return TopologyUtils.getConfigWithDefault(topologyConfig,
        org.apache.heron.api.Config.TOPOLOGY_CONTAINER_DISK_REQUESTED,
        defaultContainerDisk);
  }

  /**
   * Provide RAM per container.
   *
   * @param allocation packing
   * @return Container RAM requirement
   */
  private ByteAmount getContainerRamHint(Map<Integer, List<InstanceId>> allocation) {
    List<TopologyAPI.Config.KeyValue> topologyConfig = topology.getTopologyConfig().getKvsList();

    return TopologyUtils.getConfigWithDefault(
        topologyConfig, org.apache.heron.api.Config.TOPOLOGY_CONTAINER_RAM_REQUESTED,
        NOT_SPECIFIED_BYTE_AMOUNT);
  }

  /**
   * Check whether the PackingPlan generated is valid
   *
   * @param plan The PackingPlan to check
   * @throws PackingException if it's not a valid plan
   */
  private void validatePackingPlan(PackingPlan plan) throws PackingException {
    for (PackingPlan.ContainerPlan containerPlan : plan.getContainers()) {
      for (PackingPlan.InstancePlan instancePlan : containerPlan.getInstances()) {
        // Safe check
        if (instancePlan.getResource().getRam().lessThan(MIN_RAM_PER_INSTANCE)) {
          throw new PackingException(String.format("Invalid packing plan generated. A minimum of "
                  + "%s RAM is required, but InstancePlan for component '%s' has %s",
              MIN_RAM_PER_INSTANCE, instancePlan.getComponentName(),
              instancePlan.getResource().getRam()));
        }
      }
    }
  }

  /**
   * Read the current packing plan with update parallelism to calculate a new packing plan.
   * This method should determine a new number of containers based on the updated parallism
   * while remaining the number of instances per container <= that of the old packing plan.
   * The packing algorithm packInternal() is shared with pack()
   * delegate to packInternal() with the new container count and component parallelism
   *
   * @param currentPackingPlan Existing packing plan
   * @param componentChanges Map &lt; componentName, new component parallelism &gt;
   * that contains the parallelism for each component whose parallelism has changed.
   * @return new packing plan
   * @throws PackingException
   */
  @Override
  public PackingPlan repack(PackingPlan currentPackingPlan, Map<String, Integer> componentChanges)
      throws PackingException {
    int initialNumContainer = TopologyUtils.getNumContainers(topology);
    int initialNumInstance = TopologyUtils.getTotalInstance(topology);
    double initialNumInstancePerContainer = (double) initialNumInstance / initialNumContainer;

    Map<String, Integer> newComponentParallelism =
        getNewComponentParallelism(currentPackingPlan, componentChanges);

    int newNumInstance = TopologyUtils.getTotalInstance(newComponentParallelism);
    int newNumContainer = (int) Math.ceil(newNumInstance / initialNumInstancePerContainer);
    return packInternal(newNumContainer, newComponentParallelism);
  }

  public Map<String, Integer> getNewComponentParallelism(PackingPlan currentPackingPlan,
                                                         Map<String, Integer> componentChanges) {
    Map<String, Integer> currentComponentParallelism = currentPackingPlan.getComponentCounts();
    for (Map.Entry<String, Integer> e : componentChanges.entrySet()) {
      Integer newParallelism = currentComponentParallelism.get(e.getKey()) + e.getValue();
      currentComponentParallelism.put(e.getKey(), newParallelism);
    }
    return currentComponentParallelism;
  }

  /**
   * Read the current packing plan with update parallelism and number of containers
   * to calculate a new packing plan.
   * The packing algorithm packInternal() is shared with pack()
   * delegate to packInternal() with the new container count and component parallelism
   *
   * @param currentPackingPlan Existing packing plan
   * @param containers &lt; the new number of containers for the topology
   * specified by the user
   * @param componentChanges Map &lt; componentName, new component parallelism &gt;
   * that contains the parallelism for each component whose parallelism has changed.
   * @return new packing plan
   * @throws PackingException
   */
  @Override
  public PackingPlan repack(PackingPlan currentPackingPlan, int containers, Map<String, Integer>
      componentChanges) throws PackingException {
    if (containers == currentPackingPlan.getContainers().size()) {
      return repack(currentPackingPlan, componentChanges);
    }
    Map<String, Integer> newComponentParallelism = getNewComponentParallelism(currentPackingPlan,
        componentChanges);
    return packInternal(containers, newComponentParallelism);
  }
}
