/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.crypto.key.kms;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import com.google.common.base.Preconditions;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.classification.InterfaceAudience;

/**
 * A Utility class that maintains a Queue of entries for a given key. It tries
 * to ensure that there is are always at-least <code>numValues</code> entries
 * available for the client to consume for a particular key.
 * It also uses an underlying Cache to evict queues for keys that have not been
 * accessed for a configurable period of time.
 * Implementing classes are required to implement the
 * <code>QueueRefiller</code> interface that exposes a method to refill the
 * queue, when empty
 */
@InterfaceAudience.Private
public class ValueQueue <E> {

  /**
   * QueueRefiller interface a client must implement to use this class
   */
  public interface QueueRefiller <E> {
    /**
     * Method that has to be implemented by implementing classes to fill the
     * Queue.
     * @param keyName Key name
     * @param keyQueue Queue that needs to be filled
     * @param numValues number of Values to be added to the queue.
     * @throws IOException
     */
    public void fillQueueForKey(String keyName,
        Queue<E> keyQueue, int numValues) throws IOException;
  }

  private static final String REFILL_THREAD =
      ValueQueue.class.getName() + "_thread";

  private final LoadingCache<String, LinkedBlockingQueue<E>> keyQueues;
  private final ThreadPoolExecutor executor;
  private final UniqueKeyBlockingQueue queue = new UniqueKeyBlockingQueue();
  private final QueueRefiller<E> refiller;
  private final SyncGenerationPolicy policy;

  private final int numValues;
  private final float lowWatermark;

  private volatile boolean executorThreadsStarted = false;

  /**
   * A <code>Runnable</code> which takes a string name.
   */
  private abstract static class NamedRunnable implements Runnable {
    final String name;
    private NamedRunnable(String keyName) {
      this.name = keyName;
    }
  }

  /**
   * This backing blocking queue used in conjunction with the
   * <code>ThreadPoolExecutor</code> used by the <code>ValueQueue</code>. This
   * Queue accepts a task only if the task is not currently in the process
   * of being run by a thread which is implied by the presence of the key
   * in the <code>keysInProgress</code> set.
   *
   * NOTE: Only methods that ware explicitly called by the
   * <code>ThreadPoolExecutor</code> need to be over-ridden.
   */
  private static class UniqueKeyBlockingQueue extends
      LinkedBlockingQueue<Runnable> {

    private static final long serialVersionUID = -2152747693695890371L;
    private HashSet<String> keysInProgress = new HashSet<String>();

    @Override
    public synchronized void put(Runnable e) throws InterruptedException {
      if (keysInProgress.add(((NamedRunnable)e).name)) {
        super.put(e);
      }
    }

    @Override
    public Runnable take() throws InterruptedException {
      Runnable k = super.take();
      if (k != null) {
        keysInProgress.remove(((NamedRunnable)k).name);
      }
      return k;
    }

    @Override
    public Runnable poll(long timeout, TimeUnit unit)
        throws InterruptedException {
      Runnable k = super.poll(timeout, unit);
      if (k != null) {
        keysInProgress.remove(((NamedRunnable)k).name);
      }
      return k;
    }

  }

  /**
   * Policy to decide how many values to return to client when client asks for
   * "n" values and Queue is empty.
   * This decides how many values to return when client calls "getAtMost"
   */
  public static enum SyncGenerationPolicy {
    ATLEAST_ONE, // Return atleast 1 value
    LOW_WATERMARK, // Return min(n, lowWatermark * numValues) values
    ALL // Return n values
  }

  /**
   * Constructor takes the following tunable configuration parameters
   * @param numValues The number of values cached in the Queue for a
   *    particular key.
   * @param lowWatermark The ratio of (number of current entries/numValues)
   *    below which the <code>fillQueueForKey()</code> funciton will be
   *    invoked to fill the Queue.
   * @param expiry Expiry time after which the Key and associated Queue are
   *    evicted from the cache.
   * @param numFillerThreads Number of threads to use for the filler thread
   * @param policy The SyncGenerationPolicy to use when client
   *    calls "getAtMost"
   * @param refiller implementation of the QueueRefiller
   */
  public ValueQueue(final int numValues, final float lowWatermark,
      long expiry, int numFillerThreads, SyncGenerationPolicy policy,
      final QueueRefiller<E> refiller) {
    Preconditions.checkArgument(numValues > 0, "\"numValues\" must be > 0");
    Preconditions.checkArgument(((lowWatermark > 0)&&(lowWatermark <= 1)),
        "\"lowWatermark\" must be > 0 and <= 1");
    final int watermarkValue = (int) (numValues * lowWatermark);
    Preconditions.checkArgument(watermarkValue > 0,
        "(int) (\"numValues\" * \"lowWatermark\") must be > 0");
    Preconditions.checkArgument(expiry > 0, "\"expiry\" must be > 0");
    Preconditions.checkArgument(numFillerThreads > 0,
        "\"numFillerThreads\" must be > 0");
    Preconditions.checkNotNull(policy, "\"policy\" must not be null");
    this.refiller = refiller;
    this.policy = policy;
    this.numValues = numValues;
    this.lowWatermark = lowWatermark;
    keyQueues = CacheBuilder.newBuilder()
            .expireAfterAccess(expiry, TimeUnit.MILLISECONDS)
            .build(new CacheLoader<String, LinkedBlockingQueue<E>>() {
                  @Override
                  public LinkedBlockingQueue<E> load(String keyName)
                      throws Exception {
                    LinkedBlockingQueue<E> keyQueue =
                        new LinkedBlockingQueue<E>();
                    refiller.fillQueueForKey(keyName, keyQueue, watermarkValue);
                    return keyQueue;
                  }
                });

    executor =
        new ThreadPoolExecutor(numFillerThreads, numFillerThreads, 0L,
            TimeUnit.MILLISECONDS, queue, new ThreadFactoryBuilder()
                .setDaemon(true)
                .setNameFormat(REFILL_THREAD).build());
  }

