package org.apache.helix.tools;

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

import java.util.*;
import java.util.zip.CRC32;

public class RUSHrHash {
  /**
   * @var int holds the value for how many replicas to create for an object
   */
  protected int replicationDegree = 1;

  /**
   * an array of hash maps where each hash map holds info on the sub cluster
   * that corresponds to the array indices meaning that array element 0 holds
   * data for server 0
   * that is the total number of nodes in the cluster this property is populated
   * at construction time only
   * @var
   */

  protected HashMap[] clusters;

  /**
   * an array of hash maps where each element holds data for a sub cluster
   */
  protected HashMap[] clusterConfig;

  /**
   * total number of sub-clusters in our data configuration this property is
   * populated at construction time only
   * @var integer
   */
  protected int totalClusters = 0;

  /**
   * the total number of nodes in all of the subClusters this property is
   * populated at construction time only
   * @var integer
   */
  protected int totalNodes = 0;

  /**
   * the total number of nodes in all of the clusters this property is populated
   * at construction time only
   * @var integer
   */
  protected int totalNodesW = 0;

  /**
   * an array of HashMaps where each HashMap holds the data for a single node
   */
  protected HashMap[] nodes = null;

  /**
   * @var integer value used to help seed the random number generator
   */
  protected final int SEED_PARAM = 1560;

  /**
   * random number generator
   */

  Random ran = new Random();

  /**
   * maximum value we can have from the ran generator
   */
  float ranMax = (float) Math.pow(2.0, 16.0);

  /**
   * The constructor analyzes the passed config to obtain the fundamental values
   * and data structures for locating a node. Each of those values is described
   * in detail above with each property. briefly:
   * this.clusters this.totalClusters this.totalNodes
   * The values above are derived from the HashMap[] oonfig passed to the
   * locator.
   * @param conf
   *          dataConfig
   * @throws Exception
   */

  public RUSHrHash(HashMap<String, Object> conf) throws Exception {

    clusterConfig = (HashMap[]) conf.get("subClusters");
    replicationDegree = (Integer) conf.get("replicationDegree");

    HashMap[] subClusters = (HashMap[]) conf.get("subClusters");
    totalClusters = subClusters.length;
    clusters = new HashMap[totalClusters];
    // check the confg for all of the params
    // throw a exception if they are not there
    if (totalClusters <= 0) {
      throw new Exception(
          "data config to the RUSHr locator does not contain a valid clusters property");
    }

    int nodeCt = 0;
    HashMap[] nodeData = null;
    ArrayList<HashMap> tempNodes = new ArrayList<HashMap>();
    HashMap<String, Object> subCluster = null, clusterData = null;
    Integer clusterDataList[] = null;
    for (int i = 0; i < totalClusters; i++) {
      subCluster = subClusters[i];
      nodeData = (HashMap[]) subCluster.get("nodes");

      nodeCt = nodeData.length;
      clusterDataList = new Integer[nodeCt];
      for (int n = 0; n < nodeCt; n++) {
        tempNodes.add(nodeData[n]);
        clusterDataList[n] = n;
      }
      totalNodes += nodeCt;
      totalNodesW += nodeCt * (Integer) subCluster.get("weight");

      clusterData = new HashMap<String, Object>();
      clusterData.put("count", nodeCt);
      clusterData.put("list", clusterDataList);
      clusters[i] = clusterData;
    }
    nodes = new HashMap[totalNodes];
    tempNodes.toArray(nodes);
  }

