/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager.reservation;

import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;

/**
 * This is a run length encoded sparse data structure that maintains resource
 * allocations over time.
 */
public class RLESparseResourceAllocation {

  private static final int THRESHOLD = 100;
  private static final Resource ZERO_RESOURCE = Resources.none();

  @SuppressWarnings("checkstyle:visibilitymodifier")
  protected NavigableMap<Long, Resource> cumulativeCapacity =
      new TreeMap<Long, Resource>();

  private final ReentrantReadWriteLock readWriteLock =
      new ReentrantReadWriteLock();
  @SuppressWarnings("checkstyle:visibilitymodifier")
  protected final Lock readLock = readWriteLock.readLock();
  private final Lock writeLock = readWriteLock.writeLock();

  private final ResourceCalculator resourceCalculator;

  public RLESparseResourceAllocation(ResourceCalculator resourceCalculator) {
    this.resourceCalculator = resourceCalculator;
  }

  public RLESparseResourceAllocation(NavigableMap<Long, Resource> out,
      ResourceCalculator resourceCalculator) {
    // miss check for repeated entries
    this.cumulativeCapacity = out;
    this.resourceCalculator = resourceCalculator;
  }

  /**
   * Add a resource for the specified interval.
   *
   * @param reservationInterval the interval for which the resource is to be
   *          added
   * @param totCap the resource to be added
   * @return true if addition is successful, false otherwise
   */
  public boolean addInterval(ReservationInterval reservationInterval,
      Resource totCap) {
    if (totCap.equals(ZERO_RESOURCE)) {
      return true;
    }
    writeLock.lock();
    try {
      NavigableMap<Long, Resource> addInt = new TreeMap<Long, Resource>();
      addInt.put(reservationInterval.getStartTime(), totCap);
      addInt.put(reservationInterval.getEndTime(), ZERO_RESOURCE);
      try {
        cumulativeCapacity =
            merge(resourceCalculator, totCap, cumulativeCapacity, addInt,
                Long.MIN_VALUE, Long.MAX_VALUE, RLEOperator.add);
      } catch (PlanningException e) {
        // never happens for add
      }
      return true;
    } finally {
      writeLock.unlock();
    }
  }

  /**
   * Removes a resource for the specified interval.
   *
   * @param reservationInterval the interval for which the resource is to be
   *          removed
   * @param totCap the resource to be removed
   * @return true if removal is successful, false otherwise
   */
  public boolean removeInterval(ReservationInterval reservationInterval,
      Resource totCap) {
    if (totCap.equals(ZERO_RESOURCE)) {
      return true;
    }
    writeLock.lock();
    try {

      NavigableMap<Long, Resource> removeInt = new TreeMap<Long, Resource>();
      removeInt.put(reservationInterval.getStartTime(), totCap);
      removeInt.put(reservationInterval.getEndTime(), ZERO_RESOURCE);
      try {
        cumulativeCapacity =
            merge(resourceCalculator, totCap, cumulativeCapacity, removeInt,
                Long.MIN_VALUE, Long.MAX_VALUE, RLEOperator.subtract);
      } catch (PlanningException e) {
        // never happens for subtract
      }
      return true;
    } finally {
      writeLock.unlock();
    }
  }

  /**
   * Returns the capacity, i.e. total resources allocated at the specified point
   * of time.
   *
   * @param tick timeStap at which resource needs to be known
   * @return the resources allocated at the specified time
   */
  public Resource getCapacityAtTime(long tick) {
    readLock.lock();
    try {
      Entry<Long, Resource> closestStep = cumulativeCapacity.floorEntry(tick);
      if (closestStep != null) {
        return Resources.clone(closestStep.getValue());
      }
      return Resources.clone(ZERO_RESOURCE);
    } finally {
      readLock.unlock();
    }
  }

  /**
   * Get the timestamp of the earliest resource allocation.
   *
   * @return the timestamp of the first resource allocation
   */
  public long getEarliestStartTime() {
    readLock.lock();
    try {
      if (cumulativeCapacity.isEmpty()) {
        return -1;
      } else {
        return cumulativeCapacity.firstKey();
      }
    } finally {
      readLock.unlock();
    }
  }

