/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.samza.diagnostics;

import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.stream.Collectors;
import org.apache.samza.util.TimestampedValue;


/**
 * A {@link BoundedList} buffers multiple instances of a type T in a list.
 * {@link BoundedList}s are useful for maintaining, recording, or collecting values over time.
 * For example, a set of specific logging-events (e.g., errors).
 *
 * Eviction is controlled by parameters (maxNumberOfItems and maxStaleness), which are set during instantiation.
 * Eviction happens during element addition or during reads of the BoundedList (getValues).
 *
 * All public methods are thread-safe.
 *
 */
public class BoundedList<T> {
  private final String name;
  private final Queue<TimestampedValue<T>> elements;

  private final int maxNumberOfItems;
  private final Duration maxStaleness;
  private final static int DEFAULT_MAX_NITEMS = 1000;
  private final static Duration DEFAULT_MAX_STALENESS = Duration.ofMinutes(60);

  /**
   * Create a new {@link BoundedList} that auto evicts based on the given maxNumberOfItems, maxStaleness, and period parameters.
   *
   * @param name Name to be assigned
   * @param maxNumberOfItems The max number of items that can remain in the list
   * @param maxStaleness The max staleness of items permitted in the list
   */
  public BoundedList(String name, int maxNumberOfItems, Duration maxStaleness) {
    this.name = name;
    this.elements = new ConcurrentLinkedQueue<TimestampedValue<T>>();
    this.maxNumberOfItems = maxNumberOfItems;
    this.maxStaleness = maxStaleness;
  }

  /**
   * Create a new {@link BoundedList} that auto evicts upto a max of 100 items and a max-staleness of 60 minutes.
   * @param name Name to be assigned
   */
  public BoundedList(String name) {
    this(name, DEFAULT_MAX_NITEMS, DEFAULT_MAX_STALENESS);
  }

  /**
   * Get the name assigned to this {@link BoundedList}
   * @return the assigned name
   */
  public String getName() {
    return this.name;
  }

  /**
   * Get the Collection of values currently in the list.
   * @return the collection of values
   */
  public Collection<T> getValues() {
    this.evict();
    return Collections.unmodifiableList(this.elements.stream().map(x -> x.getValue()).collect(Collectors.toList()));
  }

  /**
   * Add a value to the list.
   * (Timestamp assigned to this value is the current timestamp.)
   * @param value The Gauge value to be added
   */
  public void add(T value) {
    this.elements.add(new TimestampedValue<T>(value, Instant.now().toEpochMilli()));

    // perform any evictions that may be needed.
    this.evict();
  }

  /**
   * Removes the given elements from the list-gauge.
   * @param elementsToRemove collection of elements to remove.
   */
  public void remove(Collection<T> elementsToRemove) {
    Iterator<TimestampedValue<T>> iterator = this.elements.iterator();
    while (iterator.hasNext()) {
      TimestampedValue<T> value = iterator.next();
      if (elementsToRemove.contains(value.getValue())) {
        iterator.remove();
      }
    }
  }

  /**
   * Evicts entries from the elements list, based on the given item-size and durationThreshold.
   * Concurrent eviction threads can cause incorrectness (when reading elements.size or elements.peek).
   */
  private synchronized void evict() {
    this.evictBasedOnSize();
    this.evictBasedOnTimestamp();
  }

  /**
   * Evicts entries from elements in FIFO order until it has maxNumberOfItems
   */
  private void evictBasedOnSize() {
    int numToEvict = this.elements.size() - this.maxNumberOfItems;
    while (numToEvict > 0) {
      this.elements.poll(); // remove head
      numToEvict--;
    }
  }

  /**
   * Removes entries from elements to ensure no element has a timestamp more than maxStaleness before current timestamp.
   */
  private void evictBasedOnTimestamp() {
    Instant currentTimestamp = Instant.now();
    TimestampedValue<T> valueInfo = this.elements.peek();

    // continue remove-head if currenttimestamp - head-element's timestamp > durationThreshold
    while (valueInfo != null
        && currentTimestamp.toEpochMilli() - valueInfo.getTimestamp() > this.maxStaleness.toMillis()) {
      this.elements.poll();
      valueInfo = this.elements.peek();
    }
  }
}
