blob: 4143b8bb1d71bc4cbc658deef448bb33b5d21940 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// @@@ END COPYRIGHT @@@
#include "BloomFilter.h"
#include "exp_function.h"
#include "NAMemory.h"
#include "ComASSERT.h"
#include <math.h>
#include <fstream>
static float ln(float x)
return (float)(log(x) / log(2.0));
static UInt32 bitsNeeded(UInt64 maxVal)
UInt32 bits = (UInt32)ceil(ln((float)maxVal));
// Since the MAX value in freqsL_ is used as
// the indicator for overflow, we need extra 1
// bit to cover maxVal # of distinct values.
return MAXOF(bits, 1) + 1;
void computeParams(UInt32 maxHashFuncs, // max hash funcs
UInt32 n, // Input: # of distinct elements
float p, // Input: probability of false positive
UInt32& m, // output
UInt16& k // output
// m (the number of bits for hash table, given n, p and optimal k):
// m = -(n Ln(p)) / (Ln(2)^2
m = UInt32(-(n * ln(p)) / pow(ln(2),2));
// optimal k
// k (the optimial # of hash functions, given m and n) = (m/n) Ln(2)
k = (UInt16) ceil(( m * ln(2) ) / n);
if ( k > maxHashFuncs ) {
k = maxHashFuncs;
// recompute m:
m = UInt32(- (float)k * (float)n / ln( 1 - powf(p, 1/(float)k)));
// n = # of distinct elements (INPUT)
// p = the probability of false positives (INPUT)
// m (the number of bits for hash table, given n and p) = -(n Ln(p)) / (Ln(2))^2
// k (the optimial # of hash functions, given m and n) = (m/n) Ln(2)
// If k is capped at MAX_NUM_HASH_FUNCS, then m is computed as
// m = -MAX_NUM_HASH_FUNCS * n / (Ln(1-p^(1/MAX_NUM_HASH_FUNCS)))
// Parameters for the count table (freqsL_)
// #entries = m
// #bits per entry = BitsCT = ceil(Ln(non-overflow frequency (INPUT)))
// Parameters for freq of freq array (freqFreqC_)
// #entries = 2^(bitsCT)
// #bits per entry = ceil(Ln(m))
// freqFreqC_ only records freqFerq stored in the count table that has not been
// overflowed. The true freq of freq needs to consider frequency stored in the
// overflow table.
// Parameters for overflow table
// #entries = # of skewed elements in n (INPUT)
// #bits per entry = UInt32
BloomFilter::BloomFilter(NAHeap* heap, UInt32 maxHashFuncs,
UInt32 n, // # of distinct elements
float p // probability of false positives
heap_(heap), keyLenInfo_(0)
computeParams(maxHashFuncs, n, p, m_, k_);
BloomFilter::BloomFilter(NAHeap* heap) :
heap_(heap), keyLenInfo_(0), m_(0), k_(0)
UInt32 BloomFilter::computeFinalHash(UInt32 hash, Lng32 hashFunc)
UInt32 flags = ExHDPHash::NO_FLAGS;
char hashNum = (char) hashFunc;
UInt32 hashValue2 = ExHDPHash::hash((char*)&hashNum, flags, sizeof(hashNum));
return (UInt32)(((hash<<1 | hash>>31) ^ hashValue2) % m_);
double BloomFilter::density()
// TBD
return 0;
ULng32 BloomFilter::packIntoBuffer(char*& buffer, NABoolean swapBytes)
ULng32 size = pack(buffer, m_, swapBytes);
size += pack(buffer, k_, swapBytes);
size += pack(buffer, keyLenInfo_, swapBytes);
return size;
ULng32 BloomFilter::unpackBuffer(char*& buffer)
ULng32 size = unpack(buffer, m_);
size += unpack(buffer, k_);
size += unpack(buffer, keyLenInfo_);
return size;
ULng32 scbfHashFunc(const simple_cbf_key& key)
return ExHDPHash::hash(key.key_, ExHDPHash::NO_FLAGS, key.keyLen_);
ULng32 cbfHashFunc(const cbf_key& key)
return ExHDPHash::hash(key.key_, ExHDPHash::NO_FLAGS, key.keyLen_);
// #entries = 2^(bitsCT)
// #bits per entry = ceil(Ln(m))
UInt64 CountingBloomFilter::estimateMemoryInBytes(
UInt32 maxHashFuncs,
UInt32 n, // # of distinct elements
float p, // probability of false positives
// non-overflow freq of n elements
UInt32 nonOverflowFreq,
UInt32 avgSkewedElements,
// # of buckets that keys will fall in.
// Assume each key maps to one
// bucket only
UInt32 numBuckets
UInt32 m;
UInt16 k;
computeParams(maxHashFuncs, n, p, m, k);
// for freqsL_
UInt64 totalMem = VarUIntArray::estimateMemoryInBytes(
m, (UInt32)bitsNeeded(nonOverflowFreq));
// for uecs_
totalMem += VarUIntArray::estimateMemoryInBytes(
numBuckets, (UInt32)bitsNeeded(n));
// for totalFreqs_
totalMem += VarUIntArray::estimateMemoryInBytes(
(UInt32)MINOF(32, nonOverflowFreq * n / numBuckets));
// for freq2sL_ array
totalMem += sizeof(VarUIntArrayPtr) * numBuckets;
// for each VarUIntArray pointed by each element of the array
totalMem += numBuckets * VarUIntArray::estimateMemoryInBytes(
(1 << (UInt32)bitsNeeded(nonOverflowFreq))+1,
totalMem += sizeof(CountingBloomFilter);
// for bucketNums_
totalMem += VarUIntArray::estimateMemoryInBytes(
m, (UInt32)bitsNeeded(numBuckets));
return totalMem;
ULng32 CountingBloomFilter::packIntoBuffer(char*& buffer, NABoolean swapBytes)
ULng32 sz = BloomFilter::packIntoBuffer(buffer, swapBytes);
sz += freqsL_.packIntoBuffer(buffer, swapBytes);
sz += pack(buffer, numBuckets_, swapBytes);
for ( CollIndex i=0; i<numBuckets_; i++ )
sz += freq2sL_[i]->packIntoBuffer(buffer, swapBytes);
sz += uecs_.packIntoBuffer(buffer, swapBytes);
sz += totalFreqs_.packIntoBuffer(buffer, swapBytes);
sz += pack(buffer, actualOverflowF2s_, swapBytes);
sz += bucketNums_.packIntoBuffer(buffer, swapBytes);
// Not packed in this class. Subclass CBFWithKnownSkew packs freqsH_.
// freqStruct* freqsH_;
// Not packed in this class. Subclass GeneralCountingBloomFilter packs freq2sH_.
// UInt64* freq2sH_;
// Not packed, since saveFi() is not used.
// VarUIntArrayPtr* savedFreq2sL_;
return sz;
ULng32 CountingBloomFilter::unpackBuffer(char*& buffer)
ULng32 sz = BloomFilter::unpackBuffer(buffer);
sz += freqsL_.unpackBuffer(buffer);
sz += unpack(buffer, numBuckets_);
freq2sL_ = new (heap_) VarUIntArrayPtr[numBuckets_];
for ( CollIndex i=0; i<numBuckets_; i++ ) {
freq2sL_[i] = new (heap_) VarUIntArray(heap_);
sz += freq2sL_[i]->unpackBuffer(buffer);
sz += uecs_.unpackBuffer(buffer);
sz += totalFreqs_.unpackBuffer(buffer);
sz += unpack(buffer, actualOverflowF2s_);
sz += bucketNums_.unpackBuffer(buffer);
// Rebuild freqsH_ and freq2sH_
return sz;
CountingBloomFilter::CountingBloomFilter(NAHeap* heap,
UInt32 maxHashFuncs,
UInt32 n, // # of distinct elements
float p, // probability of false positives
UInt32 nonOverflowFreq, // non-overflow freq of n elements
UInt32 avgSkewedElements,
UInt32 numBuckets
BloomFilter(heap, maxHashFuncs, n, p),
freqsL_(m_, (UInt32)bitsNeeded(nonOverflowFreq), heap_),
uecs_(numBuckets, (UInt32)bitsNeeded(n), heap_),
totalFreqs_(numBuckets, (UInt32)MINOF(32, nonOverflowFreq * n / numBuckets), heap_),
// total freq per bucket is estimated as
// avg. freq * n / numBuckets
bucketNums_(m_, (UInt32)bitsNeeded(numBuckets), heap_),
freq2sL_ = new (heap) VarUIntArrayPtr[numBuckets];
for ( CollIndex i=0; i<numBuckets; i++ )
freq2sL_[i] = new VarUIntArray((1 << (UInt32)bitsNeeded(nonOverflowFreq))+1,
(UInt32)bitsNeeded(m_), heap_);
sprintf(paramBuf, "maxHash=%d, n=%d, falsePosProb=%f, lfkeys=%d, skews=%d, buckets=%d, #slots=%d",
maxHashFuncs, n, p, nonOverflowFreq, avgSkewedElements, numBuckets, m_);
CountingBloomFilter::CountingBloomFilter(NAHeap* heap) :
paramBuf[0] = '\0';
void CountingBloomFilter::clear()
for ( CollIndex i=0; i<numBuckets_; i++ )
if ( freqsH_ )
memset(freqsH_, 0, sizeof(freqStruct) * getOverflowEntries());
if ( freq2sH_ )
memset(freq2sH_, 0, sizeof(UInt64) * getOverflowEntries());
NADELETEARRAY(freq2sL_, numBuckets_, VarUIntArrayPtr, heap_);
NADELETEBASIC(freqsH_, heap_);
NADELETEBASIC(freq2sH_, heap_);
Int32 compareTwoFreqStructs(const void* x, const void* y)
return ((freqStruct*)x)->operator==(*((freqStruct*)y));
Int32 freqStruct::operator==(freqStruct& y)
if ( getBucket() == y.getBucket() ) {
if ( getFreq() == y.getFreq() )
return 0;
else {
if ( getFreq() < y.getFreq() )
return -1;
return 1;
} else {
if ( getBucket() < y.getBucket() )
return -1;
return 1;
ULng32 freqStruct::pack(char* buffer, NABoolean swapBytes)
ULng32 sz = pack(buffer, freq_, swapBytes);
sz += pack(buffer, bucket_, swapBytes);
return sz;
ULng32 freqStruct::unpack(char* buffer)
ULng32 sz = unpack(buffer, freq_);
sz += unpack(buffer, bucket_);
return sz;
void CountingBloomFilter::printfreqfreq()
cout << endl;
cout << "***** CBF statistics ***********" << endl;
cout << "In low freq area:" << std::endl;
for (UInt32 b=0; b < numBuckets_; b++) {
cout << "In bucket " << b << ":" << endl;
UInt32 uc = uec(b);
UInt32 tf = totalFreq(b);
if ( tf > 0 )
cout << "uec=" << uc << ", total_freq=" << tf << endl;
for (UInt32 i=1; i < freq2sL_[b]->entries(); i++)
if ((*freq2sL_[b])[i] >0)
std::cout << "number of low freq. entries f" << i << "="
<< (*freq2sL_[b])[i] << std::endl;
UInt32 highF2s = getOverflowEntries();
std::cout << endl << "In high freq area:" << std::endl;
cout << "#highF2s=" << highF2s << endl;
for ( CollIndex i = 0 ; i < highF2s; i++) {
UInt64 freq;
UInt32 bucket;
UInt64 f2 = highF2(i, freq, bucket);
if ( freq > 0 )
std::cout << "In bucket " << bucket
<< ", number of entries with high freq. f"
<< freq << "=" << f2 << endl;
cout << endl;
UInt64 CountingBloomFilter::highF2(UInt32 k, UInt64& i, UInt32& bucket)
ComASSERT(k <= actualOverflowF2s_);
i = freqsH_[k].getFreq();
bucket = freqsH_[k].getBucket();
return freq2sH_[k];
#if 0
#define DISPLAY_KEY(x,y,z) display_key(x,y,z)
#define DISPLAY_KEY4(x,y,z, u) display_key(x,y,z, u)
#define DISPLAY_HASH_ENTRY(x,y) display_hash_entry(x,y)
#define DISPLAY_MSG(x) display_msg(x)
#define DISPLAY_KEY(x,y,z)
#define DISPLAY_KEY4(x,y,z,u)
#define DISPLAY_MSG(x)
void CountingBloomFilter::display_key(const char* msg, char * key, UInt32 key_len, UInt32 bucket)
cout << msg << endl;
cout << "bucket=" << bucket << ", " << "key=" ;
for (UInt32 k=0;k<key_len; k++)
cout << key[k];
cout << endl;
void CountingBloomFilter::display_hash_entry(const char* msg, UInt32 hash)
cout << msg << endl << "hash=" << hash << ", count[" << hash << "] =" << freqsL_[hash] << endl;
simple_cbf_key::simple_cbf_key(char* key, UInt32 key_len, NAHeap* heap) :
keyLen_(key_len), heap_(heap)
key_ = new (heap_) char[keyLen_];
memmove(key_, key, keyLen_);
simple_cbf_key::simple_cbf_key(const simple_cbf_key& key) :
heap_(key.heap_), keyLen_(key.keyLen_)
key_ = new (heap_) char[keyLen_];
memmove(key_, key.key_, keyLen_);
NADELETEBASIC(key_, heap_);
simple_cbf_key& simple_cbf_key::operator=(const simple_cbf_key& other)
heap_ = other.heap_;
keyLen_ = other.keyLen_;
key_ = new (heap_) char[keyLen_];
memmove(key_, other.key_, keyLen_);
return *this;
void CountingBloomFilter::display_msg(const char* msg)
cout << msg << endl;
cbf_key::cbf_key(char* key, UInt32 key_len, UInt32 bucket, MFV_ENUM mfv, NAHeap* heap) :
simple_cbf_key(key, key_len), bucket_(bucket), mfv_(mfv)
cbf_key::cbf_key(const cbf_key& key) :
simple_cbf_key(key), bucket_(key.bucket_), mfv_(key.mfv_)
cbf_key& cbf_key::operator=(const cbf_key& other)
bucket_ = other.bucket_;
mfv_ = other.mfv_;
return *this;
// For each bucket, compute the sum of all frequencies squared.
void CountingBloomFilter::computeSumOfFrequencySquared(double * sumSq, Int32 sz)
sz = MINOF(sz, (Int32)numBuckets_);
for ( Int32 i=0; i<sz; i++ )
sumSq[i] = 0.0;
// Use freqsL_ which records the frequencies for all low freq keys
// Use bucketNums_ which records the bucket number for all low freq keys
// Accumulate low frequencies per bucket
for ( Int32 b=1; b<sz; b++ ) {
VarUIntArray& lowf2s = lowF2s(b);
for (UInt32 j=1; j<lowf2s.entries(); j++)
// fj is the total # of keys each appearing j times.
// For these fj keys, Sum freq^2 = j * j * fj.
UInt32 fj = lowf2s[j];
sumSq[b] += j * j * fj;
void GeneralCountingBloomFilter::computeSumOfFrequencySquaredHighFreq(double * sumSq)
assert(0); // not implemented yet.
void CountingBloomFilterWithKnownSkews::computeSumOfFrequencySquaredHighFreq(double * sumSq)
// Accumulate high frequencies per bucket.
// For bucket i, its MFV is stored at 2*i, and 2MFV at 2*i+1.
for ( UInt32 i=0; i<getOverflowEntries()/2; i++ ) {
// MFV's index is 2*i
double fd = (double)freq2sH_[2*i];
sumSq[i] += fd * fd;
// 2MFV's index is 2*i+1
fd = (double)freq2sH_[2*i+1];
sumSq[i] += fd * fd;
CountingBloomFilter::insert(char * key, UInt32 key_len, UInt32 bucket, cbf_key::MFV_ENUM mfv)
DISPLAY_KEY4("insert start", key, key_len, bucket);
if (bucket >= numBuckets_)
UInt32 flags = ExHDPHash::NO_FLAGS;
UInt32 hashValueCommon = ExHDPHash::hash(key, keyLenInfo_, key_len);
UInt32 f = 0xFFFFFFFF;
ULng32 newFreq=0;
NABoolean slotFound = FALSE;
for(Lng32 i = 0; i < k_; i++)
UInt32 hash_index = computeFinalHash(hashValueCommon, i);
#if 0
if ( filename_ && (bucket == 44)) {
ofstream fileout(filename_, ios::app);
UInt32 flags = ExHDPHash::NO_FLAGS;
char hashNum = (char) i;
UInt32 hashValue2 = ExHDPHash::hash((char*)&hashNum, flags, sizeof(hashNum));
UInt32 hash = ((hashValueCommon<<1 | hashValueCommon>>31) ^ hashValue2) % m_;
fileout << " {(" << (hashValueCommon<<1 | hashValueCommon>>31)
<< " ^ " << hashValue2 << ") % "
<< m_ << " = " << hash << "}" << endl;
UInt32 b= bucketNums_[hash_index];
// Compute the min frequency in those slots that have
// not been occupied or occupted by keys from the same
// bucket.
#if 0
if ( filename_ && (bucket == 44)) {
ofstream fileout(filename_, ios::app);
fileout << "\t[" << i << "]" << "\t" << hash << "," << b;
if ( b == 0 || b-1 == bucket ) {
freqsL_.add(hash_index, 1, newFreq);
if ( b==0 )
bucketNums_.put(hash_index, bucket+1);
if ( i == 0 || f > newFreq ) {
f = newFreq;
slotFound = TRUE;
if ( !slotFound ) {
#if 0
if ( filename_ && (bucket == 44)) {
ofstream fileout(filename_, ios::app);
fileout << "\tfail to insert" << endl;
return NO_SLOT;
#if 0
if ( filename_ && (bucket == 44)) {
ofstream fileout(filename_, ios::app);
fileout << endl;
if ( f >= freqsL_.getMaxVal() ) {
// The key overflows
// If this is not a MFV and we can not handle arbitrary skews, bail out
if ( mfv == cbf_key::NONE && !canHandleArbitrarySkewedValue() )
return NEW_MFV;
// Check if it is the first time to insert the key in the high freq area
cbf_key ckey(key, key_len, bucket, mfv, heap_);
UInt64* freq = searchOverflowTable(ckey);
if ( freq ) {
(*freq)++; // yes, increment the frequency
} else {
// No, insert the key
// Remove the freqfreq influence from the key
(*freq2sL_[bucket]).sub(f-1,1, newFreq);
// The key stays in the low freq area.
// Adjust frequency of frequency.
(*freq2sL_[bucket]).sub(f-1,1, newFreq);
// For first time inserted key, change the UEC
if ( f == 1 ) {
// Adjust total RC.
DISPLAY_MSG("insert complete");
return NORMAL;
// Return status:
// TRUE: the item is in CBF before the deletion
// FALSE : the item is NOT in CBF before the deletion
CountingBloomFilter::remove(char * key, UInt32 key_len, UInt32 bucket, cbf_key::MFV_ENUM mfv)
DISPLAY_KEY4("remove start", key, key_len, bucket);
if (bucket >= numBuckets_)
return FALSE;
UInt32 flags = ExHDPHash::NO_FLAGS;
UInt32 hashValueCommon = ExHDPHash::hash(key, keyLenInfo_, key_len);
ULng32 currentFreq =0;
UInt32 f = 0xFFFFFFFF;
NABoolean slotFound = FALSE;
UInt32 cached_hash[6];
for(Lng32 i = 0; i < k_; i++)
UInt32 hash_index = cached_hash[i] = computeFinalHash(hashValueCommon, i);
UInt32 b= bucketNums_[hash_index];
// We only check hash slot that belongs to the bucket
if ( b > 0 && b-1 == bucket ) {
// If the content is zero, the key is not in CBF
if ( freqsL_[hash_index] == 0 )
return FALSE;
// Decrement the counter, if the counter already overflows, sub is a no-op.
// The minuend is saved in currentFreq.
// If the last occurance of the key (before decrement) is 1, reset the bucket# to 0;
if ( currentFreq == 1 )
bucketNums_.put(hash_index, 0);
// Compute the min freq among slots occupied by the keys from the same bucket.
if ( i==0 || f > currentFreq ) {
slotFound = TRUE;
if ( !slotFound )
return FALSE;
ULng32 dummy;
NABoolean lowFreq = TRUE;
if ( f >= freqsL_.getMaxVal() ) {
cbf_key ckey(key, key_len, bucket, mfv, heap_);
UInt64* freq = (mfv != cbf_key::NONE) ? searchOverflowTable(ckey) : NULL;
// Really high frequency key
if ( freq ) {
lowFreq = FALSE;
if ( *freq == freqsL_.getMaxVal() ) {
// The overflow condition is no longer true. remove it from overflow area
// Transfer f2 to low freq array
(*freq2sL_[bucket]).add(f-1,1, dummy);
// Set the count entry to max-1, as the content of the entry is
// sticky once reach the max value.
for(Lng32 i = 0; i < k_; i++)
freqsL_.put(cached_hash[i], freqsL_.getMaxVal()-1);
} else
(*freq) --;
if ( lowFreq ) {
// A low freq key
// Adjust frequency of frequency
(*freq2sL_[bucket]).add(f-1,1, dummy);
(*freq2sL_[bucket]).sub(f,1, dummy);
// Decrement #uec
if ( f == 1 )
// Decrement RC
DISPLAY_MSG("remove complete");
return TRUE;
NABoolean CountingBloomFilter::contain(char *key, UInt32 key_len, UInt64* freq, UInt32* b)
DISPLAY_KEY("contain() start", key, key_len);
UInt32 flags = ExHDPHash::NO_FLAGS;
UInt32 hashValueCommon = ExHDPHash::hash(key, keyLenInfo_, key_len);
UInt32 f = 0;
UInt32 hash_index = 0;
for(Lng32 i = 0; i < k_; i++)
hash_index = computeFinalHash(hashValueCommon, i);
DISPLAY_HASH_ENTRY("", hash_index);
if ( freqsL_[hash_index] == 0 )
DISPLAY_MSG("contain(): not found");
return FALSE;
if ( freq ) {
UInt32 currentFreq = freqsL_[hash_index];
if (i==0 || f > currentFreq) {
if ( freq ) {
if ( f == freqsL_.getMaxVal() ) {
cbf_key ckey(key, key_len, 0, cbf_key::NONE, heap_);
UInt64* fptr = searchOverflowTable(ckey);
if ( fptr ) {
(*freq) = *fptr;
} else {
assert(fptr != 0);
} else {
*freq = f;
if ( b )
*b = bucketNums_[hash_index];
DISPLAY_MSG("contain(): found");
return TRUE;
// Methods for General CBF.
GeneralCountingBloomFilter::GeneralCountingBloomFilter(NAHeap* heap,
UInt32 maxHashFuncs,
UInt32 n, // # of distinct elements
float p, // probability of false positives
UInt32 nonOverflowFreq, // non-overflow freq of n elements
UInt32 avgSkewedElements,
UInt32 numFreqFreqBuckets
CountingBloomFilter(heap, maxHashFuncs, n, p,nonOverflowFreq, avgSkewedElements, numFreqFreqBuckets),
overflowCountTable_(cbfHashFunc, avgSkewedElements,
TRUE/*uniqueness enforced*/, heap)
void GeneralCountingBloomFilter::insertIntoOverflowTable(const cbf_key& key)
overflowCountTable_.insert(new (heap_)cbf_key(key), new UInt64(freqsL_.getMaxVal()));
void GeneralCountingBloomFilter::removeFromOverflowTable(const cbf_key& key)
void GeneralCountingBloomFilter::computeOverflowF2s()
NADELETEBASIC(freqsH_, heap_);
NADELETEBASIC(freq2sH_, heap_);
freqsH_ = NULL;
freq2sH_ = NULL;
actualOverflowF2s_ = 0;
UInt32 overflowed = getOverflowEntries();
if ( overflowed == 0 )
// cout << "overflowed=" << overflowed << endl;
freqsH_ = (freqStruct*) (new (heap_) char[sizeof(freqStruct) * overflowed]);
//freqsH_ = new (heap_) freqStruct[overflowed]; // does not work with NADELETEARRAY
freq2sH_ = new (heap_) UInt64[overflowed];
NAHashDictionaryIterator<cbf_key, UInt64> cItor(overflowCountTable_);
cbf_key* key;
UInt64* freq;
CollIndex i;
for ( i = 0 ; i < cItor.entries() ; i++) {
cItor.getNext(key, freq);
// do a quick sort
qsort(freqsH_, cItor.entries(), sizeof(freqStruct), compareTwoFreqStructs);
// compute freqOfFreq by walking through the sorted list in one pass
UInt64 f = 0;
i = 0;
CollIndex j = 0;
while (i<overflowed)
while ( j<overflowed && freqsH_[i] == freqsH_[j] ) {
f++; j++;
freqsH_[actualOverflowF2s_] = freqsH_[i];
freq2sH_[actualOverflowF2s_++] = f;
// Methods for CBFWithKnownSkews.
UInt64 CountingBloomFilterWithKnownSkews::estimateMemoryInBytes(
UInt32 maxHashFuncs,
UInt32 n, // # of distinct elements
float p, // probability of false positives
// non-overflow freq of n elements
UInt32 nonOverflowFreq,
UInt32 numSkewedElements,
UInt32 numFreqFreqBuckets
UInt64 sz = CountingBloomFilter::estimateMemoryInBytes(
n, // # of distinct elements
p, // probability of false positives
// non-overflow freq of n elements
sz += numSkewedElements * sizeof(UInt64);
sz += sizeof(CountingBloomFilterWithKnownSkews);
return sz;
CountingBloomFilterWithKnownSkews::CountingBloomFilterWithKnownSkews(NAHeap* heap,
UInt32 maxHashFuncs,
UInt32 n, // # of distinct elements
float p, // probability of false positives
UInt32 nonOverflowFreq, // non-overflow freq of n elements
UInt32 numSkewedElements,
UInt32 numFreqFreqBuckets
CountingBloomFilter(heap, maxHashFuncs, n, p,nonOverflowFreq, numSkewedElements, numFreqFreqBuckets)
// We use freq2sH_ to record frequency in overflow area for this class and
// make the following assumptions
// 1). MVF and 2MFV per interval are distinct
// 2). Assume no new MVF and new 2MFV to come
// 3). Frequency of frequency on MFV and 2MFV per interval is 1.
freq2sH_ = new (heap_) UInt64[numSkewedElements];
for ( CollIndex i = 0 ; i < numSkewedElements; i++) {
freq2sH_[i] = 0;
actualOverflowF2s_ = numSkewedElements;
Lng32 sz =
n, // # of distinct elements
p, // probability of false positives
// non-overflow freq of n elements
CountingBloomFilterWithKnownSkews::CountingBloomFilterWithKnownSkews(NAHeap* heap) :
ULng32 CountingBloomFilterWithKnownSkews::packIntoBuffer(char*& buffer, NABoolean swapBytes)
ULng32 sz = CountingBloomFilter::packIntoBuffer(buffer, swapBytes);
for ( CollIndex i = 0 ; i < actualOverflowF2s_; i++) {
sz += pack(buffer, freq2sH_[i], swapBytes);
return sz;
ULng32 CountingBloomFilterWithKnownSkews::unpackBuffer(char*& buffer)
ULng32 sz = CountingBloomFilter::unpackBuffer(buffer);
// actualOverflowF2s_ should be unpacked and available after
// calling the above method.
freq2sH_ = new (heap_) UInt64[actualOverflowF2s_];
for ( CollIndex i = 0 ; i < actualOverflowF2s_; i++) {
sz += unpack(buffer, freq2sH_[i]);
return sz;
UInt64* CountingBloomFilterWithKnownSkews::searchOverflowTable(const cbf_key& key)
UInt32 idx = indexInOverflowArray(key);
ComASSERT(idx <= actualOverflowF2s_);
return ( freq2sH_[idx] == 0 ) ? NULL : &freq2sH_[idx];
void CountingBloomFilterWithKnownSkews::insertIntoOverflowTable(const cbf_key& key)
UInt32 idx = indexInOverflowArray(key);
ComASSERT(idx <= actualOverflowF2s_);
freq2sH_[idx] = freqsL_.getMaxVal();
void CountingBloomFilterWithKnownSkews::removeFromOverflowTable(const cbf_key& key)
UInt32 idx = indexInOverflowArray(key);
ComASSERT(idx <= actualOverflowF2s_);
freq2sH_[idx] = 0;
UInt64 CountingBloomFilterWithKnownSkews::highF2(UInt32 k, UInt64& i, UInt32& bucket)
ComASSERT(k <= actualOverflowF2s_);
i = freq2sH_[k]; // freq2sH_ here stores the first order frequency
bucket = k/2;
return 1;
UInt64 CountingBloomFilter::totalFreqForAll()
UInt64 ct = 0;
for ( CollIndex b = 0; b<numBuckets(); b++ ) {
ct += totalFreq(b); // tf is total frequency in bucket b
UInt64 y;
UInt32 bkt;
for (CollIndex i=0; i<getOverflowEntries(); i++) {
UInt64 x=highF2(i, y, bkt); // x, y and bkt are freqFreq, freq and
// bucket# of the ith entry
// in overflow area, respectively.
ct += y;
return ct;
//-------------------------- code for faststats ---------------------------------
void FastStatsCountingBloomFilter::computeSumOfFrequencySquaredHighFreq(double * sumSq)
assert(0); // not implemented yet.
FastStatsCountingBloomFilter::FastStatsCountingBloomFilter(NAHeap* heap,
UInt32 maxHashFuncs,
UInt32 n, // # of distinct elements
float p, // probability of false positives
UInt32 maxNonOverflowFreq
BloomFilter(heap, maxHashFuncs, n, p),
overflowCountTable_(scbfHashFunc, 1000 /* initial # of elements */,
TRUE/*uniqueness enforced*/, heap),
counters_(m_, (UInt32)bitsNeeded(maxNonOverflowFreq), heap_),
keys_(heap, n/* initial # of elements*/)
UInt64 FastStatsCountingBloomFilter::getSizeInBytes(
UInt32 maxHashFuncs,
UInt32 n, // # of distinct elements
float p, // probability of false positives
// non-overflow freq of n elements
UInt32 nonOverflowFreq,
NABoolean isChar, Int32 actualFixAmount)
UInt32 m;
UInt16 k;
computeParams(maxHashFuncs, n, p, m, k);
UInt64 totalMem = sizeof(BloomFilter);
// for counters_
totalMem += VarUIntArray::estimateMemoryInBytes(
m, (UInt32)bitsNeeded(nonOverflowFreq));
// for keys_, covering all allocated memory
totalMem += keys_.getByteSize();
// include the keys
if ( isChar ) {
for (CollIndex i=0; i<keys_.entries(); i++ ) {
const simple_cbf_key& key = keys_[i];
totalMem += key.getKeyLen();
} else {
// if it is a non-character key, subtract the amount if the actual
// data is stored instead of the pointer.
// -sizoef(key*) + actualFixAmount
totalMem -= (keys_.entries() * sizeof(void*));
totalMem += (keys_.entries() * actualFixAmount);
// for hash directory
totalMem += overflowCountTable_.getByteSize();
return totalMem;
FastStatsCountingBloomFilter::insert(char * key, UInt32 key_len, UInt32 freq)
UInt32 hashValueCommon = ExHDPHash::hash(key, keyLenInfo_, key_len);
UInt32 f = 0xFFFFFFFF;
ULng32 newFreq=0;
NABoolean slotFound = FALSE;
for(Lng32 i = 0; i < k_; i++)
UInt32 hash_index = computeFinalHash(hashValueCommon, i);
counters_.add(hash_index, freq, newFreq);
if ( i == 0 || f > newFreq ) {
f = newFreq;
slotFound = TRUE;
simple_cbf_key ckey(key, key_len, heap_);
if ( f == 1 ) { // a new key, insert into keys_
if ( f >= counters_.getMaxVal() ) {
// The key overflows
// Check if it is the first time to insert the key in the high freq area
UInt64* currentFreq = searchOverflowTable(ckey);
if ( currentFreq ) {
(*currentFreq) += freq; // yes, increment the frequency by freq
} else {
// No, insert the key with a frquency of freq-1
insertIntoOverflowTable(ckey, freq-1);
// The key stays in the low freq area. Do nothing as we have already
// increment the frequency for each bit touched by the key.
DISPLAY_MSG("insert complete");
return TRUE;
NABoolean FastStatsCountingBloomFilter::remove(char * key, UInt32 key_len)
DISPLAY_KEY4("remove start", key, key_len, bucket);
UInt32 flags = ExHDPHash::NO_FLAGS;
UInt32 hashValueCommon = ExHDPHash::hash(key, keyLenInfo_, key_len);
ULng32 currentFreq =0;
UInt32 f = 0xFFFFFFFF;
NABoolean slotFound = FALSE;
UInt32 cached_hash[6];
for(Lng32 i = 0; i < k_; i++)
UInt32 hash_index = cached_hash[i] = computeFinalHash(hashValueCommon, i);
// If the content is zero, the key is not in CBF
if ( counters_[hash_index] == 0 )
return FALSE;
// Decrement the counter, if the counter already overflows, sub is a no-op.
// The minuend is saved in currentFreq.
// Compute the min freq among slots occupied by the keys from the same bucket.
if ( i==0 || f > currentFreq ) {
slotFound = TRUE;
ULng32 dummy;
NABoolean lowFreq = TRUE;
if ( f >= counters_.getMaxVal() ) {
simple_cbf_key ckey(key, key_len, heap_);
UInt64* freq = searchOverflowTable(ckey);
if ( freq ) { // test if a high frequency key
lowFreq = FALSE;
if ( *freq == counters_.getMaxVal() ) {
// The overflow condition is no longer true. Remove it from overflow area
// Set the count entry to max-1, as the content of the entry is
// sticky once reach the max value.
for(Lng32 i = 0; i < k_; i++)
counters_.put(cached_hash[i], counters_.getMaxVal()-1);
} else
(*freq) --;
if ( lowFreq ) {
// A low freq key. Do nothing here as we have already decrement the
// counters.
return TRUE;
FastStatsCountingBloomFilter::contain(char * key, UInt32 key_len, UInt64* freq)
DISPLAY_KEY("contain() start", key, key_len);
UInt32 hashValueCommon = ExHDPHash::hash(key, keyLenInfo_, key_len);
UInt32 f = 0;
UInt32 hash_index = 0;
for(Lng32 i = 0; i < k_; i++)
hash_index = computeFinalHash(hashValueCommon, i);
DISPLAY_HASH_ENTRY("", hash_index);
if ( counters_[hash_index] == 0 )
DISPLAY_MSG("contain(): not found");
return FALSE;
if ( freq ) {
UInt32 currentFreq = counters_[hash_index];
if (i==0 || f > currentFreq) {
if ( freq ) {
if ( f == counters_.getMaxVal() ) {
simple_cbf_key ckey(key, key_len, heap_);
UInt64* fptr = searchOverflowTable(ckey);
if ( fptr ) {
(*freq) = *fptr;
} else {
// the key is not in hash table, whicih means that
// all bits associated with the key is set by collisions.
// Here we have to guess the frequency to be the
// max value -1.
(*freq) = counters_.getMaxVal() -1;
} else {
*freq = f;
DISPLAY_MSG("contain(): found");
return TRUE;
const simple_cbf_key& key, UInt32 freq)
overflowCountTable_.insert(new (heap_)simple_cbf_key(key),
new UInt64(counters_.getMaxVal() + freq - 1)
void FastStatsCountingBloomFilter::removeFromOverflowTable(const simple_cbf_key& key)
void FastStatsCountingBloomFilter::printfreq(const char* colNames)
printf("\nCBF keys/frequencies for %s\n", colNames);
for (CollIndex i=0; i < 100;/*keys_.entries();*/ i++) {
const simple_cbf_key& ckey =;
UInt64 freq;
NABoolean ok = contain(ckey.getKey(), ckey.getKeyLen(), &freq);
if ( ok ) {
printf("key=%d, freq=" PF64 "\n", *((Int32*)ckey.getKey()), freq);
void FastStatsCountingBloomFilter::computeOverflowF2s()
NADELETEBASIC(freqsH_, heap_);
NADELETEBASIC(freq2sH_, heap_);
freqsH_ = NULL;
freq2sH_ = NULL;
actualOverflowF2s_ = 0;
UInt32 overflowed = getOverflowEntries();
if ( overflowed == 0 )
// cout << "overflowed=" << overflowed << endl;
freqsH_ = (freqStruct*) (new (heap_) char[sizeof(freqStruct) * overflowed]);
//freqsH_ = new (heap_) freqStruct[overflowed]; // does not work with NADELETEARRAY
freq2sH_ = new (heap_) UInt64[overflowed];
NAHashDictionaryIterator<cbf_key, UInt64> cItor(overflowCountTable_);
cbf_key* key;
UInt64* freq;
CollIndex i;
for ( i = 0 ; i < cItor.entries() ; i++) {
cItor.getNext(key, freq);
// do a quick sort
qsort(freqsH_, cItor.entries(), sizeof(freqStruct), compareTwoFreqStructs);
// compute freqOfFreq by walking through the sorted list in one pass
UInt64 f = 0;
i = 0;
CollIndex j = 0;
while (i<overflowed)
while ( j<overflowed && freqsH_[i] == freqsH_[j] ) {
f++; j++;
freqsH_[actualOverflowF2s_] = freqsH_[i];
freq2sH_[actualOverflowF2s_++] = f;