blob: d33084ed2ffd43a00e9ae25420a9243cd44b84dc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef _TDIGEST_HPP_
#define _TDIGEST_HPP_
#include <cstddef>
#include <limits>
#include <type_traits>
#include <vector>
#include "common_defs.hpp"
namespace datasketches {
// this is equivalent of K_2 (default) in the Java implementation mentioned below
// Generates cluster sizes proportional to q*(1-q).
// The use of a normalizing function results in a strictly bounded number of clusters no matter how many samples.
struct scale_function {
double max(double q, double normalizer) const {
return q * (1 - q) / normalizer;
}
double normalizer(double compression, double n) const {
return compression / z(compression, n);
}
double z(double compression, double n) const {
return 4 * std::log(n / compression) + 24;
}
};
// forward declaration
template <typename T, typename Allocator = std::allocator<T>> class tdigest;
/// TDigest float sketch
using tdigest_float = tdigest<float>;
/// TDigest double sketch
using tdigest_double = tdigest<double>;
/**
* t-Digest for estimating quantiles and ranks.
* This implementation is based on the following paper:
* Ted Dunning, Otmar Ertl. Extremely Accurate Quantiles Using t-Digests
* and the following implementation in Java:
* https://github.com/tdunning/t-digest
* This implementation is similar to MergingDigest in the above Java implementation
*/
template <typename T, typename Allocator>
class tdigest {
// exclude long double by not using std::is_floating_point
static_assert(std::is_same<T, double>::value || std::is_same<T, float>::value, "Either double or float type expected");
static_assert(std::numeric_limits<T>::is_iec559, "IEEE 754 compatibility required");
public:
using value_type = T;
using allocator_type = Allocator;
static const uint16_t DEFAULT_K = 200;
using W = typename std::conditional<std::is_same<T, double>::value, uint64_t, uint32_t>::type;
class centroid {
public:
centroid(T value, W weight): mean_(value), weight_(weight) {}
void add(const centroid& other) {
weight_ += other.weight_;
mean_ += (other.mean_ - mean_) * other.weight_ / weight_;
}
T get_mean() const { return mean_; }
W get_weight() const { return weight_; }
private:
T mean_;
W weight_;
};
using vector_t = std::vector<T, Allocator>;
using vector_centroid = std::vector<centroid, typename std::allocator_traits<Allocator>::template rebind_alloc<centroid>>;
using vector_bytes = std::vector<uint8_t, typename std::allocator_traits<Allocator>::template rebind_alloc<uint8_t>>;
using vector_double = std::vector<double, typename std::allocator_traits<Allocator>::template rebind_alloc<double>>;
struct centroid_cmp {
centroid_cmp() {}
bool operator()(const centroid& a, const centroid& b) const {
if (a.get_mean() < b.get_mean()) return true;
return false;
}
};
/**
* Constructor
* @param k affects the size of the sketch and its estimation error
* @param allocator used to allocate memory
*/
explicit tdigest(uint16_t k = DEFAULT_K, const Allocator& allocator = Allocator());
/**
* Update this t-Digest with the given value
* @param value to update the t-Digest with
*/
void update(T value);
/**
* Merge the given t-Digest into this one
* @param other t-Digest to merge
*/
void merge(const tdigest& other);
/**
* Process buffered values and merge centroids if needed
*/
void compress();
/**
* @return true if t-Digest has not seen any data
*/
bool is_empty() const;
/**
* @return minimum value seen by t-Digest
*/
T get_min_value() const;
/**
* @return maximum value seen by t-Digest
*/
T get_max_value() const;
/**
* @return total weight
*/
uint64_t get_total_weight() const;
/**
* Returns an instance of the allocator for this t-Digest.
* @return allocator
*/
Allocator get_allocator() const;
/**
* Compute approximate normalized rank of the given value.
*
* <p>If the sketch is empty this throws std::runtime_error.
*
* @param value to be ranked
* @return normalized rank (from 0 to 1 inclusive)
*/
double get_rank(T value) const;
/**
* Compute approximate quantile value corresponding to the given normalized rank
*
* <p>If the sketch is empty this throws std::runtime_error.
*
* @param rank normalized rank (from 0 to 1 inclusive)
* @return quantile value corresponding to the given rank
*/
T get_quantile(double rank) const;
/**
* Returns an approximation to the Probability Mass Function (PMF) of the input stream
* given a set of split points.
*
* <p>If the sketch is empty this throws std::runtime_error.
*
* @param split_points an array of <i>m</i> unique, monotonically increasing values
* that divide the input domain into <i>m+1</i> consecutive disjoint intervals (bins).
*
* @param size the number of split points in the array
*
* @return an array of m+1 doubles each of which is an approximation
* to the fraction of the input stream values (the mass) that fall into one of those intervals.
*/
vector_double get_PMF(const T* split_points, uint32_t size) const;
/**
* Returns an approximation to the Cumulative Distribution Function (CDF), which is the
* cumulative analog of the PMF, of the input stream given a set of split points.
*
* <p>If the sketch is empty this throws std::runtime_error.
*
* @param split_points an array of <i>m</i> unique, monotonically increasing values
* that divide the input domain into <i>m+1</i> consecutive disjoint intervals.
*
* @param size the number of split points in the array
*
* @return an array of m+1 doubles, which are a consecutive approximation to the CDF
* of the input stream given the split_points. The value at array position j of the returned
* CDF array is the sum of the returned values in positions 0 through j of the returned PMF
* array. This can be viewed as array of ranks of the given split points plus one more value
* that is always 1.
*/
vector_double get_CDF(const T* split_points, uint32_t size) const;
/**
* @return parameter k (compression) that was used to configure this t-Digest
*/
uint16_t get_k() const;
/**
* Human-readable summary of this t-Digest as a string
* @param print_centroids if true append the list of centroids with weights
* @return summary of this t-Digest
*/
string<Allocator> to_string(bool print_centroids = false) const;
/**
* Computes size needed to serialize the current state.
* @param with_buffer optionally serialize buffered values avoiding compression
* @return size in bytes needed to serialize this tdigest
*/
size_t get_serialized_size_bytes(bool with_buffer = false) const;
/**
* This method serializes t-Digest into a given stream in a binary form
* @param os output stream
* @param with_buffer optionally serialize buffered values avoiding compression
*/
void serialize(std::ostream& os, bool with_buffer = false) const;
/**
* This method serializes t-Digest as a vector of bytes.
* An optional header can be reserved in front of the sketch.
* It is an uninitialized space of a given size.
* @param header_size_bytes space to reserve in front of the sketch
* @param with_buffer optionally serialize buffered values avoiding compression
* @return serialized sketch as a vector of bytes
*/
vector_bytes serialize(unsigned header_size_bytes = 0, bool with_buffer = false) const;
/**
* This method deserializes t-Digest from a given stream.
* @param is input stream
* @param allocator instance of an Allocator
* @return an instance of t-Digest
*/
static tdigest deserialize(std::istream& is, const Allocator& allocator = Allocator());
/**
* This method deserializes t-Digest from a given array of bytes.
* @param bytes pointer to the array of bytes
* @param size the size of the array
* @param allocator instance of an Allocator
* @return an instance of t-Digest
*/
static tdigest deserialize(const void* bytes, size_t size, const Allocator& allocator = Allocator());
private:
bool reverse_merge_;
uint16_t k_;
uint16_t internal_k_;
T min_;
T max_;
size_t centroids_capacity_;
vector_centroid centroids_;
uint64_t centroids_weight_;
size_t buffer_capacity_;
vector_t buffer_;
static const size_t BUFFER_MULTIPLIER = 4;
static const uint8_t PREAMBLE_LONGS_EMPTY_OR_SINGLE = 1;
static const uint8_t PREAMBLE_LONGS_MULTIPLE = 2;
static const uint8_t SERIAL_VERSION = 1;
static const uint8_t SKETCH_TYPE = 20;
static const uint8_t COMPAT_DOUBLE = 1;
static const uint8_t COMPAT_FLOAT = 2;
enum flags { IS_EMPTY, IS_SINGLE_VALUE, REVERSE_MERGE };
bool is_single_value() const;
uint8_t get_preamble_longs() const;
void merge(vector_centroid& buffer, W weight);
// for deserialize
tdigest(bool reverse_merge, uint16_t k, T min, T max, vector_centroid&& centroids, uint64_t total_weight_, vector_t&& buffer);
static double weighted_average(double x1, double w1, double x2, double w2);
// for compatibility with format of the reference implementation
static tdigest deserialize_compat(std::istream& is, const Allocator& allocator = Allocator());
static tdigest deserialize_compat(const void* bytes, size_t size, const Allocator& allocator = Allocator());
static inline void check_split_points(const T* values, uint32_t size);
};
} /* namespace datasketches */
#include "tdigest_impl.hpp"
#endif // _TDIGEST_HPP_