blob: 88a86ae0a1eddcf8e527220274766d509d7cad7b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef _EBPPS_SAMPLE_IMPL_HPP_
#define _EBPPS_SAMPLE_IMPL_HPP_
#include "common_defs.hpp"
#include "conditional_forward.hpp"
#include "ebpps_sample.hpp"
#include "serde.hpp"
#include <cmath>
#include <string>
#include <sstream>
namespace datasketches {
template<typename T, typename A>
ebpps_sample<T,A>::ebpps_sample(uint32_t reserved_size, const A& allocator) :
allocator_(allocator),
c_(0.0),
partial_item_(),
data_(allocator)
{
data_.reserve(reserved_size);
}
template<typename T, typename A>
ebpps_sample<T,A>::ebpps_sample(std::vector<T, A>&& data, optional<T>&& partial_item, double c, const A& allocator) :
allocator_(allocator),
c_(c),
partial_item_(partial_item),
data_(data, allocator)
{}
template<typename T, typename A>
template<typename TT>
void ebpps_sample<T,A>::replace_content(TT&& item, double theta) {
c_ = theta;
data_.clear();
partial_item_.reset();
if (theta == 1.0) {
data_.emplace_back(std::forward<TT>(item));
} else {
partial_item_.emplace(std::forward<TT>(item));
}
}
template<typename T, typename A>
auto ebpps_sample<T,A>::get_sample() const -> result_type {
double unused;
const double c_frac = std::modf(c_, &unused);
const bool include_partial = next_double() < c_frac;
const uint32_t result_size = static_cast<uint32_t>(data_.size()) + (include_partial ? 1 : 0);
result_type result;
result.reserve(result_size);
std::copy(data_.begin(), data_.end(), std::back_inserter(result));
if (include_partial)
result.emplace_back(static_cast<const T&>(*partial_item_));
return result;
}
template<typename T, typename A>
void ebpps_sample<T,A>::downsample(double theta) {
if (theta >= 1.0) return;
const double new_c = theta * c_;
double new_c_int;
const double new_c_frac = std::modf(new_c, &new_c_int);
double c_int;
const double c_frac = std::modf(c_, &c_int);
if (new_c_int == 0.0) {
// no full items retained
if (next_double() > (c_frac / c_)) {
swap_with_partial();
}
data_.clear();
} else if (new_c_int == c_int) {
// no items deleted
if (next_double() > (1 - theta * c_frac)/(1 - new_c_frac)) {
swap_with_partial();
}
} else {
if (next_double() < theta * c_frac) {
// subsample data in random order; last item is partial
// create sample size new_c_int then swap_with_partial()
subsample(static_cast<uint32_t>(new_c_int));
swap_with_partial();
} else {
// create sample size new_c_int + 1 then move_one_to_partial)
subsample(static_cast<uint32_t>(new_c_int) + 1);
move_one_to_partial();
}
}
if (new_c == new_c_int)
partial_item_.reset();
c_ = new_c;
}
template<typename T, typename A>
template<typename FwdSample>
void ebpps_sample<T,A>::merge(FwdSample&& other) {
double c_int;
const double c_frac = std::modf(c_, &c_int);
double unused;
const double other_c_frac = std::modf(other.c_, &unused);
// update c_ here but do NOT recompute fractional part yet
c_ += other.c_;
for (uint32_t i = 0; i < other.data_.size(); ++i)
data_.emplace_back(conditional_forward<FwdSample>(other.data_[i]));
// This modifies the original algorithm slightly due to numeric
// precision issues. Specifically, the test if c_frac + other_c_frac == 1.0
// happens before tests for < 1.0 or > 1.0 and can also be triggered
// if c_ == floor(c_) (the updated value of c_, not the input).
//
// We can still run into issues where c_frac + other_c_frac == epsilon
// and the first case would have ideally triggered. As a result, we must
// check if the partial item exists before adding to the data_ vector.
if (c_frac == 0.0 && other_c_frac == 0.0) {
partial_item_.reset();
} else if (c_frac + other_c_frac == 1.0 || c_ == std::floor(c_)) {
if (next_double() <= c_frac) {
if (partial_item_)
data_.emplace_back(std::move(*partial_item_));
} else {
if (other.partial_item_)
data_.emplace_back(conditional_forward<FwdSample>(*other.partial_item_));
}
partial_item_.reset();
} else if (c_frac + other_c_frac < 1.0) {
if (next_double() > c_frac / (c_frac + other_c_frac)) {
set_partial(conditional_forward<FwdSample>(*other.partial_item_));
}
} else { // c_frac + other_c_frac > 1
if (next_double() <= (1 - c_frac) / ((1 - c_frac) + (1 - other_c_frac))) {
data_.emplace_back(conditional_forward<FwdSample>(*other.partial_item_));
} else {
data_.emplace_back(std::move(*partial_item_));
partial_item_.reset();
set_partial(conditional_forward<FwdSample>(*other.partial_item_));
}
}
}
template<typename T, typename A>
string<A> ebpps_sample<T ,A>::to_string() const {
std::ostringstream oss;
oss << " sample:" << std::endl;
uint32_t idx = 0;
for (const T& item : data_)
oss << "\t" << idx++ << ":\t" << item << std::endl;
oss << " partial: ";
if (bool(partial_item_))
oss << (*partial_item_) << std::endl;
else
oss << "NULL" << std::endl;
return oss.str();
}
template<typename T, typename A>
void ebpps_sample<T,A>::subsample(uint32_t num_samples) {
// we can perform a Fisher-Yates style shuffle, stopping after
// num_samples points since subsequent swaps would only be
// between items after num_samples. This is valid since a
// point from anywhere in the initial array would be eligible
// to end up in the final subsample.
if (num_samples == data_.size()) return;
auto erase_start = data_.begin();
uint32_t data_len = static_cast<uint32_t>(data_.size());
for (uint32_t i = 0; i < num_samples; ++i, ++erase_start) {
uint32_t j = i + random_idx(data_len - i);
std::swap(data_[i], data_[j]);
}
data_.erase(erase_start, data_.end());
}
template<typename T, typename A>
template<typename FwdItem>
void ebpps_sample<T,A>::set_partial(FwdItem&& item) {
if (partial_item_)
*partial_item_ = conditional_forward<FwdItem>(item);
else
partial_item_.emplace(conditional_forward<FwdItem>(item));
}
template<typename T, typename A>
void ebpps_sample<T,A>::move_one_to_partial() {
const size_t idx = random_idx(static_cast<uint32_t>(data_.size()));
// swap selected item to end so we can delete it easily
const size_t last_idx = data_.size() - 1;
if (idx != last_idx) {
std::swap(data_[idx], data_[last_idx]);
}
set_partial(std::move(data_[last_idx]));
data_.pop_back();
}
template<typename T, typename A>
void ebpps_sample<T,A>::swap_with_partial() {
if (partial_item_) {
const size_t idx = random_idx(static_cast<uint32_t>(data_.size()));
std::swap(data_[idx], *partial_item_);
} else {
move_one_to_partial();
}
}
template<typename T, typename A>
void ebpps_sample<T,A>::reset() {
c_ = 0.0;
partial_item_.reset();
data_.clear();
}
template<typename T, typename A>
double ebpps_sample<T,A>::get_c() const {
return c_;
}
template<typename T, typename A>
auto ebpps_sample<T,A>::get_full_items() const -> result_type {
return result_type(data_);
}
template<typename T, typename A>
bool ebpps_sample<T,A>::has_partial_item() const {
return bool(partial_item_);
}
template<typename T, typename A>
T ebpps_sample<T,A>::get_partial_item() const {
if (!partial_item_) throw std::runtime_error("Call to get_partial_item() when no partial item exists");
return *partial_item_;
}
template<typename T, typename A>
uint32_t ebpps_sample<T,A>::random_idx(uint32_t max) {
static std::uniform_int_distribution<uint32_t> dist;
return dist(random_utils::rand, std::uniform_int_distribution<uint32_t>::param_type(0, max - 1));
}
template<typename T, typename A>
double ebpps_sample<T,A>::next_double() {
return random_utils::next_double(random_utils::rand);
}
template<typename T, typename A>
uint32_t ebpps_sample<T,A>::get_num_retained_items() const {
return static_cast<uint32_t>(data_.size() + (partial_item_ ? 1 : 0));
}
// implementation for fixed-size arithmetic types (integral and floating point)
template<typename T, typename A>
template<typename TT, typename SerDe, typename std::enable_if<std::is_arithmetic<TT>::value, int>::type>
size_t ebpps_sample<T, A>::get_serialized_size_bytes(const SerDe&) const {
if (c_ == 0.0)
return 0;
else
return sizeof(c_) + (data_.size() + (partial_item_ ? 1 : 0)) * sizeof(T);
}
// implementation for all other types
template<typename T, typename A>
template<typename TT, typename SerDe, typename std::enable_if<!std::is_arithmetic<TT>::value, int>::type>
size_t ebpps_sample<T, A>::get_serialized_size_bytes(const SerDe& sd) const {
if (c_ == 0.0) return 0;
size_t num_bytes = sizeof(c_);
for (auto it : data_)
num_bytes += sd.size_of_item(it);
if (partial_item_)
num_bytes += sd.size_of_item(*partial_item_);
return num_bytes;
}
template<typename T, typename A>
template<typename SerDe>
size_t ebpps_sample<T,A>::serialize(uint8_t* ptr, const uint8_t* end_ptr, const SerDe& sd) const {
uint8_t* st_ptr = ptr;
ensure_minimum_memory(end_ptr - ptr, sizeof(c_));
ptr += copy_to_mem(c_, ptr);
ptr += sd.serialize(ptr, end_ptr - ptr, data_.data(), static_cast<unsigned>(data_.size()));
if (partial_item_) {
ptr += sd.serialize(ptr, end_ptr - ptr, &*partial_item_, 1);
}
return ptr - st_ptr;
}
template<typename T, typename A>
template<typename SerDe>
void ebpps_sample<T,A>::serialize(std::ostream& os, const SerDe& sd) const {
write(os, c_);
sd.serialize(os, data_.data(), static_cast<unsigned>(data_.size()));
if (partial_item_)
sd.serialize(os, &*partial_item_, 1);
if (!os.good()) throw std::runtime_error("error writing to std::ostream");
}
template<typename T, typename A>
template<typename SerDe>
std::pair<ebpps_sample<T, A>, size_t> ebpps_sample<T, A>::deserialize(const uint8_t* ptr, size_t size, const SerDe& sd, const A& allocator) {
const uint8_t* st_ptr = ptr;
const uint8_t* end_ptr = ptr + size;
ensure_minimum_memory(size, sizeof(double));
double c;
ptr += copy_from_mem(ptr, c);
if (c < 0.0)
throw std::runtime_error("sketch image has C < 0.0 during deserializaiton");
double c_int;
const double c_frac = std::modf(c, &c_int);
const bool has_partial = c_frac != 0.0;
const uint32_t num_full_items = static_cast<uint32_t>(c_int);
A alloc(allocator);
std::unique_ptr<T, items_deleter> items(alloc.allocate(num_full_items), items_deleter(allocator, false, num_full_items));
ptr += sd.deserialize(ptr, end_ptr - ptr, items.get(), num_full_items);
// serde did not throw, enable destructors
items.get_deleter().set_destroy(true);
std::vector<T, A> data(std::make_move_iterator(items.get()),
std::make_move_iterator(items.get() + num_full_items),
allocator);
optional<T> partial_item;
if (has_partial) {
optional<T> tmp; // space to deserialize
ptr += sd.deserialize(ptr, end_ptr - ptr, &*tmp, 1);
// serde did not throw so place item and clean up
partial_item.emplace(*tmp);
(*tmp).~T();
}
return std::pair<ebpps_sample<T,A>, size_t>(
ebpps_sample<T,A>(std::move(data), std::move(partial_item), c, allocator),
ptr - st_ptr);
}
template<typename T, typename A>
template<typename SerDe>
ebpps_sample<T, A> ebpps_sample<T, A>::deserialize(std::istream& is, const SerDe& sd, const A& allocator) {
const double c = read<double>(is);
if (c < 0.0)
throw std::runtime_error("sketch image has C < 0.0 during deserializaiton");
double c_int;
const double c_frac = std::modf(c, &c_int);
const bool has_partial = c_frac != 0.0;
const uint32_t num_full_items = static_cast<uint32_t>(c_int);
A alloc(allocator);
std::unique_ptr<T, items_deleter> items(alloc.allocate(num_full_items), items_deleter(allocator, false, num_full_items));
sd.deserialize(is, items.get(), num_full_items);
// serde did not throw, enable destructors
items.get_deleter().set_destroy(true);
std::vector<T, A> data(std::make_move_iterator(items.get()),
std::make_move_iterator(items.get() + num_full_items),
allocator);
optional<T> partial_item;
if (has_partial) {
optional<T> tmp; // space to deserialize
sd.deserialize(is, &*tmp, 1);
// serde did not throw so place item and clean up
partial_item.emplace(*tmp);
(*tmp).~T();
}
if (!is.good()) throw std::runtime_error("error reading from std::istream");
return ebpps_sample<T,A>(std::move(data), std::move(partial_item), c, allocator);
}
template<typename T, typename A>
typename ebpps_sample<T, A>::const_iterator ebpps_sample<T, A>::begin() const {
return const_iterator(this);
}
template<typename T, typename A>
typename ebpps_sample<T, A>::const_iterator ebpps_sample<T, A>::end() const {
return const_iterator(nullptr);
}
// -------- ebpps_sketch::const_iterator implementation ---------
template<typename T, typename A>
ebpps_sample<T, A>::const_iterator::const_iterator(const ebpps_sample* sample) :
sample_(sample),
idx_(0),
use_partial_(false)
{
if (sample == nullptr)
return;
// determine in advance if we use the partial item
double c_int;
const double c_frac = std::modf(sample_->get_c(), &c_int);
use_partial_ = sample->next_double() < c_frac;
// sample with no items
if (sample_->data_.size() == 0 && use_partial_) {
idx_ = PARTIAL_IDX;
}
if (sample_->c_== 0.0 || (sample_->data_.size() == 0 && !sample_->has_partial_item())) { sample_ = nullptr; }
}
template<typename T, typename A>
ebpps_sample<T, A>::const_iterator::const_iterator(const const_iterator& other) :
sample_(other.sample_),
idx_(other.idx_),
use_partial_(other.use_partial_)
{}
template<typename T, typename A>
typename ebpps_sample<T, A>::const_iterator& ebpps_sample<T, A>::const_iterator::operator++() {
if (sample_ == nullptr)
return *this;
else if (idx_ == PARTIAL_IDX) {
idx_ = sample_->data_.size();
sample_ = nullptr;
return * this;
}
++idx_;
if (idx_ == sample_->data_.size()) {
if (use_partial_)
idx_ = PARTIAL_IDX;
else
sample_ = nullptr;
}
return *this;
}
template<typename T, typename A>
typename ebpps_sample<T, A>::const_iterator& ebpps_sample<T, A>::const_iterator::operator++(int) {
const_iterator tmp(*this);
operator++();
return tmp;
}
template<typename T, typename A>
bool ebpps_sample<T, A>::const_iterator::operator==(const const_iterator& other) const {
if (sample_ != other.sample_) return false;
if (sample_ == nullptr) return true; // end (and we know other.sample_ is also null)
return idx_ == other.idx_;
}
template<typename T, typename A>
bool ebpps_sample<T, A>::const_iterator::operator!=(const const_iterator& other) const {
return !operator==(other);
}
template<typename T, typename A>
auto ebpps_sample<T, A>::const_iterator::operator*() const -> reference {
if (idx_ == PARTIAL_IDX)
return *(sample_->partial_item_);
else
return sample_->data_[idx_];
}
template<typename T, typename A>
auto ebpps_sample<T, A>::const_iterator::operator->() const -> pointer {
return **this;
}
template<typename T, typename A>
class ebpps_sample<T, A>::items_deleter {
public:
items_deleter(const A& allocator, bool destroy, size_t num): allocator_(allocator), destroy_(destroy), num_(num) {}
void operator() (T* ptr) {
if (ptr != nullptr) {
if (destroy_) {
for (size_t i = 0; i < num_; ++i) {
ptr[i].~T();
}
}
allocator_.deallocate(ptr, num_);
}
}
void set_destroy(bool destroy) { destroy_ = destroy; }
private:
A allocator_;
bool destroy_;
size_t num_;
};
} // namespace datasketches
#endif // _EBPPS_SAMPLE_IMPL_HPP_