| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| #ifndef CPC_UNION_IMPL_HPP_ |
| #define CPC_UNION_IMPL_HPP_ |
| |
| #include "count_zeros.hpp" |
| |
| namespace datasketches { |
| |
| template<typename A> |
| cpc_union_alloc<A>::cpc_union_alloc(uint8_t lg_k, uint64_t seed): |
| lg_k(lg_k), |
| seed(seed), |
| accumulator(nullptr), |
| bit_matrix() |
| { |
| if (lg_k < CPC_MIN_LG_K || lg_k > CPC_MAX_LG_K) { |
| throw std::invalid_argument("lg_k must be >= " + std::to_string(CPC_MIN_LG_K) + " and <= " + std::to_string(CPC_MAX_LG_K) + ": " + std::to_string(lg_k)); |
| } |
| accumulator = new (AllocCpc().allocate(1)) cpc_sketch_alloc<A>(lg_k, seed); |
| } |
| |
| template<typename A> |
| cpc_union_alloc<A>::cpc_union_alloc(const cpc_union_alloc<A>& other): |
| lg_k(other.lg_k), |
| seed(other.seed), |
| accumulator(other.accumulator), |
| bit_matrix(other.bit_matrix) |
| { |
| if (accumulator != nullptr) { |
| accumulator = new (AllocCpc().allocate(1)) cpc_sketch_alloc<A>(*other.accumulator); |
| } |
| } |
| |
| template<typename A> |
| cpc_union_alloc<A>::cpc_union_alloc(cpc_union_alloc<A>&& other) noexcept: |
| lg_k(other.lg_k), |
| seed(other.seed), |
| accumulator(other.accumulator), |
| bit_matrix(std::move(other.bit_matrix)) |
| { |
| other.accumulator = nullptr; |
| } |
| |
| template<typename A> |
| cpc_union_alloc<A>::~cpc_union_alloc() { |
| if (accumulator != nullptr) { |
| accumulator->~cpc_sketch_alloc<A>(); |
| AllocCpc().deallocate(accumulator, 1); |
| } |
| } |
| |
| template<typename A> |
| cpc_union_alloc<A>& cpc_union_alloc<A>::operator=(const cpc_union_alloc<A>& other) { |
| cpc_union_alloc<A> copy(other); |
| std::swap(lg_k, copy.lg_k); |
| seed = copy.seed; |
| std::swap(accumulator, copy.accumulator); |
| bit_matrix = std::move(copy.bit_matrix); |
| return *this; |
| } |
| |
| template<typename A> |
| cpc_union_alloc<A>& cpc_union_alloc<A>::operator=(cpc_union_alloc<A>&& other) noexcept { |
| std::swap(lg_k, other.lg_k); |
| seed = other.seed; |
| std::swap(accumulator, other.accumulator); |
| bit_matrix = std::move(other.bit_matrix); |
| return *this; |
| } |
| |
| template<typename A> |
| void cpc_union_alloc<A>::update(const cpc_sketch_alloc<A>& sketch) { |
| internal_update(sketch); |
| } |
| |
| template<typename A> |
| void cpc_union_alloc<A>::update(cpc_sketch_alloc<A>&& sketch) { |
| internal_update(std::forward<cpc_sketch_alloc<A>>(sketch)); |
| } |
| |
| template<typename A> |
| template<typename S> |
| void cpc_union_alloc<A>::internal_update(S&& sketch) { |
| const uint16_t seed_hash_union = compute_seed_hash(seed); |
| const uint16_t seed_hash_sketch = compute_seed_hash(sketch.seed); |
| if (seed_hash_union != seed_hash_sketch) { |
| throw std::invalid_argument("Incompatible seed hashes: " + std::to_string(seed_hash_union) + ", " |
| + std::to_string(seed_hash_sketch)); |
| } |
| const auto src_flavor = sketch.determine_flavor(); |
| if (cpc_sketch_alloc<A>::flavor::EMPTY == src_flavor) return; |
| |
| if (sketch.get_lg_k() < lg_k) reduce_k(sketch.get_lg_k()); |
| if (sketch.get_lg_k() < lg_k) throw std::logic_error("sketch lg_k < union lg_k"); |
| |
| if (accumulator == nullptr && bit_matrix.size() == 0) throw std::logic_error("both accumulator and bit matrix are absent"); |
| |
| if (cpc_sketch_alloc<A>::flavor::SPARSE == src_flavor && accumulator != nullptr) { // Case A |
| if (bit_matrix.size() > 0) throw std::logic_error("union bit_matrix is not expected"); |
| const auto initial_dest_flavor = accumulator->determine_flavor(); |
| if (cpc_sketch_alloc<A>::flavor::EMPTY != initial_dest_flavor && |
| cpc_sketch_alloc<A>::flavor::SPARSE != initial_dest_flavor) throw std::logic_error("wrong flavor"); |
| |
| // The following partially fixes the snowplow problem provided that the K's are equal. |
| if (cpc_sketch_alloc<A>::flavor::EMPTY == initial_dest_flavor && lg_k == sketch.get_lg_k()) { |
| *accumulator = std::forward<S>(sketch); |
| return; |
| } |
| |
| walk_table_updating_sketch(sketch.surprising_value_table); |
| const auto final_dst_flavor = accumulator->determine_flavor(); |
| // if the accumulator has graduated beyond sparse, switch to a bit matrix representation |
| if (final_dst_flavor != cpc_sketch_alloc<A>::flavor::EMPTY && final_dst_flavor != cpc_sketch_alloc<A>::flavor::SPARSE) { |
| switch_to_bit_matrix(); |
| } |
| return; |
| } |
| |
| if (cpc_sketch_alloc<A>::flavor::SPARSE == src_flavor && bit_matrix.size() > 0) { // Case B |
| if (accumulator != nullptr) throw std::logic_error("union accumulator != null"); |
| or_table_into_matrix(sketch.surprising_value_table); |
| return; |
| } |
| |
| if (cpc_sketch_alloc<A>::flavor::HYBRID != src_flavor && cpc_sketch_alloc<A>::flavor::PINNED != src_flavor |
| && cpc_sketch_alloc<A>::flavor::SLIDING != src_flavor) throw std::logic_error("wrong flavor"); |
| |
| // source is past SPARSE mode, so make sure that dest is a bit matrix |
| if (accumulator != nullptr) { |
| if (bit_matrix.size() > 0) throw std::logic_error("union bit matrix is not expected"); |
| const auto dst_flavor = accumulator->determine_flavor(); |
| if (cpc_sketch_alloc<A>::flavor::EMPTY != dst_flavor && cpc_sketch_alloc<A>::flavor::SPARSE != dst_flavor) { |
| throw std::logic_error("wrong flavor"); |
| } |
| switch_to_bit_matrix(); |
| } |
| if (bit_matrix.size() == 0) throw std::logic_error("union bit_matrix is expected"); |
| |
| if (cpc_sketch_alloc<A>::flavor::HYBRID == src_flavor || cpc_sketch_alloc<A>::flavor::PINNED == src_flavor) { // Case C |
| or_window_into_matrix(sketch.sliding_window, sketch.window_offset, sketch.get_lg_k()); |
| or_table_into_matrix(sketch.surprising_value_table); |
| return; |
| } |
| |
| // SLIDING mode involves inverted logic, so we can't just walk the source sketch. |
| // Instead, we convert it to a bitMatrix that can be OR'ed into the destination. |
| if (cpc_sketch_alloc<A>::flavor::SLIDING != src_flavor) throw std::logic_error("wrong flavor"); // Case D |
| vector_u64<A> src_matrix = sketch.build_bit_matrix(); |
| or_matrix_into_matrix(src_matrix, sketch.get_lg_k()); |
| } |
| |
| template<typename A> |
| cpc_sketch_alloc<A> cpc_union_alloc<A>::get_result() const { |
| if (accumulator != nullptr) { |
| if (bit_matrix.size() > 0) throw std::logic_error("bit_matrix is not expected"); |
| return get_result_from_accumulator(); |
| } |
| if (bit_matrix.size() == 0) throw std::logic_error("bit_matrix is expected"); |
| return get_result_from_bit_matrix(); |
| } |
| |
| template<typename A> |
| cpc_sketch_alloc<A> cpc_union_alloc<A>::get_result_from_accumulator() const { |
| if (lg_k != accumulator->get_lg_k()) throw std::logic_error("lg_k != accumulator->lg_k"); |
| if (accumulator->get_num_coupons() == 0) { |
| return cpc_sketch_alloc<A>(lg_k, seed); |
| } |
| if (accumulator->determine_flavor() != cpc_sketch_alloc<A>::flavor::SPARSE) throw std::logic_error("wrong flavor"); |
| cpc_sketch_alloc<A> copy(*accumulator); |
| copy.was_merged = true; |
| return copy; |
| } |
| |
| template<typename A> |
| cpc_sketch_alloc<A> cpc_union_alloc<A>::get_result_from_bit_matrix() const { |
| const uint64_t k = 1 << lg_k; |
| const uint64_t num_coupons = count_bits_set_in_matrix(bit_matrix.data(), k); |
| |
| const auto flavor = cpc_sketch_alloc<A>::determine_flavor(lg_k, num_coupons); |
| if (flavor != cpc_sketch_alloc<A>::flavor::HYBRID && flavor != cpc_sketch_alloc<A>::flavor::PINNED |
| && flavor != cpc_sketch_alloc<A>::flavor::SLIDING) throw std::logic_error("wrong flavor"); |
| |
| const uint8_t offset = cpc_sketch_alloc<A>::determine_correct_offset(lg_k, num_coupons); |
| |
| vector_u8<A> sliding_window(k); |
| // don't need to zero the window's memory |
| |
| // dynamically growing caused snowplow effect |
| uint8_t table_lg_size = lg_k - 4; // K/16; in some cases this will end up being oversized |
| if (table_lg_size < 2) table_lg_size = 2; |
| u32_table<A> table(table_lg_size, 6 + lg_k); |
| |
| // the following should work even when the offset is zero |
| const uint64_t mask_for_clearing_window = (static_cast<uint64_t>(0xff) << offset) ^ UINT64_MAX; |
| const uint64_t mask_for_flipping_early_zone = (static_cast<uint64_t>(1) << offset) - 1; |
| uint64_t all_surprises_ored = 0; |
| |
| // The snowplow effect was caused by processing the rows in order, |
| // but we have fixed it by using a sufficiently large hash table. |
| for (unsigned i = 0; i < k; i++) { |
| uint64_t pattern = bit_matrix[i]; |
| sliding_window[i] = (pattern >> offset) & 0xff; |
| pattern &= mask_for_clearing_window; |
| pattern ^= mask_for_flipping_early_zone; // this flipping converts surprising 0's to 1's |
| all_surprises_ored |= pattern; |
| while (pattern != 0) { |
| const uint8_t col = count_trailing_zeros_in_u64(pattern); |
| pattern = pattern ^ (static_cast<uint64_t>(1) << col); // erase the 1 |
| const uint32_t row_col = (i << 6) | col; |
| bool is_novel = table.maybe_insert(row_col); |
| if (!is_novel) throw std::logic_error("is_novel != true"); |
| } |
| } |
| |
| // at this point we could shrink an oversized hash table, but the relative waste isn't very big |
| |
| uint8_t first_interesting_column = count_trailing_zeros_in_u64(all_surprises_ored); |
| if (first_interesting_column > offset) first_interesting_column = offset; // corner case |
| |
| // HIP-related fields will contain zeros, and that is okay |
| return cpc_sketch_alloc<A>(lg_k, num_coupons, first_interesting_column, std::move(table), std::move(sliding_window), false, 0, 0, seed); |
| } |
| |
| template<typename A> |
| void cpc_union_alloc<A>::switch_to_bit_matrix() { |
| bit_matrix = accumulator->build_bit_matrix(); |
| accumulator->~cpc_sketch_alloc<A>(); |
| AllocCpc().deallocate(accumulator, 1); |
| accumulator = nullptr; |
| } |
| |
| template<typename A> |
| void cpc_union_alloc<A>::walk_table_updating_sketch(const u32_table<A>& table) { |
| const uint32_t* slots = table.get_slots(); |
| const size_t num_slots = 1 << table.get_lg_size(); |
| const uint64_t dst_mask = (((1 << accumulator->get_lg_k()) - 1) << 6) | 63; // downsamples when dst lgK < src LgK |
| |
| // Using a golden ratio stride fixes the snowplow effect. |
| const double golden = 0.6180339887498949025; |
| size_t stride = static_cast<size_t>(golden * static_cast<double>(num_slots)); |
| if (stride < 2) throw std::logic_error("stride < 2"); |
| if (stride == ((stride >> 1) << 1)) stride += 1; // force the stride to be odd |
| if (stride < 3 || stride >= num_slots) throw std::out_of_range("stride out of range"); |
| |
| for (size_t i = 0, j = 0; i < num_slots; i++, j += stride) { |
| j &= num_slots - 1; |
| const uint32_t row_col = slots[j]; |
| if (row_col != UINT32_MAX) { |
| accumulator->row_col_update(row_col & dst_mask); |
| } |
| } |
| } |
| |
| template<typename A> |
| void cpc_union_alloc<A>::or_table_into_matrix(const u32_table<A>& table) { |
| const uint32_t* slots = table.get_slots(); |
| const size_t num_slots = 1 << table.get_lg_size(); |
| const uint64_t dest_mask = (1 << lg_k) - 1; // downsamples when dst lgK < sr LgK |
| for (size_t i = 0; i < num_slots; i++) { |
| const uint32_t row_col = slots[i]; |
| if (row_col != UINT32_MAX) { |
| const uint8_t col = row_col & 63; |
| const size_t row = row_col >> 6; |
| bit_matrix[row & dest_mask] |= static_cast<uint64_t>(1) << col; // set the bit |
| } |
| } |
| } |
| |
| template<typename A> |
| void cpc_union_alloc<A>::or_window_into_matrix(const vector_u8<A>& sliding_window, uint8_t offset, uint8_t src_lg_k) { |
| if (lg_k > src_lg_k) throw std::logic_error("dst LgK > src LgK"); |
| const uint64_t dst_mask = (1 << lg_k) - 1; // downsamples when dst lgK < src LgK |
| const size_t src_k = 1 << src_lg_k; |
| for (size_t src_row = 0; src_row < src_k; src_row++) { |
| bit_matrix[src_row & dst_mask] |= static_cast<uint64_t>(sliding_window[src_row]) << offset; |
| } |
| } |
| |
| template<typename A> |
| void cpc_union_alloc<A>::or_matrix_into_matrix(const vector_u64<A>& src_matrix, uint8_t src_lg_k) { |
| if (lg_k > src_lg_k) throw std::logic_error("dst LgK > src LgK"); |
| const uint64_t dst_mask = (1 << lg_k) - 1; // downsamples when dst lgK < src LgK |
| const size_t src_k = 1 << src_lg_k; |
| for (size_t src_row = 0; src_row < src_k; src_row++) { |
| bit_matrix[src_row & dst_mask] |= src_matrix[src_row]; |
| } |
| } |
| |
| template<typename A> |
| void cpc_union_alloc<A>::reduce_k(uint8_t new_lg_k) { |
| if (new_lg_k >= lg_k) throw std::logic_error("new LgK >= union lgK"); |
| if (accumulator == nullptr && bit_matrix.size() == 0) throw std::logic_error("both accumulator and bit_matrix are absent"); |
| |
| if (bit_matrix.size() > 0) { // downsample the unioner's bit matrix |
| if (accumulator != nullptr) throw std::logic_error("accumulator is not null"); |
| vector_u64<A> old_matrix = std::move(bit_matrix); |
| const uint8_t old_lg_k = lg_k; |
| const size_t new_k = 1 << new_lg_k; |
| bit_matrix = vector_u64<A>(new_k, 0); |
| lg_k = new_lg_k; |
| or_matrix_into_matrix(old_matrix, old_lg_k); |
| return; |
| } |
| |
| if (accumulator != nullptr) { // downsample the unioner's sketch |
| if (bit_matrix.size() > 0) throw std::logic_error("bit_matrix is not expected"); |
| if (!accumulator->is_empty()) { |
| cpc_sketch_alloc<A> old_accumulator(*accumulator); |
| *accumulator = cpc_sketch_alloc<A>(new_lg_k, seed); |
| walk_table_updating_sketch(old_accumulator.surprising_value_table); |
| } |
| lg_k = new_lg_k; |
| |
| const auto final_new_flavor = accumulator->determine_flavor(); |
| // if the new sketch has graduated beyond sparse, convert to bit_matrix |
| if (final_new_flavor != cpc_sketch_alloc<A>::flavor::EMPTY && |
| final_new_flavor != cpc_sketch_alloc<A>::flavor::SPARSE) { |
| switch_to_bit_matrix(); |
| } |
| return; |
| } |
| |
| throw std::logic_error("invalid state"); |
| } |
| |
| } /* namespace datasketches */ |
| |
| #endif |