/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.parquet.column.values.bloomfilter;

import org.apache.parquet.Preconditions;
import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.io.api.Binary;

import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.IntBuffer;

/*
 * This Bloom filter is implemented using block-based Bloom filter algorithm from Putze et al.'s
 * "Cache-, Hash- and Space-Efficient Bloom filters". The basic idea is to hash the item to a tiny
 * Bloom filter which size fit a single cache line or smaller. This implementation sets 8 bits in
 * each tiny Bloom filter. Each tiny Bloom filter is 32 bytes to take advantage of 32-byte SIMD
 * instruction.
 */
public class BlockSplitBloomFilter implements BloomFilter {
  // Bytes in a tiny Bloom filter block.
  private static final int BYTES_PER_BLOCK = 32;

  // Bits in a tiny Bloom filter block.
  private static final int BITS_PER_BLOCK = 256;

  // Default minimum Bloom filter size, set to the size of a tiny Bloom filter block
  public static final int DEFAULT_MINIMUM_BYTES = 32;

  // Default Maximum Bloom filter size, set to 1MB which should cover most cases.
  public static final int DEFAULT_MAXIMUM_BYTES = 1024 * 1024;

  // The number of bits to set in a tiny Bloom filter
  private static final int BITS_SET_PER_BLOCK = 8;

  // The metadata in the header of a serialized Bloom filter is four four-byte values: the number of bytes,
  // the filter algorithm, the hash algorithm, and the compression.
  public static final int HEADER_SIZE = 16;

  // The default false positive probability value
  public static final double DEFAULT_FPP = 0.01;

  // Hash strategy used in this Bloom filter.
  private final HashStrategy hashStrategy;

  // The underlying byte array for Bloom filter bitset.
  private byte[] bitset;

  // A integer array buffer of underlying bitset to help setting bits.
  private IntBuffer intBuffer;

  // Hash function use to compute hash for column value.
  private HashFunction hashFunction;

  private int maximumBytes = DEFAULT_MAXIMUM_BYTES;
  private int minimumBytes = DEFAULT_MINIMUM_BYTES;

  // The block-based algorithm needs 8 odd SALT values to calculate eight indexes
  // of bits to set, one per 32-bit word.
  private static final int[] SALT = {0x47b6137b, 0x44974d91, 0x8824ad5b, 0xa2b7289d,
    0x705495c7, 0x2df1424b, 0x9efc4947, 0x5c6bfb31};

  /**
   * Constructor of block-based Bloom filter.
   *
   * @param numBytes The number of bytes for Bloom filter bitset. The range of num_bytes should be within
   *                 [DEFAULT_MINIMUM_BYTES, DEFAULT_MAXIMUM_BYTES], it will be rounded up/down
   *                 to lower/upper bound if num_bytes is out of range. It will also be rounded up to a power
   *                 of 2. It uses XXH64 as its default hash function.
   */
  public BlockSplitBloomFilter(int numBytes) {
    this(numBytes, DEFAULT_MAXIMUM_BYTES, HashStrategy.XXH64);
  }

  /**
   * Constructor of block-based Bloom filter.
   *
   * @param numBytes The number of bytes for Bloom filter bitset. The range of num_bytes should be within
   *                 [DEFAULT_MINIMUM_BYTES, DEFAULT_MAXIMUM_BYTES], it will be rounded up/down
   *                 to lower/upper bound if num_bytes is out of range. It will also be rounded up to a power
   *                 of 2. It uses XXH64 as its default hash function.
   * @param maximumBytes The maximum bytes of the Bloom filter.
   */
  public BlockSplitBloomFilter(int numBytes, int maximumBytes) {
    this(numBytes, maximumBytes, HashStrategy.XXH64);
  }

  /**
   * Constructor of block-based Bloom filter.
   *
   * @param numBytes The number of bytes for Bloom filter bitset
   * @param hashStrategy The hash strategy of Bloom filter.
   */
  private BlockSplitBloomFilter(int numBytes, HashStrategy hashStrategy) {
    this(numBytes, DEFAULT_MAXIMUM_BYTES, hashStrategy);
  }

