Merge branch 'master' of github.com:apache/incubator-datasketches-cpp into 2.1.x-incubating
diff --git a/kll/include/kll_quantile_calculator.hpp b/kll/include/kll_quantile_calculator.hpp
index f77071e..bc60f26 100644
--- a/kll/include/kll_quantile_calculator.hpp
+++ b/kll/include/kll_quantile_calculator.hpp
@@ -26,31 +26,38 @@
template <typename T, typename C, typename A>
class kll_quantile_calculator {
- typedef typename std::allocator_traits<A>::template rebind_alloc<uint32_t> AllocU32;
- typedef typename std::allocator_traits<A>::template rebind_alloc<uint64_t> AllocU64;
public:
// assumes that all levels are sorted including level 0
kll_quantile_calculator(const T* items, const uint32_t* levels, uint8_t num_levels, uint64_t n);
- ~kll_quantile_calculator();
T get_quantile(double fraction) const;
private:
+ using AllocU32 = typename std::allocator_traits<A>::template rebind_alloc<uint32_t>;
+ using vector_u32 = std::vector<uint32_t, AllocU32>;
+ using Entry = std::pair<T, uint64_t>;
+ using AllocEntry = typename std::allocator_traits<A>::template rebind_alloc<Entry>;
+ using Container = std::vector<Entry, AllocEntry>;
uint64_t n_;
- T* items_;
- uint64_t* weights_;
- uint32_t* levels_;
- uint8_t levels_size_;
- uint8_t num_levels_;
+ vector_u32 levels_;
+ Container entries_;
- void populate_from_sketch(const T* items, uint32_t num_items, const uint32_t* levels, uint8_t num_levels);
+ void populate_from_sketch(const T* items, const uint32_t* levels, uint8_t num_levels);
T approximately_answer_positional_query(uint64_t pos) const;
- static void convert_to_preceding_cummulative(uint64_t* weights, uint32_t weights_size);
+ void convert_to_preceding_cummulative();
+ uint32_t chunk_containing_pos(uint64_t pos) const;
+ uint32_t search_for_chunk_containing_pos(uint64_t pos, uint32_t l, uint32_t r) const;
+ static void merge_sorted_blocks(Container& entries, const uint32_t* levels, uint8_t num_levels, uint32_t num_items);
+ static void merge_sorted_blocks_direct(Container& orig, Container& temp, const uint32_t* levels, uint8_t starting_level, uint8_t num_levels);
+ static void merge_sorted_blocks_reversed(Container& orig, Container& temp, const uint32_t* levels, uint8_t starting_level, uint8_t num_levels);
static uint64_t pos_of_phi(double phi, uint64_t n);
- static uint32_t chunk_containing_pos(uint64_t* weights, uint32_t weights_size, uint64_t pos);
- static uint32_t search_for_chunk_containing_pos(const uint64_t* arr, uint64_t pos, uint32_t l, uint32_t r);
- static void blocky_tandem_merge_sort(T* items, uint64_t* weights, uint32_t num_items, const uint32_t* levels, uint8_t num_levels);
- static void blocky_tandem_merge_sort_recursion(T* items_src, uint64_t* weights_src, T* items_dst, uint64_t* weights_dst, const uint32_t* levels, uint8_t starting_level, uint8_t num_levels);
- static void tandem_merge(const T* items_src, const uint64_t* weights_src, T* items_dst, uint64_t* weights_dst, const uint32_t* levels, uint8_t starting_level_1, uint8_t num_levels_1, uint8_t starting_level_2, uint8_t num_levels_2);
+
+ template<typename Comparator>
+ struct compare_pair_by_first {
+ template<typename Entry1, typename Entry2>
+ bool operator()(Entry1&& a, Entry2&& b) const {
+ return Comparator()(std::forward<Entry1>(a).first, std::forward<Entry2>(b).first);
+ }
+ };
};
} /* namespace datasketches */
diff --git a/kll/include/kll_quantile_calculator_impl.hpp b/kll/include/kll_quantile_calculator_impl.hpp
index d4f4c04..cb92674 100644
--- a/kll/include/kll_quantile_calculator_impl.hpp
+++ b/kll/include/kll_quantile_calculator_impl.hpp
@@ -22,31 +22,22 @@
#include <memory>
#include <cmath>
+#include <algorithm>
#include "kll_helper.hpp"
namespace datasketches {
template <typename T, typename C, typename A>
-kll_quantile_calculator<T, C, A>::kll_quantile_calculator(const T* items, const uint32_t* levels, uint8_t num_levels, uint64_t n) {
- n_ = n;
+kll_quantile_calculator<T, C, A>::kll_quantile_calculator(const T* items, const uint32_t* levels, uint8_t num_levels, uint64_t n):
+n_(n), levels_(num_levels + 1)
+{
const uint32_t num_items = levels[num_levels] - levels[0];
- items_ = A().allocate(num_items);
- weights_ = AllocU64().allocate(num_items + 1); // one more is intentional
- levels_size_ = num_levels + 1;
- levels_ = AllocU32().allocate(levels_size_);
- populate_from_sketch(items, num_items, levels, num_levels);
- blocky_tandem_merge_sort(items_, weights_, num_items, levels_, num_levels_);
- convert_to_preceding_cummulative(weights_, num_items + 1);
-}
-
-template <typename T, typename C, typename A>
-kll_quantile_calculator<T, C, A>::~kll_quantile_calculator() {
- const uint32_t num_items = levels_[num_levels_] - levels_[0];
- for (uint32_t i = 0; i < num_items; i++) items_[i].~T();
- A().deallocate(items_, num_items);
- AllocU64().deallocate(weights_, num_items + 1);
- AllocU32().deallocate(levels_, levels_size_);
+ entries_.reserve(num_items);
+ populate_from_sketch(items, levels, num_levels);
+ merge_sorted_blocks(entries_, levels_.data(), levels_.size() - 1, num_items);
+ if (!is_sorted(entries_.begin(), entries_.end(), compare_pair_by_first<C>())) throw std::logic_error("entries must be sorted");
+ convert_to_preceding_cummulative();
}
template <typename T, typename C, typename A>
@@ -55,17 +46,18 @@
}
template <typename T, typename C, typename A>
-void kll_quantile_calculator<T, C, A>::populate_from_sketch(const T* items, uint32_t num_items, const uint32_t* levels, uint8_t num_levels) {
- kll_helper::copy_construct<T>(items, levels[0], levels[num_levels], items_, 0);
- uint8_t src_level = 0;
- uint8_t dst_level = 0;
+void kll_quantile_calculator<T, C, A>::populate_from_sketch(const T* items, const uint32_t* levels, uint8_t num_levels) {
+ size_t src_level = 0;
+ size_t dst_level = 0;
uint64_t weight = 1;
uint32_t offset = levels[0];
while (src_level < num_levels) {
const uint32_t from_index(levels[src_level] - offset);
const uint32_t to_index(levels[src_level + 1] - offset); // exclusive
if (from_index < to_index) { // skip empty levels
- std::fill(&weights_[from_index], &weights_[to_index], weight);
+ for (uint32_t i = from_index; i < to_index; ++i) {
+ entries_.push_back(Entry(items[i + offset], weight));
+ }
levels_[dst_level] = from_index;
levels_[dst_level + 1] = to_index;
dst_level++;
@@ -73,24 +65,24 @@
src_level++;
weight *= 2;
}
- weights_[num_items] = 0;
- num_levels_ = dst_level;
+ if (levels_.size() > static_cast<size_t>(dst_level + 1)) levels_.resize(dst_level + 1);
}
template <typename T, typename C, typename A>
T kll_quantile_calculator<T, C, A>::approximately_answer_positional_query(uint64_t pos) const {
if (pos >= n_) throw std::logic_error("position out of range");
- const uint32_t weights_size(levels_[num_levels_] + 1);
- const uint32_t index = chunk_containing_pos(weights_, weights_size, pos);
- return items_[index];
+ const uint32_t num_items = levels_[levels_.size() - 1];
+ if (pos > entries_[num_items - 1].second) return entries_[num_items - 1].first;
+ const uint32_t index = chunk_containing_pos(pos);
+ return entries_[index].first;
}
template <typename T, typename C, typename A>
-void kll_quantile_calculator<T, C, A>::convert_to_preceding_cummulative(uint64_t* weights, uint32_t weights_size) {
- uint64_t subtotal(0);
- for (uint32_t i = 0; i < weights_size; i++) {
- const uint32_t new_subtotal = subtotal + weights[i];
- weights[i] = subtotal;
+void kll_quantile_calculator<T, C, A>::convert_to_preceding_cummulative() {
+ uint64_t subtotal = 0;
+ for (auto& entry: entries_) {
+ const uint32_t new_subtotal = subtotal + entry.second;
+ entry.second = subtotal;
subtotal = new_subtotal;
}
}
@@ -102,87 +94,74 @@
}
template <typename T, typename C, typename A>
-uint32_t kll_quantile_calculator<T, C, A>::chunk_containing_pos(uint64_t* weights, uint32_t weights_size, uint64_t pos) {
- if (weights_size <= 1) throw std::logic_error("weights array too short"); // weights_ contains an "extra" position
- const uint32_t nominal_length(weights_size - 1);
- if (pos < weights[0]) throw std::logic_error("position too small");
- if (pos >= weights[nominal_length]) throw std::logic_error("position too large");
- return search_for_chunk_containing_pos(weights, pos, 0, nominal_length);
+uint32_t kll_quantile_calculator<T, C, A>::chunk_containing_pos(uint64_t pos) const {
+ if (entries_.size() < 1) throw std::logic_error("array too short");
+ if (pos < entries_[0].second) throw std::logic_error("position too small");
+ if (pos > entries_[entries_.size() - 1].second) throw std::logic_error("position too large");
+ return search_for_chunk_containing_pos(pos, 0, entries_.size());
}
template <typename T, typename C, typename A>
-uint32_t kll_quantile_calculator<T, C, A>::search_for_chunk_containing_pos(const uint64_t* arr, uint64_t pos, uint32_t l, uint32_t r) {
+uint32_t kll_quantile_calculator<T, C, A>::search_for_chunk_containing_pos(uint64_t pos, uint32_t l, uint32_t r) const {
if (l + 1 == r) {
return l;
}
const uint32_t m(l + (r - l) / 2);
- if (arr[m] <= pos) {
- return search_for_chunk_containing_pos(arr, pos, m, r);
+ if (entries_[m].second <= pos) {
+ return search_for_chunk_containing_pos(pos, m, r);
}
- return search_for_chunk_containing_pos(arr, pos, l, m);
+ return search_for_chunk_containing_pos(pos, l, m);
}
template <typename T, typename C, typename A>
-void kll_quantile_calculator<T, C, A>::blocky_tandem_merge_sort(T* items, uint64_t* weights, uint32_t num_items, const uint32_t* levels, uint8_t num_levels) {
+void kll_quantile_calculator<T, C, A>::merge_sorted_blocks(Container& entries, const uint32_t* levels, uint8_t num_levels, uint32_t num_items) {
if (num_levels == 1) return;
-
- // move the input in preparation for the "ping-pong" reduction strategy
- auto tmp_items_deleter = [num_items](T* ptr) {
- for (uint32_t i = 0; i < num_items; i++) ptr[i].~T();
- A().deallocate(ptr, num_items);
- };
- std::unique_ptr<T, decltype(tmp_items_deleter)> tmp_items(A().allocate(num_items), tmp_items_deleter);
- kll_helper::move_construct<T>(items, 0, num_items, tmp_items.get(), 0, false); // do not destroy since the items will be moved back
- auto tmp_weights_deleter = [num_items](uint64_t* ptr) { AllocU64().deallocate(ptr, num_items); };
- std::unique_ptr<uint64_t[], decltype(tmp_weights_deleter)> tmp_weights(AllocU64().allocate(num_items), tmp_weights_deleter); // don't need the extra one here
- std::copy(weights, &weights[num_items], tmp_weights.get());
- blocky_tandem_merge_sort_recursion(tmp_items.get(), tmp_weights.get(), items, weights, levels, 0, num_levels);
+ Container temporary;
+ temporary.reserve(num_items);
+ merge_sorted_blocks_direct(entries, temporary, levels, 0, num_levels);
}
template <typename T, typename C, typename A>
-void kll_quantile_calculator<T, C, A>::blocky_tandem_merge_sort_recursion(T* items_src, uint64_t* weights_src, T* items_dst, uint64_t* weights_dst, const uint32_t* levels, uint8_t starting_level, uint8_t num_levels) {
+void kll_quantile_calculator<T, C, A>::merge_sorted_blocks_direct(Container& orig, Container& temp, const uint32_t* levels,
+ uint8_t starting_level, uint8_t num_levels) {
if (num_levels == 1) return;
const uint8_t num_levels_1 = num_levels / 2;
const uint8_t num_levels_2 = num_levels - num_levels_1;
- if (num_levels_1 < 1) throw std::logic_error("level above 0 expected");
- if (num_levels_2 < num_levels_1) throw std::logic_error("wrong order of levels");
const uint8_t starting_level_1 = starting_level;
const uint8_t starting_level_2 = starting_level + num_levels_1;
- // swap roles of src and dst
- blocky_tandem_merge_sort_recursion(items_dst, weights_dst, items_src, weights_src, levels, starting_level_1, num_levels_1);
- blocky_tandem_merge_sort_recursion(items_dst, weights_dst, items_src, weights_src, levels, starting_level_2, num_levels_2);
- tandem_merge(items_src, weights_src, items_dst, weights_dst, levels, starting_level_1, num_levels_1, starting_level_2, num_levels_2);
+ const auto chunk_begin = temp.begin() + temp.size();
+ merge_sorted_blocks_reversed(orig, temp, levels, starting_level_1, num_levels_1);
+ merge_sorted_blocks_reversed(orig, temp, levels, starting_level_2, num_levels_2);
+ const uint32_t num_items_1 = levels[starting_level_1 + num_levels_1] - levels[starting_level_1];
+ std::merge(
+ std::make_move_iterator(chunk_begin), std::make_move_iterator(chunk_begin + num_items_1),
+ std::make_move_iterator(chunk_begin + num_items_1), std::make_move_iterator(temp.end()),
+ orig.begin() + levels[starting_level], compare_pair_by_first<C>()
+ );
+ temp.erase(chunk_begin, temp.end());
}
template <typename T, typename C, typename A>
-void kll_quantile_calculator<T, C, A>::tandem_merge(const T* items_src, const uint64_t* weights_src, T* items_dst, uint64_t* weights_dst, const uint32_t* levels, uint8_t starting_level_1, uint8_t num_levels_1, uint8_t starting_level_2, uint8_t num_levels_2) {
- const auto from_index_1 = levels[starting_level_1];
- const auto to_index_1 = levels[starting_level_1 + num_levels_1]; // exclusive
- const auto from_index_2 = levels[starting_level_2];
- const auto to_index_2 = levels[starting_level_2 + num_levels_2]; // exclusive
- auto i_src_1 = from_index_1;
- auto i_src_2 = from_index_2;
- auto i_dst = from_index_1;
-
- while ((i_src_1 < to_index_1) && (i_src_2 < to_index_2)) {
- if (C()(items_src[i_src_1], items_src[i_src_2])) {
- items_dst[i_dst] = std::move(items_src[i_src_1]);
- weights_dst[i_dst] = weights_src[i_src_1];
- i_src_1++;
- } else {
- items_dst[i_dst] = std::move(items_src[i_src_2]);
- weights_dst[i_dst] = weights_src[i_src_2];
- i_src_2++;
- }
- i_dst++;
+void kll_quantile_calculator<T, C, A>::merge_sorted_blocks_reversed(Container& orig, Container& temp, const uint32_t* levels,
+ uint8_t starting_level, uint8_t num_levels) {
+ if (num_levels == 1) {
+ std::move(orig.begin() + levels[starting_level], orig.begin() + levels[starting_level + 1], std::back_inserter(temp));
+ return;
}
- if (i_src_1 < to_index_1) {
- std::move(&items_src[i_src_1], &items_src[to_index_1], &items_dst[i_dst]);
- std::copy(&weights_src[i_src_1], &weights_src[to_index_1], &weights_dst[i_dst]);
- } else if (i_src_2 < to_index_2) {
- std::move(&items_src[i_src_2], &items_src[to_index_2], &items_dst[i_dst]);
- std::copy(&weights_src[i_src_2], &weights_src[to_index_2], &weights_dst[i_dst]);
- }
+ const uint8_t num_levels_1 = num_levels / 2;
+ const uint8_t num_levels_2 = num_levels - num_levels_1;
+ const uint8_t starting_level_1 = starting_level;
+ const uint8_t starting_level_2 = starting_level + num_levels_1;
+ merge_sorted_blocks_direct(orig, temp, levels, starting_level_1, num_levels_1);
+ merge_sorted_blocks_direct(orig, temp, levels, starting_level_2, num_levels_2);
+ std::merge(
+ std::make_move_iterator(orig.begin() + levels[starting_level_1]),
+ std::make_move_iterator(orig.begin() + levels[starting_level_1 + num_levels_1]),
+ std::make_move_iterator(orig.begin() + levels[starting_level_2]),
+ std::make_move_iterator(orig.begin() + levels[starting_level_2 + num_levels_2]),
+ std::back_inserter(temp),
+ compare_pair_by_first<C>()
+ );
}
} /* namespace datasketches */
diff --git a/kll/include/kll_sketch_impl.hpp b/kll/include/kll_sketch_impl.hpp
index 9b6c68e..d051135 100644
--- a/kll/include/kll_sketch_impl.hpp
+++ b/kll/include/kll_sketch_impl.hpp
@@ -312,9 +312,9 @@
template<typename T, typename C, typename S, typename A>
double kll_sketch<T, C, S, A>::get_rank(const T& value) const {
if (is_empty()) return std::numeric_limits<double>::quiet_NaN();
- uint8_t level(0);
- uint64_t weight(1);
- uint64_t total(0);
+ uint8_t level = 0;
+ uint64_t weight = 1;
+ uint64_t total = 0;
while (level < num_levels_) {
const auto from_index(levels_[level]);
const auto to_index(levels_[level + 1]); // exclusive
@@ -391,7 +391,7 @@
os.write(reinterpret_cast<const char*>(&flags_byte), sizeof(flags_byte));
os.write((char*)&k_, sizeof(k_));
os.write((char*)&m_, sizeof(m_));
- const uint8_t unused(0);
+ const uint8_t unused = 0;
os.write(reinterpret_cast<const char*>(&unused), sizeof(unused));
if (is_empty()) return;
if (!is_single_item) {
@@ -412,7 +412,7 @@
const size_t size = header_size_bytes + get_serialized_size_bytes();
vector_u8<A> bytes(size);
uint8_t* ptr = bytes.data() + header_size_bytes;
- uint8_t* end_ptr = ptr + size;
+ const uint8_t* end_ptr = ptr + size;
const uint8_t preamble_ints(is_empty() || is_single_item ? PREAMBLE_INTS_SHORT : PREAMBLE_INTS_FULL);
ptr += copy_to_mem(&preamble_ints, ptr, sizeof(preamble_ints));
const uint8_t serial_version(is_single_item ? SERIAL_VERSION_2 : SERIAL_VERSION_1);
@@ -427,7 +427,7 @@
ptr += copy_to_mem(&flags_byte, ptr, sizeof(flags_byte));
ptr += copy_to_mem(&k_, ptr, sizeof(k_));
ptr += copy_to_mem(&m_, ptr, sizeof(m_));
- const uint8_t unused(0);
+ const uint8_t unused = 0;
ptr += copy_to_mem(&unused, ptr, sizeof(unused));
if (!is_empty()) {
if (!is_single_item) {
@@ -776,8 +776,8 @@
if (is_empty()) return vector_d<A>();
kll_helper::validate_values<T, C>(split_points, size);
vector_d<A> buckets(size + 1, 0);
- uint8_t level(0);
- uint64_t weight(1);
+ uint8_t level = 0;
+ uint64_t weight = 1;
while (level < num_levels_) {
const auto from_index = levels_[level];
const auto to_index = levels_[level + 1]; // exclusive
@@ -871,8 +871,9 @@
const uint32_t free_space_at_bottom = result.final_capacity - result.final_num_items;
kll_helper::move_construct<T>(workbuf.get(), outlevels[0], outlevels[0] + result.final_num_items, items_, free_space_at_bottom, true);
- if (levels_.size() < (result.final_num_levels + 1)) {
- levels_.resize(result.final_num_levels + 1);
+ const size_t new_levels_size = result.final_num_levels + 1;
+ if (levels_.size() < new_levels_size) {
+ levels_.resize(new_levels_size);
}
const uint32_t offset = free_space_at_bottom - outlevels[0];
for (uint8_t lvl = 0; lvl < levels_.size(); lvl++) { // includes the "extra" index
@@ -1027,7 +1028,7 @@
if (print_items) {
os << "### KLL sketch data:" << std::endl;
- uint8_t level(0);
+ uint8_t level = 0;
while (level < num_levels_) {
const uint32_t from_index = levels_[level];
const uint32_t to_index = levels_[level + 1]; // exclusive