blob: afbc45ab1104948e46e34b5ddd7db6c21480921f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
/*
This implementation follows apache arrow.
refer to https://github.com/apache/arrow/blob/27bbd593625122a4a25d9471c8aaf5df54a6dcf9/cpp/src/arrow/util/tdigest.cc
*/
#include "tdigest.h"
#include <fmt/format.h>
#include <algorithm>
#include <iterator>
#include <queue>
#include "common/status.h"
namespace {
// scale function K1
struct ScalerK1 {
explicit ScalerK1(uint32_t delta) : delta_norm(delta / (2.0 * M_PI)) {}
double K(double q) const { return delta_norm * std::asin(2 * q - 1); }
double Q(double k) const { return (std::sin(k / delta_norm) + 1) / 2; }
const double delta_norm;
};
} // namespace
template <typename T = ScalerK1>
class TDigestMerger : private T {
public:
using Status = rocksdb::Status;
explicit TDigestMerger(uint32_t delta) : T(delta) { Reset(0, nullptr); }
void Reset(double total_weight, std::vector<Centroid>* tdigest) {
total_weight_ = total_weight;
tdigest_ = tdigest;
if (tdigest_) {
tdigest_->resize(0);
}
weight_so_far_ = 0;
weight_limit_ = -1; // trigger first centroid merge
}
// merge one centroid from a sorted centroid stream
void Add(const Centroid& centroid) {
auto& td = *tdigest_;
const double weight = weight_so_far_ + centroid.weight;
if (weight <= weight_limit_) {
td.back().Merge(centroid);
} else {
const double quantile = weight_so_far_ / total_weight_;
// weight limit should be strictly increasing, until the last centroid
if (const double next_weight_limit = total_weight_ * this->Q(this->K(quantile) + 1);
next_weight_limit <= weight_limit_) {
weight_limit_ = total_weight_;
} else {
weight_limit_ = next_weight_limit;
}
td.push_back(centroid); // should never exceed capacity and trigger reallocation
}
weight_so_far_ = weight;
}
// validate k-size of a tdigest
Status Validate(const std::vector<Centroid>& tdigest, double total_weight) const {
double q_prev = 0;
double k_prev = this->K(0);
for (const auto& i : tdigest) {
const double q = q_prev + i.weight / total_weight;
const double k = this->K(q);
if (i.weight != 1 && (k - k_prev) > 1.001) {
return Status::Corruption(fmt::format("oversized centroid: {}", k - k_prev));
}
k_prev = k;
q_prev = q;
}
return Status::OK();
}
private:
double total_weight_; // total weight of this tdigest
double weight_so_far_; // accumulated weight till current bin
double weight_limit_; // max accumulated weight to move to next bin
std::vector<Centroid>* tdigest_;
};
class TDigestImpl {
public:
using Status = rocksdb::Status;
explicit TDigestImpl(uint32_t delta) : delta_(delta > 10 ? delta : 10), merger_(delta_) {
tdigests_[0].reserve(delta_);
tdigests_[1].reserve(delta_);
Reset();
}
void Reset() {
tdigests_[0].resize(0);
tdigests_[1].resize(0);
current_ = 0;
total_weight_ = 0;
min_ = std::numeric_limits<double>::max();
max_ = std::numeric_limits<double>::lowest();
merger_.Reset(0, nullptr);
}
void Reset(const std::vector<Centroid>& centroids, double min, double max, double total_weight) {
tdigests_[0] = centroids;
tdigests_[1].resize(0);
current_ = 0;
total_weight_ = total_weight;
min_ = min;
max_ = max;
merger_.Reset(0, nullptr);
}
Status Validate() const {
// check weight, centroid order
double total_weight = 0;
double prev_mean = std::numeric_limits<double>::lowest();
for (const auto& centroid : tdigests_[current_]) {
if (std::isnan(centroid.mean) || std::isnan(centroid.weight)) {
return Status::Corruption("NAN found in tdigest");
}
if (centroid.mean < prev_mean) {
return Status::Corruption("centroid mean decreases");
}
if (centroid.weight < 1) {
return Status::Corruption("invalid centroid weight");
}
prev_mean = centroid.mean;
total_weight += centroid.weight;
}
if (total_weight != total_weight_) {
return Status::Corruption("tdigest total weight mismatch");
}
// check if buffer expanded
if (tdigests_[0].capacity() > delta_ || tdigests_[1].capacity() > delta_) {
return Status::Corruption("oversized tdigest buffer");
}
// check k-size
return merger_.Validate(tdigests_[current_], total_weight_);
}
std::vector<Centroid> Centroids() const { return tdigests_[current_]; }
double Min() const { return min_; }
double Max() const { return max_; }
uint32_t Delta() const { return delta_; }
// merge with other tdigests
void Merge(const std::vector<const TDigestImpl*>& tdigest_impls) {
// current and end iterator
using CentroidIter = std::vector<Centroid>::const_iterator;
using CentroidIterPair = std::pair<CentroidIter, CentroidIter>;
// use a min-heap to find next minimal centroid from all tdigests
auto centroid_gt = [](const CentroidIterPair& lhs, const CentroidIterPair& rhs) {
return lhs.first->mean > rhs.first->mean;
};
using CentroidQueue = std::priority_queue<CentroidIterPair, std::vector<CentroidIterPair>, decltype(centroid_gt)>;
// trivial dynamic memory allocated at runtime
std::vector<CentroidIterPair> queue_buffer;
queue_buffer.reserve(tdigest_impls.size() + 1);
CentroidQueue queue(std::move(centroid_gt), std::move(queue_buffer));
if (const auto& this_tdigest = tdigests_[current_]; !this_tdigest.empty()) {
queue.emplace(this_tdigest.cbegin(), this_tdigest.cend());
}
for (const TDigestImpl* td : tdigest_impls) {
const auto& other_tdigest = td->tdigests_[td->current_];
if (other_tdigest.size() > 0) {
queue.emplace(other_tdigest.cbegin(), other_tdigest.cend());
total_weight_ += td->total_weight_;
min_ = std::min(min_, td->min_);
max_ = std::max(max_, td->max_);
}
}
merger_.Reset(total_weight_, &tdigests_[1 - current_]);
CentroidIter current_iter;
CentroidIter end_iter;
// do k-way merge till one buffer left
while (queue.size() > 1) {
std::tie(current_iter, end_iter) = queue.top();
merger_.Add(*current_iter);
queue.pop();
if (++current_iter != end_iter) {
queue.emplace(current_iter, end_iter);
}
}
// merge last buffer
if (!queue.empty()) {
std::tie(current_iter, end_iter) = queue.top();
while (current_iter != end_iter) {
merger_.Add(*current_iter++);
}
}
merger_.Reset(0, nullptr);
current_ = 1 - current_;
}
// merge input data with current tdigest
void MergeInput(std::vector<double> input) {
total_weight_ += static_cast<double>(input.size());
std::sort(input.begin(), input.end());
if (input.empty()) {
return;
}
// pick next minimal centroid from input and tdigest, feed to merger
merger_.Reset(total_weight_, &tdigests_[1 - current_]);
const auto& td = tdigests_[current_];
uint32_t tdigest_index = 0;
uint32_t input_index = 0;
while (tdigest_index < td.size() && input_index < input.size()) {
if (td[tdigest_index].mean < input[input_index]) {
merger_.Add(td[tdigest_index]);
++tdigest_index;
} else {
merger_.Add(Centroid{input[input_index], 1});
++input_index;
}
}
while (tdigest_index < td.size()) {
merger_.Add(td[tdigest_index]);
++tdigest_index;
}
while (input_index < input.size()) {
merger_.Add(Centroid{input[input_index], 1});
++input_index;
}
merger_.Reset(0, nullptr);
current_ = 1 - current_;
}
double Quantile(double q) const {
const auto& td = tdigests_[current_];
if (q < 0 || q > 1 || td.empty()) {
return NAN;
}
const double index = q * total_weight_;
if (index <= 1) {
return min_;
} else if (index >= total_weight_ - 1) {
return max_;
}
// find centroid contains the index
uint32_t ci = 0;
double weight_sum = 0;
for (; ci < td.size(); ++ci) {
weight_sum += td[ci].weight;
if (index <= weight_sum) {
break;
}
}
CHECK(ci < td.size());
// deviation of index from the centroid center
double diff = index + td[ci].weight / 2 - weight_sum;
// index happen to be in a unit weight centroid
if (td[ci].weight == 1 && std::abs(diff) < 0.5) {
return td[ci].mean;
}
// find adjacent centroids for interpolation
uint32_t ci_left = ci;
uint32_t ci_right = ci;
if (diff > 0) {
if (ci_right == td.size() - 1) {
// index larger than center of last bin
CHECK(weight_sum == total_weight_);
const Centroid* c = &td[ci_right];
CHECK(c->weight >= 2);
return Lerp(c->mean, max_, diff / (c->weight / 2));
}
++ci_right;
} else {
if (ci_left == 0) {
// index smaller than center of first bin
const Centroid* c = &td[0];
CHECK(c->weight >= 2);
return Lerp(min_, c->mean, index / (c->weight / 2));
}
--ci_left;
diff += td[ci_left].weight / 2 + td[ci_right].weight / 2;
}
// interpolate from adjacent centroids
diff /= (td[ci_left].weight / 2 + td[ci_right].weight / 2);
return Lerp(td[ci_left].mean, td[ci_right].mean, diff);
}
double Mean() const {
double sum = 0;
for (const auto& centroid : tdigests_[current_]) {
sum += centroid.mean * centroid.weight;
}
return total_weight_ == 0 ? NAN : sum / total_weight_;
}
double TotalWeight() const { return total_weight_; }
private:
// must be declared before merger_, see constructor initialization list
const uint32_t delta_;
TDigestMerger<> merger_;
double total_weight_;
double min_;
double max_;
// ping-pong buffer holds two tdigests, size = 2 * delta * sizeof(Centroid)
std::vector<Centroid> tdigests_[2];
// index of active tdigest buffer, 0 or 1
int current_;
};
class TDigest {
public:
explicit TDigest(uint64_t delta);
TDigest(const TDigest&) = delete;
TDigest& operator=(const TDigest&) = delete;
TDigest(TDigest&& rhs) = default;
~TDigest() = default;
void Merge(const std::vector<TDigest>& others);
void Add(const std::vector<double>& items);
void Reset(const CentroidsWithDelta& centroid_list);
void Reset();
CentroidsWithDelta DumpCentroids() const;
private:
TDigestImpl impl_;
};
TDigest::TDigest(uint64_t delta) : impl_(TDigestImpl(delta)) { Reset({}); }
void TDigest::Merge(const std::vector<TDigest>& others) {
if (others.empty()) {
return;
}
std::vector<const TDigestImpl*> impls;
impls.reserve(others.size());
std::transform(others.cbegin(), others.cend(), std::back_inserter(impls), [](const TDigest& i) { return &i.impl_; });
impl_.Merge(impls);
}
void TDigest::Reset(const CentroidsWithDelta& centroids_list) {
impl_.Reset(centroids_list.centroids, centroids_list.min, centroids_list.max, centroids_list.total_weight);
}
void TDigest::Reset() { impl_.Reset(); }
CentroidsWithDelta TDigest::DumpCentroids() const {
auto centroids = impl_.Centroids();
return {
.centroids = std::move(centroids),
.delta = impl_.Delta(),
.min = impl_.Min(),
.max = impl_.Max(),
.total_weight = impl_.TotalWeight(),
};
}
void TDigest::Add(const std::vector<double>& items) { impl_.MergeInput(items); }
StatusOr<CentroidsWithDelta> TDigestMerge(const std::vector<CentroidsWithDelta>& centroids_list, uint64_t delta) {
if (centroids_list.empty()) {
return CentroidsWithDelta{.delta = delta};
}
if (centroids_list.size() == 1) {
if (centroids_list.front().delta == delta) {
return centroids_list.front();
}
if (centroids_list.front().centroids.empty()) {
return CentroidsWithDelta{.delta = delta};
}
}
TDigest digest{delta};
std::vector<TDigest> others;
others.reserve(centroids_list.size());
for (const auto& centroids : centroids_list) {
if (centroids.centroids.empty()) {
continue; // skip empty centroids
}
TDigest d{centroids.delta};
d.Reset(centroids);
others.emplace_back(std::move(d));
}
digest.Merge(others);
return digest.DumpCentroids();
}
StatusOr<CentroidsWithDelta> TDigestMerge(const std::vector<double>& buffer,
const std::vector<CentroidsWithDelta>& centroids_lists, uint64_t delta) {
TDigest digest{delta};
digest.Reset(CentroidsWithDelta{});
digest.Add(buffer);
std::vector<TDigest> others;
others.reserve(centroids_lists.size());
for (const auto& centroids : centroids_lists) {
if (centroids.centroids.empty()) {
continue; // skip empty centroids
}
TDigest d{centroids.delta};
d.Reset(centroids);
others.emplace_back(std::move(d));
}
digest.Merge(others);
return digest.DumpCentroids();
}
StatusOr<CentroidsWithDelta> TDigestMerge(const std::vector<double>& buffer, const CentroidsWithDelta& centroid_list) {
TDigest digest{centroid_list.delta};
digest.Reset(centroid_list);
digest.Add(buffer);
return digest.DumpCentroids();
}