| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include <assert.h> |
| #include <limits.h> |
| #include <math.h> |
| #include <algorithm> |
| #include <sstream> |
| #include <iostream> |
| #include "udf/udf.h" |
| |
| #include "common/names.h" |
| |
| using std::max; |
| using namespace impala_udf; |
| |
| // This sample UDA implements the hyperloglog distinct estimate aggregate |
| // function. |
| // See these papers for more details. |
| // 1) Hyperloglog: The analysis of a near-optimal cardinality estimation algorithm (2007) |
| // 2) HyperLogLog in Practice |
| |
| // Precision taken from the paper. Doesn't seem to matter very much when between [6,12] |
| const int HLL_PRECISION = 10; |
| |
| IMPALA_UDF_EXPORT |
| void HllInit(FunctionContext* ctx, StringVal* dst) { |
| int str_len = pow(2, HLL_PRECISION); |
| uint8_t* ptr = ctx->Allocate(str_len); |
| if (ptr != NULL) { |
| dst->ptr = ptr; |
| dst->len = str_len; |
| dst->is_null = false; |
| memset(ptr, 0, str_len); |
| } else { |
| *dst = StringVal::null(); |
| } |
| } |
| |
| static const uint64_t FNV64_PRIME = 1099511628211UL; |
| static const uint64_t FNV64_SEED = 14695981039346656037UL; |
| |
| static uint64_t FnvHash(const void* data, int32_t bytes, uint64_t hash) { |
| const uint8_t* ptr = reinterpret_cast<const uint8_t*>(data); |
| while (bytes--) { |
| hash = (*ptr ^ hash) * FNV64_PRIME; |
| ++ptr; |
| } |
| return hash; |
| } |
| |
| static uint64_t Hash(const IntVal& v) { |
| return FnvHash(&v.val, sizeof(int32_t), FNV64_SEED); |
| } |
| |
| IMPALA_UDF_EXPORT |
| void HllUpdate(FunctionContext* ctx, const IntVal& src, StringVal* dst) { |
| if (src.is_null) return; |
| assert(dst != NULL); |
| assert(!dst->is_null); |
| assert(dst->len == pow(2, HLL_PRECISION)); |
| uint64_t hash_value = Hash(src); |
| // Use the lower bits to index into the number of streams and then find the first 1 bit |
| // after the index bits. |
| int idx = hash_value % dst->len; |
| const uint64_t hash_top_bits = hash_value >> HLL_PRECISION; |
| uint8_t first_one_bit = |
| 1 + ((hash_top_bits != 0) ? __builtin_ctzll(hash_top_bits) : |
| (sizeof(hash_value) * CHAR_BIT - HLL_PRECISION)); |
| dst->ptr[idx] = ::max(dst->ptr[idx], first_one_bit); |
| } |
| |
| IMPALA_UDF_EXPORT |
| void HllMerge(FunctionContext* ctx, const StringVal& src, StringVal* dst) { |
| assert(dst != NULL); |
| assert(!dst->is_null); |
| assert(!src.is_null); |
| assert(dst->len == pow(2, HLL_PRECISION)); |
| assert(src.len == pow(2, HLL_PRECISION)); |
| for (int i = 0; i < src.len; ++i) { |
| dst->ptr[i] = ::max(dst->ptr[i], src.ptr[i]); |
| } |
| } |
| |
| IMPALA_UDF_EXPORT |
| StringVal HllSerialize(FunctionContext* ctx, const StringVal& src) { |
| if (src.is_null) return src; |
| StringVal result(ctx, src.len); |
| memcpy(result.ptr, src.ptr, src.len); |
| ctx->Free(src.ptr); |
| return result; |
| } |
| |
| IMPALA_UDF_EXPORT |
| StringVal HllFinalize(FunctionContext* ctx, const StringVal& src) { |
| assert(!src.is_null); |
| assert(src.len == pow(2, HLL_PRECISION)); |
| |
| const int num_streams = pow(2, HLL_PRECISION); |
| // Empirical constants for the algorithm. |
| float alpha = 0; |
| if (num_streams == 16) { |
| alpha = 0.673f; |
| } else if (num_streams == 32) { |
| alpha = 0.697f; |
| } else if (num_streams == 64) { |
| alpha = 0.709f; |
| } else { |
| alpha = 0.7213f / (1 + 1.079f / num_streams); |
| } |
| |
| float harmonic_mean = 0; |
| int num_zero_registers = 0; |
| for (int i = 0; i < src.len; ++i) { |
| harmonic_mean += powf(2.0f, -src.ptr[i]); |
| if (src.ptr[i] == 0) ++num_zero_registers; |
| } |
| harmonic_mean = 1.0f / harmonic_mean; |
| int64_t estimate = alpha * num_streams * num_streams * harmonic_mean; |
| |
| if (num_zero_registers != 0) { |
| // Estimated cardinality is too low. Hll is too inaccurate here, instead use |
| // linear counting. |
| estimate = num_streams * log(static_cast<float>(num_streams) / num_zero_registers); |
| } |
| |
| // Output the estimate as ascii string |
| stringstream out; |
| out << estimate; |
| string out_str = out.str(); |
| StringVal result_str(ctx, out_str.size()); |
| memcpy(result_str.ptr, out_str.c_str(), result_str.len); |
| ctx->Free(src.ptr); |
| return result_str; |
| } |