  /**
   * This function is an implementation of a RUSHr algorithm as described by R J
   * Honicky and Ethan Miller
   * @param objKey
   * @throws Exception
   * @return
   */
  public ArrayList<HashMap> findNode(long objKey) throws Exception {

    HashMap[] c = this.clusters;
    int sumRemainingNodes = this.totalNodes;
    int sumRemainingNodesW = this.totalNodesW;
    int repDeg = this.replicationDegree;
    int totClu = this.totalClusters;
    int totNod = this.totalNodes;
    HashMap[] clusConfig = this.clusterConfig;

    // throw an exception if the data is no good
    if ((totNod <= 0) || (totClu <= 0)) {
      throw new Exception("the total nodes or total clusters is negative or 0.  bad joo joos!");
    }

    // get the starting cluster
    int currentCluster = totClu - 1;

    /**
     * this loop is an implementation of the RUSHr algorithm for fast placement
     * and location of objects in a distributed storage system
     * j = current cluster m = disks in current cluster n = remaining nodes
     */
    ArrayList<HashMap> nodeData = new ArrayList<HashMap>();
    while (true) {

      // prevent an infinite loop, in case there is a bug
      if (currentCluster < 0) {
        throw new Exception(
            "the cluster index became negative while we were looking for the following id: objKey.  This should never happen with any key.  There is a bug or maybe your joo joos are BAD!");
      }

      HashMap clusterData = clusConfig[currentCluster];
      Integer weight = (Integer) clusterData.get("weight");

      Integer disksInCurrentCluster = (Integer) c[currentCluster].get("count");
      sumRemainingNodes -= disksInCurrentCluster;

      Integer disksInCurrentClusterW = disksInCurrentCluster * weight;
      sumRemainingNodesW -= disksInCurrentClusterW;

      // set the seed to our set id
      long seed = objKey + currentCluster;
      ran.setSeed(seed);
      int t = (repDeg - sumRemainingNodes) > 0 ? (repDeg - sumRemainingNodes) : 0;

      int u =
          t
              + drawWHG(repDeg - t, disksInCurrentClusterW - t, disksInCurrentClusterW
                  + sumRemainingNodesW - t, weight);
      if (u > 0) {
        if (u > disksInCurrentCluster) {
          u = disksInCurrentCluster;
        }
        ran.setSeed(objKey + currentCluster + SEED_PARAM);
        choose(u, currentCluster, sumRemainingNodes, nodeData);
        reset(u, currentCluster);
        repDeg -= u;
      }
      if (repDeg == 0) {
        break;
      }
      currentCluster--;
    }
    return nodeData;
  }

  /**
   * This function is an implementation of a RUSH algorithm as described by R J
   * Honicky and Ethan Miller
   * @param objKey
   *          - an int used as the prng seed. this int is usually derived from a
   *          string hash
   * @return node - holds three values: abs_node - an int which is the absolute
   *         position of the located node in relation to all nodes on all
   *         subClusters rel_node - an int which is the relative postion located
   *         node within the located cluster cluster - an int which is the
   *         located cluster
   * @throws Exception
   */
  public ArrayList<HashMap> findNode(String objKey) throws Exception {
    // turn a string identifier into an integer for the random seed
    CRC32 crc32 = new CRC32();
    byte[] bytes = objKey.getBytes();
    crc32.update(bytes);
    long crc32Value = crc32.getValue();
    long objKeyLong = (crc32Value >> 16) & 0x7fff;
    return findNode(objKeyLong);
  }

  public void reset(int nodesToRetrieve, int currentCluster) {
    Integer[] list = (Integer[]) clusters[currentCluster].get("list");
    Integer count = (Integer) clusters[currentCluster].get("count");

    int listIdx;
    int val;
    for (int nodeIdx = 0; nodeIdx < nodesToRetrieve; nodeIdx++) {
      listIdx = count - nodesToRetrieve + nodeIdx;
      val = list[listIdx];
      if (val < (count - nodesToRetrieve)) {
        list[val] = val;
      }
      list[listIdx] = listIdx;
    }
  }

  public void choose(int nodesToRetrieve, int currentCluster, int remainingNodes,
      ArrayList<HashMap> nodeData) {
    Integer[] list = (Integer[]) clusters[currentCluster].get("list");
    Integer count = (Integer) clusters[currentCluster].get("count");

    int maxIdx;
    int randNode;
    int chosen;
    for (int nodeIdx = 0; nodeIdx < nodesToRetrieve; nodeIdx++) {
      maxIdx = count - nodeIdx - 1;
      randNode = ran.nextInt(maxIdx + 1);
      // swap
      chosen = list[randNode];
      list[randNode] = list[maxIdx];
      list[maxIdx] = chosen;
      // add the remaining nodes so we can find the node data when we are done
      nodeData.add(nodes[remainingNodes + chosen]);
    }
  }

  /**
   * @param objKey
   * @return
   * @throws com.targetnode.data.locator.Exception
   */
  public ArrayList<HashMap> findNodes(String objKey) throws Exception {
    return findNode(objKey);
  }

  public int getReplicationDegree() {
    return replicationDegree;
  }

  public int getTotalNodes() {
    return totalNodes;
  }

  public int drawWHG(int replicas, int disksInCurrentCluster, int totalDisks, int weight) {
    int found = 0;
    float z;
    float prob;
    int ranInt;

    for (int i = 0; i < replicas; i++) {
      if (totalDisks != 0) {
        ranInt = ran.nextInt((int) (ranMax + 1));
        z = ((float) ranInt / ranMax);
        prob = ((float) disksInCurrentCluster / (float) totalDisks);
        if (z <= prob) {
          found++;
          disksInCurrentCluster -= weight;
        }
        totalDisks -= weight;
      }
    }
    return found;
  }
}
