| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| #ifndef KLL_QUANTILE_CALCULATOR_HPP_ |
| #define KLL_QUANTILE_CALCULATOR_HPP_ |
| |
| #include <memory> |
| #include <cmath> |
| #include <assert.h> |
| |
| #include "kll_helper.hpp" |
| |
| namespace datasketches { |
| |
| template <typename T, typename C, typename A> |
| class kll_quantile_calculator { |
| typedef typename std::allocator_traits<A>::template rebind_alloc<uint32_t> AllocU32; |
| typedef typename std::allocator_traits<A>::template rebind_alloc<uint64_t> AllocU64; |
| public: |
| // assumes that all levels are sorted including level 0 |
| kll_quantile_calculator(const T* items, const uint32_t* levels, uint8_t num_levels, uint64_t n) { |
| n_ = n; |
| const uint32_t num_items = levels[num_levels] - levels[0]; |
| items_ = A().allocate(num_items); |
| weights_ = AllocU64().allocate(num_items + 1); // one more is intentional |
| levels_size_ = num_levels + 1; |
| levels_ = AllocU32().allocate(levels_size_); |
| populate_from_sketch(items, num_items, levels, num_levels); |
| blocky_tandem_merge_sort(items_, weights_, num_items, levels_, num_levels_); |
| convert_to_preceding_cummulative(weights_, num_items + 1); |
| } |
| |
| ~kll_quantile_calculator() { |
| const uint32_t num_items = levels_[num_levels_] - levels_[0]; |
| for (uint32_t i = 0; i < num_items; i++) items_[i].~T(); |
| A().deallocate(items_, num_items); |
| AllocU64().deallocate(weights_, num_items + 1); |
| AllocU32().deallocate(levels_, levels_size_); |
| } |
| |
| T get_quantile(double fraction) const { |
| return approximately_answer_positional_query(pos_of_phi(fraction, n_)); |
| } |
| |
| private: |
| uint64_t n_; |
| T* items_; |
| uint64_t* weights_; |
| uint32_t* levels_; |
| uint8_t levels_size_; |
| uint8_t num_levels_; |
| |
| void populate_from_sketch(const T* items, uint32_t num_items, const uint32_t* levels, uint8_t num_levels) { |
| kll_helper::copy_construct<T>(items, levels[0], levels[num_levels], items_, 0); |
| uint8_t src_level = 0; |
| uint8_t dst_level = 0; |
| uint64_t weight = 1; |
| uint32_t offset = levels[0]; |
| while (src_level < num_levels) { |
| const uint32_t from_index(levels[src_level] - offset); |
| const uint32_t to_index(levels[src_level + 1] - offset); // exclusive |
| if (from_index < to_index) { // skip empty levels |
| std::fill(&weights_[from_index], &weights_[to_index], weight); |
| levels_[dst_level] = from_index; |
| levels_[dst_level + 1] = to_index; |
| dst_level++; |
| } |
| src_level++; |
| weight *= 2; |
| } |
| weights_[num_items] = 0; |
| num_levels_ = dst_level; |
| } |
| |
| T approximately_answer_positional_query(uint64_t pos) const { |
| assert (pos < n_); |
| const uint32_t weights_size(levels_[num_levels_] + 1); |
| const uint32_t index = chunk_containing_pos(weights_, weights_size, pos); |
| return items_[index]; |
| } |
| |
| static void convert_to_preceding_cummulative(uint64_t* weights, uint32_t weights_size) { |
| uint64_t subtotal(0); |
| for (uint32_t i = 0; i < weights_size; i++) { |
| const uint32_t new_subtotal = subtotal + weights[i]; |
| weights[i] = subtotal; |
| subtotal = new_subtotal; |
| } |
| } |
| |
| static uint64_t pos_of_phi(double phi, uint64_t n) { |
| const uint64_t pos = std::floor(phi * n); |
| return (pos == n) ? n - 1 : pos; |
| } |
| |
| static uint32_t chunk_containing_pos(uint64_t* weights, uint32_t weights_size, uint64_t pos) { |
| assert (weights_size > 1); // remember, weights_ contains an "extra" position |
| const uint32_t nominal_length(weights_size - 1); |
| assert (weights[0] <= pos); |
| assert (pos < weights[nominal_length]); |
| return search_for_chunk_containing_pos(weights, pos, 0, nominal_length); |
| } |
| |
| static uint32_t search_for_chunk_containing_pos(const uint64_t* arr, uint64_t pos, uint32_t l, uint32_t r) { |
| if (l + 1 == r) { |
| return l; |
| } |
| const uint32_t m(l + (r - l) / 2); |
| if (arr[m] <= pos) { |
| return search_for_chunk_containing_pos(arr, pos, m, r); |
| } |
| return search_for_chunk_containing_pos(arr, pos, l, m); |
| } |
| |
| static void blocky_tandem_merge_sort(T* items, uint64_t* weights, uint32_t num_items, const uint32_t* levels, uint8_t num_levels) { |
| if (num_levels == 1) return; |
| |
| // move the input in preparation for the "ping-pong" reduction strategy |
| auto tmp_items_deleter = [num_items](T* ptr) { |
| for (uint32_t i = 0; i < num_items; i++) ptr[i].~T(); |
| A().deallocate(ptr, num_items); |
| }; |
| std::unique_ptr<T, decltype(tmp_items_deleter)> tmp_items(A().allocate(num_items), tmp_items_deleter); |
| kll_helper::move_construct<T>(items, 0, num_items, tmp_items.get(), 0, false); // do not destroy since the items will be moved back |
| auto tmp_weights_deleter = [num_items](uint64_t* ptr) { AllocU64().deallocate(ptr, num_items); }; |
| std::unique_ptr<uint64_t[], decltype(tmp_weights_deleter)> tmp_weights(AllocU64().allocate(num_items), tmp_weights_deleter); // don't need the extra one here |
| std::copy(weights, &weights[num_items], tmp_weights.get()); |
| blocky_tandem_merge_sort_recursion(tmp_items.get(), tmp_weights.get(), items, weights, levels, 0, num_levels); |
| } |
| |
| static void blocky_tandem_merge_sort_recursion(T* items_src, uint64_t* weights_src, T* items_dst, uint64_t* weights_dst, const uint32_t* levels, uint8_t starting_level, uint8_t num_levels) { |
| if (num_levels == 1) return; |
| const uint8_t num_levels_1 = num_levels / 2; |
| const uint8_t num_levels_2 = num_levels - num_levels_1; |
| assert (num_levels_1 >= 1); |
| assert (num_levels_2 >= num_levels_1); |
| const uint8_t starting_level_1 = starting_level; |
| const uint8_t starting_level_2 = starting_level + num_levels_1; |
| // swap roles of src and dst |
| blocky_tandem_merge_sort_recursion(items_dst, weights_dst, items_src, weights_src, levels, starting_level_1, num_levels_1); |
| blocky_tandem_merge_sort_recursion(items_dst, weights_dst, items_src, weights_src, levels, starting_level_2, num_levels_2); |
| tandem_merge(items_src, weights_src, items_dst, weights_dst, levels, starting_level_1, num_levels_1, starting_level_2, num_levels_2); |
| } |
| |
| static void tandem_merge(const T* items_src, const uint64_t* weights_src, T* items_dst, uint64_t* weights_dst, const uint32_t* levels, uint8_t starting_level_1, uint8_t num_levels_1, uint8_t starting_level_2, uint8_t num_levels_2) { |
| const auto from_index_1 = levels[starting_level_1]; |
| const auto to_index_1 = levels[starting_level_1 + num_levels_1]; // exclusive |
| const auto from_index_2 = levels[starting_level_2]; |
| const auto to_index_2 = levels[starting_level_2 + num_levels_2]; // exclusive |
| auto i_src_1 = from_index_1; |
| auto i_src_2 = from_index_2; |
| auto i_dst = from_index_1; |
| |
| while ((i_src_1 < to_index_1) and (i_src_2 < to_index_2)) { |
| if (C()(items_src[i_src_1], items_src[i_src_2])) { |
| items_dst[i_dst] = std::move(items_src[i_src_1]); |
| weights_dst[i_dst] = weights_src[i_src_1]; |
| i_src_1++; |
| } else { |
| items_dst[i_dst] = std::move(items_src[i_src_2]); |
| weights_dst[i_dst] = weights_src[i_src_2]; |
| i_src_2++; |
| } |
| i_dst++; |
| } |
| if (i_src_1 < to_index_1) { |
| std::move(&items_src[i_src_1], &items_src[to_index_1], &items_dst[i_dst]); |
| std::copy(&weights_src[i_src_1], &weights_src[to_index_1], &weights_dst[i_dst]); |
| } else if (i_src_2 < to_index_2) { |
| std::move(&items_src[i_src_2], &items_src[to_index_2], &items_dst[i_dst]); |
| std::copy(&weights_src[i_src_2], &weights_src[to_index_2], &weights_dst[i_dst]); |
| } |
| } |
| |
| }; |
| |
| } /* namespace datasketches */ |
| |
| #endif // KLL_QUANTILE_CALCULATOR_HPP_ |