Merge pull request #428 from apache/tdigest
added get_serialized_size_bytes()
diff --git a/tdigest/include/tdigest.hpp b/tdigest/include/tdigest.hpp
index d0c32d6..38f3b1a 100644
--- a/tdigest/include/tdigest.hpp
+++ b/tdigest/include/tdigest.hpp
@@ -20,8 +20,10 @@
#ifndef _TDIGEST_HPP_
#define _TDIGEST_HPP_
-#include <type_traits>
+#include <cstddef>
#include <limits>
+#include <type_traits>
+#include <vector>
#include "common_defs.hpp"
@@ -84,6 +86,7 @@
T mean_;
W weight_;
};
+ using vector_t = std::vector<T, Allocator>;
using vector_centroid = std::vector<centroid, typename std::allocator_traits<Allocator>::template rebind_alloc<centroid>>;
using vector_bytes = std::vector<uint8_t, typename std::allocator_traits<Allocator>::template rebind_alloc<uint8_t>>;
@@ -166,19 +169,28 @@
string<Allocator> to_string(bool print_centroids = false) const;
/**
+ * Computes size needed to serialize the current state.
+ * @param with_buffer optionally serialize buffered values avoiding compression
+ * @return size in bytes needed to serialize this tdigest
+ */
+ size_t get_serialized_size_bytes(bool with_buffer = false) const;
+
+ /**
* This method serializes t-Digest into a given stream in a binary form
* @param os output stream
+ * @param with_buffer optionally serialize buffered values avoiding compression
*/
- void serialize(std::ostream& os) const;
+ void serialize(std::ostream& os, bool with_buffer = false) const;
/**
* This method serializes t-Digest as a vector of bytes.
* An optional header can be reserved in front of the sketch.
* It is an uninitialized space of a given size.
* @param header_size_bytes space to reserve in front of the sketch
+ * @param with_buffer optionally serialize buffered values avoiding compression
* @return serialized sketch as a vector of bytes
*/
- vector_bytes serialize(unsigned header_size_bytes = 0) const;
+ vector_bytes serialize(unsigned header_size_bytes = 0, bool with_buffer = false) const;
/**
* This method deserializes t-Digest from a given stream.
@@ -198,7 +210,6 @@
static tdigest deserialize(const void* bytes, size_t size, const Allocator& allocator = Allocator());
private:
- Allocator allocator_;
bool reverse_merge_;
uint16_t k_;
uint16_t internal_k_;
@@ -208,8 +219,9 @@
vector_centroid centroids_;
uint64_t centroids_weight_;
size_t buffer_capacity_;
- vector_centroid buffer_;
- uint64_t buffered_weight_;
+ vector_t buffer_;
+
+ static const size_t BUFFER_MULTIPLIER = 4;
static const uint8_t PREAMBLE_LONGS_EMPTY_OR_SINGLE = 1;
static const uint8_t PREAMBLE_LONGS_MULTIPLE = 2;
@@ -222,11 +234,11 @@
enum flags { IS_EMPTY, IS_SINGLE_VALUE, REVERSE_MERGE };
bool is_single_value() const;
+ uint8_t get_preamble_longs() const;
+ void merge(vector_centroid& buffer, W weight);
// for deserialize
- tdigest(bool reverse_merge, uint16_t k, T min, T max, vector_centroid&& centroids, uint64_t total_weight_, const Allocator& allocator);
-
- void merge_buffered();
+ tdigest(bool reverse_merge, uint16_t k, T min, T max, vector_centroid&& centroids, uint64_t total_weight_, vector_t&& buffer);
static double weighted_average(double x1, double w1, double x2, double w2);
diff --git a/tdigest/include/tdigest_impl.hpp b/tdigest/include/tdigest_impl.hpp
index 1a48f88..f8af532 100644
--- a/tdigest/include/tdigest_impl.hpp
+++ b/tdigest/include/tdigest_impl.hpp
@@ -30,15 +30,14 @@
template<typename T, typename A>
tdigest<T, A>::tdigest(uint16_t k, const A& allocator):
-tdigest(false, k, std::numeric_limits<T>::infinity(), -std::numeric_limits<T>::infinity(), vector_centroid(allocator), 0, allocator)
+tdigest(false, k, std::numeric_limits<T>::infinity(), -std::numeric_limits<T>::infinity(), vector_centroid(allocator), 0, vector_t(allocator))
{}
template<typename T, typename A>
void tdigest<T, A>::update(T value) {
if (std::isnan(value)) return;
- if (buffer_.size() >= buffer_capacity_ - centroids_.size()) merge_buffered();
- buffer_.push_back(centroid(value, 1));
- ++buffered_weight_;
+ if (buffer_.size() == centroids_capacity_ * BUFFER_MULTIPLIER) compress();
+ buffer_.push_back(value);
min_ = std::min(min_, value);
max_ = std::max(max_, value);
}
@@ -46,22 +45,21 @@
template<typename T, typename A>
void tdigest<T, A>::merge(tdigest& other) {
if (other.is_empty()) return;
- size_t num = buffer_.size() + centroids_.size() + other.buffer_.size() + other.centroids_.size();
- buffer_.reserve(num);
- std::copy(other.buffer_.begin(), other.buffer_.end(), std::back_inserter(buffer_));
- std::copy(other.centroids_.begin(), other.centroids_.end(), std::back_inserter(buffer_));
- buffered_weight_ += other.get_total_weight();
- if (num > buffer_capacity_) {
- merge_buffered();
- } else {
- min_ = std::min(min_, other.get_min_value());
- max_ = std::max(max_, other.get_max_value());
- }
+ vector_centroid tmp(buffer_.get_allocator());
+ tmp.reserve(buffer_.size() + centroids_.size() + other.buffer_.size() + other.centroids_.size());
+ for (const T value: buffer_) tmp.push_back(centroid(value, 1));
+ for (const T value: other.buffer_) tmp.push_back(centroid(value, 1));
+ std::copy(other.centroids_.begin(), other.centroids_.end(), std::back_inserter(tmp));
+ merge(tmp, buffer_.size() + other.get_total_weight());
}
template<typename T, typename A>
void tdigest<T, A>::compress() {
- merge_buffered();
+ if (buffer_.size() == 0) return;
+ vector_centroid tmp(buffer_.get_allocator());
+ tmp.reserve(buffer_.size() + centroids_.size());
+ for (const T value: buffer_) tmp.push_back(centroid(value, 1));
+ merge(tmp, buffer_.size());
}
template<typename T, typename A>
@@ -83,7 +81,7 @@
template<typename T, typename A>
uint64_t tdigest<T, A>::get_total_weight() const {
- return centroids_weight_ + buffered_weight_;
+ return centroids_weight_ + buffer_.size();
}
template<typename T, typename A>
@@ -95,7 +93,7 @@
// one centroid and value == min_ == max_
if ((centroids_.size() + buffer_.size()) == 1) return 0.5;
- const_cast<tdigest*>(this)->merge_buffered(); // side effect
+ const_cast<tdigest*>(this)->compress(); // side effect
// left tail
const T first_mean = centroids_.front().get_mean();
@@ -149,7 +147,7 @@
if ((rank < 0.0) || (rank > 1.0)) {
throw std::invalid_argument("Normalized rank cannot be less than 0 or greater than 1");
}
- const_cast<tdigest*>(this)->merge_buffered(); // side effect
+ const_cast<tdigest*>(this)->compress(); // side effect
if (centroids_.size() == 1) return centroids_.front().get_mean();
// at least 2 centroids
@@ -204,13 +202,11 @@
std::ostringstream os;
os << "### t-Digest summary:" << std::endl;
os << " Nominal k : " << k_ << std::endl;
- os << " Internal k : " << internal_k_ << std::endl;
os << " Centroids : " << centroids_.size() << std::endl;
os << " Buffered : " << buffer_.size() << std::endl;
os << " Centroids capacity : " << centroids_capacity_ << std::endl;
- os << " Buffer capacity : " << buffer_capacity_ << std::endl;
+ os << " Buffer capacity : " << centroids_capacity_ * BUFFER_MULTIPLIER << std::endl;
os << " Centroids Weight : " << centroids_weight_ << std::endl;
- os << " Buffered Weight : " << buffered_weight_ << std::endl;
os << " Total Weight : " << get_total_weight() << std::endl;
os << " Reverse Merge : " << (reverse_merge_ ? "true" : "false") << std::endl;
if (!is_empty()) {
@@ -229,33 +225,33 @@
if (buffer_.size() > 0) {
os << "Buffer:" << std::endl;
int i = 0;
- for (const auto& b: buffer_) {
- os << i++ << ": " << b.get_mean() << ", " << b.get_weight() << std::endl;
+ for (const T value: buffer_) {
+ os << i++ << ": " << value << std::endl;
}
}
}
- return string<A>(os.str().c_str(), allocator_);
+ return string<A>(os.str().c_str(), buffer_.get_allocator());
}
+// assumes that there is enough room in the input buffer to add centroids from this tdigest
template<typename T, typename A>
-void tdigest<T, A>::merge_buffered() {
- if (buffered_weight_ == 0) return;
- std::copy(centroids_.begin(), centroids_.end(), std::back_inserter(buffer_));
+void tdigest<T, A>::merge(vector_centroid& buffer, W weight) {
+ std::copy(centroids_.begin(), centroids_.end(), std::back_inserter(buffer));
centroids_.clear();
- std::stable_sort(buffer_.begin(), buffer_.end(), centroid_cmp());
- if (reverse_merge_) std::reverse(buffer_.begin(), buffer_.end());
- centroids_weight_ += buffered_weight_;
- auto it = buffer_.begin();
+ std::stable_sort(buffer.begin(), buffer.end(), centroid_cmp());
+ if (reverse_merge_) std::reverse(buffer.begin(), buffer.end());
+ centroids_weight_ += weight;
+ auto it = buffer.begin();
centroids_.push_back(*it);
++it;
double weight_so_far = 0;
- while (it != buffer_.end()) {
+ while (it != buffer.end()) {
const double proposed_weight = centroids_.back().get_weight() + it->get_weight();
bool add_this = false;
- if (std::distance(buffer_.begin(), it) != 1 && std::distance(buffer_.end(), it) != 1) {
+ if (std::distance(buffer.begin(), it) != 1 && std::distance(buffer.end(), it) != 1) {
const double q0 = weight_so_far / centroids_weight_;
const double q2 = (weight_so_far + proposed_weight) / centroids_weight_;
- const double normalizer = scale_function().normalizer(internal_k_, centroids_weight_);
+ const double normalizer = scale_function().normalizer(2 * k_, centroids_weight_);
add_this = proposed_weight <= centroids_weight_ * std::min(scale_function().max(q0, normalizer), scale_function().max(q2, normalizer));
}
if (add_this) {
@@ -267,13 +263,10 @@
++it;
}
if (reverse_merge_) std::reverse(centroids_.begin(), centroids_.end());
- if (centroids_weight_ > 0) {
- min_ = std::min(min_, centroids_.front().get_mean());
- max_ = std::max(max_, centroids_.back().get_mean());
- }
+ min_ = std::min(min_, centroids_.front().get_mean());
+ max_ = std::max(max_, centroids_.back().get_mean());
reverse_merge_ = !reverse_merge_;
buffer_.clear();
- buffered_weight_ = 0;
}
template<typename T, typename A>
@@ -282,68 +275,76 @@
}
template<typename T, typename A>
-void tdigest<T, A>::serialize(std::ostream& os) const {
- const_cast<tdigest*>(this)->merge_buffered(); // side effect
- write(os, is_empty() || is_single_value() ? PREAMBLE_LONGS_EMPTY_OR_SINGLE : PREAMBLE_LONGS_MULTIPLE);
+void tdigest<T, A>::serialize(std::ostream& os, bool with_buffer) const {
+ if (!with_buffer) const_cast<tdigest*>(this)->compress(); // side effect
+ write(os, get_preamble_longs());
write(os, SERIAL_VERSION);
write(os, SKETCH_TYPE);
write(os, k_);
const uint8_t flags_byte(
- (is_empty() ? 1 << flags::IS_EMPTY : 0) |
- (is_single_value() ? 1 << flags::IS_SINGLE_VALUE : 0) |
- (reverse_merge_ ? 1 << flags::REVERSE_MERGE : 0)
+ (is_empty() ? 1 << flags::IS_EMPTY : 0)
+ | (is_single_value() ? 1 << flags::IS_SINGLE_VALUE : 0)
+ | (reverse_merge_ ? 1 << flags::REVERSE_MERGE : 0)
);
write(os, flags_byte);
write<uint16_t>(os, 0); // unused
-
if (is_empty()) return;
-
if (is_single_value()) {
write(os, min_);
return;
}
-
write(os, static_cast<uint32_t>(centroids_.size()));
- write<uint32_t>(os, 0); // unused
-
+ write(os, static_cast<uint32_t>(buffer_.size()));
write(os, min_);
write(os, max_);
- write(os, centroids_.data(), centroids_.size() * sizeof(centroid));
+ if (centroids_.size() > 0) write(os, centroids_.data(), centroids_.size() * sizeof(centroid));
+ if (buffer_.size() > 0) write(os, buffer_.data(), buffer_.size() * sizeof(T));
}
template<typename T, typename A>
-auto tdigest<T, A>::serialize(unsigned header_size_bytes) const -> vector_bytes {
- const_cast<tdigest*>(this)->merge_buffered(); // side effect
- const uint8_t preamble_longs = is_empty() || is_single_value() ? PREAMBLE_LONGS_EMPTY_OR_SINGLE : PREAMBLE_LONGS_MULTIPLE;
- const size_t size_bytes = preamble_longs * sizeof(uint64_t) +
- (is_empty() ? 0 : (is_single_value() ? sizeof(T) : sizeof(T) * 2 + sizeof(centroid) * centroids_.size()));
- vector_bytes bytes(size_bytes, 0, allocator_);
- uint8_t* ptr = bytes.data() + header_size_bytes;
+uint8_t tdigest<T, A>::get_preamble_longs() const {
+ return is_empty() || is_single_value() ? PREAMBLE_LONGS_EMPTY_OR_SINGLE : PREAMBLE_LONGS_MULTIPLE;
+}
- *ptr++ = preamble_longs;
+template<typename T, typename A>
+size_t tdigest<T, A>::get_serialized_size_bytes(bool with_buffer) const {
+ if (!with_buffer) const_cast<tdigest*>(this)->compress(); // side effect
+ size_t size_bytes = get_preamble_longs() * sizeof(uint64_t);
+ if (is_empty()) return size_bytes;
+ if (is_single_value()) return size_bytes + sizeof(T);
+ size_bytes += sizeof(T) * 2 // min and max
+ + sizeof(centroid) * centroids_.size();
+ if (with_buffer) size_bytes += sizeof(T) * buffer_.size(); // count is a part of preamble
+ return size_bytes;
+}
+
+template<typename T, typename A>
+auto tdigest<T, A>::serialize(unsigned header_size_bytes, bool with_buffer) const -> vector_bytes {
+ if (!with_buffer) const_cast<tdigest*>(this)->compress(); // side effect
+ vector_bytes bytes(get_serialized_size_bytes(with_buffer), 0, buffer_.get_allocator());
+ uint8_t* ptr = bytes.data() + header_size_bytes;
+ *ptr++ = get_preamble_longs();
*ptr++ = SERIAL_VERSION;
*ptr++ = SKETCH_TYPE;
ptr += copy_to_mem(k_, ptr);
const uint8_t flags_byte(
- (is_empty() ? 1 << flags::IS_EMPTY : 0) |
- (is_single_value() ? 1 << flags::IS_SINGLE_VALUE : 0) |
- (reverse_merge_ ? 1 << flags::REVERSE_MERGE : 0)
+ (is_empty() ? 1 << flags::IS_EMPTY : 0)
+ | (is_single_value() ? 1 << flags::IS_SINGLE_VALUE : 0)
+ | (reverse_merge_ ? 1 << flags::REVERSE_MERGE : 0)
);
*ptr++ = flags_byte;
ptr += 2; // unused
if (is_empty()) return bytes;
-
if (is_single_value()) {
copy_to_mem(min_, ptr);
return bytes;
}
-
ptr += copy_to_mem(static_cast<uint32_t>(centroids_.size()), ptr);
- ptr += 4; // unused
-
+ ptr += copy_to_mem(static_cast<uint32_t>(buffer_.size()), ptr);
ptr += copy_to_mem(min_, ptr);
ptr += copy_to_mem(max_, ptr);
- copy_to_mem(centroids_.data(), ptr, centroids_.size() * sizeof(centroid));
+ if (centroids_.size() > 0) ptr += copy_to_mem(centroids_.data(), ptr, centroids_.size() * sizeof(centroid));
+ if (buffer_.size() > 0) copy_to_mem(buffer_.data(), ptr, buffer_.size() * sizeof(T));
return bytes;
}
@@ -374,19 +375,21 @@
const bool reverse_merge = flags_byte & (1 << flags::REVERSE_MERGE);
if (is_single_value) {
const T value = read<T>(is);
- return tdigest(reverse_merge, k, value, value, vector_centroid(1, centroid(value, 1), allocator), 1, allocator);
+ return tdigest(reverse_merge, k, value, value, vector_centroid(1, centroid(value, 1), allocator), 1, vector_t(allocator));
}
const auto num_centroids = read<uint32_t>(is);
- read<uint32_t>(is); // unused
+ const auto num_buffered = read<uint32_t>(is);
const T min = read<T>(is);
const T max = read<T>(is);
vector_centroid centroids(num_centroids, centroid(0, 0), allocator);
- read(is, centroids.data(), num_centroids * sizeof(centroid));
- uint64_t total_weight = 0;
- for (const auto& c: centroids) total_weight += c.get_weight();
- return tdigest(reverse_merge, k, min, max, std::move(centroids), total_weight, allocator);
+ if (num_centroids > 0) read(is, centroids.data(), num_centroids * sizeof(centroid));
+ vector_t buffer(num_buffered, 0, allocator);
+ if (num_buffered > 0) read(is, buffer.data(), num_buffered * sizeof(T));
+ uint64_t weight = 0;
+ for (const auto& c: centroids) weight += c.get_weight();
+ return tdigest(reverse_merge, k, min, max, std::move(centroids), weight, std::move(buffer));
}
template<typename T, typename A>
@@ -423,24 +426,27 @@
ensure_minimum_memory(end_ptr - ptr, sizeof(T));
T value;
ptr += copy_from_mem(ptr, value);
- return tdigest(reverse_merge, k, value, value, vector_centroid(1, centroid(value, 1), allocator), 1, allocator);
+ return tdigest(reverse_merge, k, value, value, vector_centroid(1, centroid(value, 1), allocator), 1, vector_t(allocator));
}
ensure_minimum_memory(end_ptr - ptr, 8);
uint32_t num_centroids;
ptr += copy_from_mem(ptr, num_centroids);
- ptr += 4; // unused
+ uint32_t num_buffered;
+ ptr += copy_from_mem(ptr, num_buffered);
- ensure_minimum_memory(end_ptr - ptr, sizeof(T) * 2 + sizeof(centroid) * num_centroids);
+ ensure_minimum_memory(end_ptr - ptr, sizeof(T) * 2 + sizeof(centroid) * num_centroids + sizeof(T) * num_buffered);
T min;
ptr += copy_from_mem(ptr, min);
T max;
ptr += copy_from_mem(ptr, max);
vector_centroid centroids(num_centroids, centroid(0, 0), allocator);
- copy_from_mem(ptr, centroids.data(), sizeof(centroid) * num_centroids);
- uint64_t total_weight = 0;
- for (const auto& c: centroids) total_weight += c.get_weight();
- return tdigest(reverse_merge, k, min, max, std::move(centroids), total_weight, allocator);
+ if (num_centroids > 0) ptr += copy_from_mem(ptr, centroids.data(), num_centroids * sizeof(centroid));
+ vector_t buffer(num_buffered, 0, allocator);
+ if (num_buffered > 0) copy_from_mem(ptr, buffer.data(), num_buffered * sizeof(T));
+ uint64_t weight = 0;
+ for (const auto& c: centroids) weight += c.get_weight();
+ return tdigest(reverse_merge, k, min, max, std::move(centroids), weight, std::move(buffer));
}
// compatibility with the format of the reference implementation
@@ -466,7 +472,7 @@
c = centroid(mean, weight);
total_weight += weight;
}
- return tdigest(false, k, min, max, std::move(centroids), total_weight, allocator);
+ return tdigest(false, k, min, max, std::move(centroids), total_weight, vector_t(allocator));
}
// COMPAT_FLOAT: compatibility with asSmallBytes()
const auto min = read_big_endian<double>(is); // reference implementation uses doubles for min and max
@@ -484,7 +490,7 @@
c = centroid(mean, weight);
total_weight += weight;
}
- return tdigest(false, k, min, max, std::move(centroids), total_weight, allocator);
+ return tdigest(false, k, min, max, std::move(centroids), total_weight, vector_t(allocator));
}
// compatibility with the format of the reference implementation
@@ -526,7 +532,7 @@
c = centroid(mean, static_cast<W>(weight));
total_weight += static_cast<uint64_t>(weight);
}
- return tdigest(false, k, min, max, std::move(centroids), total_weight, allocator);
+ return tdigest(false, k, min, max, std::move(centroids), total_weight, vector_t(allocator));
}
// COMPAT_FLOAT: compatibility with asSmallBytes()
ensure_minimum_memory(end_ptr - ptr, sizeof(double) * 2 + sizeof(float) + sizeof(uint16_t) * 3);
@@ -558,7 +564,7 @@
c = centroid(mean, static_cast<W>(weight));
total_weight += static_cast<uint64_t>(weight);
}
- return tdigest(false, k, min, max, std::move(centroids), total_weight, allocator);
+ return tdigest(false, k, min, max, std::move(centroids), total_weight, vector_t(allocator));
}
template<typename T, typename A>
@@ -567,30 +573,21 @@
}
template<typename T, typename A>
-tdigest<T, A>::tdigest(bool reverse_merge, uint16_t k, T min, T max, vector_centroid&& centroids, uint64_t total_weight, const A& allocator):
-allocator_(allocator),
+tdigest<T, A>::tdigest(bool reverse_merge, uint16_t k, T min, T max, vector_centroid&& centroids, uint64_t weight, vector_t&& buffer):
reverse_merge_(reverse_merge),
k_(k),
-internal_k_(k),
min_(min),
max_(max),
centroids_capacity_(0),
centroids_(std::move(centroids)),
-centroids_weight_(total_weight),
-buffer_capacity_(0),
-buffer_(allocator),
-buffered_weight_(0)
+centroids_weight_(weight),
+buffer_(std::move(buffer))
{
if (k < 10) throw std::invalid_argument("k must be at least 10");
const size_t fudge = k < 30 ? 30 : 10;
centroids_capacity_ = 2 * k_ + fudge;
- buffer_capacity_ = 5 * centroids_capacity_;
- const double scale = std::max(1.0, static_cast<double>(buffer_capacity_) / centroids_capacity_ - 1.0);
- internal_k_ = std::ceil(std::sqrt(scale) * k_);
- centroids_capacity_ = std::max(centroids_capacity_, internal_k_ + fudge);
- buffer_capacity_ = std::max(buffer_capacity_, 2 * centroids_capacity_);
centroids_.reserve(centroids_capacity_);
- buffer_.reserve(buffer_capacity_);
+ buffer_.reserve(centroids_capacity_ * BUFFER_MULTIPLIER);
}
} /* namespace datasketches */
diff --git a/tdigest/test/tdigest_serialize_for_java.cpp b/tdigest/test/tdigest_serialize_for_java.cpp
index 1f3c1fb..fe0a9d4 100644
--- a/tdigest/test/tdigest_serialize_for_java.cpp
+++ b/tdigest/test/tdigest_serialize_for_java.cpp
@@ -34,6 +34,16 @@
}
}
+TEST_CASE("tdigest double generate with buffer", "[serialize_for_java]") {
+ const unsigned n_arr[] = {0, 1, 10, 100, 1000, 10000, 100000, 1000000};
+ for (const unsigned n: n_arr) {
+ tdigest_double td(100);
+ for (unsigned i = 1; i <= n; ++i) td.update(i);
+ std::ofstream os("tdigest_double_buf_n" + std::to_string(n) + "_cpp.sk", std::ios::binary);
+ td.serialize(os, true);
+ }
+}
+
TEST_CASE("tdigest float generate", "[serialize_for_java]") {
const unsigned n_arr[] = {0, 1, 10, 100, 1000, 10000, 100000, 1000000};
for (const unsigned n: n_arr) {
@@ -44,4 +54,14 @@
}
}
+TEST_CASE("tdigest float generate with buffer", "[serialize_for_java]") {
+ const unsigned n_arr[] = {0, 1, 10, 100, 1000, 10000, 100000, 1000000};
+ for (const unsigned n: n_arr) {
+ tdigest_float td(100);
+ for (unsigned i = 1; i <= n; ++i) td.update(i);
+ std::ofstream os("tdigest_float_buf_n" + std::to_string(n) + "_cpp.sk", std::ios::binary);
+ td.serialize(os, true);
+ }
+}
+
} /* namespace datasketches */
diff --git a/tdigest/test/tdigest_test.cpp b/tdigest/test/tdigest_test.cpp
index ac52f55..bf64dbb 100644
--- a/tdigest/test/tdigest_test.cpp
+++ b/tdigest/test/tdigest_test.cpp
@@ -54,8 +54,9 @@
TEST_CASE("many values", "[tdigest]") {
const size_t n = 10000;
- tdigest_double td(100);
+ tdigest_double td;
for (size_t i = 0; i < n; ++i) td.update(i);
+// std::cout << td.to_string(true);
// td.compress();
// std::cout << td.to_string(true);
REQUIRE_FALSE(td.is_empty());
@@ -68,7 +69,7 @@
REQUIRE(td.get_rank(n * 3 / 4) == Approx(0.75).margin(0.0001));
REQUIRE(td.get_rank(n) == 1);
REQUIRE(td.get_quantile(0) == 0);
- REQUIRE(td.get_quantile(0.5) == Approx(n / 2).epsilon(0.01));
+ REQUIRE(td.get_quantile(0.5) == Approx(n / 2).epsilon(0.03));
REQUIRE(td.get_quantile(0.9) == Approx(n * 0.9).epsilon(0.01));
REQUIRE(td.get_quantile(0.95) == Approx(n * 0.95).epsilon(0.01));
REQUIRE(td.get_quantile(1) == n - 1);
@@ -137,12 +138,14 @@
TEST_CASE("merge large", "[tdigest]") {
const size_t n = 10000;
- tdigest_double td1(100);
- tdigest_double td2(100);
+ tdigest_double td1;
+ tdigest_double td2;
for (size_t i = 0; i < n / 2; ++i) {
td1.update(i);
td2.update(n / 2 + i);
}
+// std::cout << td1.to_string();
+// std::cout << td2.to_string();
td1.merge(td2);
// td1.compress();
// std::cout << td1.to_string(true);
@@ -179,6 +182,19 @@
REQUIRE(deserialized_td.get_max_value() == 123);
}
+TEST_CASE("serialize deserialize stream single value buffered", "[tdigest]") {
+ tdigest<double> td;
+ td.update(123);
+ std::stringstream s(std::ios::in | std::ios::out | std::ios::binary);
+ td.serialize(s, true);
+ auto deserialized_td = tdigest<double>::deserialize(s);
+ REQUIRE(deserialized_td.get_k() == 200);
+ REQUIRE(deserialized_td.get_total_weight() == 1);
+ REQUIRE_FALSE(deserialized_td.is_empty());
+ REQUIRE(deserialized_td.get_min_value() == 123);
+ REQUIRE(deserialized_td.get_max_value() == 123);
+}
+
TEST_CASE("serialize deserialize stream many values", "[tdigest]") {
tdigest<double> td(100);
for (int i = 0; i < 1000; ++i) td.update(i);
@@ -194,6 +210,21 @@
REQUIRE(td.get_quantile(0.5) == deserialized_td.get_quantile(0.5));
}
+TEST_CASE("serialize deserialize stream many values with buffer", "[tdigest]") {
+ tdigest<double> td(100);
+ for (int i = 0; i < 10000; ++i) td.update(i);
+ std::stringstream s(std::ios::in | std::ios::out | std::ios::binary);
+ td.serialize(s, true);
+ auto deserialized_td = tdigest<double>::deserialize(s);
+ REQUIRE(td.get_k() == deserialized_td.get_k());
+ REQUIRE(td.get_total_weight() == deserialized_td.get_total_weight());
+ REQUIRE(td.is_empty() == deserialized_td.is_empty());
+ REQUIRE(td.get_min_value() == deserialized_td.get_min_value());
+ REQUIRE(td.get_max_value() == deserialized_td.get_max_value());
+ REQUIRE(td.get_rank(500) == deserialized_td.get_rank(500));
+ REQUIRE(td.get_quantile(0.5) == deserialized_td.get_quantile(0.5));
+}
+
TEST_CASE("serialize deserialize bytes empty", "[tdigest]") {
tdigest<double> td(100);
auto bytes = td.serialize();
@@ -215,6 +246,18 @@
REQUIRE(deserialized_td.get_max_value() == 123);
}
+TEST_CASE("serialize deserialize bytes single value buffered", "[tdigest]") {
+ tdigest<double> td(200);
+ td.update(123);
+ auto bytes = td.serialize(0, true);
+ auto deserialized_td = tdigest<double>::deserialize(bytes.data(), bytes.size());
+ REQUIRE(deserialized_td.get_k() == 200);
+ REQUIRE(deserialized_td.get_total_weight() == 1);
+ REQUIRE_FALSE(deserialized_td.is_empty());
+ REQUIRE(deserialized_td.get_min_value() == 123);
+ REQUIRE(deserialized_td.get_max_value() == 123);
+}
+
TEST_CASE("serialize deserialize bytes many values", "[tdigest]") {
tdigest<double> td(100);
for (int i = 0; i < 1000; ++i) td.update(i);
@@ -229,6 +272,20 @@
REQUIRE(td.get_quantile(0.5) == deserialized_td.get_quantile(0.5));
}
+TEST_CASE("serialize deserialize bytes many values with buffer", "[tdigest]") {
+ tdigest<double> td(100);
+ for (int i = 0; i < 10000; ++i) td.update(i);
+ auto bytes = td.serialize();
+ auto deserialized_td = tdigest<double>::deserialize(bytes.data(), bytes.size());
+ REQUIRE(td.get_k() == deserialized_td.get_k());
+ REQUIRE(td.get_total_weight() == deserialized_td.get_total_weight());
+ REQUIRE(td.is_empty() == deserialized_td.is_empty());
+ REQUIRE(td.get_min_value() == deserialized_td.get_min_value());
+ REQUIRE(td.get_max_value() == deserialized_td.get_max_value());
+ REQUIRE(td.get_rank(500) == deserialized_td.get_rank(500));
+ REQUIRE(td.get_quantile(0.5) == deserialized_td.get_quantile(0.5));
+}
+
TEST_CASE("serialize deserialize steam and bytes equivalence empty", "[tdigest]") {
tdigest<double> td(100);
std::stringstream s(std::ios::in | std::ios::out | std::ios::binary);
@@ -287,6 +344,40 @@
REQUIRE(deserialized_td1.get_quantile(0.5) == deserialized_td2.get_quantile(0.5));
}
+TEST_CASE("serialize deserialize steam and bytes equivalence with buffer", "[tdigest]") {
+ tdigest<double> td(100);
+ const int n = 10000;
+ for (int i = 0; i < n; ++i) td.update(i);
+ std::stringstream s(std::ios::in | std::ios::out | std::ios::binary);
+ td.serialize(s, true);
+ auto bytes = td.serialize(0, true);
+
+ REQUIRE(bytes.size() == static_cast<size_t>(s.tellp()));
+ for (size_t i = 0; i < bytes.size(); ++i) {
+ REQUIRE(((char*)bytes.data())[i] == (char)s.get());
+ }
+
+ s.seekg(0); // rewind
+ auto deserialized_td1 = tdigest<double>::deserialize(s);
+ auto deserialized_td2 = tdigest<double>::deserialize(bytes.data(), bytes.size());
+ REQUIRE(bytes.size() == static_cast<size_t>(s.tellg()));
+
+ REQUIRE_FALSE(deserialized_td1.is_empty());
+ REQUIRE(deserialized_td1.get_k() == 100);
+ REQUIRE(deserialized_td1.get_total_weight() == n);
+ REQUIRE(deserialized_td1.get_min_value() == 0);
+ REQUIRE(deserialized_td1.get_max_value() == n - 1);
+
+ REQUIRE_FALSE(deserialized_td2.is_empty());
+ REQUIRE(deserialized_td2.get_k() == 100);
+ REQUIRE(deserialized_td2.get_total_weight() == n);
+ REQUIRE(deserialized_td2.get_min_value() == 0);
+ REQUIRE(deserialized_td2.get_max_value() == n - 1);
+
+ REQUIRE(deserialized_td1.get_rank(n / 2) == deserialized_td2.get_rank(n / 2));
+ REQUIRE(deserialized_td1.get_quantile(0.5) == deserialized_td2.get_quantile(0.5));
+}
+
TEST_CASE("deserialize from reference implementation stream double", "[tdigest]") {
std::ifstream is;
is.exceptions(std::ios::failbit | std::ios::badbit);