  /**
   * Get the timestamp of the latest non-null resource allocation.
   *
   * @return the timestamp of the last resource allocation
   */
  public long getLatestNonNullTime() {
    readLock.lock();
    try {
      if (cumulativeCapacity.isEmpty()) {
        return -1;
      } else {
        // the last entry might contain null (to terminate
        // the sequence)... return previous one.
        Entry<Long, Resource> last = cumulativeCapacity.lastEntry();
        if (last.getValue() == null) {
          return cumulativeCapacity.floorKey(last.getKey() - 1);
        } else {
          return last.getKey();
        }
      }
    } finally {
      readLock.unlock();
    }
  }

  /**
   * Returns true if there are no non-zero entries.
   *
   * @return true if there are no allocations or false otherwise
   */
  public boolean isEmpty() {
    readLock.lock();
    try {
      if (cumulativeCapacity.isEmpty()) {
        return true;
      }
      // Deletion leaves a single zero entry with a null at the end so check for
      // that
      if (cumulativeCapacity.size() == 2) {
        return cumulativeCapacity.firstEntry().getValue().equals(ZERO_RESOURCE)
            && cumulativeCapacity.lastEntry().getValue() == null;
      }
      return false;
    } finally {
      readLock.unlock();
    }
  }

  @Override
  public String toString() {
    StringBuilder ret = new StringBuilder();
    readLock.lock();
    try {
      if (cumulativeCapacity.size() > THRESHOLD) {
        ret.append("Number of steps: ").append(cumulativeCapacity.size())
            .append(" earliest entry: ").append(cumulativeCapacity.firstKey())
            .append(" latest entry: ").append(cumulativeCapacity.lastKey());
      } else {
        for (Map.Entry<Long, Resource> r : cumulativeCapacity.entrySet()) {
          ret.append(r.getKey()).append(": ").append(r.getValue())
              .append("\n ");
        }
      }
      return ret.toString();
    } finally {
      readLock.unlock();
    }
  }

  /**
   * Returns the representation of the current resources allocated over time as
   * an interval map (in the defined non-null range).
   *
   * @return the representation of the current resources allocated over time as
   *         an interval map.
   */
  public Map<ReservationInterval, Resource> toIntervalMap() {

    readLock.lock();
    try {
      Map<ReservationInterval, Resource> allocations =
          new TreeMap<ReservationInterval, Resource>();

      // Empty
      if (isEmpty()) {
        return allocations;
      }

      Map.Entry<Long, Resource> lastEntry = null;
      for (Map.Entry<Long, Resource> entry : cumulativeCapacity.entrySet()) {

        if (lastEntry != null && entry.getValue() != null) {
          ReservationInterval interval =
              new ReservationInterval(lastEntry.getKey(), entry.getKey());
          Resource resource = lastEntry.getValue();

          allocations.put(interval, resource);
        }

        lastEntry = entry;
      }
      return allocations;
    } finally {
      readLock.unlock();
    }
  }

  public NavigableMap<Long, Resource> getCumulative() {
    readLock.lock();
    try {
      return cumulativeCapacity;
    } finally {
      readLock.unlock();
    }
  }

  public ResourceCalculator getResourceCalculator() {
    return resourceCalculator;
  }

  /**
   * Merges the range start to end of two {@code RLESparseResourceAllocation}
   * using a given {@code RLEOperator}.
   *
   * @param resCalc the resource calculator
   * @param clusterResource the total cluster resources (for DRF)
   * @param a the left operand
   * @param b the right operand
   * @param operator the operator to be applied during merge
   * @param start the start-time of the range to be considered
   * @param end the end-time of the range to be considered
   * @return the a merged RLESparseResourceAllocation, produced by applying
   *         "operator" to "a" and "b"
   * @throws PlanningException in case the operator is subtractTestPositive and
   *           the result would contain a negative value
   */
  public static RLESparseResourceAllocation merge(ResourceCalculator resCalc,
      Resource clusterResource, RLESparseResourceAllocation a,
      RLESparseResourceAllocation b, RLEOperator operator, long start, long end)
      throws PlanningException {
    NavigableMap<Long, Resource> cumA =
        a.getRangeOverlapping(start, end).getCumulative();
    NavigableMap<Long, Resource> cumB =
        b.getRangeOverlapping(start, end).getCumulative();
    NavigableMap<Long, Resource> out =
        merge(resCalc, clusterResource, cumA, cumB, start, end, operator);
    return new RLESparseResourceAllocation(out, resCalc);
  }

