Merge pull request #168 from apache/kll_merge_blocks_fix

KLL merge blocks fix
diff --git a/kll/include/kll_quantile_calculator.hpp b/kll/include/kll_quantile_calculator.hpp
index f77071e..bc60f26 100644
--- a/kll/include/kll_quantile_calculator.hpp
+++ b/kll/include/kll_quantile_calculator.hpp
@@ -26,31 +26,38 @@
 template <typename T, typename C, typename A>
 class kll_quantile_calculator {
-  typedef typename std::allocator_traits<A>::template rebind_alloc<uint32_t> AllocU32;
-  typedef typename std::allocator_traits<A>::template rebind_alloc<uint64_t> AllocU64;
     // assumes that all levels are sorted including level 0
     kll_quantile_calculator(const T* items, const uint32_t* levels, uint8_t num_levels, uint64_t n);
-    ~kll_quantile_calculator();
     T get_quantile(double fraction) const;
+    using AllocU32 = typename std::allocator_traits<A>::template rebind_alloc<uint32_t>;
+    using vector_u32 = std::vector<uint32_t, AllocU32>;
+    using Entry = std::pair<T, uint64_t>;
+    using AllocEntry = typename std::allocator_traits<A>::template rebind_alloc<Entry>;
+    using Container = std::vector<Entry, AllocEntry>;
     uint64_t n_;
-    T* items_;
-    uint64_t* weights_;
-    uint32_t* levels_;
-    uint8_t levels_size_;
-    uint8_t num_levels_;
+    vector_u32 levels_;
+    Container entries_;
-    void populate_from_sketch(const T* items, uint32_t num_items, const uint32_t* levels, uint8_t num_levels);
+    void populate_from_sketch(const T* items, const uint32_t* levels, uint8_t num_levels);
     T approximately_answer_positional_query(uint64_t pos) const;
-    static void convert_to_preceding_cummulative(uint64_t* weights, uint32_t weights_size);
+    void convert_to_preceding_cummulative();
+    uint32_t chunk_containing_pos(uint64_t pos) const;
+    uint32_t search_for_chunk_containing_pos(uint64_t pos, uint32_t l, uint32_t r) const;
+    static void merge_sorted_blocks(Container& entries, const uint32_t* levels, uint8_t num_levels, uint32_t num_items);
+    static void merge_sorted_blocks_direct(Container& orig, Container& temp, const uint32_t* levels, uint8_t starting_level, uint8_t num_levels);
+    static void merge_sorted_blocks_reversed(Container& orig, Container& temp, const uint32_t* levels, uint8_t starting_level, uint8_t num_levels);
     static uint64_t pos_of_phi(double phi, uint64_t n);
-    static uint32_t chunk_containing_pos(uint64_t* weights, uint32_t weights_size, uint64_t pos);
-    static uint32_t search_for_chunk_containing_pos(const uint64_t* arr, uint64_t pos, uint32_t l, uint32_t r);
-    static void blocky_tandem_merge_sort(T* items, uint64_t* weights, uint32_t num_items, const uint32_t* levels, uint8_t num_levels);
-    static void blocky_tandem_merge_sort_recursion(T* items_src, uint64_t* weights_src, T* items_dst, uint64_t* weights_dst, const uint32_t* levels, uint8_t starting_level, uint8_t num_levels);
-    static void tandem_merge(const T* items_src, const uint64_t* weights_src, T* items_dst, uint64_t* weights_dst, const uint32_t* levels, uint8_t starting_level_1, uint8_t num_levels_1, uint8_t starting_level_2, uint8_t num_levels_2);
+    template<typename Comparator>
+    struct compare_pair_by_first {
+      template<typename Entry1, typename Entry2>
+      bool operator()(Entry1&& a, Entry2&& b) const {
+        return Comparator()(std::forward<Entry1>(a).first, std::forward<Entry2>(b).first);
+      }
+    };
 } /* namespace datasketches */
diff --git a/kll/include/kll_quantile_calculator_impl.hpp b/kll/include/kll_quantile_calculator_impl.hpp
index d4f4c04..cb92674 100644
--- a/kll/include/kll_quantile_calculator_impl.hpp
+++ b/kll/include/kll_quantile_calculator_impl.hpp
@@ -22,31 +22,22 @@
 #include <memory>
 #include <cmath>
