blob: 5f00d889f5d7aa95d338c1129813fff917f4adb7 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <assert.h>
#include <limits.h>
#include <math.h>
#include <algorithm>
#include <sstream>
#include <iostream>
#include "udf/udf.h"
#include "common/names.h"
using std::max;
using namespace impala_udf;
// This sample UDA implements the hyperloglog distinct estimate aggregate
// function.
// See these papers for more details.
// 1) Hyperloglog: The analysis of a near-optimal cardinality estimation algorithm (2007)
// 2) HyperLogLog in Practice
// Precision taken from the paper. Doesn't seem to matter very much when between [6,12]
const int HLL_PRECISION = 10;
IMPALA_UDF_EXPORT
void HllInit(FunctionContext* ctx, StringVal* dst) {
int str_len = pow(2, HLL_PRECISION);
uint8_t* ptr = ctx->Allocate(str_len);
if (ptr != NULL) {
dst->ptr = ptr;
dst->len = str_len;
dst->is_null = false;
memset(ptr, 0, str_len);
} else {
*dst = StringVal::null();
}
}
static const uint64_t FNV64_PRIME = 1099511628211UL;
static const uint64_t FNV64_SEED = 14695981039346656037UL;
static uint64_t FnvHash(const void* data, int32_t bytes, uint64_t hash) {
const uint8_t* ptr = reinterpret_cast<const uint8_t*>(data);
while (bytes--) {
hash = (*ptr ^ hash) * FNV64_PRIME;
++ptr;
}
return hash;
}
static uint64_t Hash(const IntVal& v) {
return FnvHash(&v.val, sizeof(int32_t), FNV64_SEED);
}
IMPALA_UDF_EXPORT
void HllUpdate(FunctionContext* ctx, const IntVal& src, StringVal* dst) {
if (src.is_null) return;
assert(dst != NULL);
assert(!dst->is_null);
assert(dst->len == pow(2, HLL_PRECISION));
uint64_t hash_value = Hash(src);
// Use the lower bits to index into the number of streams and then find the first 1 bit
// after the index bits.
int idx = hash_value % dst->len;
const uint64_t hash_top_bits = hash_value >> HLL_PRECISION;
uint8_t first_one_bit =
1 + ((hash_top_bits != 0) ? __builtin_ctzll(hash_top_bits) :
(sizeof(hash_value) * CHAR_BIT - HLL_PRECISION));
dst->ptr[idx] = ::max(dst->ptr[idx], first_one_bit);
}
IMPALA_UDF_EXPORT
void HllMerge(FunctionContext* ctx, const StringVal& src, StringVal* dst) {
assert(dst != NULL);
assert(!dst->is_null);
assert(!src.is_null);
assert(dst->len == pow(2, HLL_PRECISION));
assert(src.len == pow(2, HLL_PRECISION));
for (int i = 0; i < src.len; ++i) {
dst->ptr[i] = ::max(dst->ptr[i], src.ptr[i]);
}
}
IMPALA_UDF_EXPORT
StringVal HllSerialize(FunctionContext* ctx, const StringVal& src) {
if (src.is_null) return src;
StringVal result(ctx, src.len);
memcpy(result.ptr, src.ptr, src.len);
ctx->Free(src.ptr);
return result;
}
IMPALA_UDF_EXPORT
StringVal HllFinalize(FunctionContext* ctx, const StringVal& src) {
assert(!src.is_null);
assert(src.len == pow(2, HLL_PRECISION));
const int num_streams = pow(2, HLL_PRECISION);
// Empirical constants for the algorithm.
float alpha = 0;
if (num_streams == 16) {
alpha = 0.673f;
} else if (num_streams == 32) {
alpha = 0.697f;
} else if (num_streams == 64) {
alpha = 0.709f;
} else {
alpha = 0.7213f / (1 + 1.079f / num_streams);
}
float harmonic_mean = 0;
int num_zero_registers = 0;
for (int i = 0; i < src.len; ++i) {
harmonic_mean += powf(2.0f, -src.ptr[i]);
if (src.ptr[i] == 0) ++num_zero_registers;
}
harmonic_mean = 1.0f / harmonic_mean;
int64_t estimate = alpha * num_streams * num_streams * harmonic_mean;
if (num_zero_registers != 0) {
// Estimated cardinality is too low. Hll is too inaccurate here, instead use
// linear counting.
estimate = num_streams * log(static_cast<float>(num_streams) / num_zero_registers);
}
// Output the estimate as ascii string
stringstream out;
out << estimate;
string out_str = out.str();
StringVal result_str(ctx, out_str.size());
memcpy(result_str.ptr, out_str.c_str(), result_str.len);
ctx->Free(src.ptr);
return result_str;
}