  /**
   * Constructor of block-based Bloom filter.
   *
   * @param numBytes The number of bytes for Bloom filter bitset. The range of num_bytes should be within
   *                 [DEFAULT_MINIMUM_BYTES, maximumBytes], it will be rounded up/down to lower/upper bound if
   *                 num_bytes is out of range. It will also be rounded up to a power of 2.
   * @param maximumBytes The maximum bytes of the Bloom filter.
   * @param hashStrategy The adopted hash strategy of the Bloom filter.
   */
  public BlockSplitBloomFilter(int numBytes, int maximumBytes, HashStrategy hashStrategy) {
    if (maximumBytes > DEFAULT_MINIMUM_BYTES) {
      this.maximumBytes = maximumBytes;
    }
    initBitset(numBytes);

    switch (hashStrategy) {
      case XXH64:
        this.hashStrategy = hashStrategy;
        hashFunction = new XxHash();
        break;
      default:
        throw new RuntimeException("Unsupported hash strategy");
    }
  }


  /**
   * Construct the Bloom filter with given bitset, it is used when reconstructing
   * Bloom filter from parquet file. It use XXH64 as its default hash
   * function.
   *
   * @param bitset The given bitset to construct Bloom filter.
   */
  public BlockSplitBloomFilter(byte[] bitset) {
    this(bitset, HashStrategy.XXH64);
  }

  /**
   * Construct the Bloom filter with given bitset, it is used when reconstructing
   * Bloom filter from parquet file.
   *
   * @param bitset The given bitset to construct Bloom filter.
   * @param hashStrategy The hash strategy Bloom filter apply.
   */
  private BlockSplitBloomFilter(byte[] bitset, HashStrategy hashStrategy) {
    if (bitset == null) {
      throw new RuntimeException("Given bitset is null");
    }

    this.bitset = bitset;
    this.intBuffer = ByteBuffer.wrap(bitset).order(ByteOrder.LITTLE_ENDIAN).asIntBuffer();
    switch (hashStrategy) {
      case XXH64:
        this.hashStrategy = hashStrategy;
        hashFunction = new XxHash();
        break;
      default:
        throw new RuntimeException("Unsupported hash strategy");
    }
  }

  /**
   * Create a new bitset for Bloom filter.
   *
   * @param numBytes The number of bytes for Bloom filter bitset. The range of num_bytes should be within
   *                 [minimumBytes, maximumBytes], it will be rounded up/down
   *                 to lower/upper bound if num_bytes is out of range and also will rounded up to a power
   *                 of 2. It uses XXH64 as its default hash function and block-based algorithm
   *                 as default algorithm.
   */
  private void initBitset(int numBytes) {
    if (numBytes < minimumBytes) {
      numBytes = minimumBytes;
    }
    // Get next power of 2 if it is not power of 2.
    if ((numBytes & (numBytes - 1)) != 0) {
      numBytes = Integer.highestOneBit(numBytes) << 1;
    }
    if (numBytes > maximumBytes || numBytes < 0) {
      numBytes = maximumBytes;
    }
    this.bitset = new byte[numBytes];
    this.intBuffer = ByteBuffer.wrap(bitset).order(ByteOrder.LITTLE_ENDIAN).asIntBuffer();
  }

  @Override
  public void writeTo(OutputStream out) throws IOException {
    // Write number of bytes of bitset.
    out.write(BytesUtils.intToBytes(bitset.length));
    // Write hash strategy
    out.write(BytesUtils.intToBytes(hashStrategy.value));
    // Write algorithm
    out.write(BytesUtils.intToBytes(Algorithm.BLOCK.value));
    // Write compression
    out.write(BytesUtils.intToBytes(Compression.UNCOMPRESSED.value));
    // Write bitset
    out.write(bitset);
  }

  private int[] setMask(int key) {
    int[] mask = new int[BITS_SET_PER_BLOCK];

    for (int i = 0; i < BITS_SET_PER_BLOCK; ++i) {
      mask[i] = key * SALT[i];
    }
    for (int i = 0; i < BITS_SET_PER_BLOCK; ++i) {
      mask[i] = mask[i] >>> 27;
    }
    for (int i = 0; i < BITS_SET_PER_BLOCK; ++i) {
      mask[i] = 0x1 << mask[i];
    }

    return mask;
  }

  @Override
  public void insertHash(long hash) {
    long numBlocks = bitset.length / BYTES_PER_BLOCK;
    long lowHash = hash >>> 32;
    int blockIndex = (int)((lowHash * numBlocks) >> 32);
    int key = (int)hash;

    // Calculate mask for bucket.
    int[] mask = setMask(key);
    for (int i = 0; i < BITS_SET_PER_BLOCK; i++) {
      int value = intBuffer.get(blockIndex * (BYTES_PER_BLOCK / 4) + i);
      value |= mask[i];
      intBuffer.put(blockIndex * (BYTES_PER_BLOCK / 4) + i, value);
    }
  }

