Merge pull request #181 from apache/fi_allocator
support passing an allocator instance
diff --git a/CMakeLists.txt b/CMakeLists.txt
index ff1d4a5..d3c1fef 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -96,6 +96,7 @@
add_subdirectory(theta)
add_subdirectory(sampling)
add_subdirectory(tuple)
+add_subdirectory(req)
if (WITH_PYTHON)
add_subdirectory(python)
diff --git a/common/include/memory_operations.hpp b/common/include/memory_operations.hpp
index 80dc3a3..986b2b0 100644
--- a/common/include/memory_operations.hpp
+++ b/common/include/memory_operations.hpp
@@ -52,6 +52,18 @@
return size;
}
+template<typename T>
+static inline size_t copy_to_mem(const T& item, void* dst) {
+ memcpy(dst, &item, sizeof(T));
+ return sizeof(T);
+}
+
+template<typename T>
+static inline size_t copy_from_mem(const void* src, T& item) {
+ memcpy(&item, src, sizeof(T));
+ return sizeof(T);
+}
+
} // namespace
#endif // _MEMORY_OPERATIONS_HPP_
diff --git a/kll/include/kll_quantile_calculator.hpp b/kll/include/kll_quantile_calculator.hpp
index bc60f26..5114399 100644
--- a/kll/include/kll_quantile_calculator.hpp
+++ b/kll/include/kll_quantile_calculator.hpp
@@ -28,7 +28,7 @@
class kll_quantile_calculator {
public:
// assumes that all levels are sorted including level 0
- kll_quantile_calculator(const T* items, const uint32_t* levels, uint8_t num_levels, uint64_t n);
+ kll_quantile_calculator(const T* items, const uint32_t* levels, uint8_t num_levels, uint64_t n, const A& allocator);
T get_quantile(double fraction) const;
private:
diff --git a/kll/include/kll_quantile_calculator_impl.hpp b/kll/include/kll_quantile_calculator_impl.hpp
index f580819..23efa4d 100644
--- a/kll/include/kll_quantile_calculator_impl.hpp
+++ b/kll/include/kll_quantile_calculator_impl.hpp
@@ -29,8 +29,8 @@
namespace datasketches {
template <typename T, typename C, typename A>
-kll_quantile_calculator<T, C, A>::kll_quantile_calculator(const T* items, const uint32_t* levels, uint8_t num_levels, uint64_t n):
-n_(n), levels_(num_levels + 1)
+kll_quantile_calculator<T, C, A>::kll_quantile_calculator(const T* items, const uint32_t* levels, uint8_t num_levels, uint64_t n, const A& allocator):
+n_(n), levels_(num_levels + 1, 0, allocator), entries_(allocator)
{
const uint32_t num_items = levels[num_levels] - levels[0];
entries_.reserve(num_items);
@@ -116,7 +116,7 @@
template <typename T, typename C, typename A>
void kll_quantile_calculator<T, C, A>::merge_sorted_blocks(Container& entries, const uint32_t* levels, uint8_t num_levels, uint32_t num_items) {
if (num_levels == 1) return;
- Container temporary;
+ Container temporary(entries.get_allocator());
temporary.reserve(num_items);
merge_sorted_blocks_direct(entries, temporary, levels, 0, num_levels);
}
diff --git a/kll/include/kll_sketch.hpp b/kll/include/kll_sketch.hpp
index a4530c9..af36ceb 100644
--- a/kll/include/kll_sketch.hpp
+++ b/kll/include/kll_sketch.hpp
@@ -161,7 +161,7 @@
static const uint16_t MIN_K = DEFAULT_M;
static const uint16_t MAX_K = (1 << 16) - 1;
- explicit kll_sketch(uint16_t k = DEFAULT_K);
+ explicit kll_sketch(uint16_t k = DEFAULT_K, const A& allocator = A());
kll_sketch(const kll_sketch& other);
kll_sketch(kll_sketch&& other) noexcept;
~kll_sketch();
@@ -401,7 +401,7 @@
* @param is input stream
* @return an instance of a sketch
*/
- static kll_sketch deserialize(std::istream& is);
+ static kll_sketch<T, C, S, A> deserialize(std::istream& is, const A& allocator = A());
/**
* This method deserializes a sketch from a given array of bytes.
@@ -409,7 +409,7 @@
* @param size the size of the array
* @return an instance of a sketch
*/
- static kll_sketch deserialize(const void* bytes, size_t size);
+ static kll_sketch<T, C, S, A> deserialize(const void* bytes, size_t size, const A& allocator = A());
/*
* Gets the normalized rank error given k and pmf.
@@ -461,6 +461,7 @@
static const uint8_t PREAMBLE_INTS_SHORT = 2; // for empty and single item
static const uint8_t PREAMBLE_INTS_FULL = 5;
+ A allocator_;
uint16_t k_;
uint8_t m_; // minimum buffer "width"
uint16_t min_k_; // for error estimation after merging with different k
diff --git a/kll/include/kll_sketch_impl.hpp b/kll/include/kll_sketch_impl.hpp
index f0c5ff3..90d3be7 100644
--- a/kll/include/kll_sketch_impl.hpp
+++ b/kll/include/kll_sketch_impl.hpp
@@ -30,13 +30,14 @@
namespace datasketches {
template<typename T, typename C, typename S, typename A>
-kll_sketch<T, C, S, A>::kll_sketch(uint16_t k):
+kll_sketch<T, C, S, A>::kll_sketch(uint16_t k, const A& allocator):
+allocator_(allocator),
k_(k),
m_(DEFAULT_M),
min_k_(k),
n_(0),
num_levels_(1),
-levels_(2),
+levels_(2, 0, allocator),
items_(nullptr),
items_size_(k_),
min_value_(nullptr),
@@ -47,11 +48,12 @@
throw std::invalid_argument("K must be >= " + std::to_string(MIN_K) + " and <= " + std::to_string(MAX_K) + ": " + std::to_string(k));
}
levels_[0] = levels_[1] = k;
- items_ = A().allocate(items_size_);
+ items_ = allocator_.allocate(items_size_);
}
template<typename T, typename C, typename S, typename A>
kll_sketch<T, C, S, A>::kll_sketch(const kll_sketch& other):
+allocator_(other.allocator_),
k_(other.k_),
m_(other.m_),
min_k_(other.min_k_),
@@ -64,14 +66,15 @@
max_value_(nullptr),
is_level_zero_sorted_(other.is_level_zero_sorted_)
{
- items_ = A().allocate(items_size_);
+ items_ = allocator_.allocate(items_size_);
std::copy(&other.items_[levels_[0]], &other.items_[levels_[num_levels_]], &items_[levels_[0]]);
- if (other.min_value_ != nullptr) min_value_ = new (A().allocate(1)) T(*other.min_value_);
- if (other.max_value_ != nullptr) max_value_ = new (A().allocate(1)) T(*other.max_value_);
+ if (other.min_value_ != nullptr) min_value_ = new (allocator_.allocate(1)) T(*other.min_value_);
+ if (other.max_value_ != nullptr) max_value_ = new (allocator_.allocate(1)) T(*other.max_value_);
}
template<typename T, typename C, typename S, typename A>
kll_sketch<T, C, S, A>::kll_sketch(kll_sketch&& other) noexcept:
+allocator_(std::move(other.allocator_)),
k_(other.k_),
m_(other.m_),
min_k_(other.min_k_),
@@ -91,7 +94,8 @@
template<typename T, typename C, typename S, typename A>
kll_sketch<T, C, S, A>& kll_sketch<T, C, S, A>::operator=(const kll_sketch& other) {
- kll_sketch copy(other);
+ kll_sketch<T, C, S, A> copy(other);
+ std::swap(allocator_, copy.allocator_);
std::swap(k_, copy.k_);
std::swap(m_, copy.m_);
std::swap(min_k_, copy.min_k_);
@@ -108,6 +112,7 @@
template<typename T, typename C, typename S, typename A>
kll_sketch<T, C, S, A>& kll_sketch<T, C, S, A>::operator=(kll_sketch&& other) {
+ std::swap(allocator_, other.allocator_);
std::swap(k_, other.k_);
std::swap(m_, other.m_);
std::swap(min_k_, other.min_k_);
@@ -128,15 +133,15 @@
const uint32_t begin = levels_[0];
const uint32_t end = levels_[num_levels_];
for (uint32_t i = begin; i < end; i++) items_[i].~T();
- A().deallocate(items_, items_size_);
+ allocator_.deallocate(items_, items_size_);
}
if (min_value_ != nullptr) {
min_value_->~T();
- A().deallocate(min_value_, 1);
+ allocator_.deallocate(min_value_, 1);
}
if (max_value_ != nullptr) {
max_value_->~T();
- A().deallocate(max_value_, 1);
+ allocator_.deallocate(max_value_, 1);
}
}
@@ -159,8 +164,8 @@
template<typename T, typename C, typename S, typename A>
void kll_sketch<T, C, S, A>::update_min_max(const T& value) {
if (is_empty()) {
- min_value_ = new (A().allocate(1)) T(value);
- max_value_ = new (A().allocate(1)) T(value);
+ min_value_ = new (allocator_.allocate(1)) T(value);
+ max_value_ = new (allocator_.allocate(1)) T(value);
} else {
if (C()(value, *min_value_)) *min_value_ = value;
if (C()(*max_value_, value)) *max_value_ = value;
@@ -182,8 +187,8 @@
throw std::invalid_argument("incompatible M: " + std::to_string(m_) + " and " + std::to_string(other.m_));
}
if (is_empty()) {
- min_value_ = new (A().allocate(1)) T(*other.min_value_);
- max_value_ = new (A().allocate(1)) T(*other.max_value_);
+ min_value_ = new (allocator_.allocate(1)) T(*other.min_value_);
+ max_value_ = new (allocator_.allocate(1)) T(*other.max_value_);
} else {
if (C()(*other.min_value_, *min_value_)) *min_value_ = *other.min_value_;
if (C()(*max_value_, *other.max_value_)) *max_value_ = *other.max_value_;
@@ -206,8 +211,8 @@
throw std::invalid_argument("incompatible M: " + std::to_string(m_) + " and " + std::to_string(other.m_));
}
if (is_empty()) {
- min_value_ = new (A().allocate(1)) T(std::move(*other.min_value_));
- max_value_ = new (A().allocate(1)) T(std::move(*other.max_value_));
+ min_value_ = new (allocator_.allocate(1)) T(std::move(*other.min_value_));
+ max_value_ = new (allocator_.allocate(1)) T(std::move(*other.max_value_));
} else {
if (C()(*other.min_value_, *min_value_)) *min_value_ = std::move(*other.min_value_);
if (C()(*max_value_, *other.max_value_)) *max_value_ = std::move(*other.max_value_);
@@ -270,8 +275,7 @@
template<typename T, typename C, typename S, typename A>
std::vector<T, A> kll_sketch<T, C, S, A>::get_quantiles(const double* fractions, uint32_t size) const {
- std::vector<T, A> quantiles;
- quantiles.reserve(size);
+ std::vector<T, A> quantiles(allocator_);
if (is_empty()) return quantiles;
std::unique_ptr<kll_quantile_calculator<T, C, A>, std::function<void(kll_quantile_calculator<T, C, A>*)>> quantile_calculator;
quantiles.reserve(size);
@@ -295,11 +299,11 @@
template<typename T, typename C, typename S, typename A>
std::vector<T, A> kll_sketch<T, C, S, A>::get_quantiles(size_t num) const {
- if (is_empty()) return std::vector<T, A>();
+ if (is_empty()) return std::vector<T, A>(allocator_);
if (num == 0) {
throw std::invalid_argument("num must be > 0");
}
- std::vector<double> fractions(num);
+ vector_d<A> fractions(num, 0, allocator_);
fractions[0] = 0.0;
for (size_t i = 1; i < num; i++) {
fractions[i] = static_cast<double>(i) / (num - 1);
@@ -411,7 +415,7 @@
vector_u8<A> kll_sketch<T, C, S, A>::serialize(unsigned header_size_bytes) const {
const bool is_single_item = n_ == 1;
const size_t size = header_size_bytes + get_serialized_size_bytes();
- vector_u8<A> bytes(size);
+ vector_u8<A> bytes(size, 0, allocator_);
uint8_t* ptr = bytes.data() + header_size_bytes;
const uint8_t* end_ptr = ptr + size;
const uint8_t preamble_ints(is_empty() || is_single_item ? PREAMBLE_INTS_SHORT : PREAMBLE_INTS_FULL);
@@ -449,7 +453,7 @@
}
template<typename T, typename C, typename S, typename A>
-kll_sketch<T, C, S, A> kll_sketch<T, C, S, A>::deserialize(std::istream& is) {
+kll_sketch<T, C, S, A> kll_sketch<T, C, S, A>::deserialize(std::istream& is, const A& allocator) {
uint8_t preamble_ints;
is.read((char*)&preamble_ints, sizeof(preamble_ints));
uint8_t serial_version;
@@ -472,7 +476,7 @@
if (!is.good()) throw std::runtime_error("error reading from std::istream");
const bool is_empty(flags_byte & (1 << flags::IS_EMPTY));
- if (is_empty) return kll_sketch(k);
+ if (is_empty) return kll_sketch(k, allocator);
uint64_t n;
uint16_t min_k;
@@ -488,7 +492,7 @@
is.read((char*)&num_levels, sizeof(num_levels));
is.read((char*)&unused, sizeof(unused));
}
- vector_u32<A> levels(num_levels + 1);
+ vector_u32<A> levels(num_levels + 1, 0, allocator);
const uint32_t capacity(kll_helper::compute_total_capacity(k, m, num_levels));
if (is_single_item) {
levels[0] = capacity - 1;
@@ -497,41 +501,43 @@
is.read((char*)levels.data(), sizeof(levels[0]) * num_levels);
}
levels[num_levels] = capacity;
- auto item_buffer_deleter = [](T* ptr) { A().deallocate(ptr, 1); };
- std::unique_ptr<T, decltype(item_buffer_deleter)> min_value_buffer(A().allocate(1), item_buffer_deleter);
- std::unique_ptr<T, decltype(item_buffer_deleter)> max_value_buffer(A().allocate(1), item_buffer_deleter);
- std::unique_ptr<T, item_deleter> min_value;
- std::unique_ptr<T, item_deleter> max_value;
+ A alloc(allocator);
+ auto item_buffer_deleter = [&alloc](T* ptr) { alloc.deallocate(ptr, 1); };
+ std::unique_ptr<T, decltype(item_buffer_deleter)> min_value_buffer(alloc.allocate(1), item_buffer_deleter);
+ std::unique_ptr<T, decltype(item_buffer_deleter)> max_value_buffer(alloc.allocate(1), item_buffer_deleter);
+ std::unique_ptr<T, item_deleter> min_value(nullptr, item_deleter(allocator));
+ std::unique_ptr<T, item_deleter> max_value(nullptr, item_deleter(allocator));
if (!is_single_item) {
S().deserialize(is, min_value_buffer.get(), 1);
// serde call did not throw, repackage with destrtuctor
- min_value = std::unique_ptr<T, item_deleter>(min_value_buffer.release(), item_deleter());
+ min_value = std::unique_ptr<T, item_deleter>(min_value_buffer.release(), item_deleter(allocator));
S().deserialize(is, max_value_buffer.get(), 1);
// serde call did not throw, repackage with destrtuctor
- max_value = std::unique_ptr<T, item_deleter>(max_value_buffer.release(), item_deleter());
+ max_value = std::unique_ptr<T, item_deleter>(max_value_buffer.release(), item_deleter(allocator));
}
- auto items_buffer_deleter = [capacity](T* ptr) { A().deallocate(ptr, capacity); };
- std::unique_ptr<T, decltype(items_buffer_deleter)> items_buffer(A().allocate(capacity), items_buffer_deleter);
+ auto items_buffer_deleter = [capacity, &alloc](T* ptr) { alloc.deallocate(ptr, capacity); };
+ std::unique_ptr<T, decltype(items_buffer_deleter)> items_buffer(alloc.allocate(capacity), items_buffer_deleter);
const auto num_items = levels[num_levels] - levels[0];
S().deserialize(is, &items_buffer.get()[levels[0]], num_items);
// serde call did not throw, repackage with destrtuctors
- std::unique_ptr<T, items_deleter> items(items_buffer.release(), items_deleter(levels[0], capacity));
+ std::unique_ptr<T, items_deleter> items(items_buffer.release(), items_deleter(levels[0], capacity, allocator));
const bool is_level_zero_sorted = (flags_byte & (1 << flags::IS_LEVEL_ZERO_SORTED)) > 0;
if (is_single_item) {
new (min_value_buffer.get()) T(items.get()[levels[0]]);
// copy did not throw, repackage with destrtuctor
- min_value = std::unique_ptr<T, item_deleter>(min_value_buffer.release(), item_deleter());
+ min_value = std::unique_ptr<T, item_deleter>(min_value_buffer.release(), item_deleter(allocator));
new (max_value_buffer.get()) T(items.get()[levels[0]]);
// copy did not throw, repackage with destrtuctor
- max_value = std::unique_ptr<T, item_deleter>(max_value_buffer.release(), item_deleter());
+ max_value = std::unique_ptr<T, item_deleter>(max_value_buffer.release(), item_deleter(allocator));
}
- if (!is.good()) throw std::runtime_error("error reading from std::istream");
+ if (!is.good())
+ throw std::runtime_error("error reading from std::istream");
return kll_sketch(k, min_k, n, num_levels, std::move(levels), std::move(items), capacity,
std::move(min_value), std::move(max_value), is_level_zero_sorted);
}
template<typename T, typename C, typename S, typename A>
-kll_sketch<T, C, S, A> kll_sketch<T, C, S, A>::deserialize(const void* bytes, size_t size) {
+kll_sketch<T, C, S, A> kll_sketch<T, C, S, A>::deserialize(const void* bytes, size_t size, const A& allocator) {
ensure_minimum_memory(size, 8);
const char* ptr = static_cast<const char*>(bytes);
uint8_t preamble_ints;
@@ -555,7 +561,7 @@
ensure_minimum_memory(size, 1 << preamble_ints);
const bool is_empty(flags_byte & (1 << flags::IS_EMPTY));
- if (is_empty) return kll_sketch<T, C, S, A>(k);
+ if (is_empty) return kll_sketch<T, C, S, A>(k, allocator);
uint64_t n;
uint16_t min_k;
@@ -572,7 +578,7 @@
ptr += copy_from_mem(ptr, &num_levels, sizeof(num_levels));
ptr++; // skip unused byte
}
- vector_u32<A> levels(num_levels + 1);
+ vector_u32<A> levels(num_levels + 1, 0, allocator);
const uint32_t capacity(kll_helper::compute_total_capacity(k, m, num_levels));
if (is_single_item) {
levels[0] = capacity - 1;
@@ -581,35 +587,36 @@
ptr += copy_from_mem(ptr, levels.data(), sizeof(levels[0]) * num_levels);
}
levels[num_levels] = capacity;
- auto item_buffer_deleter = [](T* ptr) { A().deallocate(ptr, 1); };
- std::unique_ptr<T, decltype(item_buffer_deleter)> min_value_buffer(A().allocate(1), item_buffer_deleter);
- std::unique_ptr<T, decltype(item_buffer_deleter)> max_value_buffer(A().allocate(1), item_buffer_deleter);
- std::unique_ptr<T, item_deleter> min_value;
- std::unique_ptr<T, item_deleter> max_value;
+ A alloc(allocator);
+ auto item_buffer_deleter = [&alloc](T* ptr) { alloc.deallocate(ptr, 1); };
+ std::unique_ptr<T, decltype(item_buffer_deleter)> min_value_buffer(alloc.allocate(1), item_buffer_deleter);
+ std::unique_ptr<T, decltype(item_buffer_deleter)> max_value_buffer(alloc.allocate(1), item_buffer_deleter);
+ std::unique_ptr<T, item_deleter> min_value(nullptr, item_deleter(allocator));
+ std::unique_ptr<T, item_deleter> max_value(nullptr, item_deleter(allocator));
if (!is_single_item) {
ptr += S().deserialize(ptr, end_ptr - ptr, min_value_buffer.get(), 1);
// serde call did not throw, repackage with destrtuctor
- min_value = std::unique_ptr<T, item_deleter>(min_value_buffer.release(), item_deleter());
+ min_value = std::unique_ptr<T, item_deleter>(min_value_buffer.release(), item_deleter(allocator));
ptr += S().deserialize(ptr, end_ptr - ptr, max_value_buffer.get(), 1);
// serde call did not throw, repackage with destrtuctor
- max_value = std::unique_ptr<T, item_deleter>(max_value_buffer.release(), item_deleter());
+ max_value = std::unique_ptr<T, item_deleter>(max_value_buffer.release(), item_deleter(allocator));
}
- auto items_buffer_deleter = [capacity](T* ptr) { A().deallocate(ptr, capacity); };
- std::unique_ptr<T, decltype(items_buffer_deleter)> items_buffer(A().allocate(capacity), items_buffer_deleter);
+ auto items_buffer_deleter = [capacity, &alloc](T* ptr) { alloc.deallocate(ptr, capacity); };
+ std::unique_ptr<T, decltype(items_buffer_deleter)> items_buffer(alloc.allocate(capacity), items_buffer_deleter);
const auto num_items = levels[num_levels] - levels[0];
ptr += S().deserialize(ptr, end_ptr - ptr, &items_buffer.get()[levels[0]], num_items);
// serde call did not throw, repackage with destrtuctors
- std::unique_ptr<T, items_deleter> items(items_buffer.release(), items_deleter(levels[0], capacity));
+ std::unique_ptr<T, items_deleter> items(items_buffer.release(), items_deleter(levels[0], capacity, allocator));
const size_t delta = ptr - static_cast<const char*>(bytes);
if (delta != size) throw std::logic_error("deserialized size mismatch: " + std::to_string(delta) + " != " + std::to_string(size));
const bool is_level_zero_sorted = (flags_byte & (1 << flags::IS_LEVEL_ZERO_SORTED)) > 0;
if (is_single_item) {
new (min_value_buffer.get()) T(items.get()[levels[0]]);
// copy did not throw, repackage with destrtuctor
- min_value = std::unique_ptr<T, item_deleter>(min_value_buffer.release(), item_deleter());
+ min_value = std::unique_ptr<T, item_deleter>(min_value_buffer.release(), item_deleter(allocator));
new (max_value_buffer.get()) T(items.get()[levels[0]]);
// copy did not throw, repackage with destrtuctor
- max_value = std::unique_ptr<T, item_deleter>(max_value_buffer.release(), item_deleter());
+ max_value = std::unique_ptr<T, item_deleter>(max_value_buffer.release(), item_deleter(allocator));
}
return kll_sketch(k, min_k, n, num_levels, std::move(levels), std::move(items), capacity,
std::move(min_value), std::move(max_value), is_level_zero_sorted);
@@ -634,6 +641,7 @@
kll_sketch<T, C, S, A>::kll_sketch(uint16_t k, uint16_t min_k, uint64_t n, uint8_t num_levels, vector_u32<A>&& levels,
std::unique_ptr<T, items_deleter> items, uint32_t items_size, std::unique_ptr<T, item_deleter> min_value,
std::unique_ptr<T, item_deleter> max_value, bool is_level_zero_sorted):
+allocator_(levels.get_allocator()),
k_(k),
m_(DEFAULT_M),
min_k_(min_k),
@@ -735,9 +743,9 @@
const uint32_t new_total_cap = cur_total_cap + delta_cap;
// move (and shift) the current data into the new buffer
- T* new_buf = A().allocate(new_total_cap);
+ T* new_buf = allocator_.allocate(new_total_cap);
kll_helper::move_construct<T>(items_, 0, cur_total_cap, new_buf, delta_cap, true);
- A().deallocate(items_, items_size_);
+ allocator_.deallocate(items_, items_size_);
items_ = new_buf;
items_size_ = new_total_cap;
@@ -763,19 +771,20 @@
template<typename T, typename C, typename S, typename A>
std::unique_ptr<kll_quantile_calculator<T, C, A>, std::function<void(kll_quantile_calculator<T, C, A>*)>> kll_sketch<T, C, S, A>::get_quantile_calculator() {
sort_level_zero();
- typedef typename std::allocator_traits<A>::template rebind_alloc<kll_quantile_calculator<T, C, A>> AllocCalc;
+ using AllocCalc = typename std::allocator_traits<A>::template rebind_alloc<kll_quantile_calculator<T, C, A>>;
+ AllocCalc alloc(allocator_);
std::unique_ptr<kll_quantile_calculator<T, C, A>, std::function<void(kll_quantile_calculator<T, C, A>*)>> quantile_calculator(
- new (AllocCalc().allocate(1)) kll_quantile_calculator<T, C, A>(items_, levels_.data(), num_levels_, n_),
- [](kll_quantile_calculator<T, C, A>* ptr){ ptr->~kll_quantile_calculator<T, C, A>(); AllocCalc().deallocate(ptr, 1); }
+ new (alloc.allocate(1)) kll_quantile_calculator<T, C, A>(items_, levels_.data(), num_levels_, n_, allocator_),
+ [&alloc](kll_quantile_calculator<T, C, A>* ptr){ ptr->~kll_quantile_calculator<T, C, A>(); alloc.deallocate(ptr, 1); }
);
return quantile_calculator;
}
template<typename T, typename C, typename S, typename A>
vector_d<A> kll_sketch<T, C, S, A>::get_PMF_or_CDF(const T* split_points, uint32_t size, bool is_CDF) const {
- if (is_empty()) return vector_d<A>();
+ if (is_empty()) return vector_d<A>(allocator_);
kll_helper::validate_values<T, C>(split_points, size);
- vector_d<A> buckets(size + 1, 0);
+ vector_d<A> buckets(size + 1, 0, allocator_);
uint8_t level = 0;
uint64_t weight = 1;
while (level < num_levels_) {
@@ -845,12 +854,13 @@
template<typename O>
void kll_sketch<T, C, S, A>::merge_higher_levels(O&& other, uint64_t final_n) {
const uint32_t tmp_num_items = get_num_retained() + other.get_num_retained_above_level_zero();
- auto tmp_items_deleter = [tmp_num_items](T* ptr) { A().deallocate(ptr, tmp_num_items); }; // no destructor needed
- const std::unique_ptr<T, decltype(tmp_items_deleter)> workbuf(A().allocate(tmp_num_items), tmp_items_deleter);
+ A alloc(allocator_);
+ auto tmp_items_deleter = [tmp_num_items, &alloc](T* ptr) { alloc.deallocate(ptr, tmp_num_items); }; // no destructor needed
+ const std::unique_ptr<T, decltype(tmp_items_deleter)> workbuf(allocator_.allocate(tmp_num_items), tmp_items_deleter);
const uint8_t ub = kll_helper::ub_on_num_levels(final_n);
const size_t work_levels_size = ub + 2; // ub+1 does not work
- vector_u32<A> worklevels(work_levels_size);
- vector_u32<A> outlevels(work_levels_size);
+ vector_u32<A> worklevels(work_levels_size, 0, allocator_);
+ vector_u32<A> outlevels(work_levels_size, 0, allocator_);
const uint8_t provisional_num_levels = std::max(num_levels_, other.num_levels_);
@@ -864,9 +874,9 @@
// now we need to transfer the results back into "this" sketch
if (result.final_capacity != items_size_) {
- A().deallocate(items_, items_size_);
+ allocator_.deallocate(items_, items_size_);
items_size_ = result.final_capacity;
- items_ = A().allocate(items_size_);
+ items_ = allocator_.allocate(items_size_);
}
const uint32_t free_space_at_bottom = result.final_capacity - result.final_num_items;
kll_helper::move_construct<T>(workbuf.get(), outlevels[0], outlevels[0] + result.final_num_items, items_, free_space_at_bottom, true);
@@ -1101,29 +1111,32 @@
template<typename T, typename C, typename S, typename A>
class kll_sketch<T, C, S, A>::item_deleter {
public:
- void operator() (T* ptr) const {
+ item_deleter(const A& allocator): allocator_(allocator) {}
+ void operator() (T* ptr) {
if (ptr != nullptr) {
ptr->~T();
- A().deallocate(ptr, 1);
+ allocator_.deallocate(ptr, 1);
}
}
+ private:
+ A allocator_;
};
template<typename T, typename C, typename S, typename A>
class kll_sketch<T, C, S, A>::items_deleter {
public:
- items_deleter(uint32_t start, uint32_t num): start(start), num(num) {}
- void operator() (T* ptr) const {
+ items_deleter(uint32_t start, uint32_t num, const A& allocator):
+ allocator_(allocator), start_(start), num_(num) {}
+ void operator() (T* ptr) {
if (ptr != nullptr) {
- for (uint32_t i = start; i < num; ++i) {
- ptr[i].~T();
- }
- A().deallocate(ptr, num);
+ for (uint32_t i = start_; i < num_; ++i) ptr[i].~T();
+ allocator_.deallocate(ptr, num_);
}
}
private:
- uint32_t start;
- uint32_t num;
+ A allocator_;
+ uint32_t start_;
+ uint32_t num_;
};
} /* namespace datasketches */
diff --git a/req/CMakeLists.txt b/req/CMakeLists.txt
new file mode 100755
index 0000000..7ebefca
--- /dev/null
+++ b/req/CMakeLists.txt
@@ -0,0 +1,60 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+add_library(req INTERFACE)
+
+add_library(${PROJECT_NAME}::REQ ALIAS req)
+
+if (BUILD_TESTS)
+ add_subdirectory(test)
+endif()
+
+target_include_directories(req
+ INTERFACE
+ $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
+ $<INSTALL_INTERFACE:$<INSTALL_PREFIX>/include>
+)
+
+target_link_libraries(req INTERFACE common)
+target_compile_features(req INTERFACE cxx_std_11)
+
+set(req_HEADERS "")
+list(APPEND req_HEADERS "include/req_common.hpp")
+list(APPEND req_HEADERS "include/req_sketch.hpp")
+list(APPEND req_HEADERS "include/req_sketch_impl.hpp")
+list(APPEND req_HEADERS "include/req_compactor.hpp")
+list(APPEND req_HEADERS "include/req_compactor_impl.hpp")
+list(APPEND req_HEADERS "include/req_quantile_calculator.hpp")
+list(APPEND req_HEADERS "include/req_quantile_calculator_impl.hpp")
+
+install(TARGETS req
+ EXPORT ${PROJECT_NAME}
+)
+
+install(FILES ${req_HEADERS}
+ DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/DataSketches")
+
+target_sources(req
+ INTERFACE
+ ${CMAKE_CURRENT_SOURCE_DIR}/include/req_common.hpp
+ ${CMAKE_CURRENT_SOURCE_DIR}/include/req_sketch.hpp
+ ${CMAKE_CURRENT_SOURCE_DIR}/include/req_sketch_impl.hpp
+ ${CMAKE_CURRENT_SOURCE_DIR}/include/req_compactor.hpp
+ ${CMAKE_CURRENT_SOURCE_DIR}/include/req_compactor_impl.hpp
+ ${CMAKE_CURRENT_SOURCE_DIR}/include/req_quantile_calculator.hpp
+ ${CMAKE_CURRENT_SOURCE_DIR}/include/req_quantile_calculator_impl.hpp
+)
diff --git a/req/include/req_common.hpp b/req/include/req_common.hpp
new file mode 100755
index 0000000..d2dc518
--- /dev/null
+++ b/req/include/req_common.hpp
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef REQ_COMMON_HPP_
+#define REQ_COMMON_HPP_
+
+#include <random>
+#include <chrono>
+
+#include "serde.hpp"
+#include "common_defs.hpp"
+
+namespace datasketches {
+
+// TODO: have a common random bit with KLL
+static std::independent_bits_engine<std::mt19937, 1, unsigned> req_random_bit(std::chrono::system_clock::now().time_since_epoch().count());
+
+namespace req_constants {
+ static const uint16_t MIN_K = 4;
+ static const uint8_t INIT_NUM_SECTIONS = 3;
+ static const unsigned MULTIPLIER = 2;
+}
+
+} /* namespace datasketches */
+
+#endif
diff --git a/req/include/req_compactor.hpp b/req/include/req_compactor.hpp
new file mode 100755
index 0000000..2ca768b
--- /dev/null
+++ b/req/include/req_compactor.hpp
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef REQ_COMPACTOR_HPP_
+#define REQ_COMPACTOR_HPP_
+
+#include <memory>
+
+namespace datasketches {
+
+template<
+typename T,
+typename Comparator,
+typename Allocator
+>
+class req_compactor {
+public:
+ req_compactor(bool hra, uint8_t lg_weight, uint32_t section_size, const Allocator& allocator, bool sorted = true);
+ ~req_compactor();
+ req_compactor(const req_compactor& other);
+ req_compactor(req_compactor&& other) noexcept;
+ req_compactor& operator=(const req_compactor& other);
+ req_compactor& operator=(req_compactor&& other);
+
+ bool is_sorted() const;
+ uint32_t get_num_items() const;
+ uint32_t get_nom_capacity() const;
+ uint8_t get_lg_weight() const;
+ const T* begin() const;
+ const T* end() const;
+ T* begin();
+ T* end();
+
+ template<bool inclusive>
+ uint64_t compute_weight(const T& item) const;
+
+ template<typename FwdT>
+ void append(FwdT&& item);
+
+ template<typename FwdC>
+ void merge(FwdC&& other);
+
+ void sort();
+
+ std::pair<uint32_t, uint32_t> compact(req_compactor& next);
+
+ /**
+ * Computes size needed to serialize the current state of the compactor.
+ * This version is for fixed-size arithmetic types (integral and floating point).
+ * @return size in bytes needed to serialize this compactor
+ */
+ template<typename S, typename TT = T, typename std::enable_if<std::is_arithmetic<TT>::value, int>::type = 0>
+ size_t get_serialized_size_bytes(const S& serde) const;
+
+ /**
+ * Computes size needed to serialize the current state of the compactor.
+ * This version is for all other types and can be expensive since every item needs to be looked at.
+ * @return size in bytes needed to serialize this compactor
+ */
+ template<typename S, typename TT = T, typename std::enable_if<!std::is_arithmetic<TT>::value, int>::type = 0>
+ size_t get_serialized_size_bytes(const S& serde) const;
+
+ template<typename S>
+ void serialize(std::ostream& os, const S& serde) const;
+
+ template<typename S>
+ size_t serialize(void* dst, size_t capacity, const S& serde) const;
+
+ template<typename S>
+ static req_compactor deserialize(std::istream& is, const S& serde, const Allocator& allocator, bool sorted, bool hra);
+
+ template<typename S>
+ static std::pair<req_compactor, size_t> deserialize(const void* bytes, size_t size, const S& serde, const Allocator& allocator, bool sorted, bool hra);
+
+ template<typename S>
+ static req_compactor deserialize(std::istream& is, const S& serde, const Allocator& allocator, bool sorted, uint16_t k, uint8_t num_items, bool hra);
+
+ template<typename S>
+ static std::pair<req_compactor, size_t> deserialize(const void* bytes, size_t size, const S& serde, const Allocator& allocator, bool sorted, uint16_t k, uint8_t num_items, bool hra);
+
+private:
+ Allocator allocator_;
+ uint8_t lg_weight_;
+ bool hra_;
+ bool coin_; // random bit for compaction
+ bool sorted_;
+ float section_size_raw_;
+ uint32_t section_size_;
+ uint8_t num_sections_;
+ uint64_t state_; // state of the deterministic compaction schedule
+ uint32_t num_items_;
+ uint32_t capacity_;
+ T* items_;
+
+ bool ensure_enough_sections();
+ std::pair<uint32_t, uint32_t> compute_compaction_range(uint32_t secs_to_compact) const;
+ void grow(size_t new_capacity);
+ void ensure_space(size_t num);
+
+ static uint32_t nearest_even(float value);
+
+ template<typename InIter, typename OutIter>
+ static void promote_evens_or_odds(InIter from, InIter to, bool flag, OutIter dst);
+
+ // for deserialization
+ class items_deleter;
+ req_compactor(bool hra, uint8_t lg_weight, bool sorted, float section_size_raw, uint8_t num_sections, uint64_t state, std::unique_ptr<T, items_deleter> items, uint32_t num_items, const Allocator& allocator);
+
+ template<typename S>
+ static std::unique_ptr<T, items_deleter> deserialize_items(std::istream& is, const S& serde, const Allocator& allocator, size_t num);
+
+ template<typename S>
+ static std::pair<std::unique_ptr<T, items_deleter>, size_t> deserialize_items(const void* bytes, size_t size, const S& serde, const Allocator& allocator, size_t num);
+
+};
+
+} /* namespace datasketches */
+
+#include "req_compactor_impl.hpp"
+
+#endif
diff --git a/req/include/req_compactor_impl.hpp b/req/include/req_compactor_impl.hpp
new file mode 100755
index 0000000..63a7dcf
--- /dev/null
+++ b/req/include/req_compactor_impl.hpp
@@ -0,0 +1,501 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef REQ_COMPACTOR_IMPL_HPP_
+#define REQ_COMPACTOR_IMPL_HPP_
+
+#include <stdexcept>
+#include <cmath>
+#include <algorithm>
+
+#include "count_zeros.hpp"
+#include "conditional_forward.hpp"
+
+#include <iomanip>
+
+namespace datasketches {
+
+template<typename T, typename C, typename A>
+req_compactor<T, C, A>::req_compactor(bool hra, uint8_t lg_weight, uint32_t section_size, const A& allocator, bool sorted):
+allocator_(allocator),
+lg_weight_(lg_weight),
+hra_(hra),
+coin_(false),
+sorted_(sorted),
+section_size_raw_(section_size),
+section_size_(section_size),
+num_sections_(req_constants::INIT_NUM_SECTIONS),
+state_(0),
+num_items_(0),
+capacity_(2 * get_nom_capacity()),
+items_(allocator_.allocate(capacity_))
+{}
+
+template<typename T, typename C, typename A>
+req_compactor<T, C, A>::~req_compactor() {
+ if (items_ != nullptr) {
+ for (auto it = begin(); it != end(); ++it) (*it).~T();
+ allocator_.deallocate(items_, capacity_);
+ }
+}
+
+template<typename T, typename C, typename A>
+req_compactor<T, C, A>::req_compactor(const req_compactor& other):
+allocator_(other.allocator_),
+lg_weight_(other.lg_weight_),
+hra_(other.hra_),
+coin_(other.coin_),
+sorted_(other.sorted_),
+section_size_raw_(other.section_size_raw_),
+section_size_(other.section_size_),
+num_sections_(other.num_sections_),
+state_(other.state_),
+num_items_(other.num_items_),
+capacity_(other.capacity_),
+items_(nullptr)
+{
+ if (other.items_ != nullptr) {
+ items_ = allocator_.allocate(capacity_);
+ const size_t from = hra_ ? capacity_ - num_items_ : 0;
+ const size_t to = hra_ ? capacity_ : num_items_;
+ for (size_t i = from; i < to; ++i) new (items_ + i) T(other.items_[i]);
+ }
+}
+
+template<typename T, typename C, typename A>
+req_compactor<T, C, A>::req_compactor(req_compactor&& other) noexcept :
+allocator_(std::move(other.allocator_)),
+lg_weight_(other.lg_weight_),
+hra_(other.hra_),
+coin_(other.coin_),
+sorted_(other.sorted_),
+section_size_raw_(other.section_size_raw_),
+section_size_(other.section_size_),
+num_sections_(other.num_sections_),
+state_(other.state_),
+num_items_(other.num_items_),
+capacity_(other.capacity_),
+items_(other.items_)
+{
+ other.items_ = nullptr;
+}
+
+template<typename T, typename C, typename A>
+req_compactor<T, C, A>& req_compactor<T, C, A>::operator=(const req_compactor& other) {
+ req_compactor copy(other);
+ std::swap(allocator_, copy.allocator_);
+ std::swap(lg_weight_, copy.lg_weight_);
+ std::swap(hra_, copy.hra_);
+ std::swap(coin_, copy.coin_);
+ std::swap(sorted_, copy.sorted_);
+ std::swap(section_size_raw_, copy.section_size_raw_);
+ std::swap(section_size_, copy.section_size_);
+ std::swap(num_sections_, copy.num_sections_);
+ std::swap(state_, copy.state_);
+ std::swap(num_items_, copy.num_items_);
+ std::swap(capacity_, copy.capacity_);
+ std::swap(items_, copy.items_);
+ return *this;
+}
+
+template<typename T, typename C, typename A>
+req_compactor<T, C, A>& req_compactor<T, C, A>::operator=(req_compactor&& other) {
+ std::swap(allocator_, other.allocator_);
+ std::swap(lg_weight_, other.lg_weight_);
+ std::swap(hra_, other.hra_);
+ std::swap(coin_, other.coin_);
+ std::swap(sorted_, other.sorted_);
+ std::swap(section_size_raw_, other.section_size_raw_);
+ std::swap(section_size_, other.section_size_);
+ std::swap(num_sections_, other.num_sections_);
+ std::swap(state_, other.state_);
+ std::swap(num_items_, other.num_items_);
+ std::swap(capacity_, other.capacity_);
+ std::swap(items_, other.items_);
+ return *this;
+}
+
+template<typename T, typename C, typename A>
+bool req_compactor<T, C, A>::is_sorted() const {
+ return sorted_;
+}
+
+template<typename T, typename C, typename A>
+uint32_t req_compactor<T, C, A>::get_num_items() const {
+ return num_items_;
+}
+
+template<typename T, typename C, typename A>
+uint32_t req_compactor<T, C, A>::get_nom_capacity() const {
+ return req_constants::MULTIPLIER * num_sections_ * section_size_;
+}
+
+template<typename T, typename C, typename A>
+uint8_t req_compactor<T, C, A>::get_lg_weight() const {
+ return lg_weight_;
+}
+
+template<typename T, typename C, typename A>
+template<bool inclusive>
+uint64_t req_compactor<T, C, A>::compute_weight(const T& item) const {
+ if (!sorted_) const_cast<req_compactor*>(this)->sort(); // allow sorting as a side effect
+ auto it = inclusive ?
+ std::upper_bound(begin(), end(), item, C()) :
+ std::lower_bound(begin(), end(), item, C());
+ return std::distance(begin(), it) << lg_weight_;
+}
+
+template<typename T, typename C, typename A>
+template<typename FwdT>
+void req_compactor<T, C, A>::append(FwdT&& item) {
+ if (num_items_ == capacity_) grow(capacity_ + get_nom_capacity());
+ const size_t i = hra_ ? capacity_ - num_items_ - 1 : num_items_;
+ new (items_ + i) T(std::forward<FwdT>(item));
+ ++num_items_;
+ if (num_items_ > 1) sorted_ = false;
+}
+
+template<typename T, typename C, typename A>
+void req_compactor<T, C, A>::grow(size_t new_capacity) {
+ T* new_items = allocator_.allocate(new_capacity);
+ size_t new_i = hra_ ? new_capacity - num_items_ : 0;
+ for (auto it = begin(); it != end(); ++it, ++new_i) {
+ new (new_items + new_i) T(std::move(*it));
+ (*it).~T();
+ }
+ allocator_.deallocate(items_, capacity_);
+ items_ = new_items;
+ capacity_ = new_capacity;
+}
+
+template<typename T, typename C, typename A>
+void req_compactor<T, C, A>::ensure_space(size_t num) {
+ if (num_items_ + num > capacity_) grow(num_items_ + num + get_nom_capacity());
+}
+
+template<typename T, typename C, typename A>
+const T* req_compactor<T, C, A>::begin() const {
+ return items_ + (hra_ ? capacity_ - num_items_ : 0);
+}
+
+template<typename T, typename C, typename A>
+const T* req_compactor<T, C, A>::end() const {
+ return items_ + (hra_ ? capacity_ : num_items_);
+}
+
+template<typename T, typename C, typename A>
+T* req_compactor<T, C, A>::begin() {
+ return items_ + (hra_ ? capacity_ - num_items_ : 0);
+}
+
+template<typename T, typename C, typename A>
+T* req_compactor<T, C, A>::end() {
+ return items_ + (hra_ ? capacity_ : num_items_);
+}
+
+template<typename T, typename C, typename A>
+template<typename FwdC>
+void req_compactor<T, C, A>::merge(FwdC&& other) {
+ // TODO: swap if other is larger?
+ if (lg_weight_ != other.lg_weight_) throw std::logic_error("weight mismatch");
+ state_ |= other.state_;
+ while (ensure_enough_sections()) {}
+ ensure_space(other.get_num_items());
+ sort();
+ auto middle = hra_ ? begin() : end();
+ auto from = hra_ ? begin() - other.get_num_items() : end();
+ auto to = from + other.get_num_items();
+ auto other_it = other.begin();
+ for (auto it = from; it != to; ++it, ++other_it) new (it) T(conditional_forward<FwdC>(*other_it));
+ if (!other.sorted_) std::sort(from, to, C());
+ if (num_items_ > 0) std::inplace_merge(hra_ ? from : begin(), middle, hra_ ? end() : to, C());
+ num_items_ += other.get_num_items();
+}
+
+template<typename T, typename C, typename A>
+void req_compactor<T, C, A>::sort() {
+ if (!sorted_) {
+ std::sort(begin(), end(), C());
+ sorted_ = true;
+ }
+}
+
+template<typename T, typename C, typename A>
+std::pair<uint32_t, uint32_t> req_compactor<T, C, A>::compact(req_compactor& next) {
+ const uint32_t starting_nom_capacity = get_nom_capacity();
+ // choose a part of the buffer to compact
+ const uint32_t secs_to_compact = std::min(static_cast<uint32_t>(count_trailing_zeros_in_u32(~state_) + 1), static_cast<uint32_t>(num_sections_));
+ auto compaction_range = compute_compaction_range(secs_to_compact);
+ if (compaction_range.second - compaction_range.first < 2) throw std::logic_error("compaction range error");
+
+ if ((state_ & 1) == 1) { coin_ = !coin_; } // for odd flip coin;
+ else { coin_ = req_random_bit(); } // random coin flip
+
+ const auto num = (compaction_range.second - compaction_range.first) / 2;
+ next.ensure_space(num);
+ auto next_middle = hra_ ? next.begin() : next.end();
+ auto next_empty = hra_ ? next.begin() - num : next.end();
+ promote_evens_or_odds(begin() + compaction_range.first, begin() + compaction_range.second, coin_, next_empty);
+ next.num_items_ += num;
+ std::inplace_merge(next.begin(), next_middle, next.end(), C());
+ for (size_t i = compaction_range.first; i < compaction_range.second; ++i) (*(begin() + i)).~T();
+ num_items_ -= compaction_range.second - compaction_range.first;
+
+ ++state_;
+ ensure_enough_sections();
+ return std::pair<uint32_t, uint32_t>(
+ num,
+ get_nom_capacity() - starting_nom_capacity
+ );
+}
+
+template<typename T, typename C, typename A>
+bool req_compactor<T, C, A>::ensure_enough_sections() {
+ const float ssr = section_size_raw_ / sqrt(2);
+ const uint32_t ne = nearest_even(ssr);
+ if (state_ >= static_cast<uint64_t>(1 << (num_sections_ - 1)) && ne >= req_constants::MIN_K) {
+ section_size_raw_ = ssr;
+ section_size_ = ne;
+ num_sections_ <<= 1;
+ if (capacity_ < 2 * get_nom_capacity()) grow(2 * get_nom_capacity());
+ return true;
+ }
+ return false;
+}
+
+template<typename T, typename C, typename A>
+std::pair<uint32_t, uint32_t> req_compactor<T, C, A>::compute_compaction_range(uint32_t secs_to_compact) const {
+ uint32_t non_compact = get_nom_capacity() / 2 + (num_sections_ - secs_to_compact) * section_size_;
+ // make compacted region even
+ if (((num_items_ - non_compact) & 1) == 1) ++non_compact;
+ const size_t low = hra_ ? 0 : non_compact;
+ const size_t high = hra_ ? num_items_ - non_compact : num_items_;
+ return std::pair<uint32_t, uint32_t>(low, high);
+}
+
+template<typename T, typename C, typename A>
+uint32_t req_compactor<T, C, A>::nearest_even(float value) {
+ return static_cast<uint32_t>(round(value / 2)) << 1;
+}
+
+template<typename T, typename C, typename A>
+template<typename InIter, typename OutIter>
+void req_compactor<T, C, A>::promote_evens_or_odds(InIter from, InIter to, bool odds, OutIter dst) {
+ if (from == to) return;
+ InIter i = from;
+ if (odds) ++i;
+ while (i != to) {
+ new (dst) T(std::move(*i));
+ ++dst;
+ ++i;
+ if (i == to) break;
+ ++i;
+ }
+}
+
+// helpers for integral types
+template<typename T>
+static inline T read(std::istream& is) {
+ T value;
+ is.read(reinterpret_cast<char*>(&value), sizeof(T));
+ return value;
+}
+
+template<typename T>
+static inline void write(std::ostream& os, T value) {
+ os.write(reinterpret_cast<const char*>(&value), sizeof(T));
+}
+
+// implementation for fixed-size arithmetic types (integral and floating point)
+template<typename T, typename C, typename A>
+template<typename S, typename TT, typename std::enable_if<std::is_arithmetic<TT>::value, int>::type>
+size_t req_compactor<T, C, A>::get_serialized_size_bytes(const S&) const {
+ return sizeof(state_) + sizeof(section_size_raw_) + sizeof(lg_weight_) + sizeof(num_sections_) +
+ sizeof(uint16_t) + // padding
+ sizeof(uint32_t) + // num_items
+ sizeof(TT) * num_items_;
+}
+
+// implementation for all other types
+template<typename T, typename C, typename A>
+template<typename S, typename TT, typename std::enable_if<!std::is_arithmetic<TT>::value, int>::type>
+size_t req_compactor<T, C, A>::get_serialized_size_bytes(const S& serde) const {
+ size_t size = sizeof(state_) + sizeof(section_size_raw_) + sizeof(lg_weight_) + sizeof(num_sections_) +
+ sizeof(uint16_t) + // padding
+ sizeof(uint32_t); // num_items
+ for (auto it = begin(); it != end(); ++it) size += serde.size_of_item(*it);
+ return size;
+}
+
+template<typename T, typename C, typename A>
+template<typename S>
+void req_compactor<T, C, A>::serialize(std::ostream& os, const S& serde) const {
+ write(os, state_);
+ write(os, section_size_raw_);
+ write(os, lg_weight_);
+ write(os, num_sections_);
+ const uint16_t padding = 0;
+ write(os, padding);
+ write(os, num_items_);
+ serde.serialize(os, begin(), num_items_);
+}
+
+template<typename T, typename C, typename A>
+template<typename S>
+size_t req_compactor<T, C, A>::serialize(void* dst, size_t capacity, const S& serde) const {
+ uint8_t* ptr = static_cast<uint8_t*>(dst);
+ const uint8_t* end_ptr = ptr + capacity;
+ ptr += copy_to_mem(state_, ptr);
+ ptr += copy_to_mem(section_size_raw_, ptr);
+ ptr += copy_to_mem(lg_weight_, ptr);
+ ptr += copy_to_mem(num_sections_, ptr);
+ const uint16_t padding = 0;
+ ptr += copy_to_mem(padding, ptr);
+ ptr += copy_to_mem(num_items_, ptr);
+ ptr += serde.serialize(ptr, end_ptr - ptr, begin(), num_items_);
+ return ptr - static_cast<uint8_t*>(dst);
+}
+
+template<typename T, typename C, typename A>
+template<typename S>
+req_compactor<T, C, A> req_compactor<T, C, A>::deserialize(std::istream& is, const S& serde, const A& allocator, bool sorted, bool hra) {
+ auto state = read<decltype(state_)>(is);
+ auto section_size_raw = read<decltype(section_size_raw_)>(is);
+ auto lg_weight = read<decltype(lg_weight_)>(is);
+ auto num_sections = read<decltype(num_sections_)>(is);
+ read<uint16_t>(is); // padding
+ auto num_items = read<uint32_t>(is);
+ auto items = deserialize_items(is, serde, allocator, num_items);
+ return req_compactor(hra, lg_weight, sorted, section_size_raw, num_sections, state, std::move(items), num_items, allocator);
+}
+
+template<typename T, typename C, typename A>
+template<typename S>
+req_compactor<T, C, A> req_compactor<T, C, A>::deserialize(std::istream& is, const S& serde, const A& allocator, bool sorted, uint16_t k, uint8_t num_items, bool hra) {
+ auto items = deserialize_items(is, serde, allocator, num_items);
+ return req_compactor(hra, 0, sorted, k, req_constants::INIT_NUM_SECTIONS, 0, std::move(items), num_items, allocator);
+}
+
+template<typename T, typename C, typename A>
+template<typename S>
+auto req_compactor<T, C, A>::deserialize_items(std::istream& is, const S& serde, const A& allocator, size_t num)
+-> std::unique_ptr<T, items_deleter> {
+ A alloc(allocator);
+ std::unique_ptr<T, items_deleter> items(alloc.allocate(num), items_deleter(allocator, false, num));
+ serde.deserialize(is, items.get(), num);
+ // serde did not throw, enable destructors
+ items.get_deleter().set_destroy(true);
+ if (!is.good()) throw std::runtime_error("error reading from std::istream");
+ return std::move(items);
+}
+
+template<typename T, typename C, typename A>
+template<typename S>
+std::pair<req_compactor<T, C, A>, size_t> req_compactor<T, C, A>::deserialize(const void* bytes, size_t size, const S& serde, const A& allocator, bool sorted, bool hra) {
+ ensure_minimum_memory(size, 8);
+ const char* ptr = static_cast<const char*>(bytes);
+ const char* end_ptr = static_cast<const char*>(bytes) + size;
+
+ uint64_t state;
+ ptr += copy_from_mem(ptr, state);
+ float section_size_raw;
+ ptr += copy_from_mem(ptr, section_size_raw);
+ uint8_t lg_weight;
+ ptr += copy_from_mem(ptr, lg_weight);
+ uint8_t num_sections;
+ ptr += copy_from_mem(ptr, num_sections);
+ ptr += 2; // padding
+ uint32_t num_items;
+ ptr += copy_from_mem(ptr, num_items);
+ auto pair = deserialize_items(ptr, end_ptr - ptr, serde, allocator, num_items);
+ ptr += pair.second;
+ return std::pair<req_compactor, size_t>(
+ req_compactor(hra, lg_weight, sorted, section_size_raw, num_sections, state, std::move(pair.first), num_items, allocator),
+ ptr - static_cast<const char*>(bytes)
+ );
+}
+
+template<typename T, typename C, typename A>
+template<typename S>
+std::pair<req_compactor<T, C, A>, size_t> req_compactor<T, C, A>::deserialize(const void* bytes, size_t size, const S& serde, const A& allocator, bool sorted, uint16_t k, uint8_t num_items, bool hra) {
+ auto pair = deserialize_items(bytes, size, serde, allocator, num_items);
+ return std::pair<req_compactor, size_t>(
+ req_compactor(hra, 0, sorted, k, req_constants::INIT_NUM_SECTIONS, 0, std::move(pair.first), num_items, allocator),
+ pair.second
+ );
+}
+
+template<typename T, typename C, typename A>
+template<typename S>
+auto req_compactor<T, C, A>::deserialize_items(const void* bytes, size_t size, const S& serde, const A& allocator, size_t num)
+-> std::pair<std::unique_ptr<T, items_deleter>, size_t> {
+ const char* ptr = static_cast<const char*>(bytes);
+ const char* end_ptr = static_cast<const char*>(bytes) + size;
+ A alloc(allocator);
+ std::unique_ptr<T, items_deleter> items(alloc.allocate(num), items_deleter(allocator, false, num));
+ ptr += serde.deserialize(ptr, end_ptr - ptr, items.get(), num);
+ // serde did not throw, enable destructors
+ items.get_deleter().set_destroy(true);
+ return std::pair<std::unique_ptr<T, items_deleter>, size_t>(
+ std::move(items),
+ ptr - static_cast<const char*>(bytes)
+ );
+}
+
+
+template<typename T, typename C, typename A>
+req_compactor<T, C, A>::req_compactor(bool hra, uint8_t lg_weight, bool sorted, float section_size_raw, uint8_t num_sections, uint64_t state, std::unique_ptr<T, items_deleter> items, uint32_t num_items, const A& allocator):
+allocator_(allocator),
+lg_weight_(lg_weight),
+hra_(hra),
+coin_(req_random_bit()),
+sorted_(sorted),
+section_size_raw_(section_size_raw),
+section_size_(nearest_even(section_size_raw)),
+num_sections_(num_sections),
+state_(state),
+num_items_(num_items),
+capacity_(num_items),
+items_(items.release())
+{}
+
+template<typename T, typename C, typename A>
+class req_compactor<T, C, A>::items_deleter {
+ public:
+ items_deleter(const A& allocator, bool destroy, uint32_t num): allocator(allocator), destroy(destroy), num(num) {}
+ void operator() (T* ptr) {
+ if (ptr != nullptr) {
+ if (destroy) {
+ for (uint32_t i = 0; i < num; ++i) {
+ ptr[i].~T();
+ }
+ }
+ allocator.deallocate(ptr, num);
+ }
+ }
+ void set_destroy(bool destroy) { this->destroy = destroy; }
+ private:
+ A allocator;
+ bool destroy;
+ uint32_t num;
+};
+
+} /* namespace datasketches */
+
+#endif
diff --git a/req/include/req_quantile_calculator.hpp b/req/include/req_quantile_calculator.hpp
new file mode 100755
index 0000000..2a6e245
--- /dev/null
+++ b/req/include/req_quantile_calculator.hpp
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef REQ_QUANTILE_CALCULATOR_HPP_
+#define REQ_QUANTILE_CALCULATOR_HPP_
+
+#include <functional>
+
+namespace datasketches {
+
+template<
+ typename T,
+ typename Comparator,
+ typename Allocator
+>
+class req_quantile_calculator {
+public:
+ req_quantile_calculator(uint64_t n, const Allocator& allocator);
+
+ void add(const T* begin, const T* end, uint8_t lg_weight);
+
+ template<bool inclusive>
+ void convert_to_cummulative();
+
+ const T* get_quantile(double rank) const;
+
+private:
+ using Entry = std::pair<const T*, uint64_t>;
+ using AllocEntry = typename std::allocator_traits<Allocator>::template rebind_alloc<Entry>;
+ using Container = std::vector<Entry, AllocEntry>;
+
+ template<typename C>
+ struct compare_pairs_by_first_ptr {
+ bool operator()(const Entry& a, const Entry& b) {
+ return C()(*a.first, *b.first);
+ }
+ };
+
+ struct compare_pairs_by_second {
+ bool operator()(const Entry& a, const Entry& b) {
+ return a.second < b.second;
+ }
+ };
+
+ uint64_t n_;
+ Container entries_;
+};
+
+} /* namespace datasketches */
+
+#include "req_quantile_calculator_impl.hpp"
+
+#endif
diff --git a/req/include/req_quantile_calculator_impl.hpp b/req/include/req_quantile_calculator_impl.hpp
new file mode 100755
index 0000000..63f0e92
--- /dev/null
+++ b/req/include/req_quantile_calculator_impl.hpp
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef REQ_QUANTILE_CALCULATOR_IMPL_HPP_
+#define REQ_QUANTILE_CALCULATOR_IMPL_HPP_
+
+namespace datasketches {
+
+template<typename T, typename C, typename A>
+req_quantile_calculator<T, C, A>::req_quantile_calculator(uint64_t n, const A& allocator):
+n_(n),
+entries_(allocator)
+{}
+
+template<typename T, typename C, typename A>
+void req_quantile_calculator<T, C, A>::add(const T* begin, const T* end, uint8_t lg_weight) {
+ if (entries_.capacity() < entries_.size() + std::distance(begin, end)) entries_.reserve(entries_.size() + std::distance(begin, end));
+ const size_t size_before = entries_.size();
+ for (auto it = begin; it != end; ++it) entries_.push_back(Entry(it, 1 << lg_weight));
+ if (size_before > 0) std::inplace_merge(entries_.begin(), entries_.begin() + size_before, entries_.end(), compare_pairs_by_first_ptr<C>());
+}
+
+template<typename T, typename C, typename A>
+template<bool inclusive>
+void req_quantile_calculator<T, C, A>::convert_to_cummulative() {
+ uint64_t subtotal = 0;
+ for (auto& entry: entries_) {
+ const uint64_t new_subtotal = subtotal + entry.second;
+ entry.second = inclusive ? new_subtotal : subtotal;
+ subtotal = new_subtotal;
+ }
+}
+
+template<typename T, typename C, typename A>
+const T* req_quantile_calculator<T, C, A>::get_quantile(double rank) const {
+ uint64_t weight = static_cast<uint64_t>(rank * n_);
+ auto it = std::lower_bound(entries_.begin(), entries_.end(), Entry(nullptr, weight), compare_pairs_by_second());
+ if (it == entries_.end()) return entries_[entries_.size() - 1].first;
+ return it->first;
+}
+
+} /* namespace datasketches */
+
+#endif
diff --git a/req/include/req_sketch.hpp b/req/include/req_sketch.hpp
new file mode 100755
index 0000000..ca806cc
--- /dev/null
+++ b/req/include/req_sketch.hpp
@@ -0,0 +1,395 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef REQ_SKETCH_HPP_
+#define REQ_SKETCH_HPP_
+
+#include "req_common.hpp"
+#include "req_compactor.hpp"
+#include "req_quantile_calculator.hpp"
+
+namespace datasketches {
+
+template<
+ typename T,
+ typename Comparator = std::less<T>,
+ typename SerDe = serde<T>,
+ typename Allocator = std::allocator<T>
+>
+class req_sketch {
+public:
+ using Compactor = req_compactor<T, Comparator, Allocator>;
+ using AllocCompactor = typename std::allocator_traits<Allocator>::template rebind_alloc<Compactor>;
+ using AllocDouble = typename std::allocator_traits<Allocator>::template rebind_alloc<double>;
+ using vector_double = std::vector<double, AllocDouble>;
+
+ /**
+ * Constructor
+ * @param k Controls the size and error of the sketch. It must be even and in the range [4, 1024], inclusive.
+ * Value of 12 roughly corresponds to 1% relative error guarantee at 95% confidence.
+ * @param hra if true, the default, the high ranks are prioritized for better
+ * accuracy. Otherwise the low ranks are prioritized for better accuracy.
+ * @param allocator to use by this instance
+ */
+ explicit req_sketch(uint16_t k, bool hra = true, const Allocator& allocator = Allocator());
+
+ ~req_sketch();
+ req_sketch(const req_sketch& other);
+ req_sketch(req_sketch&& other) noexcept;
+ req_sketch& operator=(const req_sketch& other);
+ req_sketch& operator=(req_sketch&& other);
+
+ /**
+ * Returns configured parameter K
+ * @return parameter K
+ */
+ uint16_t get_k() const;
+
+ /**
+ * Returns configured parameter High Rank Accuracy
+ * @return parameter HRA
+ */
+ bool is_HRA() const;
+
+ /**
+ * Returns true if this sketch is empty.
+ * @return empty flag
+ */
+ bool is_empty() const;
+
+ /**
+ * Returns the length of the input stream.
+ * @return stream length
+ */
+ uint64_t get_n() const;
+
+ /**
+ * Returns the number of retained items in the sketch.
+ * @return number of retained items
+ */
+ uint32_t get_num_retained() const;
+
+ /**
+ * Returns true if this sketch is in estimation mode.
+ * @return estimation mode flag
+ */
+ bool is_estimation_mode() const;
+
+ template<typename FwdT>
+ void update(FwdT&& item);
+
+ template<typename FwdSk>
+ void merge(FwdSk&& other);
+
+ /**
+ * Returns the min value of the stream.
+ * For floating point types: if the sketch is empty this returns NaN.
+ * For other types: if the sketch is empty this throws runtime_error.
+ * @return the min value of the stream
+ */
+ const T& get_min_value() const;
+
+ /**
+ * Returns the max value of the stream.
+ * For floating point types: if the sketch is empty this returns NaN.
+ * For other types: if the sketch is empty this throws runtime_error.
+ * @return the max value of the stream
+ */
+ const T& get_max_value() const;
+
+ /**
+ * Returns an approximation to the normalized (fractional) rank of the given item from 0 to 1 inclusive.
+ * With the template parameter inclusive=true the weight of the given item is included into the rank.
+ * Otherwise the rank equals the sum of the weights of items less than the given item according to the Comparator.
+ *
+ * <p>If the sketch is empty this returns NaN.
+ *
+ * @param item to be ranked
+ * @return an approximate rank of the given item
+ */
+
+ template<bool inclusive = false>
+ double get_rank(const T& item) const;
+
+ /**
+ * Returns an approximation to the Probability Mass Function (PMF) of the input stream
+ * given a set of split points (values).
+ *
+ * <p>If the sketch is empty this returns an empty vector.
+ *
+ * @param split_points an array of <i>m</i> unique, monotonically increasing values
+ * that divide the input domain into <i>m+1</i> consecutive disjoint intervals.
+ * The definition of an "interval" is inclusive of the left split point (or minimum value) and
+ * exclusive of the right split point, with the exception that the last interval will include
+ * the maximum value.
+ * It is not necessary to include either the min or max values in these split points.
+ *
+ * @return an array of m+1 doubles each of which is an approximation
+ * to the fraction of the input stream values (the mass) that fall into one of those intervals.
+ * If the template parameter inclusive=false, the definition of an "interval" is inclusive of the left split point and exclusive of the right
+ * split point, with the exception that the last interval will include the maximum value.
+ * If the template parameter inclusive=true, the definition of an "interval" is exclusive of the left split point and inclusive of the right
+ * split point.
+ */
+ template<bool inclusive = false>
+ vector_double get_PMF(const T* split_points, uint32_t size) const;
+
+ /**
+ * Returns an approximation to the Cumulative Distribution Function (CDF), which is the
+ * cumulative analog of the PMF, of the input stream given a set of split points (values).
+ *
+ * <p>If the sketch is empty this returns an empty vector.
+ *
+ * @param split_points an array of <i>m</i> unique, monotonically increasing float values
+ * that divide the input domain into <i>m+1</i> consecutive disjoint intervals.
+ * If the template parameter inclusive=false, the definition of an "interval" is inclusive of the left split point and exclusive of the right
+ * split point, with the exception that the last interval will include the maximum value.
+ * If the template parameter inclusive=true, the definition of an "interval" is exclusive of the left split point and inclusive of the right
+ * split point.
+ * It is not necessary to include either the min or max values in these split points.
+ *
+ * @return an array of m+1 double values, which are a consecutive approximation to the CDF
+ * of the input stream given the split_points. The value at array position j of the returned
+ * CDF array is the sum of the returned values in positions 0 through j of the returned PMF
+ * array.
+ */
+ template<bool inclusive = false>
+ vector_double get_CDF(const T* split_points, uint32_t size) const;
+
+ /**
+ * Returns an approximate quantile of the given normalized rank.
+ * The normalized rank must be in the range [0.0, 1.0] (both inclusive).
+ * @param rank the given normalized rank
+ * @return approximate quantile given the normalized rank
+ */
+ template<bool inclusive = false>
+ const T& get_quantile(double rank) const;
+
+ /**
+ * Returns an array of quantiles that correspond to the given array of normalized ranks.
+ * @param ranks given array of normalized ranks.
+ * @return array of quantiles that correspond to the given array of normalized ranks
+ */
+ template<bool inclusive = false>
+ std::vector<T, Allocator> get_quantiles(const double* ranks, uint32_t size) const;
+
+ /**
+ * Returns an approximate lower bound of the given noramalized rank.
+ * @param rank the given rank, a value between 0 and 1.0.
+ * @param num_std_dev the number of standard deviations. Must be 1, 2, or 3.
+ * @return an approximate lower bound rank.
+ */
+ double get_rank_lower_bound(double rank, uint8_t num_std_dev) const;
+
+ /**
+ * Returns an approximate upper bound of the given noramalized rank.
+ * @param rank the given rank, a value between 0 and 1.0.
+ * @param num_std_dev the number of standard deviations. Must be 1, 2, or 3.
+ * @return an approximate upper bound rank.
+ */
+ double get_rank_upper_bound(double rank, uint8_t num_std_dev) const;
+
+ /**
+ * Returns an a priori estimate of relative standard error (RSE, expressed as a number in [0,1]).
+ * Derived from Lemma 12 in https://arxiv.org/abs/2004.01668v2, but the constant factors were
+ * modified based on empirical measurements.
+ *
+ * @param k the given value of k
+ * @param rank the given normalized rank, a number in [0,1].
+ * @param hra if true High Rank Accuracy mode is being selected, otherwise, Low Rank Accuracy.
+ * @param n an estimate of the total number of items submitted to the sketch.
+ * @return an a priori estimate of relative standard error (RSE, expressed as a number in [0,1]).
+ */
+ static double get_RSE(uint16_t k, double rank, bool hra, uint64_t n);
+
+ /**
+ * Computes size needed to serialize the current state of the sketch.
+ * This version is for fixed-size arithmetic types (integral and floating point).
+ * @return size in bytes needed to serialize this sketch
+ */
+ template<typename TT = T, typename std::enable_if<std::is_arithmetic<TT>::value, int>::type = 0>
+ size_t get_serialized_size_bytes() const;
+
+ /**
+ * Computes size needed to serialize the current state of the sketch.
+ * This version is for all other types and can be expensive since every item needs to be looked at.
+ * @return size in bytes needed to serialize this sketch
+ */
+ template<typename TT = T, typename std::enable_if<!std::is_arithmetic<TT>::value, int>::type = 0>
+ size_t get_serialized_size_bytes() const;
+
+ /**
+ * This method serializes the sketch into a given stream in a binary form
+ * @param os output stream
+ */
+ void serialize(std::ostream& os) const;
+
+ // This is a convenience alias for users
+ // The type returned by the following serialize method
+ using vector_bytes = std::vector<uint8_t, typename std::allocator_traits<Allocator>::template rebind_alloc<uint8_t>>;
+
+ /**
+ * This method serializes the sketch as a vector of bytes.
+ * An optional header can be reserved in front of the sketch.
+ * It is a blank space of a given size.
+ * This header is used in Datasketches PostgreSQL extension.
+ * @param header_size_bytes space to reserve in front of the sketch
+ */
+ vector_bytes serialize(unsigned header_size_bytes = 0) const;
+
+ /**
+ * This method deserializes a sketch from a given stream.
+ * @param is input stream
+ * @return an instance of a sketch
+ */
+ static req_sketch deserialize(std::istream& is, const Allocator& allocator = Allocator());
+
+ /**
+ * This method deserializes a sketch from a given array of bytes.
+ * @param bytes pointer to the array of bytes
+ * @param size the size of the array
+ * @return an instance of a sketch
+ */
+ static req_sketch deserialize(const void* bytes, size_t size, const Allocator& allocator = Allocator());
+
+ /**
+ * Prints a summary of the sketch.
+ * @param print_levels if true include information about levels
+ * @param print_items if true include sketch data
+ */
+ string<Allocator> to_string(bool print_levels = false, bool print_items = false) const;
+
+ class const_iterator;
+ const_iterator begin() const;
+ const_iterator end() const;
+
+private:
+ Allocator allocator_;
+ uint16_t k_;
+ bool hra_;
+ uint32_t max_nom_size_;
+ uint32_t num_retained_;
+ uint64_t n_;
+ std::vector<Compactor, AllocCompactor> compactors_;
+ T* min_value_;
+ T* max_value_;
+
+ static const bool LAZY_COMPRESSION = false;
+
+ static const uint8_t SERIAL_VERSION = 1;
+ static const uint8_t FAMILY = 17;
+ static const size_t PREAMBLE_SIZE_BYTES = 8;
+ enum flags { RESERVED1, RESERVED2, IS_EMPTY, IS_HIGH_RANK, RAW_ITEMS, IS_LEVEL_ZERO_SORTED };
+
+ static constexpr double FIXED_RSE_FACTOR = 0.084;
+ static double relative_rse_factor();
+
+ uint8_t get_num_levels() const;
+ void grow();
+ void update_max_nom_size();
+ void update_num_retained();
+ void compress();
+
+ static double get_rank_lb(uint16_t k, uint8_t num_levels, double rank, uint8_t num_std_dev, uint64_t n, bool hra);
+ static double get_rank_ub(uint16_t k, uint8_t num_levels, double rank, uint8_t num_std_dev, uint64_t n, bool hra);
+ static bool is_exact_rank(uint16_t k, uint8_t num_levels, double rank, uint64_t n, bool hra);
+
+ using QuantileCalculator = req_quantile_calculator<T, Comparator, Allocator>;
+ using AllocCalc = typename std::allocator_traits<Allocator>::template rebind_alloc<QuantileCalculator>;
+ class calculator_deleter;
+ using QuantileCalculatorPtr = typename std::unique_ptr<QuantileCalculator, calculator_deleter>;
+ template<bool inclusive>
+ QuantileCalculatorPtr get_quantile_calculator() const;
+
+ // for deserialization
+ class item_deleter;
+ req_sketch(uint32_t k, bool hra, uint64_t n, std::unique_ptr<T, item_deleter> min_value, std::unique_ptr<T, item_deleter> max_value, std::vector<Compactor, AllocCompactor>&& compactors);
+
+ static void check_preamble_ints(uint8_t preamble_ints, uint8_t num_levels);
+ static void check_serial_version(uint8_t serial_version);
+ static void check_family_id(uint8_t family_id);
+
+ // implementations for floating point types
+ template<typename TT = T, typename std::enable_if<std::is_floating_point<TT>::value, int>::type = 0>
+ static const TT& get_invalid_value() {
+ static TT value = std::numeric_limits<TT>::quiet_NaN();
+ return value;
+ }
+
+ template<typename TT = T, typename std::enable_if<std::is_floating_point<TT>::value, int>::type = 0>
+ static inline bool check_update_value(const TT& value) {
+ return !std::isnan(value);
+ }
+
+ template<typename TT = T, typename std::enable_if<std::is_floating_point<TT>::value, int>::type = 0>
+ static inline void check_split_points(const T* values, uint32_t size) {
+ for (uint32_t i = 0; i < size ; i++) {
+ if (std::isnan(values[i])) {
+ throw std::invalid_argument("Values must not be NaN");
+ }
+ if ((i < (size - 1)) && !(Comparator()(values[i], values[i + 1]))) {
+ throw std::invalid_argument("Values must be unique and monotonically increasing");
+ }
+ }
+ }
+
+ // implementations for all other types
+ template<typename TT = T, typename std::enable_if<!std::is_floating_point<TT>::value, int>::type = 0>
+ static const TT& get_invalid_value() {
+ throw std::runtime_error("getting quantiles from empty sketch is not supported for this type of values");
+ }
+
+ template<typename TT = T, typename std::enable_if<!std::is_floating_point<TT>::value, int>::type = 0>
+ static inline bool check_update_value(const TT&) {
+ return true;
+ }
+
+ template<typename TT = T, typename std::enable_if<!std::is_floating_point<TT>::value, int>::type = 0>
+ static inline void check_split_points(const T* values, uint32_t size) {
+ for (uint32_t i = 0; i < size ; i++) {
+ if ((i < (size - 1)) && !(Comparator()(values[i], values[i + 1]))) {
+ throw std::invalid_argument("Values must be unique and monotonically increasing");
+ }
+ }
+ }
+
+};
+
+template<typename T, typename C, typename S, typename A>
+class req_sketch<T, C, S, A>::const_iterator: public std::iterator<std::input_iterator_tag, T> {
+public:
+ const_iterator& operator++();
+ const_iterator& operator++(int);
+ bool operator==(const const_iterator& other) const;
+ bool operator!=(const const_iterator& other) const;
+ std::pair<const T&, const uint64_t> operator*() const;
+private:
+ using LevelsIterator = typename std::vector<Compactor, AllocCompactor>::const_iterator;
+ LevelsIterator levels_it_;
+ LevelsIterator levels_end_;
+ const T* compactor_it_;
+ friend class req_sketch<T, C, S, A>;
+ const_iterator(LevelsIterator begin, LevelsIterator end);
+};
+
+} /* namespace datasketches */
+
+#include "req_sketch_impl.hpp"
+
+#endif
diff --git a/req/include/req_sketch_impl.hpp b/req/include/req_sketch_impl.hpp
new file mode 100755
index 0000000..3c90908
--- /dev/null
+++ b/req/include/req_sketch_impl.hpp
@@ -0,0 +1,810 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef REQ_SKETCH_IMPL_HPP_
+#define REQ_SKETCH_IMPL_HPP_
+
+#include <sstream>
+#include <stdexcept>
+
+namespace datasketches {
+
+template<typename T, typename C, typename S, typename A>
+req_sketch<T, C, S, A>::req_sketch(uint16_t k, bool hra, const A& allocator):
+allocator_(allocator),
+k_(std::max(static_cast<int>(k) & -2, static_cast<int>(req_constants::MIN_K))), //rounds down one if odd
+hra_(hra),
+max_nom_size_(0),
+num_retained_(0),
+n_(0),
+compactors_(allocator),
+min_value_(nullptr),
+max_value_(nullptr)
+{
+ grow();
+}
+
+template<typename T, typename C, typename S, typename A>
+req_sketch<T, C, S, A>::~req_sketch() {
+ if (min_value_ != nullptr) {
+ min_value_->~T();
+ allocator_.deallocate(min_value_, 1);
+ }
+ if (max_value_ != nullptr) {
+ max_value_->~T();
+ allocator_.deallocate(max_value_, 1);
+ }
+}
+
+template<typename T, typename C, typename S, typename A>
+req_sketch<T, C, S, A>::req_sketch(const req_sketch& other):
+allocator_(other.allocator_),
+k_(other.k_),
+hra_(other.hra_),
+max_nom_size_(other.max_nom_size_),
+num_retained_(other.num_retained_),
+n_(other.n_),
+compactors_(other.compactors_),
+min_value_(nullptr),
+max_value_(nullptr)
+{
+ if (other.min_value_ != nullptr) min_value_ = new (A().allocate(1)) T(*other.min_value_);
+ if (other.max_value_ != nullptr) max_value_ = new (A().allocate(1)) T(*other.max_value_);
+}
+
+template<typename T, typename C, typename S, typename A>
+req_sketch<T, C, S, A>::req_sketch(req_sketch&& other) noexcept :
+allocator_(std::move(other.allocator_)),
+k_(other.k_),
+hra_(other.hra_),
+max_nom_size_(other.max_nom_size_),
+num_retained_(other.num_retained_),
+n_(other.n_),
+compactors_(std::move(other.compactors_)),
+min_value_(other.min_value_),
+max_value_(other.max_value_)
+{
+ other.min_value_ = nullptr;
+ other.max_value_ = nullptr;
+}
+
+template<typename T, typename C, typename S, typename A>
+req_sketch<T, C, S, A>& req_sketch<T, C, S, A>::operator=(const req_sketch& other) {
+ req_sketch copy(other);
+ std::swap(allocator_, copy.allocator_);
+ std::swap(k_, copy.k_);
+ std::swap(hra_, copy.hra_);
+ std::swap(max_nom_size_, copy.max_nom_size_);
+ std::swap(num_retained_, copy.num_retained_);
+ std::swap(n_, copy.n_);
+ std::swap(compactors_, copy.compactors_);
+ std::swap(min_value_, copy.min_value_);
+ std::swap(max_value_, copy.max_value_);
+ return *this;
+}
+
+template<typename T, typename C, typename S, typename A>
+req_sketch<T, C, S, A>& req_sketch<T, C, S, A>::operator=(req_sketch&& other) {
+ std::swap(allocator_, other.allocator_);
+ std::swap(k_, other.k_);
+ std::swap(hra_, other.hra_);
+ std::swap(max_nom_size_, other.max_nom_size_);
+ std::swap(num_retained_, other.num_retained_);
+ std::swap(n_, other.n_);
+ std::swap(compactors_, other.compactors_);
+ std::swap(min_value_, other.min_value_);
+ std::swap(max_value_, other.max_value_);
+ return *this;
+}
+
+template<typename T, typename C, typename S, typename A>
+uint16_t req_sketch<T, C, S, A>::get_k() const {
+ return k_;
+}
+
+template<typename T, typename C, typename S, typename A>
+bool req_sketch<T, C, S, A>::is_HRA() const {
+ return hra_;
+}
+
+template<typename T, typename C, typename S, typename A>
+bool req_sketch<T, C, S, A>::is_empty() const {
+ return n_ == 0;
+}
+
+template<typename T, typename C, typename S, typename A>
+uint64_t req_sketch<T, C, S, A>::get_n() const {
+ return n_;
+}
+
+template<typename T, typename C, typename S, typename A>
+uint32_t req_sketch<T, C, S, A>::get_num_retained() const {
+ return num_retained_;
+}
+
+template<typename T, typename C, typename S, typename A>
+bool req_sketch<T, C, S, A>::is_estimation_mode() const {
+ return compactors_.size() > 1;
+}
+
+template<typename T, typename C, typename S, typename A>
+template<typename FwdT>
+void req_sketch<T, C, S, A>::update(FwdT&& item) {
+ if (!check_update_value(item)) { return; }
+ if (is_empty()) {
+ min_value_ = new (allocator_.allocate(1)) T(item);
+ max_value_ = new (allocator_.allocate(1)) T(item);
+ } else {
+ if (C()(item, *min_value_)) *min_value_ = item;
+ if (C()(*max_value_, item)) *max_value_ = item;
+ }
+ compactors_[0].append(std::forward<FwdT>(item));
+ ++num_retained_;
+ ++n_;
+ if (num_retained_ == max_nom_size_) compress();
+}
+
+template<typename T, typename C, typename S, typename A>
+template<typename FwdSk>
+void req_sketch<T, C, S, A>::merge(FwdSk&& other) {
+ if (is_HRA() != other.is_HRA()) throw std::invalid_argument("merging HRA and LRA is not valid");
+ if (other.is_empty()) return;
+ if (is_empty()) {
+ min_value_ = new (allocator_.allocate(1)) T(conditional_forward<FwdSk>(*other.min_value_));
+ max_value_ = new (allocator_.allocate(1)) T(conditional_forward<FwdSk>(*other.max_value_));
+ } else {
+ if (C()(*other.min_value_, *min_value_)) *min_value_ = conditional_forward<FwdSk>(*other.min_value_);
+ if (C()(*max_value_, *other.max_value_)) *max_value_ = conditional_forward<FwdSk>(*other.max_value_);
+ }
+ // grow until this has at least as many compactors as other
+ while (get_num_levels() < other.get_num_levels()) grow();
+ // merge the items in all height compactors
+ for (size_t i = 0; i < other.get_num_levels(); ++i) {
+ compactors_[i].merge(conditional_forward<FwdSk>(other.compactors_[i]));
+ }
+ n_ += other.n_;
+ update_max_nom_size();
+ update_num_retained();
+ if (num_retained_ >= max_nom_size_) compress();
+}
+
+template<typename T, typename C, typename S, typename A>
+const T& req_sketch<T, C, S, A>::get_min_value() const {
+ if (is_empty()) return get_invalid_value();
+ return *min_value_;
+}
+
+template<typename T, typename C, typename S, typename A>
+const T& req_sketch<T, C, S, A>::get_max_value() const {
+ if (is_empty()) return get_invalid_value();
+ return *max_value_;
+}
+
+template<typename T, typename C, typename S, typename A>
+template<bool inclusive>
+double req_sketch<T, C, S, A>::get_rank(const T& item) const {
+ uint64_t weight = 0;
+ for (const auto& compactor: compactors_) {
+ weight += compactor.template compute_weight<inclusive>(item);
+ }
+ return static_cast<double>(weight) / n_;
+}
+
+template<typename T, typename C, typename S, typename A>
+template<bool inclusive>
+auto req_sketch<T, C, S, A>::get_PMF(const T* split_points, uint32_t size) const -> vector_double {
+ auto buckets = get_CDF<inclusive>(split_points, size);
+ for (uint32_t i = size; i > 0; --i) {
+ buckets[i] -= buckets[i - 1];
+ }
+ return buckets;
+}
+
+template<typename T, typename C, typename S, typename A>
+template<bool inclusive>
+auto req_sketch<T, C, S, A>::get_CDF(const T* split_points, uint32_t size) const -> vector_double {
+ vector_double buckets(allocator_);
+ if (is_empty()) return buckets;
+ check_split_points(split_points, size);
+ buckets.reserve(size + 1);
+ for (uint32_t i = 0; i < size; ++i) buckets.push_back(get_rank<inclusive>(split_points[i]));
+ buckets.push_back(1);
+ return buckets;
+}
+
+template<typename T, typename C, typename S, typename A>
+template<bool inclusive>
+const T& req_sketch<T, C, S, A>::get_quantile(double rank) const {
+ if (is_empty()) return get_invalid_value();
+ if (rank == 0.0) return *min_value_;
+ if (rank == 1.0) return *max_value_;
+ if ((rank < 0.0) || (rank > 1.0)) {
+ throw std::invalid_argument("Rank cannot be less than zero or greater than 1.0");
+ }
+ return *(get_quantile_calculator<inclusive>()->get_quantile(rank));
+}
+
+template<typename T, typename C, typename S, typename A>
+template<bool inclusive>
+std::vector<T, A> req_sketch<T, C, S, A>::get_quantiles(const double* ranks, uint32_t size) const {
+ std::vector<T, A> quantiles(allocator_);
+ if (is_empty()) return quantiles;
+ QuantileCalculatorPtr quantile_calculator(nullptr, calculator_deleter(allocator_));
+ quantiles.reserve(size);
+ for (uint32_t i = 0; i < size; ++i) {
+ const double rank = ranks[i];
+ if ((rank < 0.0) || (rank > 1.0)) {
+ throw std::invalid_argument("rank cannot be less than zero or greater than 1.0");
+ }
+ if (rank == 0.0) quantiles.push_back(*min_value_);
+ else if (rank == 1.0) quantiles.push_back(*max_value_);
+ else {
+ if (!quantile_calculator) {
+ // has side effect of sorting level zero if needed
+ quantile_calculator = const_cast<req_sketch*>(this)->get_quantile_calculator<inclusive>();
+ }
+ quantiles.push_back(*(quantile_calculator->get_quantile(rank)));
+ }
+ }
+ return quantiles;
+}
+
+template<typename T, typename C, typename S, typename A>
+class req_sketch<T, C, S, A>::calculator_deleter {
+ public:
+ calculator_deleter(const AllocCalc& allocator): allocator_(allocator) {}
+ void operator() (QuantileCalculator* ptr) {
+ if (ptr != nullptr) {
+ ptr->~QuantileCalculator();
+ allocator_.deallocate(ptr, 1);
+ }
+ }
+ private:
+ AllocCalc allocator_;
+};
+
+template<typename T, typename C, typename S, typename A>
+template<bool inclusive>
+auto req_sketch<T, C, S, A>::get_quantile_calculator() const -> QuantileCalculatorPtr {
+ if (!compactors_[0].is_sorted()) {
+ const_cast<Compactor&>(compactors_[0]).sort(); // allow this side effect
+ }
+ AllocCalc ac(allocator_);
+ QuantileCalculatorPtr quantile_calculator(
+ new (ac.allocate(1)) req_quantile_calculator<T, C, A>(n_, ac),
+ calculator_deleter(ac)
+ );
+
+ for (auto& compactor: compactors_) {
+ quantile_calculator->add(compactor.begin(), compactor.end(), compactor.get_lg_weight());
+ }
+ quantile_calculator->template convert_to_cummulative<inclusive>();
+ return quantile_calculator;
+}
+
+template<typename T, typename C, typename S, typename A>
+double req_sketch<T, C, S, A>::get_rank_lower_bound(double rank, uint8_t num_std_dev) const {
+ return get_rank_lb(get_k(), get_num_levels(), rank, num_std_dev, get_n(), hra_);
+}
+
+template<typename T, typename C, typename S, typename A>
+double req_sketch<T, C, S, A>::get_rank_upper_bound(double rank, uint8_t num_std_dev) const {
+ return get_rank_ub(get_k(), get_num_levels(), rank, num_std_dev, get_n(), hra_);
+}
+
+template<typename T, typename C, typename S, typename A>
+double req_sketch<T, C, S, A>::get_RSE(uint16_t k, double rank, bool hra, uint64_t n) {
+ return get_rank_lb(k, 2, rank, 1, n, hra);
+}
+
+template<typename T, typename C, typename S, typename A>
+double req_sketch<T, C, S, A>::get_rank_lb(uint16_t k, uint8_t num_levels, double rank, uint8_t num_std_dev, uint64_t n, bool hra) {
+ if (is_exact_rank(k, num_levels, rank, n, hra)) return rank;
+ const double relative = relative_rse_factor() / k * (hra ? 1.0 - rank : rank);
+ const double fixed = FIXED_RSE_FACTOR / k;
+ const double lb_rel = rank - num_std_dev * relative;
+ const double lb_fix = rank - num_std_dev * fixed;
+ return std::max(lb_rel, lb_fix);
+}
+
+template<typename T, typename C, typename S, typename A>
+double req_sketch<T, C, S, A>::get_rank_ub(uint16_t k, uint8_t num_levels, double rank, uint8_t num_std_dev, uint64_t n, bool hra) {
+ if (is_exact_rank(k, num_levels, rank, n, hra)) return rank;
+ const double relative = relative_rse_factor() / k * (hra ? 1.0 - rank : rank);
+ const double fixed = FIXED_RSE_FACTOR / k;
+ const double ub_rel = rank + num_std_dev * relative;
+ const double ub_fix = rank + num_std_dev * fixed;
+ return std::min(ub_rel, ub_fix);
+}
+
+template<typename T, typename C, typename S, typename A>
+bool req_sketch<T, C, S, A>::is_exact_rank(uint16_t k, uint8_t num_levels, double rank, uint64_t n, bool hra) {
+ const unsigned base_cap = k * req_constants::INIT_NUM_SECTIONS;
+ if (num_levels == 1 || n <= base_cap) return true;
+ const double exact_rank_thresh = static_cast<double>(base_cap) / n;
+ return (hra && rank >= 1.0 - exact_rank_thresh) || (!hra && rank <= exact_rank_thresh);
+}
+
+template<typename T, typename C, typename S, typename A>
+double req_sketch<T, C, S, A>::relative_rse_factor() {
+ return sqrt(0.0512 / req_constants::INIT_NUM_SECTIONS);
+}
+
+// implementation for fixed-size arithmetic types (integral and floating point)
+template<typename T, typename C, typename S, typename A>
+template<typename TT, typename std::enable_if<std::is_arithmetic<TT>::value, int>::type>
+size_t req_sketch<T, C, S, A>::get_serialized_size_bytes() const {
+ size_t size = PREAMBLE_SIZE_BYTES;
+ if (is_empty()) return size;
+ if (is_estimation_mode()) {
+ size += sizeof(n_) + sizeof(TT) * 2; // min and max
+ }
+ if (n_ == 1) {
+ size += sizeof(TT);
+ } else {
+ for (const auto& compactor: compactors_) size += compactor.get_serialized_size_bytes(S());
+ }
+ return size;
+}
+
+// implementation for all other types
+template<typename T, typename C, typename S, typename A>
+template<typename TT, typename std::enable_if<!std::is_arithmetic<TT>::value, int>::type>
+size_t req_sketch<T, C, S, A>::get_serialized_size_bytes() const {
+ size_t size = PREAMBLE_SIZE_BYTES;
+ if (is_empty()) return size;
+ if (is_estimation_mode()) {
+ size += sizeof(n_);
+ size += S().size_of_item(*min_value_);
+ size += S().size_of_item(*max_value_);
+ }
+ if (n_ == 1) {
+ size += S().size_of_item(*compactors_[0].begin());
+ } else {
+ for (const auto& compactor: compactors_) size += compactor.get_serialized_size_bytes(S());
+ }
+ return size;
+}
+
+template<typename T, typename C, typename S, typename A>
+void req_sketch<T, C, S, A>::serialize(std::ostream& os) const {
+ const uint8_t preamble_ints = is_estimation_mode() ? 4 : 2;
+ write(os, preamble_ints);
+ const uint8_t serial_version = SERIAL_VERSION;
+ write(os, serial_version);
+ const uint8_t family = FAMILY;
+ write(os, family);
+ const bool raw_items = n_ <= req_constants::MIN_K;
+ const uint8_t flags_byte(
+ (is_empty() ? 1 << flags::IS_EMPTY : 0)
+ | (hra_ ? 1 << flags::IS_HIGH_RANK : 0)
+ | (raw_items ? 1 << flags::RAW_ITEMS : 0)
+ | (compactors_[0].is_sorted() ? 1 << flags::IS_LEVEL_ZERO_SORTED : 0)
+ );
+ write(os, flags_byte);
+ write(os, k_);
+ const uint8_t num_levels = is_empty() ? 0 : get_num_levels();
+ write(os, num_levels);
+ const uint8_t num_raw_items = raw_items ? n_ : 0;
+ write(os, num_raw_items);
+ if (is_empty()) return;
+ if (is_estimation_mode()) {
+ write(os, n_);
+ S().serialize(os, min_value_, 1);
+ S().serialize(os, max_value_, 1);
+ }
+ if (raw_items) {
+ S().serialize(os, compactors_[0].begin(), num_raw_items);
+ } else {
+ for (const auto& compactor: compactors_) compactor.serialize(os, S());
+ }
+}
+
+template<typename T, typename C, typename S, typename A>
+auto req_sketch<T, C, S, A>::serialize(unsigned header_size_bytes) const -> vector_bytes {
+ const size_t size = header_size_bytes + get_serialized_size_bytes();
+ vector_bytes bytes(size, 0, allocator_);
+ uint8_t* ptr = bytes.data() + header_size_bytes;
+ const uint8_t* end_ptr = ptr + size;
+
+ const uint8_t preamble_ints = is_estimation_mode() ? 4 : 2;
+ ptr += copy_to_mem(preamble_ints, ptr);
+ const uint8_t serial_version = SERIAL_VERSION;
+ ptr += copy_to_mem(serial_version, ptr);
+ const uint8_t family = FAMILY;
+ ptr += copy_to_mem(family, ptr);
+ const bool raw_items = n_ <= req_constants::MIN_K;
+ const uint8_t flags_byte(
+ (is_empty() ? 1 << flags::IS_EMPTY : 0)
+ | (hra_ ? 1 << flags::IS_HIGH_RANK : 0)
+ | (raw_items ? 1 << flags::RAW_ITEMS : 0)
+ | (compactors_[0].is_sorted() ? 1 << flags::IS_LEVEL_ZERO_SORTED : 0)
+ );
+ ptr += copy_to_mem(flags_byte, ptr);
+ ptr += copy_to_mem(k_, ptr);
+ const uint8_t num_levels = is_empty() ? 0 : get_num_levels();
+ ptr += copy_to_mem(num_levels, ptr);
+ const uint8_t num_raw_items = raw_items ? n_ : 0;
+ ptr += copy_to_mem(num_raw_items, ptr);
+ if (!is_empty()) {
+ if (is_estimation_mode()) {
+ ptr += copy_to_mem(n_, ptr);
+ ptr += S().serialize(ptr, end_ptr - ptr, min_value_, 1);
+ ptr += S().serialize(ptr, end_ptr - ptr, max_value_, 1);
+ }
+ if (raw_items) {
+ ptr += S().serialize(ptr, end_ptr - ptr, compactors_[0].begin(), num_raw_items);
+ } else {
+ for (const auto& compactor: compactors_) ptr += compactor.serialize(ptr, end_ptr - ptr, S());
+ }
+ }
+ return bytes;
+}
+
+template<typename T, typename C, typename S, typename A>
+req_sketch<T, C, S, A> req_sketch<T, C, S, A>::deserialize(std::istream& is, const A& allocator) {
+ const auto preamble_ints = read<uint8_t>(is);
+ const auto serial_version = read<uint8_t>(is);
+ const auto family_id = read<uint8_t>(is);
+ const auto flags_byte = read<uint8_t>(is);
+ const auto k = read<uint16_t>(is);
+ const auto num_levels = read<uint8_t>(is);
+ const auto num_raw_items = read<uint8_t>(is);
+
+ check_preamble_ints(preamble_ints, num_levels);
+ check_serial_version(serial_version);
+ check_family_id(family_id);
+
+ if (!is.good()) throw std::runtime_error("error reading from std::istream");
+ const bool is_empty = flags_byte & (1 << flags::IS_EMPTY);
+ const bool hra = flags_byte & (1 << flags::IS_HIGH_RANK);
+ if (is_empty) return req_sketch(k, hra, allocator);
+
+ A alloc(allocator);
+ auto item_buffer_deleter = [&alloc](T* ptr) { alloc.deallocate(ptr, 1); };
+ std::unique_ptr<T, decltype(item_buffer_deleter)> min_value_buffer(alloc.allocate(1), item_buffer_deleter);
+ std::unique_ptr<T, decltype(item_buffer_deleter)> max_value_buffer(alloc.allocate(1), item_buffer_deleter);
+ std::unique_ptr<T, item_deleter> min_value(nullptr, item_deleter(allocator));
+ std::unique_ptr<T, item_deleter> max_value(nullptr, item_deleter(allocator));
+
+ const bool raw_items = flags_byte & (1 << flags::RAW_ITEMS);
+ const bool is_level_0_sorted = flags_byte & (1 << flags::IS_LEVEL_ZERO_SORTED);
+ std::vector<Compactor, AllocCompactor> compactors(allocator);
+
+ uint64_t n = 1;
+ if (num_levels > 1) {
+ n = read<uint64_t>(is);
+ S().deserialize(is, min_value_buffer.get(), 1);
+ // serde call did not throw, repackage with destrtuctor
+ min_value = std::unique_ptr<T, item_deleter>(min_value_buffer.release(), item_deleter(allocator));
+ S().deserialize(is, max_value_buffer.get(), 1);
+ // serde call did not throw, repackage with destrtuctor
+ max_value = std::unique_ptr<T, item_deleter>(max_value_buffer.release(), item_deleter(allocator));
+ }
+
+ if (raw_items) {
+ compactors.push_back(Compactor::deserialize(is, S(), allocator, is_level_0_sorted, k, num_raw_items, hra));
+ } else {
+ for (size_t i = 0; i < num_levels; ++i) {
+ compactors.push_back(Compactor::deserialize(is, S(), allocator, i == 0 ? is_level_0_sorted : true, hra));
+ }
+ }
+ if (num_levels == 1) {
+ const auto begin = compactors[0].begin();
+ const auto end = compactors[0].end();
+ n = compactors[0].get_num_items();
+ auto min_it = begin;
+ auto max_it = begin;
+ for (auto it = begin; it != end; ++it) {
+ if (C()(*it, *min_it)) min_it = it;
+ if (C()(*max_it, *it)) max_it = it;
+ }
+ new (min_value_buffer.get()) T(*min_it);
+ // copy did not throw, repackage with destrtuctor
+ min_value = std::unique_ptr<T, item_deleter>(min_value_buffer.release(), item_deleter(allocator));
+ new (max_value_buffer.get()) T(*max_it);
+ // copy did not throw, repackage with destrtuctor
+ max_value = std::unique_ptr<T, item_deleter>(max_value_buffer.release(), item_deleter(allocator));
+ }
+
+ if (!is.good()) throw std::runtime_error("error reading from std::istream");
+ return req_sketch(k, hra, n, std::move(min_value), std::move(max_value), std::move(compactors));
+}
+
+template<typename T, typename C, typename S, typename A>
+req_sketch<T, C, S, A> req_sketch<T, C, S, A>::deserialize(const void* bytes, size_t size, const A& allocator) {
+ ensure_minimum_memory(size, 8);
+ const char* ptr = static_cast<const char*>(bytes);
+ const char* end_ptr = static_cast<const char*>(bytes) + size;
+
+ uint8_t preamble_ints;
+ ptr += copy_from_mem(ptr, preamble_ints);
+ uint8_t serial_version;
+ ptr += copy_from_mem(ptr, serial_version);
+ uint8_t family_id;
+ ptr += copy_from_mem(ptr, family_id);
+ uint8_t flags_byte;
+ ptr += copy_from_mem(ptr, flags_byte);
+ uint16_t k;
+ ptr += copy_from_mem(ptr, k);
+ uint8_t num_levels;
+ ptr += copy_from_mem(ptr, num_levels);
+ uint8_t num_raw_items;
+ ptr += copy_from_mem(ptr, num_raw_items);
+
+ check_preamble_ints(preamble_ints, num_levels);
+ check_serial_version(serial_version);
+ check_family_id(family_id);
+
+ const bool is_empty = flags_byte & (1 << flags::IS_EMPTY);
+ const bool hra = flags_byte & (1 << flags::IS_HIGH_RANK);
+ if (is_empty) return req_sketch(k, hra, allocator);
+
+ A alloc(allocator);
+ auto item_buffer_deleter = [&alloc](T* ptr) { alloc.deallocate(ptr, 1); };
+ std::unique_ptr<T, decltype(item_buffer_deleter)> min_value_buffer(alloc.allocate(1), item_buffer_deleter);
+ std::unique_ptr<T, decltype(item_buffer_deleter)> max_value_buffer(alloc.allocate(1), item_buffer_deleter);
+ std::unique_ptr<T, item_deleter> min_value(nullptr, item_deleter(allocator));
+ std::unique_ptr<T, item_deleter> max_value(nullptr, item_deleter(allocator));
+
+ const bool raw_items = flags_byte & (1 << flags::RAW_ITEMS);
+ const bool is_level_0_sorted = flags_byte & (1 << flags::IS_LEVEL_ZERO_SORTED);
+ std::vector<Compactor, AllocCompactor> compactors(allocator);
+
+ uint64_t n = 1;
+ if (num_levels > 1) {
+ ensure_minimum_memory(end_ptr - ptr, sizeof(n));
+ ptr += copy_from_mem(ptr, n);
+ ptr += S().deserialize(ptr, end_ptr - ptr, min_value_buffer.get(), 1);
+ // serde call did not throw, repackage with destrtuctor
+ min_value = std::unique_ptr<T, item_deleter>(min_value_buffer.release(), item_deleter(allocator));
+ ptr += S().deserialize(ptr, end_ptr - ptr, max_value_buffer.get(), 1);
+ // serde call did not throw, repackage with destrtuctor
+ max_value = std::unique_ptr<T, item_deleter>(max_value_buffer.release(), item_deleter(allocator));
+ }
+
+ if (raw_items) {
+ auto pair = Compactor::deserialize(ptr, end_ptr - ptr, S(), allocator, is_level_0_sorted, k, num_raw_items, hra);
+ compactors.push_back(std::move(pair.first));
+ ptr += pair.second;
+ } else {
+ for (size_t i = 0; i < num_levels; ++i) {
+ auto pair = Compactor::deserialize(ptr, end_ptr - ptr, S(), allocator, i == 0 ? is_level_0_sorted : true, hra);
+ compactors.push_back(std::move(pair.first));
+ ptr += pair.second;
+ }
+ }
+ if (num_levels == 1) {
+ const auto begin = compactors[0].begin();
+ const auto end = compactors[0].end();
+ n = compactors[0].get_num_items();
+ auto min_it = begin;
+ auto max_it = begin;
+ for (auto it = begin; it != end; ++it) {
+ if (C()(*it, *min_it)) min_it = it;
+ if (C()(*max_it, *it)) max_it = it;
+ }
+ new (min_value_buffer.get()) T(*min_it);
+ // copy did not throw, repackage with destrtuctor
+ min_value = std::unique_ptr<T, item_deleter>(min_value_buffer.release(), item_deleter(allocator));
+ new (max_value_buffer.get()) T(*max_it);
+ // copy did not throw, repackage with destrtuctor
+ max_value = std::unique_ptr<T, item_deleter>(max_value_buffer.release(), item_deleter(allocator));
+ }
+
+ return req_sketch(k, hra, n, std::move(min_value), std::move(max_value), std::move(compactors));
+}
+
+template<typename T, typename C, typename S, typename A>
+void req_sketch<T, C, S, A>::grow() {
+ const uint8_t lg_weight = get_num_levels();
+ compactors_.push_back(Compactor(hra_, lg_weight, k_, allocator_));
+ update_max_nom_size();
+}
+
+template<typename T, typename C, typename S, typename A>
+uint8_t req_sketch<T, C, S, A>::get_num_levels() const {
+ return compactors_.size();
+}
+
+template<typename T, typename C, typename S, typename A>
+void req_sketch<T, C, S, A>::update_max_nom_size() {
+ max_nom_size_ = 0;
+ for (const auto& compactor: compactors_) max_nom_size_ += compactor.get_nom_capacity();
+}
+
+template<typename T, typename C, typename S, typename A>
+void req_sketch<T, C, S, A>::update_num_retained() {
+ num_retained_ = 0;
+ for (const auto& compactor: compactors_) num_retained_ += compactor.get_num_items();
+}
+
+template<typename T, typename C, typename S, typename A>
+void req_sketch<T, C, S, A>::compress() {
+ for (size_t h = 0; h < compactors_.size(); ++h) {
+ if (compactors_[h].get_num_items() >= compactors_[h].get_nom_capacity()) {
+ if (h == 0) compactors_[0].sort();
+ if (h + 1 >= get_num_levels()) { // at the top?
+ grow(); // add a level, increases max_nom_size
+ }
+ auto pair = compactors_[h].compact(compactors_[h + 1]);
+ num_retained_ -= pair.first;
+ max_nom_size_ += pair.second;
+ if (LAZY_COMPRESSION && num_retained_ < max_nom_size_) break;
+ }
+ }
+}
+
+template<typename T, typename C, typename S, typename A>
+string<A> req_sketch<T, C, S, A>::to_string(bool print_levels, bool print_items) const {
+ std::basic_ostringstream<char, std::char_traits<char>, AllocChar<A>> os;
+ os << "### REQ sketch summary:" << std::endl;
+ os << " K : " << k_ << std::endl;
+ os << " High Rank Acc : " << (hra_ ? "true" : "false") << std::endl;
+ os << " Empty : " << (is_empty() ? "true" : "false") << std::endl;
+ os << " Estimation mode: " << (is_estimation_mode() ? "true" : "false") << std::endl;
+ os << " Sorted : " << (compactors_[0].is_sorted() ? "true" : "false") << std::endl;
+ os << " N : " << n_ << std::endl;
+ os << " Levels : " << compactors_.size() << std::endl;
+ os << " Retained items : " << num_retained_ << std::endl;
+ os << " Capacity items : " << max_nom_size_ << std::endl;
+ if (!is_empty()) {
+ os << " Min value : " << *min_value_ << std::endl;
+ os << " Max value : " << *max_value_ << std::endl;
+ }
+ os << "### End sketch summary" << std::endl;
+
+ if (print_levels) {
+ os << "### REQ sketch levels:" << std::endl;
+ os << " index: nominal capacity, actual size" << std::endl;
+ for (uint8_t i = 0; i < compactors_.size(); i++) {
+ os << " " << (unsigned int) i << ": "
+ << compactors_[i].get_nom_capacity() << ", "
+ << compactors_[i].get_num_items() << std::endl;
+ }
+ os << "### End sketch levels" << std::endl;
+ }
+
+ if (print_items) {
+ os << "### REQ sketch data:" << std::endl;
+ unsigned level = 0;
+ for (const auto& compactor: compactors_) {
+ os << " level " << level << ": " << std::endl;
+ for (auto it = compactor.begin(); it != compactor.end(); ++it) {
+ os << " " << *it << std::endl;
+ }
+ ++level;
+ }
+ os << "### End sketch data" << std::endl;
+ }
+ return os.str();
+}
+
+template<typename T, typename C, typename S, typename A>
+class req_sketch<T, C, S, A>::item_deleter {
+ public:
+ item_deleter(const A& allocator): allocator_(allocator) {}
+ void operator() (T* ptr) {
+ if (ptr != nullptr) {
+ ptr->~T();
+ allocator_.deallocate(ptr, 1);
+ }
+ }
+ private:
+ A allocator_;
+};
+
+template<typename T, typename C, typename S, typename A>
+req_sketch<T, C, S, A>::req_sketch(uint32_t k, bool hra, uint64_t n, std::unique_ptr<T, item_deleter> min_value, std::unique_ptr<T, item_deleter> max_value, std::vector<Compactor, AllocCompactor>&& compactors):
+allocator_(compactors.get_allocator()),
+k_(k),
+hra_(hra),
+max_nom_size_(0),
+num_retained_(0),
+n_(n),
+compactors_(std::move(compactors)),
+min_value_(min_value.release()),
+max_value_(max_value.release())
+{
+ update_max_nom_size();
+ update_num_retained();
+}
+
+template<typename T, typename C, typename S, typename A>
+void req_sketch<T, C, S, A>::check_preamble_ints(uint8_t preamble_ints, uint8_t num_levels) {
+ const uint8_t expected_preamble_ints = num_levels > 1 ? 4 : 2;
+ if (preamble_ints != expected_preamble_ints) {
+ throw std::invalid_argument("Possible corruption: preamble ints must be "
+ + std::to_string(expected_preamble_ints) + ", got " + std::to_string(preamble_ints));
+ }
+}
+
+template<typename T, typename C, typename S, typename A>
+void req_sketch<T, C, S, A>::check_serial_version(uint8_t serial_version) {
+ if (serial_version != SERIAL_VERSION) {
+ throw std::invalid_argument("Possible corruption: serial version mismatch: expected "
+ + std::to_string(SERIAL_VERSION)
+ + ", got " + std::to_string(serial_version));
+ }
+}
+
+template<typename T, typename C, typename S, typename A>
+void req_sketch<T, C, S, A>::check_family_id(uint8_t family_id) {
+ if (family_id != FAMILY) {
+ throw std::invalid_argument("Possible corruption: family mismatch: expected "
+ + std::to_string(FAMILY) + ", got " + std::to_string(family_id));
+ }
+}
+
+template<typename T, typename C, typename S, typename A>
+auto req_sketch<T, C, S, A>::begin() const -> const_iterator {
+ return const_iterator(compactors_.begin(), compactors_.end());
+}
+
+template<typename T, typename C, typename S, typename A>
+auto req_sketch<T, C, S, A>::end() const -> const_iterator {
+ return const_iterator(compactors_.end(), compactors_.end());
+}
+
+// iterator
+
+template<typename T, typename C, typename S, typename A>
+req_sketch<T, C, S, A>::const_iterator::const_iterator(LevelsIterator begin, LevelsIterator end):
+levels_it_(begin),
+levels_end_(end),
+compactor_it_((*levels_it_).begin())
+{}
+
+template<typename T, typename C, typename S, typename A>
+auto req_sketch<T, C, S, A>::const_iterator::operator++() -> const_iterator& {
+ ++compactor_it_;
+ if (compactor_it_ == (*levels_it_).end()) {
+ ++levels_it_;
+ if (levels_it_ != levels_end_) compactor_it_ = (*levels_it_).begin();
+ }
+ return *this;
+}
+
+template<typename T, typename C, typename S, typename A>
+auto req_sketch<T, C, S, A>::const_iterator::operator++(int) -> const_iterator& {
+ const_iterator tmp(*this);
+ operator++();
+ return tmp;
+}
+
+template<typename T, typename C, typename S, typename A>
+bool req_sketch<T, C, S, A>::const_iterator::operator==(const const_iterator& other) const {
+ if (levels_it_ != other.levels_it_) return false;
+ if (levels_it_ == levels_end_) return true;
+ return compactor_it_ == other.compactor_it_;
+}
+
+template<typename T, typename C, typename S, typename A>
+bool req_sketch<T, C, S, A>::const_iterator::operator!=(const const_iterator& other) const {
+ return !operator==(other);
+}
+
+template<typename T, typename C, typename S, typename A>
+std::pair<const T&, const uint64_t> req_sketch<T, C, S, A>::const_iterator::operator*() const {
+ return std::pair<const T&, const uint64_t>(*compactor_it_, 1 << (*levels_it_).get_lg_weight());
+}
+
+} /* namespace datasketches */
+
+#endif
diff --git a/req/test/CMakeLists.txt b/req/test/CMakeLists.txt
new file mode 100755
index 0000000..a509068
--- /dev/null
+++ b/req/test/CMakeLists.txt
@@ -0,0 +1,43 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+add_executable(req_test)
+
+target_link_libraries(req_test req common_test)
+
+set_target_properties(req_test PROPERTIES
+ CXX_STANDARD 11
+ CXX_STANDARD_REQUIRED YES
+)
+
+file(TO_CMAKE_PATH "${CMAKE_CURRENT_SOURCE_DIR}" REQ_TEST_BINARY_PATH)
+string(APPEND REQ_TEST_BINARY_PATH "/")
+target_compile_definitions(req_test
+ PRIVATE
+ TEST_BINARY_INPUT_PATH="${REQ_TEST_BINARY_PATH}"
+)
+
+add_test(
+ NAME req_test
+ COMMAND req_test
+)
+
+target_sources(req_test
+ PRIVATE
+ req_sketch_test.cpp
+ req_sketch_custom_type_test.cpp
+)
diff --git a/req/test/req_float_empty_from_java.sk b/req/test/req_float_empty_from_java.sk
new file mode 100644
index 0000000..9b24bcc
--- /dev/null
+++ b/req/test/req_float_empty_from_java.sk
Binary files differ
diff --git a/req/test/req_float_estimation_from_java.sk b/req/test/req_float_estimation_from_java.sk
new file mode 100644
index 0000000..d063b41
--- /dev/null
+++ b/req/test/req_float_estimation_from_java.sk
Binary files differ
diff --git a/req/test/req_float_exact_from_java.sk b/req/test/req_float_exact_from_java.sk
new file mode 100644
index 0000000..d144ac8
--- /dev/null
+++ b/req/test/req_float_exact_from_java.sk
Binary files differ
diff --git a/req/test/req_float_raw_items_from_java.sk b/req/test/req_float_raw_items_from_java.sk
new file mode 100644
index 0000000..0bfe5a9
--- /dev/null
+++ b/req/test/req_float_raw_items_from_java.sk
Binary files differ
diff --git a/req/test/req_float_single_item_from_java.sk b/req/test/req_float_single_item_from_java.sk
new file mode 100644
index 0000000..774db9f
--- /dev/null
+++ b/req/test/req_float_single_item_from_java.sk
Binary files differ
diff --git a/req/test/req_sketch_custom_type_test.cpp b/req/test/req_sketch_custom_type_test.cpp
new file mode 100644
index 0000000..1d76c3c
--- /dev/null
+++ b/req/test/req_sketch_custom_type_test.cpp
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <catch.hpp>
+#include <sstream>
+
+#include <req_sketch.hpp>
+#include <test_allocator.hpp>
+#include <test_type.hpp>
+
+namespace datasketches {
+
+using req_test_type_sketch = req_sketch<test_type, test_type_less, test_type_serde, test_allocator<test_type>>;
+
+TEST_CASE("req sketch custom type", "[req_sketch]") {
+
+ // setup section
+ test_allocator_total_bytes = 0;
+
+ SECTION("compact level zero") {
+ req_test_type_sketch sketch(4);
+ REQUIRE_THROWS_AS(sketch.get_quantile(0), std::runtime_error);
+ REQUIRE_THROWS_AS(sketch.get_min_value(), std::runtime_error);
+ REQUIRE_THROWS_AS(sketch.get_max_value(), std::runtime_error);
+ REQUIRE(sketch.get_serialized_size_bytes() == 8);
+
+ for (int i = 0; i < 24; ++i) sketch.update(i);
+ //std::cout << sketch.to_string(true);
+
+ REQUIRE(sketch.is_estimation_mode());
+ REQUIRE(sketch.get_n() > sketch.get_num_retained());
+ REQUIRE(sketch.get_min_value().get_value() == 0);
+ REQUIRE(sketch.get_max_value().get_value() == 23);
+ }
+
+ SECTION("merge small") {
+ req_test_type_sketch sketch1(4);
+ sketch1.update(1);
+
+ req_test_type_sketch sketch2(4);
+ sketch2.update(2);
+
+ sketch2.merge(sketch1);
+
+ //std::cout << sketch2.to_string(true);
+
+ REQUIRE_FALSE(sketch2.is_estimation_mode());
+ REQUIRE(sketch2.get_num_retained() == sketch2.get_n());
+ REQUIRE(sketch2.get_min_value().get_value() == 1);
+ REQUIRE(sketch2.get_max_value().get_value() == 2);
+ }
+
+ SECTION("merge higher levels") {
+ req_test_type_sketch sketch1(4);
+ for (int i = 0; i < 24; ++i) sketch1.update(i);
+
+ req_test_type_sketch sketch2(4);
+ for (int i = 0; i < 24; ++i) sketch2.update(i);
+
+ sketch2.merge(sketch1);
+
+ //std::cout << sketch2.to_string(true);
+
+ REQUIRE(sketch2.is_estimation_mode());
+ REQUIRE(sketch2.get_n() > sketch2.get_num_retained());
+ REQUIRE(sketch2.get_min_value().get_value() == 0);
+ REQUIRE(sketch2.get_max_value().get_value() == 23);
+ }
+
+ SECTION("serialize deserialize") {
+ req_test_type_sketch sketch1(12);
+
+ const int n = 1000;
+ for (int i = 0; i < n; i++) sketch1.update(i);
+
+ std::stringstream s(std::ios::in | std::ios::out | std::ios::binary);
+ sketch1.serialize(s);
+ REQUIRE((size_t) s.tellp() == sketch1.get_serialized_size_bytes());
+ auto sketch2 = req_test_type_sketch::deserialize(s);
+ REQUIRE((size_t) s.tellg() == sketch2.get_serialized_size_bytes());
+ REQUIRE(s.tellg() == s.tellp());
+ REQUIRE(sketch2.is_empty() == sketch1.is_empty());
+ REQUIRE(sketch2.is_estimation_mode() == sketch1.is_estimation_mode());
+ REQUIRE(sketch2.get_n() == sketch1.get_n());
+ REQUIRE(sketch2.get_num_retained() == sketch1.get_num_retained());
+ REQUIRE(sketch2.get_min_value().get_value() == sketch1.get_min_value().get_value());
+ REQUIRE(sketch2.get_max_value().get_value() == sketch1.get_max_value().get_value());
+ REQUIRE(sketch2.get_quantile(0.5).get_value() == sketch1.get_quantile(0.5).get_value());
+ REQUIRE(sketch2.get_rank(0) == sketch1.get_rank(0));
+ REQUIRE(sketch2.get_rank(n) == sketch1.get_rank(n));
+ REQUIRE(sketch2.get_rank(n / 2) == sketch1.get_rank(n / 2));
+ }
+
+ SECTION("moving merge") {
+ req_test_type_sketch sketch1(4);
+ for (int i = 0; i < 10; i++) sketch1.update(i);
+ req_test_type_sketch sketch2(4);
+ sketch2.update(10);
+ sketch2.merge(std::move(sketch1));
+ REQUIRE(sketch2.get_min_value().get_value() == 0);
+ REQUIRE(sketch2.get_max_value().get_value() == 10);
+ REQUIRE(sketch2.get_n() == 11);
+ }
+
+ // cleanup
+ if (test_allocator_total_bytes != 0) {
+ REQUIRE(test_allocator_total_bytes == 0);
+ }
+}
+
+} /* namespace datasketches */
diff --git a/req/test/req_sketch_test.cpp b/req/test/req_sketch_test.cpp
new file mode 100755
index 0000000..0be4aa5
--- /dev/null
+++ b/req/test/req_sketch_test.cpp
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <catch.hpp>
+
+#include <req_sketch.hpp>
+
+#include <fstream>
+#include <sstream>
+#include <limits>
+
+namespace datasketches {
+
+#ifdef TEST_BINARY_INPUT_PATH
+const std::string input_path = TEST_BINARY_INPUT_PATH;
+#else
+const std::string input_path = "test/";
+#endif
+
+TEST_CASE("req sketch: empty", "[req_sketch]") {
+ std::cout << "sizeof(req_float_sketch)=" << sizeof(req_sketch<float>) << "\n";
+ req_sketch<float> sketch(12);
+ REQUIRE(sketch.get_k() == 12);
+ REQUIRE(sketch.is_HRA());
+ REQUIRE(sketch.is_empty());
+ REQUIRE_FALSE(sketch.is_estimation_mode());
+ REQUIRE(sketch.get_n() == 0);
+ REQUIRE(sketch.get_num_retained() == 0);
+ REQUIRE(std::isnan(sketch.get_rank(0)));
+ REQUIRE(std::isnan(sketch.get_rank(std::numeric_limits<float>::infinity())));
+ REQUIRE(std::isnan(sketch.get_min_value()));
+ REQUIRE(std::isnan(sketch.get_max_value()));
+ REQUIRE(std::isnan(sketch.get_quantile(0)));
+ REQUIRE(std::isnan(sketch.get_quantile(0.5)));
+ REQUIRE(std::isnan(sketch.get_quantile(1)));
+ const double ranks[3] {0, 0.5, 1};
+ REQUIRE(sketch.get_quantiles(ranks, 3).size() == 0);
+}
+
+TEST_CASE("req sketch: single value, lra", "[req_sketch]") {
+ req_sketch<float> sketch(12, false);
+ sketch.update(1);
+ REQUIRE_FALSE(sketch.is_HRA());
+ REQUIRE_FALSE(sketch.is_empty());
+ REQUIRE_FALSE(sketch.is_estimation_mode());
+ REQUIRE(sketch.get_n() == 1);
+ REQUIRE(sketch.get_num_retained() == 1);
+ REQUIRE(sketch.get_rank(1) == 0);
+ REQUIRE(sketch.get_rank<true>(1) == 1);
+ REQUIRE(sketch.get_rank(1.1) == 1);
+ REQUIRE(sketch.get_rank(std::numeric_limits<float>::infinity()) == 1);
+ REQUIRE(sketch.get_quantile(0) == 1);
+ REQUIRE(sketch.get_quantile(0.5) == 1);
+ REQUIRE(sketch.get_quantile(1) == 1);
+
+ const double ranks[3] {0, 0.5, 1};
+ auto quantiles = sketch.get_quantiles(ranks, 3);
+ REQUIRE(quantiles.size() == 3);
+ REQUIRE(quantiles[0] == 1);
+ REQUIRE(quantiles[1] == 1);
+ REQUIRE(quantiles[2] == 1);
+
+ unsigned count = 0;
+ for (auto it: sketch) {
+ REQUIRE(it.second == 1);
+ ++count;
+ }
+ REQUIRE(count == 1);
+}
+
+TEST_CASE("req sketch: repeated values", "[req_sketch]") {
+ req_sketch<float> sketch(12);
+ sketch.update(1);
+ sketch.update(1);
+ sketch.update(1);
+ sketch.update(2);
+ sketch.update(2);
+ sketch.update(2);
+ REQUIRE_FALSE(sketch.is_empty());
+ REQUIRE_FALSE(sketch.is_estimation_mode());
+ REQUIRE(sketch.get_n() == 6);
+ REQUIRE(sketch.get_num_retained() == 6);
+ REQUIRE(sketch.get_rank(1) == 0);
+ REQUIRE(sketch.get_rank<true>(1) == 0.5);
+ REQUIRE(sketch.get_rank(2) == 0.5);
+ REQUIRE(sketch.get_rank<true>(2) == 1);
+}
+
+TEST_CASE("req sketch: exact mode", "[req_sketch]") {
+ req_sketch<float> sketch(12);
+ for (size_t i = 1; i <= 10; ++i) sketch.update(i);
+ REQUIRE_FALSE(sketch.is_empty());
+ REQUIRE_FALSE(sketch.is_estimation_mode());
+ REQUIRE(sketch.get_n() == 10);
+ REQUIRE(sketch.get_num_retained() == 10);
+
+ // like KLL
+ REQUIRE(sketch.get_rank(1) == 0);
+ REQUIRE(sketch.get_rank(2) == 0.1);
+ REQUIRE(sketch.get_rank(6) == 0.5);
+ REQUIRE(sketch.get_rank(9) == 0.8);
+ REQUIRE(sketch.get_rank(10) == 0.9);
+
+ // inclusive
+ REQUIRE(sketch.get_rank<true>(1) == 0.1);
+ REQUIRE(sketch.get_rank<true>(2) == 0.2);
+ REQUIRE(sketch.get_rank<true>(5) == 0.5);
+ REQUIRE(sketch.get_rank<true>(9) == 0.9);
+ REQUIRE(sketch.get_rank<true>(10) == 1);
+
+ // like KLL
+ REQUIRE(sketch.get_quantile(0) == 1);
+ REQUIRE(sketch.get_quantile(0.1) == 2);
+ REQUIRE(sketch.get_quantile(0.5) == 6);
+ REQUIRE(sketch.get_quantile(0.9) == 10);
+ REQUIRE(sketch.get_quantile(1) == 10);
+
+ // inclusive
+ REQUIRE(sketch.get_quantile<true>(0) == 1);
+ REQUIRE(sketch.get_quantile<true>(0.1) == 1);
+ REQUIRE(sketch.get_quantile<true>(0.5) == 5);
+ REQUIRE(sketch.get_quantile<true>(0.9) == 9);
+ REQUIRE(sketch.get_quantile<true>(1) == 10);
+
+ const double ranks[3] {0, 0.5, 1};
+ auto quantiles = sketch.get_quantiles(ranks, 3);
+ REQUIRE(quantiles.size() == 3);
+ REQUIRE(quantiles[0] == 1);
+ REQUIRE(quantiles[1] == 6);
+ REQUIRE(quantiles[2] == 10);
+
+ const float splits[3] {2, 6, 9};
+ auto cdf = sketch.get_CDF(splits, 3);
+ REQUIRE(cdf[0] == 0.1);
+ REQUIRE(cdf[1] == 0.5);
+ REQUIRE(cdf[2] == 0.8);
+ REQUIRE(cdf[3] == 1);
+ auto pmf = sketch.get_PMF(splits, 3);
+ REQUIRE(pmf[0] == Approx(0.1).margin(1e-8));
+ REQUIRE(pmf[1] == Approx(0.4).margin(1e-8));
+ REQUIRE(pmf[2] == Approx(0.3).margin(1e-8));
+ REQUIRE(pmf[3] == Approx(0.2).margin(1e-8));
+
+ REQUIRE(sketch.get_rank_lower_bound(0.5, 1) == 0.5);
+ REQUIRE(sketch.get_rank_upper_bound(0.5, 1) == 0.5);
+}
+
+TEST_CASE("req sketch: estimation mode", "[req_sketch]") {
+ req_sketch<float> sketch(12);
+ const size_t n = 100000;
+ for (size_t i = 0; i < n; ++i) sketch.update(i);
+ REQUIRE_FALSE(sketch.is_empty());
+ REQUIRE(sketch.is_estimation_mode());
+ REQUIRE(sketch.get_n() == n);
+// std::cout << sketch.to_string(true);
+ REQUIRE(sketch.get_num_retained() < n);
+ REQUIRE(sketch.get_rank(0) == 0);
+ REQUIRE(sketch.get_rank(n) == 1);
+ REQUIRE(sketch.get_rank(n / 2) == Approx(0.5).margin(0.01));
+ REQUIRE(sketch.get_rank(n - 1) == Approx(1).margin(0.01));
+ REQUIRE(sketch.get_min_value() == 0);
+ REQUIRE(sketch.get_max_value() == n - 1);
+ REQUIRE(sketch.get_rank_lower_bound(0.5, 1) < 0.5);
+ REQUIRE(sketch.get_rank_upper_bound(0.5, 1) > 0.5);
+
+ unsigned count = 0;
+ for (auto it: sketch) {
+ REQUIRE(it.second >= 1);
+ ++count;
+ }
+ REQUIRE(count == sketch.get_num_retained());
+}
+
+TEST_CASE("req sketch: stream serialize-deserialize empty", "[req_sketch]") {
+ req_sketch<float> sketch(12);
+
+ std::stringstream s(std::ios::in | std::ios::out | std::ios::binary);
+ sketch.serialize(s);
+ auto sketch2 = req_sketch<float>::deserialize(s);
+ REQUIRE(s.tellg() == s.tellp());
+ REQUIRE(sketch2.is_empty() == sketch.is_empty());
+ REQUIRE(sketch2.is_estimation_mode() == sketch.is_estimation_mode());
+ REQUIRE(sketch2.get_num_retained() == sketch.get_num_retained());
+ REQUIRE(sketch2.get_n() == sketch.get_n());
+ REQUIRE(std::isnan(sketch2.get_min_value()));
+ REQUIRE(std::isnan(sketch2.get_max_value()));
+}
+
+TEST_CASE("req sketch: byte serialize-deserialize empty", "[req_sketch]") {
+ req_sketch<float> sketch(12);
+
+ auto bytes = sketch.serialize();
+ REQUIRE(bytes.size() == sketch.get_serialized_size_bytes());
+ auto sketch2 = req_sketch<float>::deserialize(bytes.data(), bytes.size());
+ REQUIRE(bytes.size() == sketch2.get_serialized_size_bytes());
+ REQUIRE(sketch2.is_empty() == sketch.is_empty());
+ REQUIRE(sketch2.is_estimation_mode() == sketch.is_estimation_mode());
+ REQUIRE(sketch2.get_num_retained() == sketch.get_num_retained());
+ REQUIRE(sketch2.get_n() == sketch.get_n());
+ REQUIRE(std::isnan(sketch2.get_min_value()));
+ REQUIRE(std::isnan(sketch2.get_max_value()));
+}
+
+TEST_CASE("req sketch: stream serialize-deserialize single item", "[req_sketch]") {
+ req_sketch<float> sketch(12);
+ sketch.update(1);
+
+ std::stringstream s(std::ios::in | std::ios::out | std::ios::binary);
+ sketch.serialize(s);
+ auto sketch2 = req_sketch<float>::deserialize(s);
+ REQUIRE(s.tellg() == s.tellp());
+ REQUIRE(sketch2.is_empty() == sketch.is_empty());
+ REQUIRE(sketch2.is_estimation_mode() == sketch.is_estimation_mode());
+ REQUIRE(sketch2.get_num_retained() == sketch.get_num_retained());
+ REQUIRE(sketch2.get_n() == sketch.get_n());
+ REQUIRE(sketch2.get_min_value() == sketch.get_min_value());
+ REQUIRE(sketch2.get_max_value() == sketch.get_max_value());
+}
+
+TEST_CASE("req sketch: byte serialize-deserialize single item", "[req_sketch]") {
+ req_sketch<float> sketch(12);
+ sketch.update(1);
+
+ auto bytes = sketch.serialize();
+ REQUIRE(bytes.size() == sketch.get_serialized_size_bytes());
+ auto sketch2 = req_sketch<float>::deserialize(bytes.data(), bytes.size());
+ std::cout << sketch2.to_string(true);
+ REQUIRE(bytes.size() == sketch2.get_serialized_size_bytes());
+ REQUIRE(sketch2.is_empty() == sketch.is_empty());
+ REQUIRE(sketch2.is_estimation_mode() == sketch.is_estimation_mode());
+ REQUIRE(sketch2.get_num_retained() == sketch.get_num_retained());
+ REQUIRE(sketch2.get_n() == sketch.get_n());
+ REQUIRE(sketch2.get_min_value() == sketch.get_min_value());
+ REQUIRE(sketch2.get_max_value() == sketch.get_max_value());
+}
+
+TEST_CASE("req sketch: stream serialize-deserialize exact mode", "[req_sketch]") {
+ req_sketch<float> sketch(12);
+ const size_t n = 50;
+ for (size_t i = 0; i < n; ++i) sketch.update(i);
+ REQUIRE_FALSE(sketch.is_estimation_mode());
+
+ std::stringstream s(std::ios::in | std::ios::out | std::ios::binary);
+ sketch.serialize(s);
+ auto sketch2 = req_sketch<float>::deserialize(s);
+ REQUIRE(s.tellg() == s.tellp());
+ REQUIRE(sketch2.is_empty() == sketch.is_empty());
+ REQUIRE(sketch2.is_estimation_mode() == sketch.is_estimation_mode());
+ REQUIRE(sketch2.get_num_retained() == sketch.get_num_retained());
+ REQUIRE(sketch2.get_n() == sketch.get_n());
+ REQUIRE(sketch2.get_min_value() == sketch.get_min_value());
+ REQUIRE(sketch2.get_max_value() == sketch.get_max_value());
+}
+
+TEST_CASE("req sketch: byte serialize-deserialize exact mode", "[req_sketch]") {
+ req_sketch<float> sketch(12);
+ const size_t n = 50;
+ for (size_t i = 0; i < n; ++i) sketch.update(i);
+ REQUIRE_FALSE(sketch.is_estimation_mode());
+
+ auto bytes = sketch.serialize();
+ REQUIRE(bytes.size() == sketch.get_serialized_size_bytes());
+ auto sketch2 = req_sketch<float>::deserialize(bytes.data(), bytes.size());
+ std::cout << sketch2.to_string(true);
+ REQUIRE(bytes.size() == sketch2.get_serialized_size_bytes());
+ REQUIRE(sketch2.is_empty() == sketch.is_empty());
+ REQUIRE(sketch2.is_estimation_mode() == sketch.is_estimation_mode());
+ REQUIRE(sketch2.get_num_retained() == sketch.get_num_retained());
+ REQUIRE(sketch2.get_n() == sketch.get_n());
+ REQUIRE(sketch2.get_min_value() == sketch.get_min_value());
+ REQUIRE(sketch2.get_max_value() == sketch.get_max_value());
+}
+
+TEST_CASE("req sketch: stream serialize-deserialize estimation mode", "[req_sketch]") {
+ req_sketch<float> sketch(12);
+ const size_t n = 100000;
+ for (size_t i = 0; i < n; ++i) sketch.update(i);
+ REQUIRE(sketch.is_estimation_mode());
+
+ std::stringstream s(std::ios::in | std::ios::out | std::ios::binary);
+ sketch.serialize(s);
+ auto sketch2 = req_sketch<float>::deserialize(s);
+ REQUIRE(s.tellg() == s.tellp());
+ REQUIRE(sketch2.is_empty() == sketch.is_empty());
+ REQUIRE(sketch2.is_estimation_mode() == sketch.is_estimation_mode());
+ REQUIRE(sketch2.get_num_retained() == sketch.get_num_retained());
+ REQUIRE(sketch2.get_n() == sketch.get_n());
+ REQUIRE(sketch2.get_min_value() == sketch.get_min_value());
+ REQUIRE(sketch2.get_max_value() == sketch.get_max_value());
+}
+
+TEST_CASE("req sketch: byte serialize-deserialize estimation mode", "[req_sketch]") {
+ req_sketch<float> sketch(12);
+ const size_t n = 100000;
+ for (size_t i = 0; i < n; ++i) sketch.update(i);
+ REQUIRE(sketch.is_estimation_mode());
+
+ auto bytes = sketch.serialize();
+ REQUIRE(bytes.size() == sketch.get_serialized_size_bytes());
+ auto sketch2 = req_sketch<float>::deserialize(bytes.data(), bytes.size());
+ REQUIRE(bytes.size() == sketch2.get_serialized_size_bytes());
+ REQUIRE(sketch2.is_empty() == sketch.is_empty());
+ REQUIRE(sketch2.is_estimation_mode() == sketch.is_estimation_mode());
+ REQUIRE(sketch2.get_num_retained() == sketch.get_num_retained());
+ REQUIRE(sketch2.get_n() == sketch.get_n());
+ REQUIRE(sketch2.get_min_value() == sketch.get_min_value());
+ REQUIRE(sketch2.get_max_value() == sketch.get_max_value());
+}
+
+TEST_CASE("req sketch: serialize deserialize stream and bytes equivalence", "[req_sketch]") {
+ req_sketch<float> sketch(12);
+ const size_t n = 100000;
+ for (size_t i = 0; i < n; ++i) sketch.update(i);
+ REQUIRE(sketch.is_estimation_mode());
+
+ std::stringstream s(std::ios::in | std::ios::out | std::ios::binary);
+ sketch.serialize(s);
+ auto bytes = sketch.serialize();
+ REQUIRE(bytes.size() == static_cast<size_t>(s.tellp()));
+ for (size_t i = 0; i < bytes.size(); ++i) {
+ REQUIRE(((char*)bytes.data())[i] == (char)s.get());
+ }
+
+ s.seekg(0); // rewind
+ auto sketch1 = req_sketch<float>::deserialize(s);
+ auto sketch2 = req_sketch<float>::deserialize(bytes.data(), bytes.size());
+ REQUIRE(bytes.size() == static_cast<size_t>(s.tellg()));
+ REQUIRE(sketch2.is_empty() == sketch1.is_empty());
+ REQUIRE(sketch2.is_estimation_mode() == sketch.is_estimation_mode());
+ REQUIRE(sketch2.get_num_retained() == sketch.get_num_retained());
+ REQUIRE(sketch2.get_n() == sketch.get_n());
+ REQUIRE(sketch2.get_min_value() == sketch.get_min_value());
+ REQUIRE(sketch2.get_max_value() == sketch.get_max_value());
+}
+
+TEST_CASE("req sketch: stream deserialize from Java - empty", "[req_sketch]") {
+ std::ifstream is;
+ is.exceptions(std::ios::failbit | std::ios::badbit);
+ is.open(input_path + "req_float_empty_from_java.sk", std::ios::binary);
+ auto sketch = req_sketch<float>::deserialize(is);
+ REQUIRE(sketch.is_empty());
+ REQUIRE_FALSE(sketch.is_estimation_mode());
+ REQUIRE(sketch.get_n() == 0);
+ REQUIRE(sketch.get_num_retained() == 0);
+ REQUIRE(std::isnan(sketch.get_min_value()));
+ REQUIRE(std::isnan(sketch.get_max_value()));
+}
+
+TEST_CASE("req sketch: stream deserialize from Java - single item", "[req_sketch]") {
+ std::ifstream is;
+ is.exceptions(std::ios::failbit | std::ios::badbit);
+ is.open(input_path + "req_float_single_item_from_java.sk", std::ios::binary);
+ auto sketch = req_sketch<float>::deserialize(is);
+ REQUIRE_FALSE(sketch.is_empty());
+ REQUIRE_FALSE(sketch.is_estimation_mode());
+ REQUIRE(sketch.get_n() == 1);
+ REQUIRE(sketch.get_num_retained() == 1);
+ REQUIRE(sketch.get_min_value() == 1);
+ REQUIRE(sketch.get_max_value() == 1);
+ REQUIRE(sketch.get_rank(1) == 0);
+ REQUIRE(sketch.get_rank<true>(1) == 1);
+}
+
+TEST_CASE("req sketch: stream deserialize from Java - raw items", "[req_sketch]") {
+ std::ifstream is;
+ is.exceptions(std::ios::failbit | std::ios::badbit);
+ is.open(input_path + "req_float_raw_items_from_java.sk", std::ios::binary);
+ auto sketch = req_sketch<float>::deserialize(is);
+ REQUIRE_FALSE(sketch.is_empty());
+ REQUIRE_FALSE(sketch.is_estimation_mode());
+ REQUIRE(sketch.get_n() == 4);
+ REQUIRE(sketch.get_num_retained() == 4);
+ REQUIRE(sketch.get_min_value() == 0);
+ REQUIRE(sketch.get_max_value() == 3);
+ REQUIRE(sketch.get_rank(2) == 0.5);
+}
+
+TEST_CASE("req sketch: stream deserialize from Java - exact mode", "[req_sketch]") {
+ std::ifstream is;
+ is.exceptions(std::ios::failbit | std::ios::badbit);
+ is.open(input_path + "req_float_exact_from_java.sk", std::ios::binary);
+ auto sketch = req_sketch<float>::deserialize(is);
+ REQUIRE_FALSE(sketch.is_empty());
+ REQUIRE_FALSE(sketch.is_estimation_mode());
+ REQUIRE(sketch.get_n() == 100);
+ REQUIRE(sketch.get_num_retained() == 100);
+ REQUIRE(sketch.get_min_value() == 0);
+ REQUIRE(sketch.get_max_value() == 99);
+ REQUIRE(sketch.get_rank(50) == 0.5);
+}
+
+TEST_CASE("req sketch: stream deserialize from Java - estimation mode", "[req_sketch]") {
+ std::ifstream is;
+ is.exceptions(std::ios::failbit | std::ios::badbit);
+ is.open(input_path + "req_float_estimation_from_java.sk", std::ios::binary);
+ auto sketch = req_sketch<float>::deserialize(is);
+ REQUIRE_FALSE(sketch.is_empty());
+ REQUIRE(sketch.is_estimation_mode());
+ REQUIRE(sketch.get_n() == 10000);
+ REQUIRE(sketch.get_num_retained() == 2942);
+ REQUIRE(sketch.get_min_value() == 0);
+ REQUIRE(sketch.get_max_value() == 9999);
+ REQUIRE(sketch.get_rank(5000) == 0.5);
+}
+
+TEST_CASE("req sketch: merge into empty", "[req_sketch]") {
+ req_sketch<float> sketch1(40);
+
+ req_sketch<float> sketch2(40);
+ for (size_t i = 0; i < 1000; ++i) sketch2.update(i);
+
+ sketch1.merge(sketch2);
+ REQUIRE(sketch1.get_min_value() == 0);
+ REQUIRE(sketch1.get_max_value() == 999);
+ REQUIRE(sketch1.get_quantile(0.25) == Approx(250).margin(3));
+ REQUIRE(sketch1.get_quantile(0.5) == Approx(500).margin(3));
+ REQUIRE(sketch1.get_quantile(0.75) == Approx(750).margin(3));
+ REQUIRE(sketch1.get_rank(500) == Approx(0.5).margin(0.01));
+}
+
+TEST_CASE("req sketch: merge", "[req_sketch]") {
+ req_sketch<float> sketch1(100);
+ for (size_t i = 0; i < 1000; ++i) sketch1.update(i);
+
+ req_sketch<float> sketch2(100);
+ for (size_t i = 1000; i < 2000; ++i) sketch2.update(i);
+
+ sketch1.merge(sketch2);
+ REQUIRE(sketch1.get_min_value() == 0);
+ REQUIRE(sketch1.get_max_value() == 1999);
+ REQUIRE(sketch1.get_quantile(0.25) == Approx(500).margin(3));
+ REQUIRE(sketch1.get_quantile(0.5) == Approx(1000).margin(1));
+ REQUIRE(sketch1.get_quantile(0.75) == Approx(1500).margin(1));
+ REQUIRE(sketch1.get_rank(1000) == Approx(0.5).margin(0.01));
+}
+
+TEST_CASE("req sketch: merge multiple", "[req_sketch]") {
+ req_sketch<float> sketch1(12);
+ for (size_t i = 0; i < 40; ++i) sketch1.update(i);
+
+ req_sketch<float> sketch2(12);
+ for (size_t i = 40; i < 80; ++i) sketch2.update(i);
+
+ req_sketch<float> sketch3(12);
+ for (size_t i = 80; i < 120; ++i) sketch3.update(i);
+
+ req_sketch<float> sketch(12);
+ sketch.merge(sketch1);
+ sketch.merge(sketch2);
+ sketch.merge(sketch3);
+ REQUIRE(sketch.get_min_value() == 0);
+ REQUIRE(sketch.get_max_value() == 119);
+ REQUIRE(sketch.get_quantile(0.5) == Approx(60).margin(3));
+ REQUIRE(sketch.get_rank(60) == Approx(0.5).margin(0.01));
+}
+
+TEST_CASE("req sketch: merge incompatible HRA and LRA", "[req_sketch]") {
+ req_sketch<float> sketch1(12);
+ sketch1.update(1);
+
+ req_sketch<float> sketch2(12, false);
+ sketch2.update(1);
+
+ REQUIRE_THROWS_AS(sketch1.merge(sketch2), std::invalid_argument);
+}
+
+//TEST_CASE("for manual comparison with Java") {
+// req_sketch<float> sketch(12, false);
+// for (size_t i = 0; i < 100000; ++i) sketch.update(i);
+// sketch.merge(sketch);
+// std::ofstream os;
+// os.exceptions(std::ios::failbit | std::ios::badbit);
+// os.open("req_float_lra_12_100000_merged.sk", std::ios::binary);
+// sketch.get_quantile(0.5); // force sorting level 0
+// sketch.serialize(os);
+//}
+
+} /* namespace datasketches */
diff --git a/theta/include/theta_union.hpp b/theta/include/theta_union.hpp
index 7fa2095..6cf8ccc 100644
--- a/theta/include/theta_union.hpp
+++ b/theta/include/theta_union.hpp
@@ -24,7 +24,7 @@
#include <functional>
#include <climits>
-#include <theta_sketch.hpp>
+#include "theta_sketch.hpp"
namespace datasketches {
diff --git a/tuple/include/theta_update_sketch_base_impl.hpp b/tuple/include/theta_update_sketch_base_impl.hpp
index 8fdb3d8..bbf845b 100644
--- a/tuple/include/theta_update_sketch_base_impl.hpp
+++ b/tuple/include/theta_update_sketch_base_impl.hpp
@@ -69,7 +69,7 @@
template<typename EN, typename EK, typename A>
theta_update_sketch_base<EN, EK, A>::theta_update_sketch_base(theta_update_sketch_base&& other) noexcept:
-allocator_(other.allocator_),
+allocator_(std::move(other.allocator_)),
is_empty_(other.is_empty_),
lg_cur_size_(other.lg_cur_size_),
lg_nom_size_(other.lg_nom_size_),
diff --git a/tuple/include/tuple_sketch.hpp b/tuple/include/tuple_sketch.hpp
index 2292937..4966b74 100644
--- a/tuple/include/tuple_sketch.hpp
+++ b/tuple/include/tuple_sketch.hpp
@@ -444,19 +444,21 @@
// for deserialize
class deleter_of_summaries {
public:
- deleter_of_summaries(uint32_t num, bool destroy): num(num), destroy(destroy) {}
- void set_destroy(bool destroy) { this->destroy = destroy; }
- void operator() (Summary* ptr) const {
+ deleter_of_summaries(uint32_t num, bool destroy, const Allocator& allocator):
+ allocator_(allocator), num_(num), destroy_(destroy) {}
+ void set_destroy(bool destroy) { destroy_ = destroy; }
+ void operator() (Summary* ptr) {
if (ptr != nullptr) {
- if (destroy) {
- for (uint32_t i = 0; i < num; ++i) ptr[i].~Summary();
+ if (destroy_) {
+ for (uint32_t i = 0; i < num_; ++i) ptr[i].~Summary();
}
- Allocator().deallocate(ptr, num);
+ allocator_.deallocate(ptr, num_);
}
}
private:
- uint32_t num;
- bool destroy;
+ Allocator allocator_;
+ uint32_t num_;
+ bool destroy_;
};
virtual void print_specifics(std::basic_ostream<char>& os) const;
diff --git a/tuple/include/tuple_sketch_impl.hpp b/tuple/include/tuple_sketch_impl.hpp
index 63552d7..52e2ebf 100644
--- a/tuple/include/tuple_sketch_impl.hpp
+++ b/tuple/include/tuple_sketch_impl.hpp
@@ -470,7 +470,7 @@
std::vector<Entry, AllocEntry> entries(alloc);
if (!is_empty) {
entries.reserve(num_entries);
- std::unique_ptr<S, deleter_of_summaries> summary(alloc.allocate(1), deleter_of_summaries(1, false));
+ std::unique_ptr<S, deleter_of_summaries> summary(alloc.allocate(1), deleter_of_summaries(1, false, allocator));
for (size_t i = 0; i < num_entries; ++i) {
uint64_t key;
is.read(reinterpret_cast<char*>(&key), sizeof(uint64_t));
@@ -533,7 +533,7 @@
std::vector<Entry, AllocEntry> entries(alloc);
if (!is_empty) {
entries.reserve(num_entries);
- std::unique_ptr<S, deleter_of_summaries> summary(alloc.allocate(1), deleter_of_summaries(1, false));
+ std::unique_ptr<S, deleter_of_summaries> summary(alloc.allocate(1), deleter_of_summaries(1, false, allocator));
for (size_t i = 0; i < num_entries; ++i) {
uint64_t key;
ptr += copy_from_mem(ptr, &key, sizeof(key));