blob: f2979df6b8ec6bcf52b832b1674c9273558abae5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.runtime.util;
import org.apache.flink.core.memory.DataInputDeserializer;
import java.util.Arrays;
import static org.apache.flink.table.dataformat.util.BinaryRowUtil.BYTE_ARRAY_BASE_OFFSET;
import static org.apache.flink.table.dataformat.util.BinaryRowUtil.LONG_ARRAY_OFFSET;
import static org.apache.flink.table.dataformat.util.BinaryRowUtil.UNSAFE;
import static org.apache.flink.util.Preconditions.checkArgument;
/**
* BloomFilter based on a long array of Java heap, and serialization and merge based on Unsafe.
*
* <p>Part of this class refers to the implementation from Apache Hive project
* https://github.com/apache/hive/blob/master/common/src/java/org/apache/hive/common/util/BloomFilter.java.
*/
public class BloomFilter {
private static final long MIN_BLOOM_FILTER_ENTRIES = 500000L;
/**
* Default false positive probability for BloomFilter.
*/
public static final double DEFAULT_FPP = 0.03f;
private final int numBits;
private final int numHashFunctions;
private final BitSet bitSet;
public BloomFilter(long maxNumEntries) {
this(maxNumEntries, DEFAULT_FPP);
}
/**
* Constructor. MaxNumEntries and fpp together determine the size of bloomFilter.
* @param maxNumEntries max number entries in this bloomFilter.
* @param fpp false positive probability.
*/
public BloomFilter(long maxNumEntries, double fpp) {
checkArgument(maxNumEntries > 0, "expectedEntries should be > 0");
int nb = optimalNumOfBits(maxNumEntries, fpp);
this.numBits = nb + (Long.SIZE - (nb % Long.SIZE));
this.numHashFunctions = optimalNumOfHashFunctions(maxNumEntries, numBits);
this.bitSet = new BitSet(this.numBits);
}
private BloomFilter(long[] bits, int numFuncs) {
this.numBits = bits.length * Long.SIZE;
this.numHashFunctions = numFuncs;
this.bitSet = new BitSet(bits);
}
// Thomas Wang's integer hash function
// http://web.archive.org/web/20071223173210/http://www.concentric.net/~Ttwang/tech/inthash.htm
public static long getLongHash(long key) {
key = (~key) + (key << 21); // key = (key << 21) - key - 1;
key = key ^ (key >> 24);
key = (key + (key << 3)) + (key << 8); // key * 265
key = key ^ (key >> 14);
key = (key + (key << 2)) + (key << 4); // key * 21
key = key ^ (key >> 28);
key = key + (key << 31);
return key;
}
public static long getLongHash(double key) {
return getLongHash(Double.doubleToLongBits(key));
}
public void addHash(long hash64) {
int hash1 = (int) hash64;
int hash2 = (int) (hash64 >>> 32);
for (int i = 1; i <= numHashFunctions; i++) {
int combinedHash = hash1 + ((i + 1) * hash2);
// hashcode should be positive, flip all the bits if it's negative
if (combinedHash < 0) {
combinedHash = ~combinedHash;
}
int pos = combinedHash % numBits;
bitSet.set(pos);
}
}
public boolean testHash(long hash64) {
int hash1 = (int) hash64;
int hash2 = (int) (hash64 >>> 32);
for (int i = 1; i <= numHashFunctions; i++) {
int combinedHash = hash1 + ((i + 1) * hash2);
// hashcode should be positive, flip all the bits if it's negative
if (combinedHash < 0) {
combinedHash = ~combinedHash;
}
int pos = combinedHash % numBits;
if (!bitSet.get(pos)) {
return false;
}
}
return true;
}
public long[] getBitSet() {
return bitSet.getData();
}
/**
* Merge the specified bloom filter with current bloom filter.
*
* @param that - bloom filter to merge
*/
public void merge(BloomFilter that) {
if (this != that && this.numBits == that.numBits && this.numHashFunctions == that.numHashFunctions) {
this.bitSet.putAll(that.bitSet);
} else {
throw new IllegalArgumentException("BloomKFilters are not compatible for merging." +
" this - " + this.toString() + " that - " + that.toString());
}
}
public void reset() {
this.bitSet.clear();
}
@Override
public String toString() {
return "numBits: " + numBits + " numHashFunctions: " + numHashFunctions;
}
/**
* This is a high performance to bytes.
* See {@link DataInputDeserializer#readLong()}, in LITTLE_ENDIAN, it will reverse long,
* that is low performance.
*/
public static byte[] toBytes(BloomFilter filter) {
long[] bitSet = filter.getBitSet();
int longLen = bitSet.length;
byte[] bytes = new byte[1 + 4 + longLen * 8];
UNSAFE.putByte(bytes, BYTE_ARRAY_BASE_OFFSET, (byte) filter.numHashFunctions);
UNSAFE.putInt(bytes, BYTE_ARRAY_BASE_OFFSET + 1, longLen);
UNSAFE.copyMemory(bitSet, LONG_ARRAY_OFFSET,
bytes, BYTE_ARRAY_BASE_OFFSET + 5, longLen * 8);
return bytes;
}
public static BloomFilter fromBytes(byte[] bytes) {
byte numHashFunc = UNSAFE.getByte(bytes, BYTE_ARRAY_BASE_OFFSET);
int longLen = UNSAFE.getInt(bytes, BYTE_ARRAY_BASE_OFFSET + 1);
long[] data = new long[longLen];
UNSAFE.copyMemory(bytes, BYTE_ARRAY_BASE_OFFSET + 5,
data, LONG_ARRAY_OFFSET, longLen * 8);
return new BloomFilter(data, numHashFunc);
}
/**
* For code gen.
*/
public static long suitableMaxNumEntries(long maxNumEntries) {
return Math.max(maxNumEntries, BloomFilter.MIN_BLOOM_FILTER_ENTRIES);
}
private static int optimalNumOfHashFunctions(long n, long m) {
return Math.max(1, (int) Math.round((double) m / n * Math.log(2)));
}
private static int optimalNumOfBits(long maxNumEntries, double fpp) {
return (int) (-maxNumEntries * Math.log(fpp) / (Math.log(2) * Math.log(2)));
}
public static double findSuitableFpp(long entries, double maxNumOfBits) {
for (double f = DEFAULT_FPP; f < 1.0f; f += 0.01f) {
long bits = optimalNumOfBits(entries, f);
if (bits < maxNumOfBits) {
return f;
}
}
return 1f;
}
public static void mergeBloomFilterBytes(byte[] bf1Bytes, byte[] bf2Bytes) {
mergeBloomFilterBytes(bf1Bytes, 0, bf1Bytes.length, bf2Bytes, 0, bf2Bytes.length);
}
public static void mergeBloomFilterBytes(
byte[] bf1Bytes, int bf1Start, int bf1Length,
byte[] bf2Bytes, int bf2Start, int bf2Length) {
if (bf1Length != bf2Length) {
throw new IllegalArgumentException("bf1Length " + bf1Length + " does not match bf2Length " + bf2Length);
}
// Validation on the bitset size/3 hash functions.
int longLen1 = UNSAFE.getInt(bf1Bytes, BYTE_ARRAY_BASE_OFFSET + bf1Start + 1);
if (UNSAFE.getByte(bf1Bytes, BYTE_ARRAY_BASE_OFFSET + bf1Start) !=
UNSAFE.getByte(bf2Bytes, BYTE_ARRAY_BASE_OFFSET + bf2Start) ||
longLen1 != UNSAFE.getInt(bf2Bytes, BYTE_ARRAY_BASE_OFFSET + bf2Start + 1)) {
throw new IllegalArgumentException("bf1 NumHashFunctions/NumBits does not match bf2");
}
for (int idx = 5 + BYTE_ARRAY_BASE_OFFSET; idx < bf1Length + BYTE_ARRAY_BASE_OFFSET; idx += 8) {
long l1 = UNSAFE.getLong(bf1Bytes, bf1Start + idx);
long l2 = UNSAFE.getLong(bf2Bytes, bf2Start + idx);
UNSAFE.putLong(bf1Bytes, bf1Start + idx, l1 | l2);
}
}
/**
* Bare metal bit set implementation. For performance reasons, this implementation does not
* check for index bounds nor expand the bit set size if the specified index is greater than
* the size.
*/
public static class BitSet {
private final long[] data;
BitSet(long bits) {
this(new long[(int) Math.ceil((double) bits / (double) Long.SIZE)]);
}
/**
* Deserialize long array as bit set.
*
* @param data - bit array
*/
BitSet(long[] data) {
assert data.length > 0 : "data length is zero!";
this.data = data;
}
/**
* Sets the bit at specified index.
*
* @param index - position
*/
public void set(int index) {
data[index >>> 6] |= (1L << index);
}
/**
* Returns true if the bit is set in the specified index.
*
* @param index - position
* @return - value at the bit position
*/
public boolean get(int index) {
return (data[index >>> 6] & (1L << index)) != 0;
}
public long[] getData() {
return data;
}
/**
* Combines the two BitArrays using bitwise OR.
*/
public void putAll(BloomFilter.BitSet array) {
assert data.length == array.data.length :
"BitArrays must be of equal length (" + data.length + "!= " + array.data.length + ")";
for (int i = 0; i < data.length; i++) {
data[i] |= array.data[i];
}
}
/**
* Clear the bit set.
*/
public void clear() {
Arrays.fill(data, 0);
}
}
}