  public ValueQueue(final int numValues, final float lowWaterMark, long expiry,
      int numFillerThreads, QueueRefiller<E> fetcher) {
    this(numValues, lowWaterMark, expiry, numFillerThreads,
        SyncGenerationPolicy.ALL, fetcher);
  }

  /**
   * Initializes the Value Queues for the provided keys by calling the
   * fill Method with "numInitValues" values
   * @param keyNames Array of key Names
   * @throws ExecutionException
   */
  public void initializeQueuesForKeys(String... keyNames)
      throws ExecutionException {
    for (String keyName : keyNames) {
      keyQueues.get(keyName);
    }
  }

  /**
   * This removes the value currently at the head of the Queue for the
   * provided key. Will immediately fire the Queue filler function if key
   * does not exist.
   * If Queue exists but all values are drained, It will ask the generator
   * function to add 1 value to Queue and then drain it.
   * @param keyName String key name
   * @return E the next value in the Queue
   * @throws IOException
   * @throws ExecutionException
   */
  public E getNext(String keyName)
      throws IOException, ExecutionException {
    return getAtMost(keyName, 1).get(0);
  }

  /**
   * Drains the Queue for the provided key.
   *
   * @param keyName the key to drain the Queue for
   */
  public void drain(String keyName ) {
    try {
      keyQueues.get(keyName).clear();
    } catch (ExecutionException ex) {
      //NOP
    }
  }

  /**
   * Get size of the Queue for keyName. This is only used in unit tests.
   * @param keyName the key name
   * @return int queue size
   */
  public int getSize(String keyName) {
    // We can't do keyQueues.get(keyName).size() here,
    // since that will have the side effect of populating the cache.
    Map<String, LinkedBlockingQueue<E>> map =
        keyQueues.getAllPresent(Arrays.asList(keyName));
    if (map.get(keyName) == null) {
      return 0;
    }
    return map.get(keyName).size();
  }

  /**
   * This removes the "num" values currently at the head of the Queue for the
   * provided key. Will immediately fire the Queue filler function if key
   * does not exist
   * How many values are actually returned is governed by the
   * <code>SyncGenerationPolicy</code> specified by the user.
   * @param keyName String key name
   * @param num Minimum number of values to return.
   * @return List<E> values returned
   * @throws IOException
   * @throws ExecutionException
   */
  public List<E> getAtMost(String keyName, int num) throws IOException,
      ExecutionException {
    LinkedBlockingQueue<E> keyQueue = keyQueues.get(keyName);
    // Using poll to avoid race condition..
    LinkedList<E> ekvs = new LinkedList<E>();
    try {
      for (int i = 0; i < num; i++) {
        E val = keyQueue.poll();
        // If queue is empty now, Based on the provided SyncGenerationPolicy,
        // figure out how many new values need to be generated synchronously
        if (val == null) {
          // Synchronous call to get remaining values
          int numToFill = 0;
          switch (policy) {
          case ATLEAST_ONE:
            numToFill = (ekvs.size() < 1) ? 1 : 0;
            break;
          case LOW_WATERMARK:
            numToFill =
                Math.min(num, (int) (lowWatermark * numValues)) - ekvs.size();
            break;
          case ALL:
            numToFill = num - ekvs.size();
            break;
          }
          // Synchronous fill if not enough values found
          if (numToFill > 0) {
            refiller.fillQueueForKey(keyName, ekvs, numToFill);
          }

          break;
        } else {
          ekvs.add(val);
        }
      }
      // Schedule a refill task in case queue has gone below the watermark
      if (keyQueue.size() < (int) (lowWatermark * numValues)) {
        submitRefillTask(keyName, keyQueue);
      }
    } catch (Exception e) {
      throw new IOException("Exception while contacting value generator ", e);
    }
    return ekvs;
  }

  private void submitRefillTask(final String keyName,
      final Queue<E> keyQueue) throws InterruptedException {
    if (!executorThreadsStarted) {
      synchronized (this) {
        if (!executorThreadsStarted) {
          // To ensure all requests are first queued, make coreThreads =
          // maxThreads
          // and pre-start all the Core Threads.
          executor.prestartAllCoreThreads();
          executorThreadsStarted = true;
        }
      }
    }
    // The submit/execute method of the ThreadPoolExecutor is bypassed and
    // the Runnable is directly put in the backing BlockingQueue so that we
    // can control exactly how the runnable is inserted into the queue.
    queue.put(
        new NamedRunnable(keyName) {
          @Override
          public void run() {
            int cacheSize = numValues;
            int threshold = (int) (lowWatermark * (float) cacheSize);
            // Need to ensure that only one refill task per key is executed
            try {
              if (keyQueue.size() < threshold) {
                refiller.fillQueueForKey(name, keyQueue,
                    cacheSize - keyQueue.size());
              }
            } catch (final Exception e) {
              throw new RuntimeException(e);
            }
          }
        }
        );
  }

  /**
   * Cleanly shutdown
   */
  public void shutdown() {
    executor.shutdownNow();
  }

}