  private static NavigableMap<Long, Resource> merge(ResourceCalculator resCalc,
      Resource clusterResource, NavigableMap<Long, Resource> a,
      NavigableMap<Long, Resource> b, long start, long end,
      RLEOperator operator) throws PlanningException {

    // handle special cases of empty input
    if (a == null || a.isEmpty()) {
      if (operator == RLEOperator.subtract
          || operator == RLEOperator.subtractTestNonNegative) {
        return negate(operator, b);
      } else {
        return b;
      }
    }
    if (b == null || b.isEmpty()) {
      return a;
    }

    // define iterators and support variables
    Iterator<Entry<Long, Resource>> aIt = a.entrySet().iterator();
    Iterator<Entry<Long, Resource>> bIt = b.entrySet().iterator();
    Entry<Long, Resource> curA = aIt.next();
    Entry<Long, Resource> curB = bIt.next();
    Entry<Long, Resource> lastA = null;
    Entry<Long, Resource> lastB = null;
    boolean aIsDone = false;
    boolean bIsDone = false;

    TreeMap<Long, Resource> out = new TreeMap<Long, Resource>();

    while (!(curA.equals(lastA) && curB.equals(lastB))) {

      Resource outRes;
      long time = -1;

      // curA is smaller than curB
      if (bIsDone || (curA.getKey() < curB.getKey() && !aIsDone)) {
        outRes = combineValue(operator, resCalc, clusterResource, curA, lastB);
        time = (curA.getKey() < start) ? start : curA.getKey();
        lastA = curA;
        if (aIt.hasNext()) {
          curA = aIt.next();
        } else {
          aIsDone = true;
        }

      } else {
        // curB is smaller than curA
        if (aIsDone || (curA.getKey() > curB.getKey() && !bIsDone)) {
          outRes =
              combineValue(operator, resCalc, clusterResource, lastA, curB);
          time = (curB.getKey() < start) ? start : curB.getKey();
          lastB = curB;
          if (bIt.hasNext()) {
            curB = bIt.next();
          } else {
            bIsDone = true;
          }

        } else {
          // curA is equal to curB
          outRes = combineValue(operator, resCalc, clusterResource, curA, curB);
          time = (curA.getKey() < start) ? start : curA.getKey();
          lastA = curA;
          if (aIt.hasNext()) {
            curA = aIt.next();
          } else {
            aIsDone = true;
          }
          lastB = curB;
          if (bIt.hasNext()) {
            curB = bIt.next();
          } else {
            bIsDone = true;
          }
        }
      }

      // add to out if not redundant
      addIfNeeded(out, time, outRes);
    }
    addIfNeeded(out, end, null);

    return out;
  }

  private static NavigableMap<Long, Resource> negate(RLEOperator operator,
      NavigableMap<Long, Resource> a) throws PlanningException {

    TreeMap<Long, Resource> out = new TreeMap<Long, Resource>();
    for (Entry<Long, Resource> e : a.entrySet()) {
      Resource val = Resources.negate(e.getValue());
      // test for negative value and throws
      if (operator == RLEOperator.subtractTestNonNegative
          && (Resources.fitsIn(val, ZERO_RESOURCE)
              && !Resources.equals(val, ZERO_RESOURCE))) {
        throw new PlanningException(
            "RLESparseResourceAllocation: merge failed as the "
                + "resulting RLESparseResourceAllocation would be negative");
      }
      out.put(e.getKey(), val);
    }

    return out;
  }

  private static void addIfNeeded(TreeMap<Long, Resource> out, long time,
      Resource outRes) {

    if (out.isEmpty() || (out.lastEntry() != null && outRes == null)
        || (out.lastEntry().getValue() != null
            && !Resources.equals(out.lastEntry().getValue(), outRes))) {
      out.put(time, outRes);
    }

  }

  private static Resource combineValue(RLEOperator op,
      ResourceCalculator resCalc, Resource clusterResource,
      Entry<Long, Resource> eA, Entry<Long, Resource> eB)
      throws PlanningException {

    // deal with nulls
    if (eA == null || eA.getValue() == null) {
      if (eB == null || eB.getValue() == null) {
        return null;
      }
      if (op == RLEOperator.subtract) {
        return Resources.negate(eB.getValue());
      } else {
        return eB.getValue();
      }
    }
    if (eB == null || eB.getValue() == null) {
      return eA.getValue();
    }

    Resource a = eA.getValue();
    Resource b = eB.getValue();
    switch (op) {
    case add:
      return Resources.add(a, b);
    case subtract:
      return Resources.subtract(a, b);
    case subtractTestNonNegative:
      if (!Resources.fitsIn(b, a)) {
        throw new PlanningException(
            "RLESparseResourceAllocation: merge failed as the "
                + "resulting RLESparseResourceAllocation would "
                + "be negative, when testing: (" + eB + ") > (" + eA + ")");
      } else {
        return Resources.subtract(a, b);
      }
    case min:
      return Resources.min(resCalc, clusterResource, a, b);
    case max:
      return Resources.max(resCalc, clusterResource, a, b);
    default:
      return null;
    }

  }

