| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| #ifndef REQ_COMPACTOR_IMPL_HPP_ |
| #define REQ_COMPACTOR_IMPL_HPP_ |
| |
| #include <stdexcept> |
| #include <cmath> |
| #include <algorithm> |
| |
| #include "count_zeros.hpp" |
| #include "conditional_forward.hpp" |
| |
| #include <iomanip> |
| |
| namespace datasketches { |
| |
| template<typename T, bool H, typename C, typename A> |
| req_compactor<T, H, C, A>::req_compactor(uint8_t lg_weight, uint32_t section_size, const A& allocator, bool sorted): |
| allocator_(allocator), |
| lg_weight_(lg_weight), |
| coin_(false), |
| sorted_(sorted), |
| section_size_raw_(section_size), |
| section_size_(section_size), |
| num_sections_(req_constants::INIT_NUM_SECTIONS), |
| state_(0), |
| num_items_(0), |
| capacity_(2 * get_nom_capacity()), |
| items_(allocator_.allocate(capacity_)) |
| {} |
| |
| template<typename T, bool H, typename C, typename A> |
| req_compactor<T, H, C, A>::~req_compactor() { |
| if (items_ != nullptr) { |
| for (auto it = begin(); it != end(); ++it) (*it).~T(); |
| allocator_.deallocate(items_, capacity_); |
| } |
| } |
| |
| template<typename T, bool H, typename C, typename A> |
| req_compactor<T, H, C, A>::req_compactor(const req_compactor& other): |
| allocator_(other.allocator_), |
| lg_weight_(other.lg_weight_), |
| coin_(other.coin_), |
| sorted_(other.sorted_), |
| section_size_raw_(other.section_size_raw_), |
| section_size_(other.section_size_), |
| num_sections_(other.num_sections_), |
| state_(other.state_), |
| num_items_(other.num_items_), |
| capacity_(other.capacity_), |
| items_(nullptr) |
| { |
| if (other.items_ != nullptr) { |
| items_ = allocator_.allocate(capacity_); |
| const size_t from = H ? capacity_ - num_items_ : 0; |
| const size_t to = H ? capacity_ : num_items_; |
| for (size_t i = from; i < to; ++i) new (items_ + i) T(other.items_[i]); |
| } |
| } |
| |
| template<typename T, bool H, typename C, typename A> |
| req_compactor<T, H, C, A>::req_compactor(req_compactor&& other) noexcept : |
| allocator_(std::move(other.allocator_)), |
| lg_weight_(other.lg_weight_), |
| coin_(other.coin_), |
| sorted_(other.sorted_), |
| section_size_raw_(other.section_size_raw_), |
| section_size_(other.section_size_), |
| num_sections_(other.num_sections_), |
| state_(other.state_), |
| num_items_(other.num_items_), |
| capacity_(other.capacity_), |
| items_(other.items_) |
| { |
| other.items_ = nullptr; |
| } |
| |
| template<typename T, bool H, typename C, typename A> |
| req_compactor<T, H, C, A>& req_compactor<T, H, C, A>::operator=(const req_compactor& other) { |
| req_compactor copy(other); |
| std::swap(allocator_, copy.allocator_); |
| std::swap(lg_weight_, copy.lg_weight_); |
| std::swap(coin_, copy.coin_); |
| std::swap(sorted_, copy.sorted_); |
| std::swap(section_size_raw_, copy.section_size_raw_); |
| std::swap(section_size_, copy.section_size_); |
| std::swap(num_sections_, copy.num_sections_); |
| std::swap(state_, copy.state_); |
| std::swap(num_items_, copy.num_items_); |
| std::swap(capacity_, copy.capacity_); |
| std::swap(items_, copy.items_); |
| return *this; |
| } |
| |
| template<typename T, bool H, typename C, typename A> |
| req_compactor<T, H, C, A>& req_compactor<T, H, C, A>::operator=(req_compactor&& other) { |
| std::swap(allocator_, other.allocator_); |
| std::swap(lg_weight_, other.lg_weight_); |
| std::swap(coin_, other.coin_); |
| std::swap(sorted_, other.sorted_); |
| std::swap(section_size_raw_, other.section_size_raw_); |
| std::swap(section_size_, other.section_size_); |
| std::swap(num_sections_, other.num_sections_); |
| std::swap(state_, other.state_); |
| std::swap(num_items_, other.num_items_); |
| std::swap(capacity_, other.capacity_); |
| std::swap(items_, other.items_); |
| return *this; |
| } |
| |
| template<typename T, bool H, typename C, typename A> |
| bool req_compactor<T, H, C, A>::is_sorted() const { |
| return sorted_; |
| } |
| |
| template<typename T, bool H, typename C, typename A> |
| uint32_t req_compactor<T, H, C, A>::get_num_items() const { |
| return num_items_; |
| } |
| |
| template<typename T, bool H, typename C, typename A> |
| uint32_t req_compactor<T, H, C, A>::get_nom_capacity() const { |
| return req_constants::MULTIPLIER * num_sections_ * section_size_; |
| } |
| |
| template<typename T, bool H, typename C, typename A> |
| uint8_t req_compactor<T, H, C, A>::get_lg_weight() const { |
| return lg_weight_; |
| } |
| |
| template<typename T, bool H, typename C, typename A> |
| template<bool inclusive> |
| uint64_t req_compactor<T, H, C, A>::compute_weight(const T& item) const { |
| if (!sorted_) const_cast<req_compactor*>(this)->sort(); // allow sorting as a side effect |
| auto it = inclusive ? |
| std::upper_bound(begin(), end(), item, C()) : |
| std::lower_bound(begin(), end(), item, C()); |
| return std::distance(begin(), it) << lg_weight_; |
| } |
| |
| template<typename T, bool H, typename C, typename A> |
| template<typename FwdT> |
| void req_compactor<T, H, C, A>::append(FwdT&& item) { |
| if (num_items_ == capacity_) grow(capacity_ + get_nom_capacity()); |
| const size_t i = H ? capacity_ - num_items_ - 1 : num_items_; |
| new (items_ + i) T(std::forward<FwdT>(item)); |
| ++num_items_; |
| if (num_items_ > 1) sorted_ = false; |
| } |
| |
| template<typename T, bool H, typename C, typename A> |
| void req_compactor<T, H, C, A>::grow(size_t new_capacity) { |
| T* new_items = allocator_.allocate(new_capacity); |
| const size_t from = H ? capacity_ - num_items_ : 0; |
| const size_t to = H ? capacity_ : num_items_; |
| size_t new_i = H ? new_capacity - num_items_ : 0; |
| for (size_t i = from; i < to; ++i, ++new_i) { |
| new (new_items + new_i) T(std::move(items_[i])); |
| items_[i].~T(); |
| } |
| allocator_.deallocate(items_, capacity_); |
| items_ = new_items; |
| capacity_ = new_capacity; |
| } |
| |
| template<typename T, bool H, typename C, typename A> |
| void req_compactor<T, H, C, A>::ensure_space(size_t num) { |
| if (num_items_ + num > capacity_) grow(num_items_ + num + get_nom_capacity()); |
| } |
| |
| template<typename T, bool H, typename C, typename A> |
| const T* req_compactor<T, H, C, A>::begin() const { |
| return items_ + (H ? capacity_ - num_items_ : 0); |
| } |
| |
| template<typename T, bool H, typename C, typename A> |
| const T* req_compactor<T, H, C, A>::end() const { |
| return items_ + (H ? capacity_ : num_items_); |
| } |
| |
| template<typename T, bool H, typename C, typename A> |
| T* req_compactor<T, H, C, A>::begin() { |
| return items_ + (H ? capacity_ - num_items_ : 0); |
| } |
| |
| template<typename T, bool H, typename C, typename A> |
| T* req_compactor<T, H, C, A>::end() { |
| return items_ + (H ? capacity_ : num_items_); |
| } |
| |
| template<typename T, bool H, typename C, typename A> |
| template<typename FwdC> |
| void req_compactor<T, H, C, A>::merge(FwdC&& other) { |
| // TODO: swap if other is larger? |
| if (lg_weight_ != other.lg_weight_) throw std::logic_error("weight mismatch"); |
| state_ |= other.state_; |
| while (ensure_enough_sections()) {} |
| ensure_space(other.get_num_items()); |
| sort(); |
| auto middle = H ? begin() : end(); |
| auto from = H ? begin() - other.get_num_items() : end(); |
| auto to = from + other.get_num_items(); |
| auto other_it = other.begin(); |
| for (auto it = from; it != to; ++it, ++other_it) new (it) T(conditional_forward<FwdC>(*other_it)); |
| if (!other.sorted_) std::sort(from, to, C()); |
| std::inplace_merge(H ? from : begin(), middle, H ? end() : to); |
| num_items_ += other.get_num_items(); |
| } |
| |
| template<typename T, bool H, typename C, typename A> |
| void req_compactor<T, H, C, A>::sort() { |
| if (!sorted_) { |
| std::sort(begin(), end(), C()); |
| sorted_ = true; |
| } |
| } |
| |
| template<typename T, bool H, typename C, typename A> |
| void req_compactor<T, H, C, A>::compact(req_compactor& next) { |
| // choose a part of the buffer to compact |
| const uint32_t secs_to_compact = std::min(static_cast<uint32_t>(count_trailing_zeros_in_u32(~state_) + 1), static_cast<uint32_t>(num_sections_)); |
| const size_t compaction_range = compute_compaction_range(secs_to_compact); |
| const uint32_t compact_from = compaction_range & 0xFFFFFFFFLL; // low 32 |
| const uint32_t compact_to = compaction_range >> 32; // high 32 |
| if (compact_to - compact_from < 2) throw std::logic_error("compaction range error"); |
| |
| if ((state_ & 1) == 1) { coin_ = !coin_; } // for odd flip coin; |
| else { coin_ = req_random_bit(); } // random coin flip |
| |
| const auto num = (compact_to - compact_from) / 2; |
| next.ensure_space(num); |
| auto next_middle = H ? next.begin() : next.end(); |
| auto next_empty = H ? next.begin() - num : next.end(); |
| promote_evens_or_odds(begin() + compact_from, begin() + compact_to, coin_, next_empty); |
| next.num_items_ += num; |
| std::inplace_merge(next.begin(), next_middle, next.end(), C()); |
| for (size_t i = compact_from; i < compact_to; ++i) (*(begin() + i)).~T(); |
| num_items_ -= compact_to - compact_from; |
| |
| ++state_; |
| ensure_enough_sections(); |
| } |
| |
| template<typename T, bool H, typename C, typename A> |
| bool req_compactor<T, H, C, A>::ensure_enough_sections() { |
| const float ssr = section_size_raw_ / sqrt(2); |
| const uint32_t ne = nearest_even(ssr); |
| if (state_ >= static_cast<uint64_t>(1 << (num_sections_ - 1)) && ne >= req_constants::MIN_K) { |
| section_size_raw_ = ssr; |
| section_size_ = ne; |
| num_sections_ <<= 1; |
| if (capacity_ < 2 * get_nom_capacity()) grow(2 * get_nom_capacity()); |
| return true; |
| } |
| return false; |
| } |
| |
| template<typename T, bool H, typename C, typename A> |
| size_t req_compactor<T, H, C, A>::compute_compaction_range(uint32_t secs_to_compact) const { |
| uint32_t non_compact = get_nom_capacity() / 2 + (num_sections_ - secs_to_compact) * section_size_; |
| // make compacted region even |
| if (((num_items_ - non_compact) & 1) == 1) ++non_compact; |
| const size_t low = H ? 0 : non_compact; |
| const size_t high = H ? num_items_ - non_compact : num_items_; |
| return (high << 32) + low; |
| } |
| |
| template<typename T, bool H, typename C, typename A> |
| uint32_t req_compactor<T, H, C, A>::nearest_even(float value) { |
| return static_cast<uint32_t>(round(value / 2)) << 1; |
| } |
| |
| template<typename T, bool H, typename C, typename A> |
| template<typename InIter, typename OutIter> |
| void req_compactor<T, H, C, A>::promote_evens_or_odds(InIter from, InIter to, bool odds, OutIter dst) { |
| if (from == to) return; |
| InIter i = from; |
| if (odds) ++i; |
| while (i != to) { |
| new (dst) T(std::move(*i)); |
| ++dst; |
| ++i; |
| if (i == to) break; |
| ++i; |
| } |
| } |
| |
| // helpers for integral types |
| template<typename T> |
| static inline T read(std::istream& is) { |
| T value; |
| is.read(reinterpret_cast<char*>(&value), sizeof(T)); |
| return value; |
| } |
| |
| template<typename T> |
| static inline void write(std::ostream& os, T value) { |
| os.write(reinterpret_cast<const char*>(&value), sizeof(T)); |
| } |
| |
| // implementation for fixed-size arithmetic types (integral and floating point) |
| template<typename T, bool H, typename C, typename A> |
| template<typename S, typename TT, typename std::enable_if<std::is_arithmetic<TT>::value, int>::type> |
| size_t req_compactor<T, H, C, A>::get_serialized_size_bytes(const S&) const { |
| return sizeof(state_) + sizeof(section_size_raw_) + sizeof(lg_weight_) + sizeof(num_sections_) + |
| sizeof(uint16_t) + // padding |
| sizeof(uint32_t) + // num_items |
| sizeof(TT) * num_items_; |
| } |
| |
| // implementation for all other types |
| template<typename T, bool H, typename C, typename A> |
| template<typename S, typename TT, typename std::enable_if<!std::is_arithmetic<TT>::value, int>::type> |
| size_t req_compactor<T, H, C, A>::get_serialized_size_bytes(const S& serde) const { |
| size_t size = sizeof(state_) + sizeof(section_size_raw_) + sizeof(lg_weight_) + sizeof(num_sections_) + |
| sizeof(uint16_t) + // padding |
| sizeof(uint32_t); // num_items |
| sizeof(TT) * num_items_; |
| for (const auto& item: items_) size += serde.size_of_item(item); |
| return size; |
| } |
| |
| template<typename T, bool H, typename C, typename A> |
| template<typename S> |
| void req_compactor<T, H, C, A>::serialize(std::ostream& os, const S& serde) const { |
| write(os, state_); |
| write(os, section_size_raw_); |
| write(os, lg_weight_); |
| write(os, num_sections_); |
| const uint16_t padding = 0; |
| write(os, padding); |
| write(os, num_items_); |
| serde.serialize(os, begin(), num_items_); |
| } |
| |
| template<typename T, bool H, typename C, typename A> |
| template<typename S> |
| size_t req_compactor<T, H, C, A>::serialize(void* dst, size_t capacity, const S& serde) const { |
| uint8_t* ptr = static_cast<uint8_t*>(dst); |
| const uint8_t* end_ptr = ptr + capacity; |
| ptr += copy_to_mem(state_, ptr); |
| ptr += copy_to_mem(section_size_raw_, ptr); |
| ptr += copy_to_mem(lg_weight_, ptr); |
| ptr += copy_to_mem(num_sections_, ptr); |
| const uint16_t padding = 0; |
| ptr += copy_to_mem(padding, ptr); |
| ptr += copy_to_mem(num_items_, ptr); |
| ptr += serde.serialize(ptr, end_ptr - ptr, begin(), num_items_); |
| return ptr - static_cast<uint8_t*>(dst); |
| } |
| |
| template<typename T, bool H, typename C, typename A> |
| template<typename S> |
| req_compactor<T, H, C, A> req_compactor<T, H, C, A>::deserialize(std::istream& is, const S& serde, const A& allocator, bool sorted) { |
| auto state = read<decltype(state_)>(is); |
| auto section_size_raw = read<decltype(section_size_raw_)>(is); |
| auto lg_weight = read<decltype(lg_weight_)>(is); |
| auto num_sections = read<decltype(num_sections_)>(is); |
| read<uint16_t>(is); // padding |
| auto num_items = read<uint32_t>(is); |
| auto items = deserialize_items(is, serde, allocator, num_items); |
| return req_compactor(lg_weight, sorted, section_size_raw, num_sections, state, std::move(items), num_items, allocator); |
| } |
| |
| template<typename T, bool H, typename C, typename A> |
| template<typename S> |
| req_compactor<T, H, C, A> req_compactor<T, H, C, A>::deserialize(std::istream& is, const S& serde, const A& allocator, bool sorted, uint16_t k, uint8_t num_items) { |
| auto items = deserialize_items(is, serde, allocator, num_items); |
| return req_compactor(0, sorted, k, req_constants::INIT_NUM_SECTIONS, 0, std::move(items), num_items, allocator); |
| } |
| |
| template<typename T, bool H, typename C, typename A> |
| template<typename S> |
| auto req_compactor<T, H, C, A>::deserialize_items(std::istream& is, const S& serde, const A& allocator, size_t num) |
| -> std::unique_ptr<T, items_deleter> { |
| A alloc(allocator); |
| std::unique_ptr<T, items_deleter> items(alloc.allocate(num), items_deleter(allocator, false, num)); |
| serde.deserialize(is, items.get(), num); |
| // serde did not throw, enable destructors |
| items.get_deleter().set_destroy(true); |
| if (!is.good()) throw std::runtime_error("error reading from std::istream"); |
| return std::move(items); |
| } |
| |
| template<typename T, bool H, typename C, typename A> |
| template<typename S> |
| std::pair<req_compactor<T, H, C, A>, size_t> req_compactor<T, H, C, A>::deserialize(const void* bytes, size_t size, const S& serde, const A& allocator, bool sorted) { |
| ensure_minimum_memory(size, 8); |
| const char* ptr = static_cast<const char*>(bytes); |
| const char* end_ptr = static_cast<const char*>(bytes) + size; |
| |
| uint64_t state; |
| ptr += copy_from_mem(ptr, state); |
| float section_size_raw; |
| ptr += copy_from_mem(ptr, section_size_raw); |
| uint8_t lg_weight; |
| ptr += copy_from_mem(ptr, lg_weight); |
| uint8_t num_sections; |
| ptr += copy_from_mem(ptr, num_sections); |
| ptr += 2; // padding |
| uint32_t num_items; |
| ptr += copy_from_mem(ptr, num_items); |
| auto pair = deserialize_items(ptr, end_ptr - ptr, serde, allocator, num_items); |
| ptr += pair.second; |
| return std::pair<req_compactor, size_t>( |
| req_compactor(lg_weight, sorted, section_size_raw, num_sections, state, std::move(pair.first), num_items, allocator), |
| ptr - static_cast<const char*>(bytes) |
| ); |
| } |
| |
| template<typename T, bool H, typename C, typename A> |
| template<typename S> |
| std::pair<req_compactor<T, H, C, A>, size_t> req_compactor<T, H, C, A>::deserialize(const void* bytes, size_t size, const S& serde, const A& allocator, bool sorted, uint16_t k, uint8_t num_items) { |
| auto pair = deserialize_items(bytes, size, serde, allocator, num_items); |
| return std::pair<req_compactor, size_t>( |
| req_compactor(0, sorted, k, req_constants::INIT_NUM_SECTIONS, 0, std::move(pair.first), num_items, allocator), |
| pair.second |
| ); |
| } |
| |
| template<typename T, bool H, typename C, typename A> |
| template<typename S> |
| auto req_compactor<T, H, C, A>::deserialize_items(const void* bytes, size_t size, const S& serde, const A& allocator, size_t num) |
| -> std::pair<std::unique_ptr<T, items_deleter>, size_t> { |
| const char* ptr = static_cast<const char*>(bytes); |
| const char* end_ptr = static_cast<const char*>(bytes) + size; |
| A alloc(allocator); |
| std::unique_ptr<T, items_deleter> items(alloc.allocate(num), items_deleter(allocator, false, num)); |
| ptr += serde.deserialize(ptr, end_ptr - ptr, items.get(), num); |
| // serde did not throw, enable destructors |
| items.get_deleter().set_destroy(true); |
| return std::pair<std::unique_ptr<T, items_deleter>, size_t>( |
| std::move(items), |
| ptr - static_cast<const char*>(bytes) |
| ); |
| } |
| |
| |
| template<typename T, bool H, typename C, typename A> |
| req_compactor<T, H, C, A>::req_compactor(uint8_t lg_weight, bool sorted, float section_size_raw, uint8_t num_sections, uint64_t state, std::unique_ptr<T, items_deleter> items, uint32_t num_items, const A& allocator): |
| allocator_(allocator), |
| lg_weight_(lg_weight), |
| coin_(req_random_bit()), |
| sorted_(sorted), |
| section_size_raw_(section_size_raw), |
| section_size_(nearest_even(section_size_raw)), |
| num_sections_(num_sections), |
| state_(state), |
| num_items_(num_items), |
| capacity_(num_items), |
| items_(items.release()) |
| {} |
| |
| template<typename T, bool H, typename C, typename A> |
| class req_compactor<T, H, C, A>::items_deleter { |
| public: |
| items_deleter(const A& allocator, bool destroy, uint32_t num): allocator(allocator), destroy(destroy), num(num) {} |
| void operator() (T* ptr) { |
| if (ptr != nullptr) { |
| if (destroy) { |
| for (uint32_t i = 0; i < num; ++i) { |
| ptr[i].~T(); |
| } |
| } |
| allocator.deallocate(ptr, num); |
| } |
| } |
| void set_destroy(bool destroy) { this->destroy = destroy; } |
| private: |
| A allocator; |
| bool destroy; |
| uint32_t num; |
| }; |
| |
| } /* namespace datasketches */ |
| |
| #endif |