blob: bf2ce7b8216119193a7d560bd5d520a87fcc7fb5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef REQ_SKETCH_IMPL_HPP_
#define REQ_SKETCH_IMPL_HPP_
#include <sstream>
namespace datasketches {
template<typename T, bool H, typename C, typename S, typename A>
req_sketch<T, H, C, S, A>::req_sketch(uint16_t k, const A& allocator):
allocator_(allocator),
k_(std::max(static_cast<int>(k) & -2, static_cast<int>(req_constants::MIN_K))), //rounds down one if odd
max_nom_size_(0),
num_retained_(0),
n_(0),
compactors_(allocator),
min_value_(nullptr),
max_value_(nullptr)
{
grow();
}
template<typename T, bool H, typename C, typename S, typename A>
req_sketch<T, H, C, S, A>::~req_sketch() {
if (min_value_ != nullptr) {
min_value_->~T();
allocator_.deallocate(min_value_, 1);
}
if (max_value_ != nullptr) {
max_value_->~T();
allocator_.deallocate(max_value_, 1);
}
}
template<typename T, bool H, typename C, typename S, typename A>
bool req_sketch<T, H, C, S, A>::is_empty() const {
return n_ == 0;
}
template<typename T, bool H, typename C, typename S, typename A>
uint64_t req_sketch<T, H, C, S, A>::get_n() const {
return n_;
}
template<typename T, bool H, typename C, typename S, typename A>
uint32_t req_sketch<T, H, C, S, A>::get_num_retained() const {
return num_retained_;
}
template<typename T, bool H, typename C, typename S, typename A>
bool req_sketch<T, H, C, S, A>::is_estimation_mode() const {
return compactors_.size() > 1;
}
template<typename T, bool H, typename C, typename S, typename A>
template<typename FwdT>
void req_sketch<T, H, C, S, A>::update(FwdT&& item) {
if (!check_update_value(item)) { return; }
if (is_empty()) {
min_value_ = new (allocator_.allocate(1)) T(item);
max_value_ = new (allocator_.allocate(1)) T(item);
} else {
if (C()(item, *min_value_)) *min_value_ = item;
if (C()(*max_value_, item)) *max_value_ = item;
}
compactors_[0].append(item);
++num_retained_;
++n_;
if (num_retained_ == max_nom_size_) {
compactors_[0].sort();
compress();
}
}
template<typename T, bool H, typename C, typename S, typename A>
const T& req_sketch<T, H, C, S, A>::get_min_value() const {
if (is_empty()) return get_invalid_value();
return *min_value_;
}
template<typename T, bool H, typename C, typename S, typename A>
const T& req_sketch<T, H, C, S, A>::get_max_value() const {
if (is_empty()) return get_invalid_value();
return *max_value_;
}
template<typename T, bool H, typename C, typename S, typename A>
template<bool inclusive>
double req_sketch<T, H, C, S, A>::get_rank(const T& item) const {
uint64_t weight = 0;
for (const auto& compactor: compactors_) {
weight += compactor.template compute_weight<inclusive>(item);
}
return static_cast<double>(weight) / n_;
}
template<typename T, bool H, typename C, typename S, typename A>
template<bool inclusive>
const T& req_sketch<T, H, C, S, A>::get_quantile(double rank) const {
if (is_empty()) return get_invalid_value();
if (rank == 0.0) return *min_value_;
if (rank == 1.0) return *max_value_;
if ((rank < 0.0) || (rank > 1.0)) {
throw std::invalid_argument("Rank cannot be less than zero or greater than 1.0");
}
// TODO: min and max
if (!compactors_[0].is_sorted()) {
const_cast<req_compactor<T, H, C, A>&>(compactors_[0]).sort(); // allow this side effect
}
req_quantile_calculator<T, A> quantile_calculator(n_, allocator_);
for (auto& compactor: compactors_) {
quantile_calculator.add(compactor.get_items(), compactor.get_lg_weight());
}
quantile_calculator.template convert_to_cummulative<inclusive>();
return quantile_calculator.get_quantile(rank);
}
template<typename T, bool H, typename C, typename S, typename A>
void req_sketch<T, H, C, S, A>::serialize(std::ostream& os) const {
const uint8_t preamble_longs = 1;
write(os, preamble_longs);
const uint8_t serial_version = SERIAL_VERSION;
write(os, serial_version);
const uint8_t family = FAMILY;
write(os, family);
const bool is_single_item = n_ == 1;
const uint8_t flags_byte(
(is_empty() ? 1 << flags::IS_EMPTY : 0)
| (H ? 1 << flags::IS_HIGH_RANK : 0)
| (compactors_[0].is_sorted() ? 1 << flags::IS_LEVEL_ZERO_SORTED : 0)
| (is_single_item ? 1 << flags::IS_SINGLE_ITEM : 0)
);
write(os, flags_byte);
write(os, k_);
const uint8_t num_levels = get_num_levels();
write(os, num_levels);
const uint8_t unused = 0;
write(os, unused);
if (is_empty()) return;
if (is_estimation_mode()) {
write(os, n_);
S().serialize(os, min_value_, 1);
S().serialize(os, max_value_, 1);
}
if (is_single_item) {
S().serialize(os, min_value_, 1);
} else {
for (const auto& compactor: compactors_) compactor.serialize(os, S());
}
}
template<typename T, bool H, typename C, typename S, typename A>
req_sketch<T, H, C, S, A> req_sketch<T, H, C, S, A>::deserialize(std::istream& is, const A& allocator) {
const auto preamble_longs = read<uint8_t>(is);
const auto serial_version = read<uint8_t>(is);
const auto family_id = read<uint8_t>(is);
const auto flags_byte = read<uint8_t>(is);
const auto k = read<uint16_t>(is);
const auto num_levels = read<uint8_t>(is);
const auto unused = read<uint8_t>(is);
// TODO: checks
if (!is.good()) throw std::runtime_error("error reading from std::istream");
const bool is_empty = flags_byte & (1 << flags::IS_EMPTY);
if (is_empty) return req_sketch(k, allocator);
uint64_t n = 1;
if (num_levels > 1) n = read<uint64_t>(is);
A alloc(allocator);
auto item_buffer_deleter = [&alloc](T* ptr) { alloc.deallocate(ptr, 1); };
std::unique_ptr<T, decltype(item_buffer_deleter)> min_value_buffer(alloc.allocate(1), item_buffer_deleter);
std::unique_ptr<T, decltype(item_buffer_deleter)> max_value_buffer(alloc.allocate(1), item_buffer_deleter);
std::unique_ptr<T, item_deleter> min_value(nullptr, item_deleter(allocator));
std::unique_ptr<T, item_deleter> max_value(nullptr, item_deleter(allocator));
const bool is_single_item = flags_byte & (1 << flags::IS_SINGLE_ITEM);
if (num_levels > 1) {
S().deserialize(is, min_value_buffer.get(), 1);
// serde call did not throw, repackage with destrtuctor
min_value = std::unique_ptr<T, item_deleter>(min_value_buffer.release(), item_deleter(allocator));
S().deserialize(is, max_value_buffer.get(), 1);
// serde call did not throw, repackage with destrtuctor
max_value = std::unique_ptr<T, item_deleter>(max_value_buffer.release(), item_deleter(allocator));
}
const bool is_level_0_sorted = flags_byte & (1 << flags::IS_LEVEL_ZERO_SORTED);
std::vector<Compactor, AllocCompactor> compactors(allocator);
std::unique_ptr<T, decltype(item_buffer_deleter)> item_buffer(alloc.allocate(1), item_buffer_deleter);
if (is_single_item) {
S().deserialize(is, min_value_buffer.get(), 1);
// serde call did not throw, repackage with destrtuctor
min_value = std::unique_ptr<T, item_deleter>(min_value_buffer.release(), item_deleter(allocator));
new (max_value_buffer.get()) T(*min_value);
// copy did not throw, repackage with destrtuctor
max_value = std::unique_ptr<T, item_deleter>(max_value_buffer.release(), item_deleter(allocator));
compactors.push_back(req_compactor<T, H, C, A>(1, k, allocator, min_value.get(), is_level_0_sorted));
} else {
if (num_levels == 1) {
const auto& items = compactors[0].get_items();
n = items.size();
auto min_it = items.begin();
auto max_it = items.begin();
auto it = items.begin();
while (it != items.end()) {
if (C()(*it, *min_it)) min_it = it;
if (C()(*max_it, *it)) max_it = it;
++it;
}
new (min_value_buffer.get()) T(*min_it);
// copy did not throw, repackage with destrtuctor
min_value = std::unique_ptr<T, item_deleter>(min_value_buffer.release(), item_deleter(allocator));
new (max_value_buffer.get()) T(*max_it);
// copy did not throw, repackage with destrtuctor
max_value = std::unique_ptr<T, item_deleter>(max_value_buffer.release(), item_deleter(allocator));
} else {
for (size_t i = 0; i < num_levels; ++i) {
auto compactor = req_compactor<T, H, C, A>::deserialize(is, S(), allocator, i == 0 ? is_level_0_sorted : true);
compactors.push_back(std::move(compactor));
}
}
}
if (!is.good()) throw std::runtime_error("error reading from std::istream");
return req_sketch(k, n, std::move(min_value), std::move(max_value), std::move(compactors));
}
//template<typename T, bool H, typename C, typename S, typename A>
//req_sketch<T, H, C, S, A> deserialize(const void* bytes, size_t size) {
//
//}
template<typename T, bool H, typename C, typename S, typename A>
void req_sketch<T, H, C, S, A>::grow() {
const uint8_t lg_weight = get_num_levels();
compactors_.push_back(Compactor(lg_weight, k_, allocator_));
update_max_nom_size();
}
template<typename T, bool H, typename C, typename S, typename A>
uint8_t req_sketch<T, H, C, S, A>::get_num_levels() const {
return compactors_.size();
}
template<typename T, bool H, typename C, typename S, typename A>
void req_sketch<T, H, C, S, A>::update_max_nom_size() {
max_nom_size_ = 0;
for (const auto& compactor: compactors_) max_nom_size_ += compactor.get_nom_capacity();
}
template<typename T, bool H, typename C, typename S, typename A>
void req_sketch<T, H, C, S, A>::update_num_retained() {
num_retained_ = 0;
for (const auto& compactor: compactors_) num_retained_ += compactor.get_num_items();
}
template<typename T, bool H, typename C, typename S, typename A>
void req_sketch<T, H, C, S, A>::compress() {
for (size_t h = 0; h < compactors_.size(); ++h) {
if (compactors_[h].get_num_items() >= compactors_[h].get_nom_capacity()) {
if (h + 1 >= get_num_levels()) { // at the top?
grow(); // add a level, increases max_nom_size
}
auto promoted = compactors_[h].compact();
compactors_[h + 1].merge_sort_in(std::move(promoted));
update_num_retained();
if (num_retained_ < max_nom_size_) break;
}
}
update_max_nom_size();
}
template<typename T, bool H, typename C, typename S, typename A>
string<A> req_sketch<T, H, C, S, A>::to_string(bool print_levels, bool print_items) const {
std::basic_ostringstream<char, std::char_traits<char>, AllocChar<A>> os;
os << "### REQ sketch summary:" << std::endl;
os << " K : " << k_ << std::endl;
os << " High Rank Acc : " << (H ? "true" : "false") << std::endl;
os << " Empty : " << (is_empty() ? "true" : "false") << std::endl;
os << " Estimation mode: " << (is_estimation_mode() ? "true" : "false") << std::endl;
os << " Sorted : " << (compactors_[0].is_sorted() ? "true" : "false") << std::endl;
os << " N : " << n_ << std::endl;
os << " Levels : " << compactors_.size() << std::endl;
os << " Retained items : " << num_retained_ << std::endl;
os << " Capacity items : " << max_nom_size_ << std::endl;
// os << " Storage bytes : " << get_serialized_size_bytes() << std::endl;
if (!is_empty()) {
os << " Min value : " << *min_value_ << std::endl;
os << " Max value : " << *max_value_ << std::endl;
}
os << "### End sketch summary" << std::endl;
if (print_levels) {
os << "### REQ sketch levels:" << std::endl;
os << " index: nominal capacity, actual size" << std::endl;
for (uint8_t i = 0; i < compactors_.size(); i++) {
os << " " << (unsigned int) i << ": "
<< compactors_[i].get_nom_capacity() << ", "
<< compactors_[i].get_num_items() << std::endl;
}
os << "### End sketch levels" << std::endl;
}
if (print_items) {
os << "### REQ sketch data:" << std::endl;
unsigned level = 0;
for (const auto& compactor: compactors_) {
os << " level " << level << ": " << std::endl;
for (const auto& item: compactor.get_items()) {
os << " " << item << std::endl;
}
++level;
}
os << "### End sketch data" << std::endl;
}
return os.str();
}
template<typename T, bool H, typename C, typename S, typename A>
class req_sketch<T, H, C, S, A>::item_deleter {
public:
item_deleter(const A& allocator): allocator_(allocator) {}
void operator() (T* ptr) {
if (ptr != nullptr) {
ptr->~T();
allocator_.deallocate(ptr, 1);
}
}
private:
A allocator_;
};
template<typename T, bool H, typename C, typename S, typename A>
req_sketch<T, H, C, S, A>::req_sketch(uint32_t k, uint64_t n, std::unique_ptr<T, item_deleter> min_value, std::unique_ptr<T, item_deleter> max_value, std::vector<Compactor, AllocCompactor>&& compactors):
allocator_(compactors.get_allocator()),
k_(k),
max_nom_size_(0),
num_retained_(0),
n_(n),
compactors_(std::move(compactors)),
min_value_(min_value.release()),
max_value_(max_value.release())
{
update_max_nom_size();
update_num_retained();
}
} /* namespace datasketches */
#endif