  /**
   * Get a {@link RLESparseResourceAllocation} view of the {@link Resource}
   * allocations between the specified start and end times.
   *
   * @param start the time from which the {@link Resource} allocations are
   *          required
   * @param end the time upto which the {@link Resource} allocations are
   *          required
   * @return the overlapping allocations
   */
  public RLESparseResourceAllocation getRangeOverlapping(long start, long end) {
    readLock.lock();
    try {
      NavigableMap<Long, Resource> a = this.getCumulative();
      if (a != null && !a.isEmpty()) {
        // include the portion of previous entry that overlaps start
        if (start > a.firstKey()) {
          long previous = a.floorKey(start);
          a = a.tailMap(previous, true);
        }
        if (end < a.lastKey()) {
          a = a.headMap(end, true);
        }
      }
      RLESparseResourceAllocation ret =
          new RLESparseResourceAllocation(a, resourceCalculator);
      return ret;
    } finally {
      readLock.unlock();
    }
  }

  /**
   * This method shifts all the timestamp of the {@link Resource} entries by the
   * specified "delta".
   *
   * @param delta the time by which to shift the {@link Resource} allocations
   */
  public void shift(long delta) {
    writeLock.lock();
    try {
      TreeMap<Long, Resource> newCum = new TreeMap<>();
      long start;
      for (Map.Entry<Long, Resource> entry : cumulativeCapacity.entrySet()) {
        if (delta > 0) {
          start = (entry.getKey() == Long.MAX_VALUE) ? Long.MAX_VALUE
              : entry.getKey() + delta;
        } else {
          start = (entry.getKey() == Long.MIN_VALUE) ? Long.MIN_VALUE
              : entry.getKey() + delta;
        }
        newCum.put(start, entry.getValue());
      }
      cumulativeCapacity = newCum;
    } finally {
      writeLock.unlock();
    }
  }

  /**
   * The set of operators that can be applied to two
   * {@code RLESparseResourceAllocation} during a merge operation.
   */
  public enum RLEOperator {
    add, subtract, min, max, subtractTestNonNegative
  }

  /**
   * Get the maximum capacity across specified time instances. The search-space
   * is specified using the starting value, tick, and the periodic interval for
   * search. Maximum resource allocation across tick, tick + period, tick + 2 *
   * period,..., tick + n * period .. is returned.
   *
   * @param tick the starting time instance
   * @param period interval at which capacity is evaluated
   * @return maximum resource allocation
   */
  public Resource getMaximumPeriodicCapacity(long tick, long period) {
    Resource maxCapacity = ZERO_RESOURCE;
    readLock.lock();
    try {
      if (!cumulativeCapacity.isEmpty()) {
        Long lastKey = cumulativeCapacity.lastKey();
        for (long t = tick; t <= lastKey; t = t + period) {
          maxCapacity = Resources.componentwiseMax(maxCapacity,
              cumulativeCapacity.floorEntry(t).getValue());
        }
      }
      return maxCapacity;
    } finally {
      readLock.unlock();
    }
  }

  /**
   * Get the minimum capacity in the specified time range.
   *
   * @param interval the {@link ReservationInterval} to be searched
   * @return minimum resource allocation
   */
  public Resource getMinimumCapacityInInterval(ReservationInterval interval) {
    Resource minCapacity =
        Resource.newInstance(Integer.MAX_VALUE, Integer.MAX_VALUE);
    long start = interval.getStartTime();
    long end = interval.getEndTime();
    NavigableMap<Long, Resource> capacityRange =
        getRangeOverlapping(start, end).getCumulative();
    if (!capacityRange.isEmpty()) {
      for (Map.Entry<Long, Resource> entry : capacityRange.entrySet()) {
        if (entry.getValue() != null) {
          minCapacity =
              Resources.componentwiseMin(minCapacity, entry.getValue());
        }
      }
    }
    return minCapacity;
  }

}
