blob: bbca76f716694bcf1afe5c658e6a11c1e0e8aaca [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef KLL_SKETCH_HPP_
#define KLL_SKETCH_HPP_
#include <functional>
#include <memory>
#include <vector>
#include "kll_quantile_calculator.hpp"
#include "common_defs.hpp"
#include "serde.hpp"
namespace datasketches {
/*
* Implementation of a very compact quantiles sketch with lazy compaction scheme
* and nearly optimal accuracy per retained item.
* See <a href="https://arxiv.org/abs/1603.05346v2">Optimal Quantile Approximation in Streams</a>.
*
* <p>This is a stochastic streaming sketch that enables near-real time analysis of the
* approximate distribution of values from a very large stream in a single pass, requiring only
* that the values are comparable.
* The analysis is obtained using <i>get_quantile()</i> or <i>get_quantiles()</i> functions or the
* inverse functions get_rank(), get_PMF() (Probability Mass Function), and get_CDF()
* (Cumulative Distribution Function).
*
* <p>As of May 2020, this implementation produces serialized sketches which are binary-compatible
* with the equivalent Java implementation only when template parameter T = float
* (32-bit single precision values).
*
* <p>Given an input stream of <i>N</i> numeric values, the <i>absolute rank</i> of any specific
* value is defined as its index <i>(0 to N-1)</i> in the hypothetical sorted stream of all
* <i>N</i> input values.
*
* <p>The <i>normalized rank</i> (<i>rank</i>) of any specific value is defined as its
* <i>absolute rank</i> divided by <i>N</i>.
* Thus, the <i>normalized rank</i> is a value between zero and one.
* In the documentation for this sketch <i>absolute rank</i> is never used so any
* reference to just <i>rank</i> should be interpreted to mean <i>normalized rank</i>.
*
* <p>This sketch is configured with a parameter <i>k</i>, which affects the size of the sketch
* and its estimation error.
*
* <p>The estimation error is commonly called <i>epsilon</i> (or <i>eps</i>) and is a fraction
* between zero and one. Larger values of <i>k</i> result in smaller values of epsilon.
* Epsilon is always with respect to the rank and cannot be applied to the
* corresponding values.
*
* <p>The relationship between the normalized rank and the corresponding values can be viewed
* as a two dimensional monotonic plot with the normalized rank on one axis and the
* corresponding values on the other axis. If the y-axis is specified as the value-axis and
* the x-axis as the normalized rank, then <i>y = get_quantile(x)</i> is a monotonically
* increasing function.
*
* <p>The functions <i>get_quantile(rank)</i> and get_quantiles(...) translate ranks into
* corresponding values. The functions <i>get_rank(value),
* get_CDF(...) (Cumulative Distribution Function), and get_PMF(...)
* (Probability Mass Function)</i> perform the opposite operation and translate values into ranks.
*
* <p>The <i>getPMF(...)</i> function has about 13 to 47% worse rank error (depending
* on <i>k</i>) than the other queries because the mass of each "bin" of the PMF has
* "double-sided" error from the upper and lower edges of the bin as a result of a subtraction,
* as the errors from the two edges can sometimes add.
*
* <p>The default <i>k</i> of 200 yields a "single-sided" epsilon of about 1.33% and a
* "double-sided" (PMF) epsilon of about 1.65%.
*
* <p>A <i>get_quantile(rank)</i> query has the following guarantees:
* <ul>
* <li>Let <i>v = get_quantile(r)</i> where <i>r</i> is the rank between zero and one.</li>
* <li>The value <i>v</i> will be a value from the input stream.</li>
* <li>Let <i>trueRank</i> be the true rank of <i>v</i> derived from the hypothetical sorted
* stream of all <i>N</i> values.</li>
* <li>Let <i>eps = get_normalized_rank_error(false)</i>.</li>
* <li>Then <i>r - eps &le; trueRank &le; r + eps</i> with a confidence of 99%. Note that the
* error is on the rank, not the value.</li>
* </ul>
*
* <p>A <i>get_rank(value)</i> query has the following guarantees:
* <ul>
* <li>Let <i>r = get_rank(v)</i> where <i>v</i> is a value between the min and max values of
* the input stream.</li>
* <li>Let <i>true_rank</i> be the true rank of <i>v</i> derived from the hypothetical sorted
* stream of all <i>N</i> values.</li>
* <li>Let <i>eps = get_normalized_rank_error(false)</i>.</li>
* <li>Then <i>r - eps &le; trueRank &le; r + eps</i> with a confidence of 99%.</li>
* </ul>
*
* <p>A <i>get_PMF()</i> query has the following guarantees:
* <ul>
* <li>Let <i>{r1, r2, ..., r(m+1)} = get_PMF(v1, v2, ..., vm)</i> where <i>v1, v2</i> are values
* between the min and max values of the input stream.
* <li>Let <i>mass<sub>i</sub> = estimated mass between v<sub>i</sub> and v<sub>i+1</sub></i>.</li>
* <li>Let <i>trueMass</i> be the true mass between the values of <i>v<sub>i</sub>,
* v<sub>i+1</sub></i> derived from the hypothetical sorted stream of all <i>N</i> values.</li>
* <li>Let <i>eps = get_normalized_rank_error(true)</i>.</li>
* <li>then <i>mass - eps &le; trueMass &le; mass + eps</i> with a confidence of 99%.</li>
* <li>r(m+1) includes the mass of all points larger than vm.</li>
* </ul>
*
* <p>A <i>get_CDF(...)</i> query has the following guarantees;
* <ul>
* <li>Let <i>{r1, r2, ..., r(m+1)} = get_CDF(v1, v2, ..., vm)</i> where <i>v1, v2</i> are values
* between the min and max values of the input stream.
* <li>Let <i>mass<sub>i</sub> = r<sub>i+1</sub> - r<sub>i</sub></i>.</li>
* <li>Let <i>trueMass</i> be the true mass between the true ranks of <i>v<sub>i</sub>,
* v<sub>i+1</sub></i> derived from the hypothetical sorted stream of all <i>N</i> values.</li>
* <li>Let <i>eps = get_normalized_rank_error(true)</i>.</li>
* <li>then <i>mass - eps &le; trueMass &le; mass + eps</i> with a confidence of 99%.</li>
* <li>1 - r(m+1) includes the mass of all points larger than vm.</li>
* </ul>
*
* <p>From the above, it might seem like we could make some estimates to bound the
* <em>value</em> returned from a call to <em>get_quantile()</em>. The sketch, however, does not
* let us derive error bounds or confidences around values. Because errors are independent, we
* can approximately bracket a value as shown below, but there are no error estimates available.
* Additionally, the interval may be quite large for certain distributions.
* <ul>
* <li>Let <i>v = get_quantile(r)</i>, the estimated quantile value of rank <i>r</i>.</li>
* <li>Let <i>eps = get_normalized_rank_error(false)</i>.</li>
* <li>Let <i>v<sub>lo</sub></i> = estimated quantile value of rank <i>(r - eps)</i>.</li>
* <li>Let <i>v<sub>hi</sub></i> = estimated quantile value of rank <i>(r + eps)</i>.</li>
* <li>Then <i>v<sub>lo</sub> &le; v &le; v<sub>hi</sub></i>, with 99% confidence.</li>
* </ul>
*
* author Kevin Lang
* author Alexander Saydakov
* author Lee Rhodes
*/
template<typename A> using AllocU8 = typename std::allocator_traits<A>::template rebind_alloc<uint8_t>;
template<typename A> using vector_u8 = std::vector<uint8_t, AllocU8<A>>;
template<typename A> using AllocU32 = typename std::allocator_traits<A>::template rebind_alloc<uint32_t>;
template<typename A> using vector_u32 = std::vector<uint32_t, AllocU32<A>>;
template<typename A> using AllocD = typename std::allocator_traits<A>::template rebind_alloc<double>;
template<typename A> using vector_d = std::vector<double, AllocD<A>>;
template <typename T, typename C = std::less<T>, typename S = serde<T>, typename A = std::allocator<T>>
class kll_sketch {
public:
static const uint8_t DEFAULT_M = 8;
static const uint16_t DEFAULT_K = 200;
static const uint16_t MIN_K = DEFAULT_M;
static const uint16_t MAX_K = (1 << 16) - 1;
explicit kll_sketch(uint16_t k = DEFAULT_K, const A& allocator = A());
kll_sketch(const kll_sketch& other);
kll_sketch(kll_sketch&& other) noexcept;
~kll_sketch();
kll_sketch& operator=(const kll_sketch& other);
kll_sketch& operator=(kll_sketch&& other);
/**
* Updates this sketch with the given data item.
* This method takes lvalue.
* @param value an item from a stream of items
*/
void update(const T& value);
/**
* Updates this sketch with the given data item.
* This method takes rvalue.
* @param value an item from a stream of items
*/
void update(T&& value);
/**
* Merges another sketch into this one.
* This method takes lvalue.
* @param other sketch to merge into this one
*/
void merge(const kll_sketch& other);
/**
* Merges another sketch into this one.
* This method takes rvalue.
* @param other sketch to merge into this one
*/
void merge(kll_sketch&& other);
/**
* Returns true if this sketch is empty.
* @return empty flag
*/
bool is_empty() const;
/**
* Returns configured parameter k
* @return parameter k
*/
uint16_t get_k() const;
/**
* Returns the length of the input stream.
* @return stream length
*/
uint64_t get_n() const;
/**
* Returns the number of retained items (samples) in the sketch.
* @return the number of retained items
*/
uint32_t get_num_retained() const;
/**
* Returns true if this sketch is in estimation mode.
* @return estimation mode flag
*/
bool is_estimation_mode() const;
/**
* Returns the min value of the stream.
* For floating point types: if the sketch is empty this returns NaN.
* For other types: if the sketch is empty this throws runtime_error.
* @return the min value of the stream
*/
T get_min_value() const;
/**
* Returns the max value of the stream.
* For floating point types: if the sketch is empty this returns NaN.
* For other types: if the sketch is empty this throws runtime_error.
* @return the max value of the stream
*/
T get_max_value() const;
/**
* Returns an approximation to the value of the data item
* that would be preceded by the given fraction of a hypothetical sorted
* version of the input stream so far.
* <p>
* Note that this method has a fairly large overhead (microseconds instead of nanoseconds)
* so it should not be called multiple times to get different quantiles from the same
* sketch. Instead use get_quantiles(), which pays the overhead only once.
* <p>
* For floating point types: if the sketch is empty this returns NaN.
* For other types: if the sketch is empty this throws runtime_error.
*
* @param fraction the specified fractional position in the hypothetical sorted stream.
* These are also called normalized ranks or fractional ranks.
* If fraction = 0.0, the true minimum value of the stream is returned.
* If fraction = 1.0, the true maximum value of the stream is returned.
*
* @return the approximation to the value at the given fraction
*/
T get_quantile(double fraction) const;
/**
* This is a more efficient multiple-query version of get_quantile().
* <p>
* This returns an array that could have been generated by using get_quantile() for each
* fractional rank separately, but would be very inefficient.
* This method incurs the internal set-up overhead once and obtains multiple quantile values in
* a single query. It is strongly recommend that this method be used instead of multiple calls
* to get_quantile().
*
* <p>If the sketch is empty this returns an empty vector.
*
* @param fractions given array of fractional positions in the hypothetical sorted stream.
* These are also called normalized ranks or fractional ranks.
* These fractions must be in the interval [0.0, 1.0], inclusive.
*
* @return array of approximations to the given fractions in the same order as given fractions
* in the input array.
*/
std::vector<T, A> get_quantiles(const double* fractions, uint32_t size) const;
/**
* This is a multiple-query version of get_quantile() that allows the caller to
* specify the number of evenly-spaced fractional ranks.
*
* <p>If the sketch is empty this returns an empty vector.
*
* @param num an integer that specifies the number of evenly-spaced fractional ranks.
* This must be an integer greater than 0. A value of 1 will return the min value.
* A value of 2 will return the min and the max value. A value of 3 will return the min,
* the median and the max value, etc.
*
* @return array of approximations to the given number of evenly-spaced fractional ranks.
*/
std::vector<T, A> get_quantiles(size_t num) const;
/**
* Returns an approximation to the normalized (fractional) rank of the given value from 0 to 1,
* inclusive.
*
* <p>The resulting approximation has a probabilistic guarantee that can be obtained from the
* get_normalized_rank_error(false) function.
*
* <p>If the sketch is empty this returns NaN.
*
* @param value to be ranked
* @return an approximate rank of the given value
*/
double get_rank(const T& value) const;
/**
* Returns an approximation to the Probability Mass Function (PMF) of the input stream
* given a set of split points (values).
*
* <p>The resulting approximations have a probabilistic guarantee that can be obtained from the
* get_normalized_rank_error(true) function.
*
* <p>If the sketch is empty this returns an empty vector.
*
* @param split_points an array of <i>m</i> unique, monotonically increasing values
* that divide the input domain into <i>m+1</i> consecutive disjoint intervals.
* The definition of an "interval" is inclusive of the left split point (or minimum value) and
* exclusive of the right split point, with the exception that the last interval will include
* the maximum value.
* It is not necessary to include either the min or max values in these split points.
*
* @return an array of m+1 doubles each of which is an approximation
* to the fraction of the input stream values (the mass) that fall into one of those intervals.
* The definition of an "interval" is inclusive of the left split point and exclusive of the right
* split point, with the exception that the last interval will include maximum value.
*/
vector_d<A> get_PMF(const T* split_points, uint32_t size) const;
/**
* Returns an approximation to the Cumulative Distribution Function (CDF), which is the
* cumulative analog of the PMF, of the input stream given a set of split points (values).
*
* <p>The resulting approximations have a probabilistic guarantee that can be obtained from the
* get_normalized_rank_error(false) function.
*
* <p>If the sketch is empty this returns an empty vector.
*
* @param split_points an array of <i>m</i> unique, monotonically increasing values
* that divide the input domain into <i>m+1</i> consecutive disjoint intervals.
* The definition of an "interval" is inclusive of the left split point (or minimum value) and
* exclusive of the right split point, with the exception that the last interval will include
* the maximum value.
* It is not necessary to include either the min or max values in these split points.
*
* @return an array of m+1 double values, which are a consecutive approximation to the CDF
* of the input stream given the split_points. The value at array position j of the returned
* CDF array is the sum of the returned values in positions 0 through j of the returned PMF
* array.
*/
vector_d<A> get_CDF(const T* split_points, uint32_t size) const;
/**
* Gets the approximate rank error of this sketch normalized as a fraction between zero and one.
* @param pmf if true, returns the "double-sided" normalized rank error for the get_PMF() function.
* Otherwise, it is the "single-sided" normalized rank error for all the other queries.
* @return if pmf is true, returns the normalized rank error for the get_PMF() function.
* Otherwise, it is the "single-sided" normalized rank error for all the other queries.
*/
double get_normalized_rank_error(bool pmf) const;
/**
* Computes size needed to serialize the current state of the sketch.
* This version is for fixed-size arithmetic types (integral and floating point).
* @return size in bytes needed to serialize this sketch
*/
template<typename TT = T, typename std::enable_if<std::is_arithmetic<TT>::value, int>::type = 0>
size_t get_serialized_size_bytes() const;
/**
* Computes size needed to serialize the current state of the sketch.
* This version is for all other types and can be expensive since every item needs to be looked at.
* @return size in bytes needed to serialize this sketch
*/
template<typename TT = T, typename std::enable_if<!std::is_arithmetic<TT>::value, int>::type = 0>
size_t get_serialized_size_bytes() const;
/**
* This method serializes the sketch into a given stream in a binary form
* @param os output stream
*/
void serialize(std::ostream& os) const;
// This is a convenience alias for users
// The type returned by the following serialize method
typedef vector_u8<A> vector_bytes;
/**
* This method serializes the sketch as a vector of bytes.
* An optional header can be reserved in front of the sketch.
* It is a blank space of a given size.
* This header is used in Datasketches PostgreSQL extension.
* @param header_size_bytes space to reserve in front of the sketch
*/
vector_bytes serialize(unsigned header_size_bytes = 0) const;
/**
* This method deserializes a sketch from a given stream.
* @param is input stream
* @return an instance of a sketch
*/
static kll_sketch<T, C, S, A> deserialize(std::istream& is, const A& allocator = A());
/**
* This method deserializes a sketch from a given array of bytes.
* @param bytes pointer to the array of bytes
* @param size the size of the array
* @return an instance of a sketch
*/
static kll_sketch<T, C, S, A> deserialize(const void* bytes, size_t size, const A& allocator = A());
/*
* Gets the normalized rank error given k and pmf.
* k - the configuration parameter
* pmf - if true, returns the "double-sided" normalized rank error for the get_PMF() function.
* Otherwise, it is the "single-sided" normalized rank error for all the other queries.
* Constants were derived as the best fit to 99 percentile empirically measured max error in thousands of trials
*/
static double get_normalized_rank_error(uint16_t k, bool pmf);
/**
* Prints a summary of the sketch.
* @param print_levels if true include information about levels
* @param print_items if true include sketch data
*/
string<A> to_string(bool print_levels = false, bool print_items = false) const;
class const_iterator;
const_iterator begin() const;
const_iterator end() const;
#ifdef KLL_VALIDATION
uint8_t get_num_levels() { return num_levels_; }
uint32_t* get_levels() { return levels_; }
T* get_items() { return items_; }
#endif
private:
/* Serialized sketch layout:
* Adr:
* || 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 |
* 0 || unused | M |--------K--------| Flags | FamID | SerVer | PreambleInts |
* || 15 | 14 | 13 | 12 | 11 | 10 | 9 | 8 |
* 1 ||-----------------------------------N------------------------------------------|
* || 23 | 22 | 21 | 20 | 19 | 18 | 17 | 16 |
* 2 ||---------------data----------------|-unused-|numLevels|-------min K-----------|
*/
static const size_t EMPTY_SIZE_BYTES = 8;
static const size_t DATA_START_SINGLE_ITEM = 8;
static const size_t DATA_START = 20;
static const uint8_t SERIAL_VERSION_1 = 1;
static const uint8_t SERIAL_VERSION_2 = 2;
static const uint8_t FAMILY = 15;
enum flags { IS_EMPTY, IS_LEVEL_ZERO_SORTED, IS_SINGLE_ITEM };
static const uint8_t PREAMBLE_INTS_SHORT = 2; // for empty and single item
static const uint8_t PREAMBLE_INTS_FULL = 5;
A allocator_;
uint16_t k_;
uint8_t m_; // minimum buffer "width"
uint16_t min_k_; // for error estimation after merging with different k
uint64_t n_;
uint8_t num_levels_;
vector_u32<A> levels_;
T* items_;
uint32_t items_size_;
T* min_value_;
T* max_value_;
bool is_level_zero_sorted_;
// for deserialization
class item_deleter;
class items_deleter;
kll_sketch(uint16_t k, uint16_t min_k, uint64_t n, uint8_t num_levels, vector_u32<A>&& levels,
std::unique_ptr<T, items_deleter> items, uint32_t items_size, std::unique_ptr<T, item_deleter> min_value,
std::unique_ptr<T, item_deleter> max_value, bool is_level_zero_sorted);
// common update code
inline void update_min_max(const T& value);
inline uint32_t internal_update();
// The following code is only valid in the special case of exactly reaching capacity while updating.
// It cannot be used while merging, while reducing k, or anything else.
void compress_while_updating(void);
uint8_t find_level_to_compact() const;
void add_empty_top_level_to_completely_full_sketch();
void sort_level_zero();
std::unique_ptr<kll_quantile_calculator<T, C, A>, std::function<void(kll_quantile_calculator<T, C, A>*)>> get_quantile_calculator();
vector_d<A> get_PMF_or_CDF(const T* split_points, uint32_t size, bool is_CDF) const;
void increment_buckets_unsorted_level(uint32_t from_index, uint32_t to_index, uint64_t weight,
const T* split_points, uint32_t size, double* buckets) const;
void increment_buckets_sorted_level(uint32_t from_index, uint32_t to_index, uint64_t weight,
const T* split_points, uint32_t size, double* buckets) const;
template<typename O> void merge_higher_levels(O&& other, uint64_t final_n);
void populate_work_arrays(const kll_sketch& other, T* workbuf, uint32_t* worklevels, uint8_t provisional_num_levels);
void populate_work_arrays(kll_sketch&& other, T* workbuf, uint32_t* worklevels, uint8_t provisional_num_levels);
void assert_correct_total_weight() const;
uint32_t safe_level_size(uint8_t level) const;
uint32_t get_num_retained_above_level_zero() const;
static void check_m(uint8_t m);
static void check_preamble_ints(uint8_t preamble_ints, uint8_t flags_byte);
static void check_serial_version(uint8_t serial_version);
static void check_family_id(uint8_t family_id);
// implementations for floating point types
template<typename TT = T, typename std::enable_if<std::is_floating_point<TT>::value, int>::type = 0>
static TT get_invalid_value() {
return std::numeric_limits<TT>::quiet_NaN();
}
template<typename TT = T, typename std::enable_if<std::is_floating_point<TT>::value, int>::type = 0>
static inline bool check_update_value(TT value) {
return !std::isnan(value);
}
// implementations for all other types
template<typename TT = T, typename std::enable_if<!std::is_floating_point<TT>::value, int>::type = 0>
static TT get_invalid_value() {
throw std::runtime_error("getting quantiles from empty sketch is not supported for this type of values");
}
template<typename TT = T, typename std::enable_if<!std::is_floating_point<TT>::value, int>::type = 0>
static inline bool check_update_value(TT) {
return true;
}
};
template<typename T, typename C, typename S, typename A>
class kll_sketch<T, C, S, A>::const_iterator: public std::iterator<std::input_iterator_tag, T> {
public:
friend class kll_sketch<T, C, S, A>;
const_iterator& operator++();
const_iterator& operator++(int);
bool operator==(const const_iterator& other) const;
bool operator!=(const const_iterator& other) const;
const std::pair<const T&, const uint64_t> operator*() const;
private:
const T* items;
const uint32_t* levels;
const uint8_t num_levels;
uint32_t index;
uint8_t level;
uint64_t weight;
const_iterator(const T* items, const uint32_t* levels, const uint8_t num_levels);
};
} /* namespace datasketches */
#include "kll_sketch_impl.hpp"
#endif