blob: c7ff7a614e259b52d8e1e6357867b0622ff90645 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef KLL_SKETCH_HPP_
#define KLL_SKETCH_HPP_
#include <memory>
#include <limits>
#include <iostream>
#include <iomanip>
#include <functional>
#include <cstring>
#include "kll_quantile_calculator.hpp"
#include "kll_helper.hpp"
#include "serde.hpp"
namespace datasketches {
/*
* Implementation of a very compact quantiles sketch with lazy compaction scheme
* and nearly optimal accuracy per retained item.
* See <a href="https://arxiv.org/abs/1603.05346v2">Optimal Quantile Approximation in Streams</a>.
*
* <p>This is a stochastic streaming sketch that enables near-real time analysis of the
* approximate distribution of values from a very large stream in a single pass, requiring only
* that the values are comparable.
* The analysis is obtained using <i>get_quantile()</i> or <i>get_quantiles()</i> functions or the
* inverse functions get_rank(), get_PMF() (Probability Mass Function), and get_CDF()
* (Cumulative Distribution Function).
*
* <p>Given an input stream of <i>N</i> numeric values, the <i>absolute rank</i> of any specific
* value is defined as its index <i>(0 to N-1)</i> in the hypothetical sorted stream of all
* <i>N</i> input values.
*
* <p>The <i>normalized rank</i> (<i>rank</i>) of any specific value is defined as its
* <i>absolute rank</i> divided by <i>N</i>.
* Thus, the <i>normalized rank</i> is a value between zero and one.
* In the documentation for this sketch <i>absolute rank</i> is never used so any
* reference to just <i>rank</i> should be interpreted to mean <i>normalized rank</i>.
*
* <p>This sketch is configured with a parameter <i>k</i>, which affects the size of the sketch
* and its estimation error.
*
* <p>The estimation error is commonly called <i>epsilon</i> (or <i>eps</i>) and is a fraction
* between zero and one. Larger values of <i>k</i> result in smaller values of epsilon.
* Epsilon is always with respect to the rank and cannot be applied to the
* corresponding values.
*
* <p>The relationship between the normalized rank and the corresponding values can be viewed
* as a two dimensional monotonic plot with the normalized rank on one axis and the
* corresponding values on the other axis. If the y-axis is specified as the value-axis and
* the x-axis as the normalized rank, then <i>y = get_quantile(x)</i> is a monotonically
* increasing function.
*
* <p>The functions <i>get_quantile(rank)</i> and get_quantiles(...) translate ranks into
* corresponding values. The functions <i>get_rank(value),
* get_CDF(...) (Cumulative Distribution Function), and get_PMF(...)
* (Probability Mass Function)</i> perform the opposite operation and translate values into ranks.
*
* <p>The <i>getPMF(...)</i> function has about 13 to 47% worse rank error (depending
* on <i>k</i>) than the other queries because the mass of each "bin" of the PMF has
* "double-sided" error from the upper and lower edges of the bin as a result of a subtraction,
* as the errors from the two edges can sometimes add.
*
* <p>The default <i>k</i> of 200 yields a "single-sided" epsilon of about 1.33% and a
* "double-sided" (PMF) epsilon of about 1.65%.
*
* <p>A <i>get_quantile(rank)</i> query has the following guarantees:
* <ul>
* <li>Let <i>v = get_quantile(r)</i> where <i>r</i> is the rank between zero and one.</li>
* <li>The value <i>v</i> will be a value from the input stream.</li>
* <li>Let <i>trueRank</i> be the true rank of <i>v</i> derived from the hypothetical sorted
* stream of all <i>N</i> values.</li>
* <li>Let <i>eps = get_normalized_rank_error(false)</i>.</li>
* <li>Then <i>r - eps &le; trueRank &le; r + eps</i> with a confidence of 99%. Note that the
* error is on the rank, not the value.</li>
* </ul>
*
* <p>A <i>get_rank(value)</i> query has the following guarantees:
* <ul>
* <li>Let <i>r = get_rank(v)</i> where <i>v</i> is a value between the min and max values of
* the input stream.</li>
* <li>Let <i>true_rank</i> be the true rank of <i>v</i> derived from the hypothetical sorted
* stream of all <i>N</i> values.</li>
* <li>Let <i>eps = get_normalized_rank_error(false)</i>.</li>
* <li>Then <i>r - eps &le; trueRank &le; r + eps</i> with a confidence of 99%.</li>
* </ul>
*
* <p>A <i>get_PMF()</i> query has the following guarantees:
* <ul>
* <li>Let <i>{r1, r2, ..., r(m+1)} = get_PMF(v1, v2, ..., vm)</i> where <i>v1, v2</i> are values
* between the min and max values of the input stream.
* <li>Let <i>mass<sub>i</sub> = estimated mass between v<sub>i</sub> and v<sub>i+1</sub></i>.</li>
* <li>Let <i>trueMass</i> be the true mass between the values of <i>v<sub>i</sub>,
* v<sub>i+1</sub></i> derived from the hypothetical sorted stream of all <i>N</i> values.</li>
* <li>Let <i>eps = get_normalized_rank_error(true)</i>.</li>
* <li>then <i>mass - eps &le; trueMass &le; mass + eps</i> with a confidence of 99%.</li>
* <li>r(m+1) includes the mass of all points larger than vm.</li>
* </ul>
*
* <p>A <i>get_CDF(...)</i> query has the following guarantees;
* <ul>
* <li>Let <i>{r1, r2, ..., r(m+1)} = get_CDF(v1, v2, ..., vm)</i> where <i>v1, v2</i> are values
* between the min and max values of the input stream.
* <li>Let <i>mass<sub>i</sub> = r<sub>i+1</sub> - r<sub>i</sub></i>.</li>
* <li>Let <i>trueMass</i> be the true mass between the true ranks of <i>v<sub>i</sub>,
* v<sub>i+1</sub></i> derived from the hypothetical sorted stream of all <i>N</i> values.</li>
* <li>Let <i>eps = get_normalized_rank_error(true)</i>.</li>
* <li>then <i>mass - eps &le; trueMass &le; mass + eps</i> with a confidence of 99%.</li>
* <li>1 - r(m+1) includes the mass of all points larger than vm.</li>
* </ul>
*
* <p>From the above, it might seem like we could make some estimates to bound the
* <em>value</em> returned from a call to <em>get_quantile()</em>. The sketch, however, does not
* let us derive error bounds or confidences around values. Because errors are independent, we
* can approximately bracket a value as shown below, but there are no error estimates available.
* Additionally, the interval may be quite large for certain distributions.
* <ul>
* <li>Let <i>v = get_quantile(r)</i>, the estimated quantile value of rank <i>r</i>.</li>
* <li>Let <i>eps = get_normalized_rank_error(false)</i>.</li>
* <li>Let <i>v<sub>lo</sub></i> = estimated quantile value of rank <i>(r - eps)</i>.</li>
* <li>Let <i>v<sub>hi</sub></i> = estimated quantile value of rank <i>(r + eps)</i>.</li>
* <li>Then <i>v<sub>lo</sub> &le; v &le; v<sub>hi</sub></i>, with 99% confidence.</li>
* </ul>
*
* author Kevin Lang
* author Alexander Saydakov
* author Lee Rhodes
*/
typedef std::unique_ptr<void, std::function<void(void*)>> void_ptr_with_deleter;
template <typename T, typename C = std::less<T>, typename S = serde<T>, typename A = std::allocator<T>>
class kll_sketch {
typedef typename std::allocator_traits<A>::template rebind_alloc<uint32_t> AllocU32;
public:
static const uint8_t DEFAULT_M = 8;
static const uint16_t DEFAULT_K = 200;
static const uint16_t MIN_K = DEFAULT_M;
static const uint16_t MAX_K = (1 << 16) - 1;
explicit kll_sketch(uint16_t k = DEFAULT_K);
kll_sketch(const kll_sketch& other);
~kll_sketch();
kll_sketch& operator=(kll_sketch other);
void update(const T& value);
void update(T&& value);
void merge(const kll_sketch& other);
bool is_empty() const;
uint64_t get_n() const;
uint32_t get_num_retained() const;
bool is_estimation_mode() const;
T get_min_value() const;
T get_max_value() const;
T get_quantile(double fraction) const;
std::unique_ptr<T[], std::function<void(T*)>> get_quantiles(const double* fractions, uint32_t size) const;
double get_rank(const T& value) const;
std::unique_ptr<double[], std::function<void(double*)>> get_PMF(const T* split_points, uint32_t size) const;
std::unique_ptr<double[], std::function<void(double*)>> get_CDF(const T* split_points, uint32_t size) const;
double get_normalized_rank_error(bool pmf) const;
// implementation for fixed-size arithmetic types (integral and floating point)
template<typename TT = T, typename std::enable_if<std::is_arithmetic<TT>::value, int>::type = 0>
uint32_t get_serialized_size_bytes() const {
if (is_empty()) { return EMPTY_SIZE_BYTES; }
if (num_levels_ == 1 and get_num_retained() == 1) {
return DATA_START_SINGLE_ITEM + sizeof(TT);
}
// the last integer in the levels_ array is not serialized because it can be derived
return DATA_START + num_levels_ * sizeof(uint32_t) + (get_num_retained() + 2) * sizeof(TT);
}
// implementation for all other types
template<typename TT = T, typename std::enable_if<!std::is_arithmetic<TT>::value, int>::type = 0>
uint32_t get_serialized_size_bytes() const {
if (is_empty()) { return EMPTY_SIZE_BYTES; }
if (num_levels_ == 1 and get_num_retained() == 1) {
return DATA_START_SINGLE_ITEM + S().size_of_item(items_[levels_[0]]);
}
// the last integer in the levels_ array is not serialized because it can be derived
uint32_t size = DATA_START + num_levels_ * sizeof(uint32_t);
size += S().size_of_item(*min_value_);
size += S().size_of_item(*max_value_);
for (auto& it: *this) size += S().size_of_item(it.first);
return size;
}
// this may need to be specialized to return correct size if sizeof(T) does not match the actual serialized size of an item
// this method is for the user's convenience to predict the sketch size before serialization
// and is not used in the serialization and deserialization code
// predicting the size before serialization may not make sense if the item type is not of a fixed size (like string)
static uint32_t get_max_serialized_size_bytes(uint16_t k, uint64_t n);
void serialize(std::ostream& os) const;
std::pair<void_ptr_with_deleter, const size_t> serialize(unsigned header_size_bytes = 0) const;
static kll_sketch<T, C, S, A> deserialize(std::istream& is);
static kll_sketch<T, C, S, A> deserialize(const void* bytes, size_t size);
/*
* Gets the normalized rank error given k and pmf.
* k - the configuration parameter
* pmf - if true, returns the "double-sided" normalized rank error for the get_PMF() function.
* Otherwise, it is the "single-sided" normalized rank error for all the other queries.
* Constants were derived as the best fit to 99 percentile empirically measured max error in thousands of trials
*/
static double get_normalized_rank_error(uint16_t k, bool pmf);
void to_stream(std::ostream& os, bool print_levels = false, bool print_items = false) const;
class const_iterator;
const_iterator begin() const;
const_iterator end() const;
#ifdef KLL_VALIDATION
uint8_t get_num_levels() { return num_levels_; }
uint32_t* get_levels() { return levels_; }
T* get_items() { return items_; }
#endif
private:
/* Serialized sketch layout:
* Adr:
* || 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 |
* 0 || unused | M |--------K--------| Flags | FamID | SerVer | PreambleInts |
* || 15 | 14 | 13 | 12 | 11 | 10 | 9 | 8 |
* 1 ||-----------------------------------N------------------------------------------|
* || 23 | 22 | 21 | 20 | 19 | 18 | 17 | 16 |
* 2 ||---------------data----------------|-unused-|numLevels|-------min K-----------|
*/
static const size_t EMPTY_SIZE_BYTES = 8;
static const size_t DATA_START_SINGLE_ITEM = 8;
static const size_t DATA_START = 20;
static const uint8_t SERIAL_VERSION_1 = 1;
static const uint8_t SERIAL_VERSION_2 = 2;
static const uint8_t FAMILY = 15;
enum flags { IS_EMPTY, IS_LEVEL_ZERO_SORTED, IS_SINGLE_ITEM };
static const uint8_t PREAMBLE_INTS_SHORT = 2; // for empty and single item
static const uint8_t PREAMBLE_INTS_FULL = 5;
uint16_t k_;
uint8_t m_; // minimum buffer "width"
uint16_t min_k_; // for error estimation after merging with different k
uint64_t n_;
uint8_t num_levels_;
uint32_t* levels_;
uint8_t levels_size_;
T* items_;
uint32_t items_size_;
T* min_value_;
T* max_value_;
bool is_level_zero_sorted_;
// for deserialization
// the common part of the preamble was read and compatibility checks were done
kll_sketch(uint16_t k, uint8_t flags_byte, std::istream& is);
// for deserialization
// the common part of the preamble was read and compatibility checks were done
kll_sketch(uint16_t k, uint8_t flags_byte, const void* bytes, size_t size);
// common update code
inline uint32_t internal_update(const T& value);
// The following code is only valid in the special case of exactly reaching capacity while updating.
// It cannot be used while merging, while reducing k, or anything else.
void compress_while_updating(void);
uint8_t find_level_to_compact() const;
void add_empty_top_level_to_completely_full_sketch();
void sort_level_zero();
std::unique_ptr<kll_quantile_calculator<T, C, A>, std::function<void(kll_quantile_calculator<T, C, A>*)>> get_quantile_calculator();
std::unique_ptr<double[], std::function<void(double*)>> get_PMF_or_CDF(const T* split_points, uint32_t size, bool is_CDF) const;
void increment_buckets_unsorted_level(uint32_t from_index, uint32_t to_index, uint64_t weight,
const T* split_points, uint32_t size, double* buckets) const;
void increment_buckets_sorted_level(uint32_t from_index, uint32_t to_index, uint64_t weight,
const T* split_points, uint32_t size, double* buckets) const;
void merge_higher_levels(const kll_sketch& other, uint64_t final_n);
void populate_work_arrays(const kll_sketch& other, T* workbuf, uint32_t* worklevels, uint8_t provisional_num_levels);
void assert_correct_total_weight() const;
uint32_t safe_level_size(uint8_t level) const;
uint32_t get_num_retained_above_level_zero() const;
static void check_m(uint8_t m);
static void check_preamble_ints(uint8_t preamble_ints, uint8_t flags_byte);
static void check_serial_version(uint8_t serial_version);
static void check_family_id(uint8_t family_id);
// implementation for floating point types
template<typename TT = T, typename std::enable_if<std::is_floating_point<TT>::value, int>::type = 0>
static TT get_invalid_value() {
return std::numeric_limits<TT>::quiet_NaN();
}
// implementation for all other types
template<typename TT = T, typename std::enable_if<!std::is_floating_point<TT>::value, int>::type = 0>
static TT get_invalid_value() {
throw std::runtime_error("getting quantiles from empty sketch is not supported for this type of values");
}
};
template<typename T, typename C, typename S, typename A>
class kll_sketch<T, C, S, A>::const_iterator: public std::iterator<std::input_iterator_tag, T> {
public:
friend class kll_sketch<T, C, S, A>;
const_iterator(const const_iterator& other);
const_iterator& operator++();
const_iterator& operator++(int);
bool operator==(const const_iterator& other) const;
bool operator!=(const const_iterator& other) const;
const std::pair<const T&, const uint64_t> operator*() const;
private:
const T* items;
const uint32_t* levels;
const uint8_t num_levels;
uint32_t index;
uint8_t level;
uint64_t weight;
const_iterator(const T* items, const uint32_t* levels, const uint8_t num_levels);
};
// kll_sketch implementation
template<typename T, typename C, typename S, typename A>
kll_sketch<T, C, S, A>::kll_sketch(uint16_t k) {
if (k < MIN_K or k > MAX_K) {
throw std::invalid_argument("K must be >= " + std::to_string(MIN_K) + " and <= " + std::to_string(MAX_K) + ": " + std::to_string(k));
}
k_ = k;
m_ = DEFAULT_M;
min_k_ = k;
n_ = 0;
num_levels_ = 1;
levels_size_ = 2;
levels_ = new (AllocU32().allocate(2)) uint32_t[2] {k_, k_};
items_size_ = k_;
items_ = A().allocate(items_size_);
min_value_ = A().allocate(1);
max_value_ = A().allocate(1);
is_level_zero_sorted_ = false;
}
template<typename T, typename C, typename S, typename A>
kll_sketch<T, C, S, A>::kll_sketch(const kll_sketch& other) {
k_ = other.k_;
m_ = other.m_;
min_k_ = other.min_k_;
n_ = other.n_;
num_levels_ = other.num_levels_;
levels_size_ = other.levels_size_;
levels_ = AllocU32().allocate(levels_size_);
std::copy(&other.levels_[0], &other.levels_[levels_size_], levels_);
items_size_ = other.items_size_;
items_ = A().allocate(items_size_);
std::copy(other.items_, &other.items_[items_size_], items_);
min_value_ = A().allocate(1);
max_value_ = A().allocate(1);
new (min_value_) T(*other.min_value_);
new (max_value_) T(*other.max_value_);
is_level_zero_sorted_ = other.is_level_zero_sorted_;
}
template<typename T, typename C, typename S, typename A>
kll_sketch<T, C, S, A>& kll_sketch<T, C, S, A>::operator=(kll_sketch other) {
std::swap(k_, other.k_);
std::swap(m_, other.m_);
std::swap(min_k_, other.min_k_);
std::swap(n_, other.n_);
std::swap(num_levels_, other.num_levels_);
std::swap(levels_size_, other.levels_size_);
std::swap(levels_, other.levels_);
std::swap(items_size_, other.items_size_);
std::swap(items_, other.items_);
std::swap(min_value_, other.min_value_);
std::swap(max_value_, other.max_value_);
std::swap(is_level_zero_sorted_, other.is_level_zero_sorted_);
return *this;
}
template<typename T, typename C, typename S, typename A>
kll_sketch<T, C, S, A>::~kll_sketch() {
const uint32_t begin = levels_[0];
const uint32_t end = begin + get_num_retained();
for (uint32_t i = begin; i < end; i++) items_[i].~T();
if (!is_empty()) {
min_value_->~T();
max_value_->~T();
}
AllocU32().deallocate(levels_, levels_size_);
A().deallocate(items_, items_size_);
A().deallocate(min_value_, 1);
A().deallocate(max_value_, 1);
}
template<typename T, typename C, typename S, typename A>
void kll_sketch<T, C, S, A>::update(const T& value) {
const uint32_t index = internal_update(value);
new (&items_[index]) T(value);
}
template<typename T, typename C, typename S, typename A>
void kll_sketch<T, C, S, A>::update(T&& value) {
const uint32_t index = internal_update(value);
new (&items_[index]) T(std::move(value));
}
template<typename T, typename C, typename S, typename A>
uint32_t kll_sketch<T, C, S, A>::internal_update(const T& value) {
if (is_empty()) {
new (min_value_) T(value);
new (max_value_) T(value);
} else {
if (C()(value, *min_value_)) *min_value_ = value;
if (C()(*max_value_, value)) *max_value_ = value;
}
if (levels_[0] == 0) compress_while_updating();
n_++;
is_level_zero_sorted_ = false;
return --levels_[0];
}
template<typename T, typename C, typename S, typename A>
void kll_sketch<T, C, S, A>::merge(const kll_sketch& other) {
if (other.is_empty()) return;
if (m_ != other.m_) {
throw std::invalid_argument("incompatible M: " + std::to_string(m_) + " and " + std::to_string(other.m_));
}
const uint64_t final_n = n_ + other.n_;
for (uint32_t i = other.levels_[0]; i < other.levels_[1]; i++) {
update(other.items_[i]);
}
if (is_empty()) {
new (min_value_) T(*other.min_value_);
new (max_value_) T(*other.max_value_);
} else {
if (C()(*other.min_value_, *min_value_)) *min_value_ = *other.min_value_;
if (C()(*max_value_, *other.max_value_)) *max_value_ = *other.max_value_;
}
if (other.num_levels_ >= 2) merge_higher_levels(other, final_n);
n_ = final_n;
if (other.is_estimation_mode()) min_k_ = std::min(min_k_, other.min_k_);
assert_correct_total_weight();
}
template<typename T, typename C, typename S, typename A>
bool kll_sketch<T, C, S, A>::is_empty() const {
return n_ == 0;
}
template<typename T, typename C, typename S, typename A>
uint64_t kll_sketch<T, C, S, A>::get_n() const {
return n_;
}
template<typename T, typename C, typename S, typename A>
uint32_t kll_sketch<T, C, S, A>::get_num_retained() const {
return levels_[num_levels_] - levels_[0];
}
template<typename T, typename C, typename S, typename A>
bool kll_sketch<T, C, S, A>::is_estimation_mode() const {
return num_levels_ > 1;
}
template<typename T, typename C, typename S, typename A>
T kll_sketch<T, C, S, A>::get_min_value() const {
if (is_empty()) return get_invalid_value();
return *min_value_;
}
template<typename T, typename C, typename S, typename A>
T kll_sketch<T, C, S, A>::get_max_value() const {
if (is_empty()) return get_invalid_value();
return *max_value_;
}
template<typename T, typename C, typename S, typename A>
T kll_sketch<T, C, S, A>::get_quantile(double fraction) const {
if (is_empty()) return get_invalid_value();
if (fraction == 0.0) return *min_value_;
if (fraction == 1.0) return *max_value_;
if ((fraction < 0.0) or (fraction > 1.0)) {
throw std::invalid_argument("Fraction cannot be less than zero or greater than 1.0");
}
// has side effect of sorting level zero if needed
auto quantile_calculator(const_cast<kll_sketch*>(this)->get_quantile_calculator());
return quantile_calculator->get_quantile(fraction);
}
template<typename T, typename C, typename S, typename A>
std::unique_ptr<T[], std::function<void(T*)>> kll_sketch<T, C, S, A>::get_quantiles(const double* fractions, uint32_t size) const {
if (is_empty()) return std::unique_ptr<T[], std::function<void(T*)>>();
std::unique_ptr<kll_quantile_calculator<T, C, A>, std::function<void(kll_quantile_calculator<T, C, A>*)>> quantile_calculator;
std::unique_ptr<T[], std::function<void(T*)>> quantiles(
A().allocate(size),
[size](T* ptr){
for (uint32_t i = 0; i < size; i++) ptr[i].~T();
A().deallocate(ptr, size);
}
);
for (uint32_t i = 0; i < size; i++) {
const double fraction = fractions[i];
if ((fraction < 0.0) or (fraction > 1.0)) {
throw std::invalid_argument("Fraction cannot be less than zero or greater than 1.0");
}
if (fraction == 0.0) new (&quantiles[i]) T(*min_value_);
else if (fraction == 1.0) new (&quantiles[i]) T(*max_value_);
else {
if (!quantile_calculator) {
// has side effect of sorting level zero if needed
quantile_calculator = const_cast<kll_sketch*>(this)->get_quantile_calculator();
}
new (&quantiles[i]) T(quantile_calculator->get_quantile(fraction));
}
}
return std::move(quantiles);
}
template<typename T, typename C, typename S, typename A>
double kll_sketch<T, C, S, A>::get_rank(const T& value) const {
if (is_empty()) return std::numeric_limits<double>::quiet_NaN();
uint8_t level(0);
uint64_t weight(1);
uint64_t total(0);
while (level < num_levels_) {
const auto from_index(levels_[level]);
const auto to_index(levels_[level + 1]); // exclusive
for (uint32_t i = from_index; i < to_index; i++) {
if (C()(items_[i], value)) {
total += weight;
} else if ((level > 0) or is_level_zero_sorted_) {
break; // levels above 0 are sorted, no point comparing further
}
}
level++;
weight *= 2;
}
return (double) total / n_;
}
template<typename T, typename C, typename S, typename A>
std::unique_ptr<double[], std::function<void(double*)>> kll_sketch<T, C, S, A>::get_PMF(const T* split_points, uint32_t size) const {
return get_PMF_or_CDF(split_points, size, false);
}
template<typename T, typename C, typename S, typename A>
std::unique_ptr<double[], std::function<void(double*)>> kll_sketch<T, C, S, A>::get_CDF(const T* split_points, uint32_t size) const {
return get_PMF_or_CDF(split_points, size, true);
}
template<typename T, typename C, typename S, typename A>
double kll_sketch<T, C, S, A>::get_normalized_rank_error(bool pmf) const {
return get_normalized_rank_error(min_k_, pmf);
}
template<typename T, typename C, typename S, typename A>
void kll_sketch<T, C, S, A>::serialize(std::ostream& os) const {
const bool is_single_item = n_ == 1;
const uint8_t preamble_ints(is_empty() or is_single_item ? PREAMBLE_INTS_SHORT : PREAMBLE_INTS_FULL);
os.write((char*)&preamble_ints, sizeof(preamble_ints));
const uint8_t serial_version(is_single_item ? SERIAL_VERSION_2 : SERIAL_VERSION_1);
os.write((char*)&serial_version, sizeof(serial_version));
const uint8_t family(FAMILY);
os.write((char*)&family, sizeof(family));
const uint8_t flags_byte(
(is_empty() ? 1 << flags::IS_EMPTY : 0)
| (is_level_zero_sorted_ ? 1 << flags::IS_LEVEL_ZERO_SORTED : 0)
| (is_single_item ? 1 << flags::IS_SINGLE_ITEM : 0)
);
os.write((char*)&flags_byte, sizeof(flags_byte));
os.write((char*)&k_, sizeof(k_));
os.write((char*)&m_, sizeof(m_));
const uint8_t unused(0);
os.write((char*)&unused, sizeof(unused));
if (is_empty()) return;
if (!is_single_item) {
os.write((char*)&n_, sizeof(n_));
os.write((char*)&min_k_, sizeof(min_k_));
os.write((char*)&num_levels_, sizeof(num_levels_));
os.write((char*)&unused, sizeof(unused));
os.write((char*)levels_, sizeof(levels_[0]) * num_levels_);
S().serialize(os, min_value_, 1);
S().serialize(os, max_value_, 1);
}
S().serialize(os, &items_[levels_[0]], get_num_retained());
}
template<typename T, typename C, typename S, typename A>
std::pair<void_ptr_with_deleter, const size_t> kll_sketch<T, C, S, A>::serialize(unsigned header_size_bytes) const {
const bool is_single_item = n_ == 1;
const size_t size = header_size_bytes + get_serialized_size_bytes();
typedef typename A::template rebind<char>::other AllocChar;
void_ptr_with_deleter data_ptr(
static_cast<void*>(AllocChar().allocate(size)),
[size](void* ptr) { AllocChar().deallocate(static_cast<char*>(ptr), size); }
);
char* ptr = static_cast<char*>(data_ptr.get()) + header_size_bytes;
const uint8_t preamble_ints(is_empty() or is_single_item ? PREAMBLE_INTS_SHORT : PREAMBLE_INTS_FULL);
copy_to_mem(&preamble_ints, &ptr, sizeof(preamble_ints));
const uint8_t serial_version(is_single_item ? SERIAL_VERSION_2 : SERIAL_VERSION_1);
copy_to_mem(&serial_version, &ptr, sizeof(serial_version));
const uint8_t family(FAMILY);
copy_to_mem(&family, &ptr, sizeof(family));
const uint8_t flags_byte(
(is_empty() ? 1 << flags::IS_EMPTY : 0)
| (is_level_zero_sorted_ ? 1 << flags::IS_LEVEL_ZERO_SORTED : 0)
| (is_single_item ? 1 << flags::IS_SINGLE_ITEM : 0)
);
copy_to_mem(&flags_byte, &ptr, sizeof(flags_byte));
copy_to_mem(&k_, &ptr, sizeof(k_));
copy_to_mem(&m_, &ptr, sizeof(m_));
const uint8_t unused(0);
copy_to_mem(&unused, &ptr, sizeof(unused));
if (!is_empty()) {
if (!is_single_item) {
copy_to_mem(&n_, &ptr, sizeof(n_));
copy_to_mem(&min_k_, &ptr, sizeof(min_k_));
copy_to_mem(&num_levels_, &ptr, sizeof(num_levels_));
copy_to_mem(&unused, &ptr, sizeof(unused));
copy_to_mem(levels_, &ptr, sizeof(levels_[0]) * num_levels_);
ptr += S().serialize(ptr, min_value_, 1);
ptr += S().serialize(ptr, max_value_, 1);
}
ptr += S().serialize(ptr, &items_[levels_[0]], get_num_retained());
}
const size_t delta = ptr - static_cast<const char*>(data_ptr.get());
if (delta != size) throw std::logic_error("serialized size mismatch: " + std::to_string(delta) + " != " + std::to_string(size));
return std::make_pair(std::move(data_ptr), size);
}
template<typename T, typename C, typename S, typename A>
kll_sketch<T, C, S, A> kll_sketch<T, C, S, A>::deserialize(std::istream& is) {
uint8_t preamble_ints;
is.read((char*)&preamble_ints, sizeof(preamble_ints));
uint8_t serial_version;
is.read((char*)&serial_version, sizeof(serial_version));
uint8_t family_id;
is.read((char*)&family_id, sizeof(family_id));
uint8_t flags_byte;
is.read((char*)&flags_byte, sizeof(flags_byte));
uint16_t k;
is.read((char*)&k, sizeof(k));
uint8_t m;
is.read((char*)&m, sizeof(m));
uint8_t unused;
is.read((char*)&unused, sizeof(unused));
check_m(m);
check_preamble_ints(preamble_ints, flags_byte);
check_serial_version(serial_version);
check_family_id(family_id);
const bool is_empty(flags_byte & (1 << flags::IS_EMPTY));
return is_empty ? kll_sketch<T, C, S, A>(k) : kll_sketch<T, C, S, A>(k, flags_byte, is);
}
template<typename T, typename C, typename S, typename A>
kll_sketch<T, C, S, A> kll_sketch<T, C, S, A>::deserialize(const void* bytes, size_t size) {
const char* ptr = static_cast<const char*>(bytes);
uint8_t preamble_ints;
copy_from_mem(&ptr, &preamble_ints, sizeof(preamble_ints));
uint8_t serial_version;
copy_from_mem(&ptr, &serial_version, sizeof(serial_version));
uint8_t family_id;
copy_from_mem(&ptr, &family_id, sizeof(family_id));
uint8_t flags_byte;
copy_from_mem(&ptr, &flags_byte, sizeof(flags_byte));
uint16_t k;
copy_from_mem(&ptr, &k, sizeof(k));
uint8_t m;
copy_from_mem(&ptr, &m, sizeof(m));
check_m(m);
check_preamble_ints(preamble_ints, flags_byte);
check_serial_version(serial_version);
check_family_id(family_id);
const bool is_empty(flags_byte & (1 << flags::IS_EMPTY));
return is_empty ? kll_sketch<T, C, S, A>(k) : kll_sketch<T, C, S, A>(k, flags_byte, bytes, size);
}
/*
* Gets the normalized rank error given k and pmf.
* k - the configuration parameter
* pmf - if true, returns the "double-sided" normalized rank error for the get_PMF() function.
* Otherwise, it is the "single-sided" normalized rank error for all the other queries.
* Constants were derived as the best fit to 99 percentile empirically measured max error in thousands of trials
*/
template<typename T, typename C, typename S, typename A>
double kll_sketch<T, C, S, A>::get_normalized_rank_error(uint16_t k, bool pmf) {
return pmf
? 2.446 / pow(k, 0.9433)
: 2.296 / pow(k, 0.9723);
}
// for deserialization
// the common part of the preamble was read and compatibility checks were done
template<typename T, typename C, typename S, typename A>
kll_sketch<T, C, S, A>::kll_sketch(uint16_t k, uint8_t flags_byte, std::istream& is) {
k_ = k;
m_ = DEFAULT_M;
const bool is_single_item(flags_byte & (1 << flags::IS_SINGLE_ITEM)); // used in serial version 2
if (is_single_item) {
n_ = 1;
min_k_ = k_;
num_levels_ = 1;
} else {
is.read((char*)&n_, sizeof(n_));
is.read((char*)&min_k_, sizeof(min_k_));
is.read((char*)&num_levels_, sizeof(num_levels_));
uint8_t unused;
is.read((char*)&unused, sizeof(unused));
}
levels_ = AllocU32().allocate(num_levels_ + 1);
levels_size_ = num_levels_ + 1;
const uint32_t capacity(kll_helper::compute_total_capacity(k_, m_, num_levels_));
if (is_single_item) {
levels_[0] = capacity - 1;
} else {
// the last integer in levels_ is not serialized because it can be derived
is.read((char*)levels_, sizeof(levels_[0]) * num_levels_);
}
levels_[num_levels_] = capacity;
min_value_ = A().allocate(1);
max_value_ = A().allocate(1);
if (!is_single_item) {
S().deserialize(is, min_value_, 1);
S().deserialize(is, max_value_, 1);
}
items_ = A().allocate(capacity);
items_size_ = capacity;
const auto num_items = levels_[num_levels_] - levels_[0];
S().deserialize(is, &items_[levels_[0]], num_items);
if (is_single_item) {
new (min_value_) T(items_[levels_[0]]);
new (max_value_) T(items_[levels_[0]]);
}
is_level_zero_sorted_ = (flags_byte & (1 << flags::IS_LEVEL_ZERO_SORTED)) > 0;
}
// for deserialization
// the common part of the preamble was read and compatibility checks were done
template<typename T, typename C, typename S, typename A>
kll_sketch<T, C, S, A>::kll_sketch(uint16_t k, uint8_t flags_byte, const void* bytes, size_t size) {
k_ = k;
m_ = DEFAULT_M;
const bool is_single_item(flags_byte & (1 << flags::IS_SINGLE_ITEM)); // used in serial version 2
const char* ptr = static_cast<const char*>(bytes) + DATA_START_SINGLE_ITEM;
if (is_single_item) {
n_ = 1;
min_k_ = k_;
num_levels_ = 1;
} else {
copy_from_mem(&ptr, &n_, sizeof(n_));
copy_from_mem(&ptr, &min_k_, sizeof(min_k_));
copy_from_mem(&ptr, &num_levels_, sizeof(num_levels_));
ptr++; // skip unused byte
}
levels_ = AllocU32().allocate(num_levels_ + 1);
levels_size_ = num_levels_ + 1;
const uint32_t capacity(kll_helper::compute_total_capacity(k_, m_, num_levels_));
if (is_single_item) {
levels_[0] = capacity - 1;
} else {
// the last integer in levels_ is not serialized because it can be derived
copy_from_mem(&ptr, levels_, sizeof(levels_[0]) * num_levels_);
}
levels_[num_levels_] = capacity;
min_value_ = A().allocate(1);
max_value_ = A().allocate(1);
if (!is_single_item) {
ptr += S().deserialize(ptr, min_value_, 1);
ptr += S().deserialize(ptr, max_value_, 1);
}
items_ = A().allocate(capacity);
items_size_ = capacity;
const auto num_items(levels_[num_levels_] - levels_[0]);
ptr += S().deserialize(ptr, &items_[levels_[0]], num_items);
if (is_single_item) {
new (min_value_) T(items_[levels_[0]]);
new (max_value_) T(items_[levels_[0]]);
}
is_level_zero_sorted_ = (flags_byte & (1 << flags::IS_LEVEL_ZERO_SORTED)) > 0;
const size_t delta = ptr - static_cast<const char*>(bytes);
if (delta != size) throw std::logic_error("deserialized size mismatch: " + std::to_string(delta) + " != " + std::to_string(size));
}
// The following code is only valid in the special case of exactly reaching capacity while updating.
// It cannot be used while merging, while reducing k, or anything else.
template<typename T, typename C, typename S, typename A>
void kll_sketch<T, C, S, A>::compress_while_updating(void) {
const uint8_t level = find_level_to_compact();
// It is important to add the new top level right here. Be aware that this operation
// grows the buffer and shifts the data and also the boundaries of the data and grows the
// levels array and increments num_levels_
if (level == (num_levels_ - 1)) {
add_empty_top_level_to_completely_full_sketch();
}
const uint32_t raw_beg = levels_[level];
const uint32_t raw_lim = levels_[level + 1];
// +2 is OK because we already added a new top level if necessary
const uint32_t pop_above = levels_[level + 2] - raw_lim;
const uint32_t raw_pop = raw_lim - raw_beg;
const bool odd_pop = kll_helper::is_odd(raw_pop);
const uint32_t adj_beg = odd_pop ? raw_beg + 1 : raw_beg;
const uint32_t adj_pop = odd_pop ? raw_pop - 1 : raw_pop;
const uint32_t half_adj_pop = adj_pop / 2;
const uint32_t destroy_beg = levels_[0];
// level zero might not be sorted, so we must sort it if we wish to compact it
// sort_level_zero() is not used here because of the adjustment for odd number of items
if ((level == 0) and !is_level_zero_sorted_) {
std::sort(&items_[adj_beg], &items_[adj_beg + adj_pop], C());
}
if (pop_above == 0) {
kll_helper::randomly_halve_up(items_, adj_beg, adj_pop);
} else {
kll_helper::randomly_halve_down(items_, adj_beg, adj_pop);
kll_helper::merge_sorted_arrays<T, C>(items_, adj_beg, half_adj_pop, raw_lim, pop_above, adj_beg + half_adj_pop);
}
levels_[level + 1] -= half_adj_pop; // adjust boundaries of the level above
if (odd_pop) {
levels_[level] = levels_[level + 1] - 1; // the current level now contains one item
if (levels_[level] != raw_beg) items_[levels_[level]] = std::move(items_[raw_beg]); // namely this leftover guy
} else {
levels_[level] = levels_[level + 1]; // the current level is now empty
}
// verify that we freed up half_adj_pop array slots just below the current level
assert (levels_[level] == (raw_beg + half_adj_pop));
// finally, we need to shift up the data in the levels below
// so that the freed-up space can be used by level zero
if (level > 0) {
const uint32_t amount = raw_beg - levels_[0];
std::move_backward(&items_[levels_[0]], &items_[levels_[0] + amount], &items_[levels_[0] + half_adj_pop + amount]);
for (uint8_t lvl = 0; lvl < level; lvl++) levels_[lvl] += half_adj_pop;
}
for (uint32_t i = 0; i < half_adj_pop; i++) items_[i + destroy_beg].~T();
}
template<typename T, typename C, typename S, typename A>
uint8_t kll_sketch<T, C, S, A>::find_level_to_compact() const {
uint8_t level = 0;
while (true) {
assert (level < num_levels_);
const uint32_t pop = levels_[level + 1] - levels_[level];
const uint32_t cap = kll_helper::level_capacity(k_, num_levels_, level, m_);
if (pop >= cap) {
return level;
}
level++;
}
}
template<typename T, typename C, typename S, typename A>
void kll_sketch<T, C, S, A>::add_empty_top_level_to_completely_full_sketch() {
const uint32_t cur_total_cap = levels_[num_levels_];
// make sure that we are following a certain growth scheme
assert (levels_[0] == 0);
assert (items_size_ == cur_total_cap);
// note that merging MIGHT over-grow levels_, in which case we might not have to grow it here
const uint8_t new_levels_size = num_levels_ + 2;
if (levels_size_ < new_levels_size) {
uint32_t* new_levels = AllocU32().allocate(new_levels_size);
std::copy(&levels_[0], &levels_[levels_size_], new_levels);
AllocU32().deallocate(levels_, levels_size_);
levels_ = new_levels;
levels_size_ = new_levels_size;
}
const uint32_t delta_cap = kll_helper::level_capacity(k_, num_levels_ + 1, 0, m_);
const uint32_t new_total_cap = cur_total_cap + delta_cap;
// move (and shift) the current data into the new buffer
T* new_buf = A().allocate(new_total_cap);
kll_helper::move_construct<T>(items_, 0, cur_total_cap, new_buf, delta_cap, true);
A().deallocate(items_, items_size_);
items_ = new_buf;
items_size_ = new_total_cap;
// this loop includes the old "extra" index at the top
for (uint8_t i = 0; i <= num_levels_; i++) {
levels_[i] += delta_cap;
}
assert (levels_[num_levels_] == new_total_cap);
num_levels_++;
levels_[num_levels_] = new_total_cap; // initialize the new "extra" index at the top
}
template<typename T, typename C, typename S, typename A>
void kll_sketch<T, C, S, A>::sort_level_zero() {
if (!is_level_zero_sorted_) {
std::sort(&items_[levels_[0]], &items_[levels_[1]], C());
is_level_zero_sorted_ = true;
}
}
template<typename T, typename C, typename S, typename A>
std::unique_ptr<kll_quantile_calculator<T, C, A>, std::function<void(kll_quantile_calculator<T, C, A>*)>> kll_sketch<T, C, S, A>::get_quantile_calculator() {
sort_level_zero();
typedef typename std::allocator_traits<A>::template rebind_alloc<kll_quantile_calculator<T, C, A>> AllocCalc;
std::unique_ptr<kll_quantile_calculator<T, C, A>, std::function<void(kll_quantile_calculator<T, C, A>*)>> quantile_calculator(
new (AllocCalc().allocate(1)) kll_quantile_calculator<T, C, A>(items_, levels_, num_levels_, n_),
[](kll_quantile_calculator<T, C, A>* ptr){ ptr->~kll_quantile_calculator<T, C, A>(); AllocCalc().deallocate(ptr, 1); }
);
return std::move(quantile_calculator);
}
template<typename T, typename C, typename S, typename A>
std::unique_ptr<double[], std::function<void(double*)>> kll_sketch<T, C, S, A>::get_PMF_or_CDF(const T* split_points, uint32_t size, bool is_CDF) const {
if (is_empty()) return nullptr;
kll_helper::validate_values<T, C>(split_points, size);
typedef typename std::allocator_traits<A>::template rebind_alloc<double> AllocD;
const size_t array_size = size + 1;
std::unique_ptr<double[], std::function<void(double*)>> buckets(AllocD().allocate(size + 1), [array_size](double* ptr){ AllocD().deallocate(ptr, array_size); });
std::fill(&buckets.get()[0], &buckets.get()[array_size], 0);
uint8_t level(0);
uint64_t weight(1);
while (level < num_levels_) {
const auto from_index = levels_[level];
const auto to_index = levels_[level + 1]; // exclusive
if ((level == 0) and !is_level_zero_sorted_) {
increment_buckets_unsorted_level(from_index, to_index, weight, split_points, size, buckets.get());
} else {
increment_buckets_sorted_level(from_index, to_index, weight, split_points, size, buckets.get());
}
level++;
weight *= 2;
}
// normalize and, if CDF, convert to cumulative
if (is_CDF) {
double subtotal = 0;
for (uint32_t i = 0; i <= size; i++) {
subtotal += buckets[i];
buckets[i] = subtotal / n_;
}
} else {
for (uint32_t i = 0; i <= size; i++) {
buckets[i] /= n_;
}
}
return buckets;
}
template<typename T, typename C, typename S, typename A>
void kll_sketch<T, C, S, A>::increment_buckets_unsorted_level(uint32_t from_index, uint32_t to_index, uint64_t weight,
const T* split_points, uint32_t size, double* buckets) const
{
for (uint32_t i = from_index; i < to_index; i++) {
uint32_t j;
for (j = 0; j < size; j++) {
if (C()(items_[i], split_points[j])) {
break;
}
}
buckets[j] += weight;
}
}
template<typename T, typename C, typename S, typename A>
void kll_sketch<T, C, S, A>::increment_buckets_sorted_level(uint32_t from_index, uint32_t to_index, uint64_t weight,
const T* split_points, uint32_t size, double* buckets) const
{
uint32_t i = from_index;
uint32_t j = 0;
while ((i < to_index) and (j < size)) {
if (C()(items_[i], split_points[j])) {
buckets[j] += weight; // this sample goes into this bucket
i++; // move on to next sample and see whether it also goes into this bucket
} else {
j++; // no more samples for this bucket
}
}
// now either i == to_index (we are out of samples), or
// j == size (we are out of buckets, but there are more samples remaining)
// we only need to do something in the latter case
if (j == size) {
buckets[j] += weight * (to_index - i);
}
}
template<typename T, typename C, typename S, typename A>
void kll_sketch<T, C, S, A>::merge_higher_levels(const kll_sketch& other, uint64_t final_n) {
const uint32_t tmp_num_items = get_num_retained() + other.get_num_retained_above_level_zero();
auto tmp_items_deleter = [tmp_num_items](T* ptr) { A().deallocate(ptr, tmp_num_items); }; // no destructor needed
const std::unique_ptr<T, decltype(tmp_items_deleter)> workbuf(A().allocate(tmp_num_items), tmp_items_deleter);
const uint8_t ub = kll_helper::ub_on_num_levels(final_n);
const size_t work_levels_size = ub + 2; // ub+1 does not work
auto tmp_levels_deleter = [work_levels_size](uint32_t* ptr) { AllocU32().deallocate(ptr, work_levels_size); };
const std::unique_ptr<uint32_t[], decltype(tmp_levels_deleter)> worklevels(AllocU32().allocate(work_levels_size), tmp_levels_deleter);
const std::unique_ptr<uint32_t[], decltype(tmp_levels_deleter)> outlevels(AllocU32().allocate(work_levels_size), tmp_levels_deleter);
const uint8_t provisional_num_levels = std::max(num_levels_, other.num_levels_);
populate_work_arrays(other, workbuf.get(), worklevels.get(), provisional_num_levels);
const kll_helper::compress_result result = kll_helper::general_compress<T, C>(k_, m_, provisional_num_levels, workbuf.get(),
worklevels.get(), outlevels.get(), is_level_zero_sorted_);
// ub can sometimes be much bigger
if (result.final_num_levels > ub) throw std::logic_error("merge error");
// now we need to transfer the results back into "this" sketch
if (result.final_capacity != items_size_) {
A().deallocate(items_, items_size_);
items_size_ = result.final_capacity;
items_ = A().allocate(items_size_);
}
const uint32_t free_space_at_bottom = result.final_capacity - result.final_num_items;
kll_helper::move_construct<T>(workbuf.get(), outlevels[0], outlevels[0] + result.final_num_items, items_, free_space_at_bottom, true);
if (levels_size_ < (result.final_num_levels + 1)) {
AllocU32().deallocate(levels_, levels_size_);
levels_size_ = result.final_num_levels + 1;
levels_ = AllocU32().allocate(levels_size_);
}
const uint32_t offset = free_space_at_bottom - outlevels[0];
for (uint8_t lvl = 0; lvl < levels_size_; lvl++) { // includes the "extra" index
levels_[lvl] = outlevels[lvl] + offset;
}
num_levels_ = result.final_num_levels;
}
// this leaves items_ uninitialized (all objects moved out and destroyed)
template<typename T, typename C, typename S, typename A>
void kll_sketch<T, C, S, A>::populate_work_arrays(const kll_sketch& other, T* workbuf, uint32_t* worklevels, uint8_t provisional_num_levels) {
worklevels[0] = 0;
// the level zero data from "other" was already inserted into "this"
kll_helper::move_construct<T>(items_, levels_[0], levels_[1], workbuf, 0, true);
worklevels[1] = safe_level_size(0);
for (uint8_t lvl = 1; lvl < provisional_num_levels; lvl++) {
const uint32_t self_pop = safe_level_size(lvl);
const uint32_t other_pop = other.safe_level_size(lvl);
worklevels[lvl + 1] = worklevels[lvl] + self_pop + other_pop;
if ((self_pop > 0) and (other_pop == 0)) {
kll_helper::move_construct<T>(items_, levels_[lvl], levels_[lvl] + self_pop, workbuf, worklevels[lvl], true);
} else if ((self_pop == 0) and (other_pop > 0)) {
kll_helper::copy_construct<T>(other.items_, other.levels_[lvl], other.levels_[lvl] + other_pop, workbuf, worklevels[lvl]);
} else if ((self_pop > 0) and (other_pop > 0)) {
kll_helper::merge_sorted_arrays<T, C>(items_, levels_[lvl], self_pop, other.items_, other.levels_[lvl], other_pop, workbuf, worklevels[lvl]);
}
}
}
template<typename T, typename C, typename S, typename A>
void kll_sketch<T, C, S, A>::assert_correct_total_weight() const {
const uint64_t total(kll_helper::sum_the_sample_weights(num_levels_, levels_));
if (total != n_) {
throw std::logic_error("Total weight does not match N");
}
}
template<typename T, typename C, typename S, typename A>
uint32_t kll_sketch<T, C, S, A>::safe_level_size(uint8_t level) const {
if (level >= num_levels_) return 0;
return levels_[level + 1] - levels_[level];
}
template<typename T, typename C, typename S, typename A>
uint32_t kll_sketch<T, C, S, A>::get_num_retained_above_level_zero() const {
if (num_levels_ == 1) return 0;
return levels_[num_levels_] - levels_[1];
}
template<typename T, typename C, typename S, typename A>
void kll_sketch<T, C, S, A>::check_m(uint8_t m) {
if (m != DEFAULT_M) {
throw std::invalid_argument("Possible corruption: M must be " + std::to_string(DEFAULT_M)
+ ": " + std::to_string(m));
}
}
template<typename T, typename C, typename S, typename A>
void kll_sketch<T, C, S, A>::check_preamble_ints(uint8_t preamble_ints, uint8_t flags_byte) {
const bool is_empty(flags_byte & (1 << flags::IS_EMPTY));
const bool is_single_item(flags_byte & (1 << flags::IS_SINGLE_ITEM));
if (is_empty or is_single_item) {
if (preamble_ints != PREAMBLE_INTS_SHORT) {
throw std::invalid_argument("Possible corruption: preamble ints must be "
+ std::to_string(PREAMBLE_INTS_SHORT) + " for an empty or single item sketch: " + std::to_string(preamble_ints));
}
} else {
if (preamble_ints != PREAMBLE_INTS_FULL) {
throw std::invalid_argument("Possible corruption: preamble ints must be "
+ std::to_string(PREAMBLE_INTS_FULL) + " for a sketch with more than one item: " + std::to_string(preamble_ints));
}
}
}
template<typename T, typename C, typename S, typename A>
void kll_sketch<T, C, S, A>::check_serial_version(uint8_t serial_version) {
if (serial_version != SERIAL_VERSION_1 and serial_version != SERIAL_VERSION_2) {
throw std::invalid_argument("Possible corruption: serial version mismatch: expected "
+ std::to_string(SERIAL_VERSION_1) + " or " + std::to_string(SERIAL_VERSION_2)
+ ", got " + std::to_string(serial_version));
}
}
template<typename T, typename C, typename S, typename A>
void kll_sketch<T, C, S, A>::check_family_id(uint8_t family_id) {
if (family_id != FAMILY) {
throw std::invalid_argument("Possible corruption: family mismatch: expected "
+ std::to_string(FAMILY) + ", got " + std::to_string(family_id));
}
}
template <typename T, typename C, typename S, typename A>
void kll_sketch<T, C, S, A>::to_stream(std::ostream& os, bool print_levels, bool print_items) const {
os << "### KLL sketch summary:" << std::endl;
os << " K : " << k_ << std::endl;
os << " min K : " << min_k_ << std::endl;
os << " M : " << (unsigned int) m_ << std::endl;
os << " N : " << n_ << std::endl;
os << " Epsilon : " << std::setprecision(3) << get_normalized_rank_error(false) * 100 << "%" << std::endl;
os << " Epsilon PMF : " << get_normalized_rank_error(true) * 100 << "%" << std::endl;
os << " Empty : " << (is_empty() ? "true" : "false") << std::endl;
os << " Estimation mode: " << (is_estimation_mode() ? "true" : "false") << std::endl;
os << " Levels : " << (unsigned int) num_levels_ << std::endl;
os << " Sorted : " << (is_level_zero_sorted_ ? "true" : "false") << std::endl;
os << " Capacity items : " << items_size_ << std::endl;
os << " Retained items : " << get_num_retained() << std::endl;
os << " Storage bytes : " << get_serialized_size_bytes() << std::endl;
if (!is_empty()) {
os << " Min value : " << *min_value_ << std::endl;
os << " Max value : " << *max_value_ << std::endl;
}
os << "### End sketch summary" << std::endl;
// for debugging
const bool with_levels(false);
const bool with_data(false);
if (with_levels) {
os << "### KLL sketch levels:" << std::endl;
os << " index: nominal capacity, actual size" << std::endl;
for (uint8_t i = 0; i < num_levels_; i++) {
os << " " << (unsigned int) i << ": " << kll_helper::level_capacity(k_, num_levels_, i, m_) << ", " << safe_level_size(i) << std::endl;
}
os << "### End sketch levels" << std::endl;
}
if (with_data) {
os << "### KLL sketch data:" << std::endl;
uint8_t level(0);
while (level < num_levels_) {
const uint32_t from_index = levels_[level];
const uint32_t to_index = levels_[level + 1]; // exclusive
if (from_index < to_index) {
os << " level " << (unsigned int) level << ":" << std::endl;
}
for (uint32_t i = from_index; i < to_index; i++) {
os << " " << items_[i] << std::endl;
}
level++;
}
os << "### End sketch data" << std::endl;
}
}
template <typename T, typename C, typename S, typename A>
typename kll_sketch<T, C, S, A>::const_iterator kll_sketch<T, C, S, A>::begin() const {
return kll_sketch<T, C, S, A>::const_iterator(items_, levels_, num_levels_);
}
template <typename T, typename C, typename S, typename A>
typename kll_sketch<T, C, S, A>::const_iterator kll_sketch<T, C, S, A>::end() const {
return kll_sketch<T, C, S, A>::const_iterator(nullptr, nullptr, num_levels_);
}
// kll_sketch::const_iterator implementation
template<typename T, typename C, typename S, typename A>
kll_sketch<T, C, S, A>::const_iterator::const_iterator(const T* items, const uint32_t* levels, const uint8_t num_levels):
items(items), levels(levels), num_levels(num_levels), index(levels == nullptr ? 0 : levels[0]), level(levels == nullptr ? num_levels : 0), weight(1)
{}
template<typename T, typename C, typename S, typename A>
kll_sketch<T, C, S, A>::const_iterator::const_iterator(const const_iterator& other):
items(other.items), levels(other.levels), num_levels(other.num_levels), index(other.index), level(other.level), weight(other.weight)
{}
template<typename T, typename C, typename S, typename A>
typename kll_sketch<T, C, S, A>::const_iterator& kll_sketch<T, C, S, A>::const_iterator::operator++() {
++index;
if (index == levels[level + 1]) { // go to the next non-empty level
do {
++level;
weight *= 2;
} while (level < num_levels and levels[level] == levels[level + 1]);
}
return *this;
}
template<typename T, typename C, typename S, typename A>
typename kll_sketch<T, C, S, A>::const_iterator& kll_sketch<T, C, S, A>::const_iterator::operator++(int) {
const_iterator tmp(*this);
operator++();
return tmp;
}
template<typename T, typename C, typename S, typename A>
bool kll_sketch<T, C, S, A>::const_iterator::operator==(const const_iterator& other) const {
if (level != other.level) return false;
if (level == num_levels) return true; // end
return index == other.index;
}
template<typename T, typename C, typename S, typename A>
bool kll_sketch<T, C, S, A>::const_iterator::operator!=(const const_iterator& other) const {
return !operator==(other);
}
template<typename T, typename C, typename S, typename A>
const std::pair<const T&, const uint64_t> kll_sketch<T, C, S, A>::const_iterator::operator*() const {
return std::pair<const T&, const uint64_t>(items[index], weight);
}
} /* namespace datasketches */
#endif