  @Override
  public boolean findHash(long hash) {
    long numBlocks = bitset.length / BYTES_PER_BLOCK;
    long lowHash = hash >>> 32;
    int blockIndex = (int)((lowHash * numBlocks) >> 32);
    int key = (int)hash;

    // Calculate mask for the tiny Bloom filter.
    int[] mask = setMask(key);
    for (int i = 0; i < BITS_SET_PER_BLOCK; i++) {
      if (0 == (intBuffer.get(blockIndex * (BYTES_PER_BLOCK / 4) + i) & mask[i])) {
        return false;
      }
    }

    return true;
  }

  /**
   * Calculate optimal size according to the number of distinct values and false positive probability.
   *
   * @param n: The number of distinct values.
   * @param p: The false positive probability.
   * @return optimal number of bits of given n and p.
   */
  public static int optimalNumOfBits(long n, double p) {
    Preconditions.checkArgument((p > 0.0 && p < 1.0),
      "FPP should be less than 1.0 and great than 0.0");
    final double m = -8 * n / Math.log(1 - Math.pow(p, 1.0 / 8));
    final double MAX = DEFAULT_MAXIMUM_BYTES << 3;
    int numBits = (int)m;

    // Handle overflow.
    if (m > MAX || m < 0) {
      numBits = (int)MAX;
    }

    // Round to BITS_PER_BLOCK
    numBits = (numBits + BITS_PER_BLOCK -1) & ~BITS_PER_BLOCK;

    if (numBits < (DEFAULT_MINIMUM_BYTES << 3)) {
      numBits = DEFAULT_MINIMUM_BYTES << 3;
    }

    return numBits;
  }

  @Override
  public long getBitsetSize() {
    return this.bitset.length;
  }

  @Override
  public long hash(Object value) {
    ByteBuffer plain;

    if (value instanceof Binary) {
      return hashFunction.hashBytes(((Binary) value).getBytes());
    }

    if (value instanceof Integer) {
      plain = ByteBuffer.allocate(Integer.SIZE/Byte.SIZE);
      plain.order(ByteOrder.LITTLE_ENDIAN).putInt(((Integer)value));
    } else if (value instanceof Long) {
      plain = ByteBuffer.allocate(Long.SIZE/Byte.SIZE);
      plain.order(ByteOrder.LITTLE_ENDIAN).putLong(((Long)value));
    } else if (value instanceof Float) {
      plain = ByteBuffer.allocate(Float.SIZE/Byte.SIZE);
      plain.order(ByteOrder.LITTLE_ENDIAN).putFloat(((Float)value));
    } else if (value instanceof Double) {
      plain = ByteBuffer.allocate(Double.SIZE/ Byte.SIZE);
      plain.order(ByteOrder.LITTLE_ENDIAN).putDouble(((Double)value));
    } else {
      throw new RuntimeException("Parquet Bloom filter: Not supported type");
    }

    return hashFunction.hashByteBuffer(plain);
  }

  @Override
  public long hash(int value) {
    ByteBuffer plain = ByteBuffer.allocate(Integer.SIZE/Byte.SIZE);
    plain.order(ByteOrder.LITTLE_ENDIAN).putInt(value);
    return hashFunction.hashByteBuffer(plain);
  }

  @Override
  public long hash(long value) {
    ByteBuffer plain = ByteBuffer.allocate(Long.SIZE/Byte.SIZE);
    plain.order(ByteOrder.LITTLE_ENDIAN).putLong(value);
    return hashFunction.hashByteBuffer(plain);
  }

  @Override
  public long hash(double value) {
    ByteBuffer plain = ByteBuffer.allocate(Double.SIZE/Byte.SIZE);
    plain.order(ByteOrder.LITTLE_ENDIAN).putDouble(value);
    return hashFunction.hashByteBuffer(plain);
  }

  @Override
  public long hash(float value) {
    ByteBuffer plain = ByteBuffer.allocate(Float.SIZE/Byte.SIZE);
    plain.order(ByteOrder.LITTLE_ENDIAN).putFloat(value);
    return hashFunction.hashByteBuffer(plain);
  }

  @Override
  public long hash(Binary value) {
    return hashFunction.hashBytes(value.getBytes());
  }
}
