blob: 5acfe5f59e130c38e127b01b1023544408ee766c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef CPC_UNION_IMPL_HPP_
#define CPC_UNION_IMPL_HPP_
#include "count_zeros.hpp"
namespace datasketches {
template<typename A>
cpc_union_alloc<A>::cpc_union_alloc(uint8_t lg_k, uint64_t seed, const A& allocator):
lg_k(lg_k),
seed(seed),
accumulator(nullptr),
bit_matrix(allocator)
{
if (lg_k < CPC_MIN_LG_K || lg_k > CPC_MAX_LG_K) {
throw std::invalid_argument("lg_k must be >= " + std::to_string(CPC_MIN_LG_K) + " and <= " + std::to_string(CPC_MAX_LG_K) + ": " + std::to_string(lg_k));
}
accumulator = new (AllocCpc().allocate(1)) cpc_sketch_alloc<A>(lg_k, seed, allocator);
}
template<typename A>
cpc_union_alloc<A>::cpc_union_alloc(const cpc_union_alloc<A>& other):
lg_k(other.lg_k),
seed(other.seed),
accumulator(other.accumulator),
bit_matrix(other.bit_matrix)
{
if (accumulator != nullptr) {
accumulator = new (AllocCpc().allocate(1)) cpc_sketch_alloc<A>(*other.accumulator);
}
}
template<typename A>
cpc_union_alloc<A>::cpc_union_alloc(cpc_union_alloc<A>&& other) noexcept:
lg_k(other.lg_k),
seed(other.seed),
accumulator(other.accumulator),
bit_matrix(std::move(other.bit_matrix))
{
other.accumulator = nullptr;
}
template<typename A>
cpc_union_alloc<A>::~cpc_union_alloc() {
if (accumulator != nullptr) {
accumulator->~cpc_sketch_alloc<A>();
AllocCpc().deallocate(accumulator, 1);
}
}
template<typename A>
cpc_union_alloc<A>& cpc_union_alloc<A>::operator=(const cpc_union_alloc<A>& other) {
cpc_union_alloc<A> copy(other);
std::swap(lg_k, copy.lg_k);
seed = copy.seed;
std::swap(accumulator, copy.accumulator);
bit_matrix = std::move(copy.bit_matrix);
return *this;
}
template<typename A>
cpc_union_alloc<A>& cpc_union_alloc<A>::operator=(cpc_union_alloc<A>&& other) noexcept {
std::swap(lg_k, other.lg_k);
seed = other.seed;
std::swap(accumulator, other.accumulator);
bit_matrix = std::move(other.bit_matrix);
return *this;
}
template<typename A>
void cpc_union_alloc<A>::update(const cpc_sketch_alloc<A>& sketch) {
internal_update(sketch);
}
template<typename A>
void cpc_union_alloc<A>::update(cpc_sketch_alloc<A>&& sketch) {
internal_update(std::forward<cpc_sketch_alloc<A>>(sketch));
}
template<typename A>
template<typename S>
void cpc_union_alloc<A>::internal_update(S&& sketch) {
const uint16_t seed_hash_union = compute_seed_hash(seed);
const uint16_t seed_hash_sketch = compute_seed_hash(sketch.seed);
if (seed_hash_union != seed_hash_sketch) {
throw std::invalid_argument("Incompatible seed hashes: " + std::to_string(seed_hash_union) + ", "
+ std::to_string(seed_hash_sketch));
}
const auto src_flavor = sketch.determine_flavor();
if (cpc_sketch_alloc<A>::flavor::EMPTY == src_flavor) return;
if (sketch.get_lg_k() < lg_k) reduce_k(sketch.get_lg_k());
if (sketch.get_lg_k() < lg_k) throw std::logic_error("sketch lg_k < union lg_k");
if (accumulator == nullptr && bit_matrix.size() == 0) throw std::logic_error("both accumulator and bit matrix are absent");
if (cpc_sketch_alloc<A>::flavor::SPARSE == src_flavor && accumulator != nullptr) { // Case A
if (bit_matrix.size() > 0) throw std::logic_error("union bit_matrix is not expected");
const auto initial_dest_flavor = accumulator->determine_flavor();
if (cpc_sketch_alloc<A>::flavor::EMPTY != initial_dest_flavor &&
cpc_sketch_alloc<A>::flavor::SPARSE != initial_dest_flavor) throw std::logic_error("wrong flavor");
// The following partially fixes the snowplow problem provided that the K's are equal.
if (cpc_sketch_alloc<A>::flavor::EMPTY == initial_dest_flavor && lg_k == sketch.get_lg_k()) {
*accumulator = std::forward<S>(sketch);
return;
}
walk_table_updating_sketch(sketch.surprising_value_table);
const auto final_dst_flavor = accumulator->determine_flavor();
// if the accumulator has graduated beyond sparse, switch to a bit matrix representation
if (final_dst_flavor != cpc_sketch_alloc<A>::flavor::EMPTY && final_dst_flavor != cpc_sketch_alloc<A>::flavor::SPARSE) {
switch_to_bit_matrix();
}
return;
}
if (cpc_sketch_alloc<A>::flavor::SPARSE == src_flavor && bit_matrix.size() > 0) { // Case B
if (accumulator != nullptr) throw std::logic_error("union accumulator != null");
or_table_into_matrix(sketch.surprising_value_table);
return;
}
if (cpc_sketch_alloc<A>::flavor::HYBRID != src_flavor && cpc_sketch_alloc<A>::flavor::PINNED != src_flavor
&& cpc_sketch_alloc<A>::flavor::SLIDING != src_flavor) throw std::logic_error("wrong flavor");
// source is past SPARSE mode, so make sure that dest is a bit matrix
if (accumulator != nullptr) {
if (bit_matrix.size() > 0) throw std::logic_error("union bit matrix is not expected");
const auto dst_flavor = accumulator->determine_flavor();
if (cpc_sketch_alloc<A>::flavor::EMPTY != dst_flavor && cpc_sketch_alloc<A>::flavor::SPARSE != dst_flavor) {
throw std::logic_error("wrong flavor");
}
switch_to_bit_matrix();
}
if (bit_matrix.size() == 0) throw std::logic_error("union bit_matrix is expected");
if (cpc_sketch_alloc<A>::flavor::HYBRID == src_flavor || cpc_sketch_alloc<A>::flavor::PINNED == src_flavor) { // Case C
or_window_into_matrix(sketch.sliding_window, sketch.window_offset, sketch.get_lg_k());
or_table_into_matrix(sketch.surprising_value_table);
return;
}
// SLIDING mode involves inverted logic, so we can't just walk the source sketch.
// Instead, we convert it to a bitMatrix that can be OR'ed into the destination.
if (cpc_sketch_alloc<A>::flavor::SLIDING != src_flavor) throw std::logic_error("wrong flavor"); // Case D
vector_u64<A> src_matrix = sketch.build_bit_matrix();
or_matrix_into_matrix(src_matrix, sketch.get_lg_k());
}
template<typename A>
cpc_sketch_alloc<A> cpc_union_alloc<A>::get_result() const {
if (accumulator != nullptr) {
if (bit_matrix.size() > 0) throw std::logic_error("bit_matrix is not expected");
return get_result_from_accumulator();
}
if (bit_matrix.size() == 0) throw std::logic_error("bit_matrix is expected");
return get_result_from_bit_matrix();
}
template<typename A>
cpc_sketch_alloc<A> cpc_union_alloc<A>::get_result_from_accumulator() const {
if (lg_k != accumulator->get_lg_k()) throw std::logic_error("lg_k != accumulator->lg_k");
if (accumulator->get_num_coupons() == 0) {
return cpc_sketch_alloc<A>(lg_k, seed);
}
if (accumulator->determine_flavor() != cpc_sketch_alloc<A>::flavor::SPARSE) throw std::logic_error("wrong flavor");
cpc_sketch_alloc<A> copy(*accumulator);
copy.was_merged = true;
return copy;
}
template<typename A>
cpc_sketch_alloc<A> cpc_union_alloc<A>::get_result_from_bit_matrix() const {
const uint64_t k = 1 << lg_k;
const uint64_t num_coupons = count_bits_set_in_matrix(bit_matrix.data(), k);
const auto flavor = cpc_sketch_alloc<A>::determine_flavor(lg_k, num_coupons);
if (flavor != cpc_sketch_alloc<A>::flavor::HYBRID && flavor != cpc_sketch_alloc<A>::flavor::PINNED
&& flavor != cpc_sketch_alloc<A>::flavor::SLIDING) throw std::logic_error("wrong flavor");
const uint8_t offset = cpc_sketch_alloc<A>::determine_correct_offset(lg_k, num_coupons);
vector_u8<A> sliding_window(k, 0, bit_matrix.get_allocator());
// don't need to zero the window's memory
// dynamically growing caused snowplow effect
uint8_t table_lg_size = lg_k - 4; // K/16; in some cases this will end up being oversized
if (table_lg_size < 2) table_lg_size = 2;
u32_table<A> table(table_lg_size, 6 + lg_k, bit_matrix.get_allocator());
// the following should work even when the offset is zero
const uint64_t mask_for_clearing_window = (static_cast<uint64_t>(0xff) << offset) ^ UINT64_MAX;
const uint64_t mask_for_flipping_early_zone = (static_cast<uint64_t>(1) << offset) - 1;
uint64_t all_surprises_ored = 0;
// The snowplow effect was caused by processing the rows in order,
// but we have fixed it by using a sufficiently large hash table.
for (unsigned i = 0; i < k; i++) {
uint64_t pattern = bit_matrix[i];
sliding_window[i] = (pattern >> offset) & 0xff;
pattern &= mask_for_clearing_window;
pattern ^= mask_for_flipping_early_zone; // this flipping converts surprising 0's to 1's
all_surprises_ored |= pattern;
while (pattern != 0) {
const uint8_t col = count_trailing_zeros_in_u64(pattern);
pattern = pattern ^ (static_cast<uint64_t>(1) << col); // erase the 1
const uint32_t row_col = (i << 6) | col;
bool is_novel = table.maybe_insert(row_col);
if (!is_novel) throw std::logic_error("is_novel != true");
}
}
// at this point we could shrink an oversized hash table, but the relative waste isn't very big
uint8_t first_interesting_column = count_trailing_zeros_in_u64(all_surprises_ored);
if (first_interesting_column > offset) first_interesting_column = offset; // corner case
// HIP-related fields will contain zeros, and that is okay
return cpc_sketch_alloc<A>(lg_k, num_coupons, first_interesting_column, std::move(table), std::move(sliding_window), false, 0, 0, seed);
}
template<typename A>
void cpc_union_alloc<A>::switch_to_bit_matrix() {
bit_matrix = accumulator->build_bit_matrix();
accumulator->~cpc_sketch_alloc<A>();
AllocCpc().deallocate(accumulator, 1);
accumulator = nullptr;
}
template<typename A>
void cpc_union_alloc<A>::walk_table_updating_sketch(const u32_table<A>& table) {
const uint32_t* slots = table.get_slots();
const size_t num_slots = 1 << table.get_lg_size();
const uint64_t dst_mask = (((1 << accumulator->get_lg_k()) - 1) << 6) | 63; // downsamples when dst lgK < src LgK
// Using a golden ratio stride fixes the snowplow effect.
const double golden = 0.6180339887498949025;
size_t stride = static_cast<size_t>(golden * static_cast<double>(num_slots));
if (stride < 2) throw std::logic_error("stride < 2");
if (stride == ((stride >> 1) << 1)) stride += 1; // force the stride to be odd
if (stride < 3 || stride >= num_slots) throw std::out_of_range("stride out of range");
for (size_t i = 0, j = 0; i < num_slots; i++, j += stride) {
j &= num_slots - 1;
const uint32_t row_col = slots[j];
if (row_col != UINT32_MAX) {
accumulator->row_col_update(row_col & dst_mask);
}
}
}
template<typename A>
void cpc_union_alloc<A>::or_table_into_matrix(const u32_table<A>& table) {
const uint32_t* slots = table.get_slots();
const size_t num_slots = 1 << table.get_lg_size();
const uint64_t dest_mask = (1 << lg_k) - 1; // downsamples when dst lgK < sr LgK
for (size_t i = 0; i < num_slots; i++) {
const uint32_t row_col = slots[i];
if (row_col != UINT32_MAX) {
const uint8_t col = row_col & 63;
const size_t row = row_col >> 6;
bit_matrix[row & dest_mask] |= static_cast<uint64_t>(1) << col; // set the bit
}
}
}
template<typename A>
void cpc_union_alloc<A>::or_window_into_matrix(const vector_u8<A>& sliding_window, uint8_t offset, uint8_t src_lg_k) {
if (lg_k > src_lg_k) throw std::logic_error("dst LgK > src LgK");
const uint64_t dst_mask = (1 << lg_k) - 1; // downsamples when dst lgK < src LgK
const size_t src_k = 1 << src_lg_k;
for (size_t src_row = 0; src_row < src_k; src_row++) {
bit_matrix[src_row & dst_mask] |= static_cast<uint64_t>(sliding_window[src_row]) << offset;
}
}
template<typename A>
void cpc_union_alloc<A>::or_matrix_into_matrix(const vector_u64<A>& src_matrix, uint8_t src_lg_k) {
if (lg_k > src_lg_k) throw std::logic_error("dst LgK > src LgK");
const uint64_t dst_mask = (1 << lg_k) - 1; // downsamples when dst lgK < src LgK
const size_t src_k = 1 << src_lg_k;
for (size_t src_row = 0; src_row < src_k; src_row++) {
bit_matrix[src_row & dst_mask] |= src_matrix[src_row];
}
}
template<typename A>
void cpc_union_alloc<A>::reduce_k(uint8_t new_lg_k) {
if (new_lg_k >= lg_k) throw std::logic_error("new LgK >= union lgK");
if (accumulator == nullptr && bit_matrix.size() == 0) throw std::logic_error("both accumulator and bit_matrix are absent");
if (bit_matrix.size() > 0) { // downsample the unioner's bit matrix
if (accumulator != nullptr) throw std::logic_error("accumulator is not null");
vector_u64<A> old_matrix = std::move(bit_matrix);
const uint8_t old_lg_k = lg_k;
const size_t new_k = 1 << new_lg_k;
bit_matrix = vector_u64<A>(new_k, 0, old_matrix.get_allocator());
lg_k = new_lg_k;
or_matrix_into_matrix(old_matrix, old_lg_k);
return;
}
if (accumulator != nullptr) { // downsample the unioner's sketch
if (bit_matrix.size() > 0) throw std::logic_error("bit_matrix is not expected");
if (!accumulator->is_empty()) {
cpc_sketch_alloc<A> old_accumulator(*accumulator);
*accumulator = cpc_sketch_alloc<A>(new_lg_k, seed);
walk_table_updating_sketch(old_accumulator.surprising_value_table);
}
lg_k = new_lg_k;
const auto final_new_flavor = accumulator->determine_flavor();
// if the new sketch has graduated beyond sparse, convert to bit_matrix
if (final_new_flavor != cpc_sketch_alloc<A>::flavor::EMPTY &&
final_new_flavor != cpc_sketch_alloc<A>::flavor::SPARSE) {
switch_to_bit_matrix();
}
return;
}
throw std::logic_error("invalid state");
}
} /* namespace datasketches */
#endif