+#include <algorithm>
 #include "kll_helper.hpp"
 namespace datasketches {
 template <typename T, typename C, typename A>
-kll_quantile_calculator<T, C, A>::kll_quantile_calculator(const T* items, const uint32_t* levels, uint8_t num_levels, uint64_t n) {
-  n_ = n;
+kll_quantile_calculator<T, C, A>::kll_quantile_calculator(const T* items, const uint32_t* levels, uint8_t num_levels, uint64_t n):
+n_(n), levels_(num_levels + 1)
   const uint32_t num_items = levels[num_levels] - levels[0];
-  items_ = A().allocate(num_items);
-  weights_ = AllocU64().allocate(num_items + 1); // one more is intentional
-  levels_size_ = num_levels + 1;
-  levels_ = AllocU32().allocate(levels_size_);
-  populate_from_sketch(items, num_items, levels, num_levels);
-  blocky_tandem_merge_sort(items_, weights_, num_items, levels_, num_levels_);
-  convert_to_preceding_cummulative(weights_, num_items + 1);
-template <typename T, typename C, typename A>
-kll_quantile_calculator<T, C, A>::~kll_quantile_calculator() {
-  const uint32_t num_items = levels_[num_levels_] - levels_[0];
-  for (uint32_t i = 0; i < num_items; i++) items_[i].~T();
-  A().deallocate(items_, num_items);
-  AllocU64().deallocate(weights_, num_items + 1);
-  AllocU32().deallocate(levels_, levels_size_);
+  entries_.reserve(num_items);
+  populate_from_sketch(items, levels, num_levels);
+  merge_sorted_blocks(entries_,, levels_.size() - 1, num_items);
+  if (!is_sorted(entries_.begin(), entries_.end(), compare_pair_by_first<C>())) throw std::logic_error("entries must be sorted");
+  convert_to_preceding_cummulative();
 template <typename T, typename C, typename A>
@@ -55,17 +46,18 @@
 template <typename T, typename C, typename A>
-void kll_quantile_calculator<T, C, A>::populate_from_sketch(const T* items, uint32_t num_items, const uint32_t* levels, uint8_t num_levels) {
-  kll_helper::copy_construct<T>(items, levels[0], levels[num_levels], items_, 0);
-  uint8_t src_level = 0;
-  uint8_t dst_level = 0;
+void kll_quantile_calculator<T, C, A>::populate_from_sketch(const T* items, const uint32_t* levels, uint8_t num_levels) {
+  size_t src_level = 0;
+  size_t dst_level = 0;
   uint64_t weight = 1;
   uint32_t offset = levels[0];
   while (src_level < num_levels) {
     const uint32_t from_index(levels[src_level] - offset);
     const uint32_t to_index(levels[src_level + 1] - offset); // exclusive
     if (from_index < to_index) { // skip empty levels
-      std::fill(&weights_[from_index], &weights_[to_index], weight);
+      for (uint32_t i = from_index; i < to_index; ++i) {
+        entries_.push_back(Entry(items[i + offset], weight));
+      }
       levels_[dst_level] = from_index;
       levels_[dst_level + 1] = to_index;
@@ -73,24 +65,24 @@
     weight *= 2;
-  weights_[num_items] = 0;
-  num_levels_ = dst_level;
+  if (levels_.size() > static_cast<size_t>(dst_level + 1)) levels_.resize(dst_level + 1);
 template <typename T, typename C, typename A>
 T kll_quantile_calculator<T, C, A>::approximately_answer_positional_query(uint64_t pos) const {
   if (pos >= n_) throw std::logic_error("position out of range");
-  const uint32_t weights_size(levels_[num_levels_] + 1);
-  const uint32_t index = chunk_containing_pos(weights_, weights_size, pos);
-  return items_[index];
+  const uint32_t num_items = levels_[levels_.size() - 1];
+  if (pos > entries_[num_items - 1].second) return entries_[num_items - 1].first;
+  const uint32_t index = chunk_containing_pos(pos);
+  return entries_[index].first;
 template <typename T, typename C, typename A>
-void kll_quantile_calculator<T, C, A>::convert_to_preceding_cummulative(uint64_t* weights, uint32_t weights_size) {
-  uint64_t subtotal(0);
-  for (uint32_t i = 0; i < weights_size; i++) {
-    const uint32_t new_subtotal = subtotal + weights[i];
-    weights[i] = subtotal;
+void kll_quantile_calculator<T, C, A>::convert_to_preceding_cummulative() {
+  uint64_t subtotal = 0;
+  for (auto& entry: entries_) {
+    const uint32_t new_subtotal = subtotal + entry.second;
+    entry.second = subtotal;
     subtotal = new_subtotal;
@@ -102,87 +94,74 @@
 template <typename T, typename C, typename A>
-uint32_t kll_quantile_calculator<T, C, A>::chunk_containing_pos(uint64_t* weights, uint32_t weights_size, uint64_t pos) {
-  if (weights_size <= 1) throw std::logic_error("weights array too short"); // weights_ contains an "extra" position
-  const uint32_t nominal_length(weights_size - 1);
-  if (pos < weights[0]) throw std::logic_error("position too small");
-  if (pos >= weights[nominal_length]) throw std::logic_error("position too large");
-  return search_for_chunk_containing_pos(weights, pos, 0, nominal_length);
+uint32_t kll_quantile_calculator<T, C, A>::chunk_containing_pos(uint64_t pos) const {
+  if (entries_.size() < 1) throw std::logic_error("array too short");
+  if (pos < entries_[0].second) throw std::logic_error("position too small");
+  if (pos > entries_[entries_.size() - 1].second) throw std::logic_error("position too large");
+  return search_for_chunk_containing_pos(pos, 0, entries_.size());
 template <typename T, typename C, typename A>
-uint32_t kll_quantile_calculator<T, C, A>::search_for_chunk_containing_pos(const uint64_t* arr, uint64_t pos, uint32_t l, uint32_t r) {
+uint32_t kll_quantile_calculator<T, C, A>::search_for_chunk_containing_pos(uint64_t pos, uint32_t l, uint32_t r) const {
   if (l + 1 == r) {
     return l;
   const uint32_t m(l + (r - l) / 2);
-  if (arr[m] <= pos) {
-    return search_for_chunk_containing_pos(arr, pos, m, r);
+  if (entries_[m].second <= pos) {
+    return search_for_chunk_containing_pos(pos, m, r);
-  return search_for_chunk_containing_pos(arr, pos, l, m);
+  return search_for_chunk_containing_pos(pos, l, m);
 template <typename T, typename C, typename A>
-void kll_quantile_calculator<T, C, A>::blocky_tandem_merge_sort(T* items, uint64_t* weights, uint32_t num_items, const uint32_t* levels, uint8_t num_levels) {
+void kll_quantile_calculator<T, C, A>::merge_sorted_blocks(Container& entries, const uint32_t* levels, uint8_t num_levels, uint32_t num_items) {
   if (num_levels == 1) return;
-  // move the input in preparation for the "ping-pong" reduction strategy
-  auto tmp_items_deleter = [num_items](T* ptr) {
-    for (uint32_t i = 0; i < num_items; i++) ptr[i].~T();
-    A().deallocate(ptr, num_items);
-  };
-  std::unique_ptr<T, decltype(tmp_items_deleter)> tmp_items(A().allocate(num_items), tmp_items_deleter);
-  kll_helper::move_construct<T>(items, 0, num_items, tmp_items.get(), 0, false); // do not destroy since the items will be moved back
-  auto tmp_weights_deleter = [num_items](uint64_t* ptr) { AllocU64().deallocate(ptr, num_items); };
-  std::unique_ptr<uint64_t[], decltype(tmp_weights_deleter)> tmp_weights(AllocU64().allocate(num_items), tmp_weights_deleter); // don't need the extra one here
-  std::copy(weights, &weights[num_items], tmp_weights.get());
-  blocky_tandem_merge_sort_recursion(tmp_items.get(), tmp_weights.get(), items, weights, levels, 0, num_levels);
+  Container temporary;
+  temporary.reserve(num_items);
+  merge_sorted_blocks_direct(entries, temporary, levels, 0, num_levels);
 template <typename T, typename C, typename A>
-void kll_quantile_calculator<T, C, A>::blocky_tandem_merge_sort_recursion(T* items_src, uint64_t* weights_src, T* items_dst, uint64_t* weights_dst, const uint32_t* levels, uint8_t starting_level, uint8_t num_levels) {
+void kll_quantile_calculator<T, C, A>::merge_sorted_blocks_direct(Container& orig, Container& temp, const uint32_t* levels,
+    uint8_t starting_level, uint8_t num_levels) {
   if (num_levels == 1) return;
   const uint8_t num_levels_1 = num_levels / 2;
   const uint8_t num_levels_2 = num_levels - num_levels_1;
-  if (num_levels_1 < 1) throw std::logic_error("level above 0 expected");
-  if (num_levels_2 < num_levels_1) throw std::logic_error("wrong order of levels");
   const uint8_t starting_level_1 = starting_level;
   const uint8_t starting_level_2 = starting_level + num_levels_1;
-  // swap roles of src and dst
-  blocky_tandem_merge_sort_recursion(items_dst, weights_dst, items_src, weights_src, levels, starting_level_1, num_levels_1);
-  blocky_tandem_merge_sort_recursion(items_dst, weights_dst, items_src, weights_src, levels, starting_level_2, num_levels_2);
-  tandem_merge(items_src, weights_src, items_dst, weights_dst, levels, starting_level_1, num_levels_1, starting_level_2, num_levels_2);
+  const auto chunk_begin = temp.begin() + temp.size();
+  merge_sorted_blocks_reversed(orig, temp, levels, starting_level_1, num_levels_1);
+  merge_sorted_blocks_reversed(orig, temp, levels, starting_level_2, num_levels_2);
+  const uint32_t num_items_1 = levels[starting_level_1 + num_levels_1] - levels[starting_level_1];
+  std::merge(
+    std::make_move_iterator(chunk_begin), std::make_move_iterator(chunk_begin + num_items_1),
+    std::make_move_iterator(chunk_begin + num_items_1), std::make_move_iterator(temp.end()),
+    orig.begin() + levels[starting_level], compare_pair_by_first<C>()
+  );
+  temp.erase(chunk_begin, temp.end());
 template <typename T, typename C, typename A>
-void kll_quantile_calculator<T, C, A>::tandem_merge(const T* items_src, const uint64_t* weights_src, T* items_dst, uint64_t* weights_dst, const uint32_t* levels, uint8_t starting_level_1, uint8_t num_levels_1, uint8_t starting_level_2, uint8_t num_levels_2) {
-  const auto from_index_1 = levels[starting_level_1];
-  const auto to_index_1 = levels[starting_level_1 + num_levels_1]; // exclusive
-  const auto from_index_2 = levels[starting_level_2];
-  const auto to_index_2 = levels[starting_level_2 + num_levels_2]; // exclusive
-  auto i_src_1 = from_index_1;
-  auto i_src_2 = from_index_2;
-  auto i_dst = from_index_1;
-  while ((i_src_1 < to_index_1) && (i_src_2 < to_index_2)) {
-    if (C()(items_src[i_src_1], items_src[i_src_2])) {
-      items_dst[i_dst] = std::move(items_src[i_src_1]);
-      weights_dst[i_dst] = weights_src[i_src_1];
-      i_src_1++;
-    } else {
-      items_dst[i_dst] = std::move(items_src[i_src_2]);
-      weights_dst[i_dst] = weights_src[i_src_2];
-      i_src_2++;
-    }
-    i_dst++;
+void kll_quantile_calculator<T, C, A>::merge_sorted_blocks_reversed(Container& orig, Container& temp, const uint32_t* levels,
+    uint8_t starting_level, uint8_t num_levels) {
+  if (num_levels == 1) {
+    std::move(orig.begin() + levels[starting_level], orig.begin() + levels[starting_level + 1], std::back_inserter(temp));
+    return;
-  if (i_src_1 < to_index_1) {
-    std::move(&items_src[i_src_1], &items_src[to_index_1], &items_dst[i_dst]);
-    std::copy(&weights_src[i_src_1], &weights_src[to_index_1], &weights_dst[i_dst]);
-  } else if (i_src_2 < to_index_2) {
-    std::move(&items_src[i_src_2], &items_src[to_index_2], &items_dst[i_dst]);
-    std::copy(&weights_src[i_src_2], &weights_src[to_index_2], &weights_dst[i_dst]);
-  }
+  const uint8_t num_levels_1 = num_levels / 2;
+  const uint8_t num_levels_2 = num_levels - num_levels_1;
+  const uint8_t starting_level_1 = starting_level;
+  const uint8_t starting_level_2 = starting_level + num_levels_1;
+  merge_sorted_blocks_direct(orig, temp, levels, starting_level_1, num_levels_1);
+  merge_sorted_blocks_direct(orig, temp, levels, starting_level_2, num_levels_2);
+  std::merge(
+    std::make_move_iterator(orig.begin() + levels[starting_level_1]),
+    std::make_move_iterator(orig.begin() + levels[starting_level_1 + num_levels_1]),
+    std::make_move_iterator(orig.begin() + levels[starting_level_2]),
+    std::make_move_iterator(orig.begin() + levels[starting_level_2 + num_levels_2]),
+    std::back_inserter(temp),
+    compare_pair_by_first<C>()
+  );
 } /* namespace datasketches */
diff --git a/kll/include/kll_sketch_impl.hpp b/kll/include/kll_sketch_impl.hpp
index 9b6c68e..d051135 100644
--- a/kll/include/kll_sketch_impl.hpp
+++ b/kll/include/kll_sketch_impl.hpp
@@ -312,9 +312,9 @@
 template<typename T, typename C, typename S, typename A>
 double kll_sketch<T, C, S, A>::get_rank(const T& value) const {
   if (is_empty()) return std::numeric_limits<double>::quiet_NaN();
-  uint8_t level(0);
-  uint64_t weight(1);
-  uint64_t total(0);
+  uint8_t level = 0;
+  uint64_t weight = 1;
+  uint64_t total = 0;
   while (level < num_levels_) {
     const auto from_index(levels_[level]);
     const auto to_index(levels_[level + 1]); // exclusive
@@ -391,7 +391,7 @@
   os.write(reinterpret_cast<const char*>(&flags_byte), sizeof(flags_byte));
   os.write((char*)&k_, sizeof(k_));
   os.write((char*)&m_, sizeof(m_));
-  const uint8_t unused(0);
+  const uint8_t unused = 0;
   os.write(reinterpret_cast<const char*>(&unused), sizeof(unused));
   if (is_empty()) return;
   if (!is_single_item) {
@@ -412,7 +412,7 @@
   const size_t size = header_size_bytes + get_serialized_size_bytes();
   vector_u8<A> bytes(size);
   uint8_t* ptr = + header_size_bytes;
-  uint8_t* end_ptr = ptr + size;
+  const uint8_t* end_ptr = ptr + size;
   const uint8_t preamble_ints(is_empty() || is_single_item ? PREAMBLE_INTS_SHORT : PREAMBLE_INTS_FULL);
   ptr += copy_to_mem(&preamble_ints, ptr, sizeof(preamble_ints));
   const uint8_t serial_version(is_single_item ? SERIAL_VERSION_2 : SERIAL_VERSION_1);
@@ -427,7 +427,7 @@
   ptr += copy_to_mem(&flags_byte, ptr, sizeof(flags_byte));
   ptr += copy_to_mem(&k_, ptr, sizeof(k_));
   ptr += copy_to_mem(&m_, ptr, sizeof(m_));
-  const uint8_t unused(0);
+  const uint8_t unused = 0;
   ptr += copy_to_mem(&unused, ptr, sizeof(unused));
   if (!is_empty()) {
     if (!is_single_item) {
@@ -776,8 +776,8 @@
   if (is_empty()) return vector_d<A>();
   kll_helper::validate_values<T, C>(split_points, size);
   vector_d<A> buckets(size + 1, 0);
-  uint8_t level(0);
-  uint64_t weight(1);
+  uint8_t level = 0;
+  uint64_t weight = 1;
   while (level < num_levels_) {
     const auto from_index = levels_[level];
     const auto to_index = levels_[level + 1]; // exclusive
@@ -871,8 +871,9 @@
   const uint32_t free_space_at_bottom = result.final_capacity - result.final_num_items;
   kll_helper::move_construct<T>(workbuf.get(), outlevels[0], outlevels[0] + result.final_num_items, items_, free_space_at_bottom, true);
-  if (levels_.size() < (result.final_num_levels + 1)) {
-    levels_.resize(result.final_num_levels + 1);
+  const size_t new_levels_size = result.final_num_levels + 1;
+  if (levels_.size() < new_levels_size) {
+    levels_.resize(new_levels_size);
   const uint32_t offset = free_space_at_bottom - outlevels[0];
   for (uint8_t lvl = 0; lvl < levels_.size(); lvl++) { // includes the "extra" index
@@ -1027,7 +1028,7 @@
   if (print_items) {
     os << "### KLL sketch data:" << std::endl;
-    uint8_t level(0);
+    uint8_t level = 0;
     while (level < num_levels_) {
       const uint32_t from_index = levels_[level];
       const uint32_t to_index = levels_[level + 1]; // exclusive