/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

#ifndef _VAR_OPT_SKETCH_IMPL_HPP_
#define _VAR_OPT_SKETCH_IMPL_HPP_

#include <memory>
#include <sstream>
#include <cmath>
#include <random>
#include <algorithm>

#include "var_opt_sketch.hpp"
#include "serde.hpp"
#include "bounds_binomial_proportions.hpp"
#include "count_zeros.hpp"
#include "memory_operations.hpp"
#include "ceiling_power_of_2.hpp"

namespace datasketches {

/**
 * Implementation code for the VarOpt sketch.
 * 
 * author Kevin Lang 
 * author Jon Malkin
 */
template<typename T, typename S, typename A>
var_opt_sketch<T,S,A>::var_opt_sketch(uint32_t k, resize_factor rf) :
  var_opt_sketch<T,S,A>(k, rf, false) {}

template<typename T, typename S, typename A>
var_opt_sketch<T,S,A>::var_opt_sketch(const var_opt_sketch& other) :
  k_(other.k_),
  h_(other.h_),
  m_(other.m_),
  r_(other.r_),
  n_(other.n_),
  total_wt_r_(other.total_wt_r_),
  rf_(other.rf_),
  curr_items_alloc_(other.curr_items_alloc_),
  filled_data_(other.filled_data_),
  data_(nullptr),
  weights_(nullptr),
  num_marks_in_h_(other.num_marks_in_h_),
  marks_(nullptr)
  {
    data_ = A().allocate(curr_items_alloc_);
    // skip gap or anything unused at the end
    for (size_t i = 0; i < h_; ++i)
      new (&data_[i]) T(other.data_[i]);
    for (size_t i = h_ + 1; i < h_ + r_ + 1; ++i)
      new (&data_[i]) T(other.data_[i]);

    // we skipped the gap
    filled_data_ = false;

    weights_ = AllocDouble().allocate(curr_items_alloc_);
    // doubles so can successfully copy regardless of the internal state
    std::copy(&other.weights_[0], &other.weights_[curr_items_alloc_], weights_);
    
    if (other.marks_ != nullptr) {
      marks_ = AllocBool().allocate(curr_items_alloc_);
      std::copy(&other.marks_[0], &other.marks_[curr_items_alloc_], marks_);
    }
  }

template<typename T, typename S, typename A>
var_opt_sketch<T,S,A>::var_opt_sketch(const var_opt_sketch& other, bool as_sketch, uint64_t adjusted_n) :
  k_(other.k_),
  h_(other.h_),
  m_(other.m_),
  r_(other.r_),
  n_(adjusted_n),
  total_wt_r_(other.total_wt_r_),
  rf_(other.rf_),
  curr_items_alloc_(other.curr_items_alloc_),
  filled_data_(other.filled_data_),
  data_(nullptr),
  weights_(nullptr),
  num_marks_in_h_(other.num_marks_in_h_),
  marks_(nullptr)
  {
    data_ = A().allocate(curr_items_alloc_);
    // skip gap or anything unused at the end
    for (size_t i = 0; i < h_; ++i)
      new (&data_[i]) T(other.data_[i]);
    for (size_t i = h_ + 1; i < h_ + r_ + 1; ++i)
      new (&data_[i]) T(other.data_[i]);
    
    // we skipped the gap
    filled_data_ = false;

    weights_ = AllocDouble().allocate(curr_items_alloc_);
    // doubles so can successfully copy regardless of the internal state
    std::copy(&other.weights_[0], &other.weights_[curr_items_alloc_], weights_);

    if (!as_sketch && other.marks_ != nullptr) {
      marks_ = AllocBool().allocate(curr_items_alloc_);
      std::copy(&other.marks_[0], &other.marks_[curr_items_alloc_], marks_);
    }
  }

template<typename T, typename S, typename A>
var_opt_sketch<T,S,A>::var_opt_sketch(T* data, double* weights, size_t len,
                                      uint32_t k, uint64_t n, uint32_t h_count, uint32_t r_count, double total_wt_r) :
  k_(k),
  h_(h_count),
  m_(0),
  r_(r_count),
  n_(n),
  total_wt_r_(total_wt_r),
  rf_(DEFAULT_RESIZE_FACTOR),
  curr_items_alloc_(len),
  filled_data_(n > k),
  data_(data),
  weights_(weights),
  num_marks_in_h_(0),
  marks_(nullptr)
  {}

template<typename T, typename S, typename A>
var_opt_sketch<T,S,A>::var_opt_sketch(var_opt_sketch&& other) noexcept :
  k_(other.k_),
  h_(other.h_),
  m_(other.m_),
  r_(other.r_),
  n_(other.n_),
  total_wt_r_(other.total_wt_r_),
  rf_(other.rf_),
  curr_items_alloc_(other.curr_items_alloc_),
  filled_data_(other.filled_data_),
  data_(other.data_),
  weights_(other.weights_),
  num_marks_in_h_(other.num_marks_in_h_),
  marks_(other.marks_)
  {
    other.data_ = nullptr;
    other.weights_ = nullptr;
    other.marks_ = nullptr;
  }

template<typename T, typename S, typename A>
var_opt_sketch<T,S,A>::var_opt_sketch(uint32_t k, resize_factor rf, bool is_gadget) :
  k_(k), h_(0), m_(0), r_(0), n_(0), total_wt_r_(0.0), rf_(rf) {
  if (k == 0 || k_ > MAX_K) {
    throw std::invalid_argument("k must be at least 1 and less than 2^31 - 1");
  }

  uint32_t ceiling_lg_k = to_log_2(ceiling_power_of_2(k_));
  uint32_t initial_lg_size = starting_sub_multiple(ceiling_lg_k, rf_, MIN_LG_ARR_ITEMS);
  curr_items_alloc_ = get_adjusted_size(k_, 1 << initial_lg_size);
  if (curr_items_alloc_ == k_) { // if full size, need to leave 1 for the gap
    ++curr_items_alloc_;
  }

  allocate_data_arrays(curr_items_alloc_, is_gadget);
  num_marks_in_h_ = 0;
}

template<typename T, typename S, typename A>
var_opt_sketch<T,S,A>::var_opt_sketch(uint32_t k, uint32_t h, uint32_t m, uint32_t r, uint64_t n, double total_wt_r, resize_factor rf,
                                      uint32_t curr_items_alloc, bool filled_data, std::unique_ptr<T, items_deleter> items,
                                      std::unique_ptr<double, weights_deleter> weights, uint32_t num_marks_in_h,
                                      std::unique_ptr<bool, marks_deleter> marks) :
  k_(k),
  h_(h),
  m_(m),
  r_(r),
  n_(n),
  total_wt_r_(total_wt_r),
  rf_(rf),
  curr_items_alloc_(curr_items_alloc),
  filled_data_(filled_data),
  data_(items.release()),
  weights_(weights.release()),
  num_marks_in_h_(num_marks_in_h),
  marks_(marks.release())
{}


template<typename T, typename S, typename A>
var_opt_sketch<T,S,A>::~var_opt_sketch() {
  if (data_ != nullptr) {
    if (filled_data_) {
      // destroy everything
      const size_t num_to_destroy = std::min(k_ + 1, curr_items_alloc_);
      for (size_t i = 0; i < num_to_destroy; ++i) {
        A().destroy(data_ + i);
      }
    } else {
      // skip gap or anything unused at the end
      for (size_t i = 0; i < h_; ++i) {
        A().destroy(data_+ i);
      }
    
      for (size_t i = h_ + 1; i < h_ + r_ + 1; ++i) {
        A().destroy(data_ + i);
      }
    }
    A().deallocate(data_, curr_items_alloc_);
  }

  if (weights_ != nullptr) {
    AllocDouble().deallocate(weights_, curr_items_alloc_);
  }
  
  if (marks_ != nullptr) {
    AllocBool().deallocate(marks_, curr_items_alloc_);
  }
}

template<typename T, typename S, typename A>
var_opt_sketch<T,S,A>& var_opt_sketch<T,S,A>::operator=(const var_opt_sketch& other) {
  var_opt_sketch<T,S,A> sk_copy(other);
  std::swap(k_, sk_copy.k_);
  std::swap(h_, sk_copy.h_);
  std::swap(m_, sk_copy.m_);
  std::swap(r_, sk_copy.r_);
  std::swap(n_, sk_copy.n_);
  std::swap(total_wt_r_, sk_copy.total_wt_r_);
  std::swap(rf_, sk_copy.rf_);
  std::swap(curr_items_alloc_, sk_copy.curr_items_alloc_);
  std::swap(filled_data_, sk_copy.filled_data_);
  std::swap(data_, sk_copy.data_);
  std::swap(weights_, sk_copy.weights_);
  std::swap(num_marks_in_h_, sk_copy.num_marks_in_h_);
  std::swap(marks_, sk_copy.marks_);
  return *this;
}

template<typename T, typename S, typename A>
var_opt_sketch<T,S,A>& var_opt_sketch<T,S,A>::operator=(var_opt_sketch&& other) {
  std::swap(k_, other.k_);
  std::swap(h_, other.h_);
  std::swap(m_, other.m_);
  std::swap(r_, other.r_);
  std::swap(n_, other.n_);
  std::swap(total_wt_r_, other.total_wt_r_);
  std::swap(rf_, other.rf_);
  std::swap(curr_items_alloc_, other.curr_items_alloc_);
  std::swap(filled_data_, other.filled_data_);
  std::swap(data_, other.data_);
  std::swap(weights_, other.weights_);
  std::swap(num_marks_in_h_, other.num_marks_in_h_);
  std::swap(marks_, other.marks_);
  return *this;
}

/*
 * An empty sketch requires 8 bytes.
 *
 * <pre>
 * Long || Start Byte Adr:
 * Adr:
 *      ||       0        |    1   |    2   |    3   |    4   |    5   |    6   |    7   |
 *  0   || Preamble_Longs | SerVer | FamID  |  Flags |---------Max Res. Size (K)---------|
 * </pre>
 *
 * A non-empty sketch requires 24 bytes of preamble for an under-full sample; once there are
 * at least k items the sketch uses 32 bytes of preamble.
 *
 * The count of items seen is limited to 48 bits (~256 trillion) even though there are adjacent
 * unused preamble bits. The acceptance probability for an item is a double in the range [0,1),
 * limiting us to 53 bits of randomness due to details of the IEEE floating point format. To
 * ensure meaningful probabilities as the items seen count approaches capacity, we intentionally
 * use slightly fewer bits.
 * 
 * Following the header are weights for the heavy items, then marks in the event this is a gadget.
 * The serialized items come last.
 * 
 * <pre>
 * Long || Start Byte Adr:
 * Adr:
 *      ||       0        |    1   |    2   |    3   |    4   |    5   |    6   |    7   |
 *  0   || Preamble_Longs | SerVer | FamID  |  Flags |---------Max Res. Size (K)---------|
 *
 *      ||       8        |    9   |   10   |   11   |   12   |   13   |   14   |   15   |
 *  1   ||---------------------------Items Seen Count (N)--------------------------------|
 *
 *      ||      16        |   17   |   18   |   19   |   20   |   21   |   22   |   23   |
 *  2   ||-------------Item Count in H---------------|-------Item Count in R-------------|
 *
 *      ||      24        |   25   |   26   |   27   |   28   |   29   |   30   |   31   |
 *  3   ||-------------------------------Total Weight in R-------------------------------|
 * </pre>
 */

// implementation for fixed-size arithmetic types (integral and floating point)
template<typename T, typename S, typename A>
template<typename TT, typename std::enable_if<std::is_arithmetic<TT>::value, int>::type>
size_t var_opt_sketch<T,S,A>::get_serialized_size_bytes() const {
  if (is_empty()) { return PREAMBLE_LONGS_EMPTY << 3; }
  size_t num_bytes = (r_ == 0 ? PREAMBLE_LONGS_WARMUP : PREAMBLE_LONGS_FULL) << 3;
  num_bytes += h_ * sizeof(double);    // weights
  if (marks_ != nullptr) {             // marks
    num_bytes += (h_ / 8) + (h_ % 8 > 0);
  }
  num_bytes += (h_ + r_) * sizeof(TT); // the actual items
  return num_bytes;
}

// implementation for all other types
template<typename T, typename S, typename A>
template<typename TT, typename std::enable_if<!std::is_arithmetic<TT>::value, int>::type>
size_t var_opt_sketch<T,S,A>::get_serialized_size_bytes() const {
  if (is_empty()) { return PREAMBLE_LONGS_EMPTY << 3; }
  size_t num_bytes = (r_ == 0 ? PREAMBLE_LONGS_WARMUP : PREAMBLE_LONGS_FULL) << 3;
  num_bytes += h_ * sizeof(double);    // weights
  if (marks_ != nullptr) {             // marks
    num_bytes += (h_ / 8) + (h_ % 8 > 0);
  }
  // must iterate over the items
  for (auto& it: *this)
    num_bytes += S().size_of_item(it.first);
  return num_bytes;
}

template<typename T, typename S, typename A>
std::vector<uint8_t, AllocU8<A>> var_opt_sketch<T,S,A>::serialize(unsigned header_size_bytes) const {
  const size_t size = header_size_bytes + get_serialized_size_bytes();
  std::vector<uint8_t, AllocU8<A>> bytes(size);
  uint8_t* ptr = bytes.data() + header_size_bytes;
  uint8_t* end_ptr = ptr + size;

  bool empty = is_empty();
  uint8_t preLongs = (empty ? PREAMBLE_LONGS_EMPTY
                                  : (r_ == 0 ? PREAMBLE_LONGS_WARMUP : PREAMBLE_LONGS_FULL));
  uint8_t first_byte = (preLongs & 0x3F) | ((static_cast<uint8_t>(rf_)) << 6);
  uint8_t flags = (marks_ != nullptr ? GADGET_FLAG_MASK : 0);

  if (empty) {
    flags |= EMPTY_FLAG_MASK;
  }

  // first prelong
  uint8_t ser_ver(SER_VER);
  uint8_t family(FAMILY_ID);
  ptr += copy_to_mem(&first_byte, ptr, sizeof(uint8_t));
  ptr += copy_to_mem(&ser_ver, ptr, sizeof(uint8_t));
  ptr += copy_to_mem(&family, ptr, sizeof(uint8_t));
  ptr += copy_to_mem(&flags, ptr, sizeof(uint8_t));
  ptr += copy_to_mem(&k_, ptr, sizeof(uint32_t));

  if (!empty) {
    // second and third prelongs
    ptr += copy_to_mem(&n_, ptr, sizeof(uint64_t));
    ptr += copy_to_mem(&h_, ptr, sizeof(uint32_t));
    ptr += copy_to_mem(&r_, ptr, sizeof(uint32_t));

    // fourth prelong, if needed
    if (r_ > 0) {
      ptr += copy_to_mem(&total_wt_r_, ptr, sizeof(double));
    }

    // first h_ weights
    ptr += copy_to_mem(weights_, ptr, h_ * sizeof(double));

    // first h_ marks as packed bytes iff we have a gadget
    if (marks_ != nullptr) {
      uint8_t val = 0;
      for (uint32_t i = 0; i < h_; ++i) {
        if (marks_[i]) {
          val |= 0x1 << (i & 0x7);
        }

        if ((i & 0x7) == 0x7) {
          ptr += copy_to_mem(&val, ptr, sizeof(uint8_t));
          val = 0;
        }
      }

      // write out any remaining values
      if ((h_ & 0x7) > 0) {
        ptr += copy_to_mem(&val, ptr, sizeof(uint8_t));
      }
    }

    // write the sample items, skipping the gap. Either h_ or r_ may be 0
    ptr += S().serialize(ptr, end_ptr - ptr, data_, h_);
    ptr += S().serialize(ptr, end_ptr - ptr, &data_[h_ + 1], r_);
  }
  
  size_t bytes_written = ptr - bytes.data();
  if (bytes_written != size) {
    throw std::logic_error("serialized size mismatch: " + std::to_string(bytes_written) + " != " + std::to_string(size));
  }

  return bytes;
}

template<typename T, typename S, typename A>
void var_opt_sketch<T,S,A>::serialize(std::ostream& os) const {
  const bool empty = (h_ == 0) && (r_ == 0);

  const uint8_t preLongs = (empty ? PREAMBLE_LONGS_EMPTY
                                  : (r_ == 0 ? PREAMBLE_LONGS_WARMUP : PREAMBLE_LONGS_FULL));
  const uint8_t first_byte = (preLongs & 0x3F) | ((static_cast<uint8_t>(rf_)) << 6);
  uint8_t flags = (marks_ != nullptr ? GADGET_FLAG_MASK : 0);

  if (empty) {
    flags |= EMPTY_FLAG_MASK;
  }

  // first prelong
  const uint8_t ser_ver(SER_VER);
  const uint8_t family(FAMILY_ID);
  os.write((char*)&first_byte, sizeof(uint8_t));
  os.write((char*)&ser_ver, sizeof(uint8_t));
  os.write((char*)&family, sizeof(uint8_t));
  os.write((char*)&flags, sizeof(uint8_t));
  os.write((char*)&k_, sizeof(uint32_t));

  if (!empty) {
    // second and third prelongs
    os.write((char*)&n_, sizeof(uint64_t));
    os.write((char*)&h_, sizeof(uint32_t));
    os.write((char*)&r_, sizeof(uint32_t));
    
    // fourth prelong, if needed
    if (r_ > 0) {
      os.write((char*)&total_wt_r_, sizeof(double));
    }

    // write the first h_ weights
    os.write((char*)weights_, h_ * sizeof(double));

    // write the first h_ marks as packed bytes iff we have a gadget
    if (marks_ != nullptr) {
      uint8_t val = 0;
      for (uint32_t i = 0; i < h_; ++i) {
        if (marks_[i]) {
          val |= 0x1 << (i & 0x7);
        }

        if ((i & 0x7) == 0x7) {
          os.write((char*)&val, sizeof(uint8_t));
          val = 0;
        }
      }

      // write out any remaining values
      if ((h_ & 0x7) > 0) {
        os.write((char*)&val, sizeof(uint8_t));
      }
    }

    // write the sample items, skipping the gap. Either h_ or r_ may be 0
    S().serialize(os, data_, h_);
    S().serialize(os, &data_[h_ + 1], r_);
  }
}

template<typename T, typename S, typename A>
var_opt_sketch<T,S,A> var_opt_sketch<T,S,A>::deserialize(const void* bytes, size_t size) {
  ensure_minimum_memory(size, 8);
  const char* ptr = static_cast<const char*>(bytes);
  const char* base = ptr;
  const char* end_ptr = ptr + size;
  uint8_t first_byte;
  ptr += copy_from_mem(ptr, &first_byte, sizeof(first_byte));
  uint8_t preamble_longs = first_byte & 0x3f;
  resize_factor rf = static_cast<resize_factor>((first_byte >> 6) & 0x03);
  uint8_t serial_version;
  ptr += copy_from_mem(ptr, &serial_version, sizeof(serial_version));
  uint8_t family_id;
  ptr += copy_from_mem(ptr, &family_id, sizeof(family_id));
  uint8_t flags;
  ptr += copy_from_mem(ptr, &flags, sizeof(flags));
  uint32_t k;
  ptr += copy_from_mem(ptr, &k, sizeof(k));

  check_preamble_longs(preamble_longs, flags);
  check_family_and_serialization_version(family_id, serial_version);
  ensure_minimum_memory(size, preamble_longs << 3);

  const bool is_empty = flags & EMPTY_FLAG_MASK;
  const bool is_gadget = flags & GADGET_FLAG_MASK;

  if (is_empty) {
    return var_opt_sketch<T,S,A>(k, rf, is_gadget);
  }

  // second and third prelongs
  uint64_t n;
  uint32_t h, r;
  ptr += copy_from_mem(ptr, &n, sizeof(n));
  ptr += copy_from_mem(ptr, &h, sizeof(h));
  ptr += copy_from_mem(ptr, &r, sizeof(r));

  const uint32_t array_size = validate_and_get_target_size(preamble_longs, k, n, h, r, rf);
  
  // current_items_alloc_ is set but validate R region weight (4th prelong), if needed, before allocating
  double total_wt_r = 0.0;
  if (preamble_longs == PREAMBLE_LONGS_FULL) {
    ptr += copy_from_mem(ptr, &total_wt_r, sizeof(total_wt_r));
    if (std::isnan(total_wt_r) || r == 0 || total_wt_r <= 0.0) {
      throw std::invalid_argument("Possible corruption: deserializing in full mode but r = 0 or invalid R weight. "
       "Found r = " + std::to_string(r) + ", R region weight = " + std::to_string(total_wt_r));
    }
  } else {
    total_wt_r = 0.0;
  }

  // read the first h_ weights, fill in rest of array with -1.0
  check_memory_size(ptr - base + (h * sizeof(double)), size);
  std::unique_ptr<double, weights_deleter> weights(AllocDouble().allocate(array_size), weights_deleter(array_size));
  double* wts = weights.get(); // to avoid lots of .get() calls -- do not delete
  ptr += copy_from_mem(ptr, wts, h * sizeof(double));
  for (size_t i = 0; i < h; ++i) {
    if (!(wts[i] > 0.0)) {
      throw std::invalid_argument("Possible corruption: Non-positive weight when deserializing: " + std::to_string(wts[i]));
    }
  }
  std::fill(&wts[h], &wts[array_size], -1.0);
  
  // read the first h_ marks as packed bytes iff we have a gadget
  uint32_t num_marks_in_h = 0;
  std::unique_ptr<bool, marks_deleter> marks(nullptr, marks_deleter(array_size));
  if (is_gadget) {
    uint8_t val = 0;
    marks = std::unique_ptr<bool, marks_deleter>(AllocBool().allocate(array_size), marks_deleter(array_size));
    const size_t size_marks = (h / 8) + (h % 8 > 0 ? 1 : 0);
    check_memory_size(ptr - base + size_marks, size);
    for (uint32_t i = 0; i < h; ++i) {
     if ((i & 0x7) == 0x0) { // should trigger on first iteration
        ptr += copy_from_mem(ptr, &val, sizeof(val));
      }
      marks.get()[i] = ((val >> (i & 0x7)) & 0x1) == 1;
      num_marks_in_h += (marks.get()[i] ? 1 : 0);
    }
  }

  // read the sample items, skipping the gap. Either h_ or r_ may be 0
  items_deleter deleter(array_size);
  std::unique_ptr<T, items_deleter> items(A().allocate(array_size), deleter);
  
  ptr += S().deserialize(ptr, end_ptr - ptr, items.get(), h);
  items.get_deleter().set_h(h); // serde didn't throw, so the items are now valid
  
  ptr += S().deserialize(ptr, end_ptr - ptr, &(items.get()[h + 1]), r);
  items.get_deleter().set_r(r); // serde didn't throw, so the items are now valid

  return var_opt_sketch(k, h, (r > 0 ? 1 : 0), r, n, total_wt_r, rf, array_size, false,
                        std::move(items), std::move(weights), num_marks_in_h, std::move(marks));
}

template<typename T, typename S, typename A>
var_opt_sketch<T,S,A> var_opt_sketch<T,S,A>::deserialize(std::istream& is) {
  uint8_t first_byte;
  is.read((char*)&first_byte, sizeof(first_byte));
  uint8_t preamble_longs = first_byte & 0x3f;
  resize_factor rf = static_cast<resize_factor>((first_byte >> 6) & 0x03);
  uint8_t serial_version;
  is.read((char*)&serial_version, sizeof(serial_version));
  uint8_t family_id;
  is.read((char*)&family_id, sizeof(family_id));
  uint8_t flags;
  is.read((char*)&flags, sizeof(flags));
  uint32_t k;
  is.read((char*)&k, sizeof(k));

  check_preamble_longs(preamble_longs, flags);
  check_family_and_serialization_version(family_id, serial_version);

  const bool is_empty = flags & EMPTY_FLAG_MASK;
  const bool is_gadget = flags & GADGET_FLAG_MASK;

  if (is_empty) {
    if (!is.good())
      throw std::runtime_error("error reading from std::istream"); 
    else
      return var_opt_sketch<T,S,A>(k, rf, is_gadget);
  }

  // second and third prelongs
  uint64_t n;
  uint32_t h, r;
  is.read((char*)&n, sizeof(n));
  is.read((char*)&h, sizeof(h));
  is.read((char*)&r, sizeof(r));

  const uint32_t array_size = validate_and_get_target_size(preamble_longs, k, n, h, r, rf);

  // current_items_alloc_ is set but validate R region weight (4th prelong), if needed, before allocating
  double total_wt_r = 0.0;
  if (preamble_longs == PREAMBLE_LONGS_FULL) { 
    is.read((char*)&total_wt_r, sizeof(total_wt_r));
    if (std::isnan(total_wt_r) || r == 0 || total_wt_r <= 0.0) {
      throw std::invalid_argument("Possible corruption: deserializing in full mode but r = 0 or invalid R weight. "
       "Found r = " + std::to_string(r) + ", R region weight = " + std::to_string(total_wt_r));
    }
  } else {
    total_wt_r = 0.0;
  }

  // read the first h weights, fill remainder with -1.0
  std::unique_ptr<double, weights_deleter> weights(AllocDouble().allocate(array_size), weights_deleter(array_size));
  double* wts = weights.get(); // to avoid lots of .get() calls -- do not delete
  is.read((char*)wts, h * sizeof(double));
  for (size_t i = 0; i < h; ++i) {
    if (!(wts[i] > 0.0)) {
      throw std::invalid_argument("Possible corruption: Non-positive weight when deserializing: " + std::to_string(wts[i]));
    }
  }
  std::fill(&wts[h], &wts[array_size], -1.0);

  // read the first h_ marks as packed bytes iff we have a gadget
  uint32_t num_marks_in_h = 0;
  std::unique_ptr<bool, marks_deleter> marks(nullptr, marks_deleter(array_size));
  if (is_gadget) {
    marks = std::unique_ptr<bool, marks_deleter>(AllocBool().allocate(array_size), marks_deleter(array_size));
    uint8_t val = 0;
    for (uint32_t i = 0; i < h; ++i) {
      if ((i & 0x7) == 0x0) { // should trigger on first iteration
        is.read((char*)&val, sizeof(val));
      }
      marks.get()[i] = ((val >> (i & 0x7)) & 0x1) == 1;
      num_marks_in_h += (marks.get()[i] ? 1 : 0);
    }
  }

  // read the sample items, skipping the gap. Either h or r may be 0
  items_deleter deleter(array_size);
  std::unique_ptr<T, items_deleter> items(A().allocate(array_size), deleter);
  
  S().deserialize(is, items.get(), h); // aka &data_[0]
  items.get_deleter().set_h(h); // serde didn't throw, so the items are now valid
  
  S().deserialize(is, &(items.get()[h + 1]), r);
  items.get_deleter().set_r(r); // serde didn't throw, so the items are now valid

  if (!is.good())
    throw std::runtime_error("error reading from std::istream"); 

  return var_opt_sketch(k, h, (r > 0 ? 1 : 0), r, n, total_wt_r, rf, array_size, false,
                        std::move(items), std::move(weights), num_marks_in_h, std::move(marks));
}

template<typename T, typename S, typename A>
bool var_opt_sketch<T,S,A>::is_empty() const {
  return (h_ == 0 && r_ == 0);
}

template<typename T, typename S, typename A>
void var_opt_sketch<T,S,A>::reset() {
  const uint32_t prev_alloc = curr_items_alloc_;
  const uint32_t ceiling_lg_k = to_log_2(ceiling_power_of_2(k_));
  const uint32_t initial_lg_size = starting_sub_multiple(ceiling_lg_k, rf_, MIN_LG_ARR_ITEMS);
  curr_items_alloc_ = get_adjusted_size(k_, 1 << initial_lg_size);
  if (curr_items_alloc_ == k_) { // if full size, need to leave 1 for the gap
    ++curr_items_alloc_;
  }

  if (filled_data_) {
    // destroy everything
    const size_t num_to_destroy = std::min(k_ + 1, prev_alloc);
    for (size_t i = 0; i < num_to_destroy; ++i) 
      A().destroy(data_ + i);      
  } else {
    // skip gap or anything unused at the end
    for (size_t i = 0; i < h_; ++i)
      A().destroy(data_+ i);
    
    for (size_t i = h_ + 1; i < h_ + r_ + 1; ++i)
      A().destroy(data_ + i);
  }

  if (curr_items_alloc_ < prev_alloc) {
    const bool is_gadget = (marks_ != nullptr);
  
    A().deallocate(data_, prev_alloc);
    AllocDouble().deallocate(weights_, prev_alloc);
  
    if (marks_ != nullptr)
      AllocBool().deallocate(marks_, prev_alloc);

    allocate_data_arrays(curr_items_alloc_, is_gadget);
  }
  
  n_ = 0;
  h_ = 0;
  m_ = 0;
  r_ = 0;
  num_marks_in_h_ = 0;
  total_wt_r_ = 0.0;
  filled_data_ = false;
}

template<typename T, typename S, typename A>
uint64_t var_opt_sketch<T,S,A>::get_n() const {
  return n_;
}

template<typename T, typename S, typename A>
uint32_t var_opt_sketch<T,S,A>::get_k() const {
  return k_;
}

template<typename T, typename S, typename A>
uint32_t var_opt_sketch<T,S,A>::get_num_samples() const {
  const uint32_t num_in_sketch = h_ + r_;
  return (num_in_sketch < k_ ? num_in_sketch : k_);
}

template<typename T, typename S, typename A>
void var_opt_sketch<T,S,A>::update(const T& item, double weight) {
  update(item, weight, false);
}

template<typename T, typename S, typename A>
void var_opt_sketch<T,S,A>::update(T&& item, double weight) {
  update(std::move(item), weight, false);
}

template<typename T, typename S, typename A>
string<A> var_opt_sketch<T,S,A>::to_string() const {
  std::basic_ostringstream<char, std::char_traits<char>, AllocChar<A>> os;
  os << "### VarOpt SUMMARY: " << std::endl;
  os << "   k            : " << k_ << std::endl;
  os << "   h            : " << h_ << std::endl;
  os << "   r            : " << r_ << std::endl;
  os << "   weight_r     : " << total_wt_r_ << std::endl;
  os << "   Current size : " << curr_items_alloc_ << std::endl;
  os << "   Resize factor: " << (1 << rf_) << std::endl;
  os << "### END SKETCH SUMMARY" << std::endl;
  return os.str();
}

template<typename T, typename S, typename A>
string<A> var_opt_sketch<T,S,A>::items_to_string() const {
  std::basic_ostringstream<char, std::char_traits<char>, AllocChar<A>> os;
  os << "### Sketch Items" << std::endl;
  int idx = 0;
  for (auto record : *this) {
    os << idx << ": " << record.first << "\twt = " << record.second << std::endl;
    ++idx;
  }
  return os.str();
}

template<typename T, typename S, typename A>
string<A> var_opt_sketch<T,S,A>::items_to_string(bool print_gap) const {
  std::basic_ostringstream<char, std::char_traits<char>, AllocChar<A>> os;
  os << "### Sketch Items" << std::endl;
  const uint32_t array_length = (n_ < k_ ? n_ : k_ + 1);
  for (uint32_t i = 0, display_idx = 0; i < array_length; ++i) {
    if (i == h_ && print_gap) {
      os << i << ": GAP" << std::endl;
      ++display_idx;
    } else {
      os << i << ": " << data_[i] << "\twt = ";
      if (weights_[i] == -1.0) {
        os << get_tau() << "\t(-1.0)" << std::endl;
      } else {
        os << weights_[i] << std::endl;
      }
      ++display_idx;
    }
  }
  return os.str();
}

template<typename T, typename S, typename A>
template<typename O>
void var_opt_sketch<T,S,A>::update(O&& item, double weight, bool mark) {
  if (weight < 0.0 || std::isnan(weight) || std::isinf(weight)) {
    throw std::invalid_argument("Item weights must be nonnegative and finite. Found: "
                                + std::to_string(weight));
  } else if (weight == 0.0) {
    return;
  }
  ++n_;

  if (r_ == 0) {
    // exact mode
    update_warmup_phase(std::forward<O>(item), weight, mark);
  } else {
    // sketch is in estimation mode so we can make the following check,
    // although very conservative to check every time
    if ((h_ != 0) && (peek_min() < get_tau()))
      throw std::logic_error("sketch not in valid estimation mode");

    // what tau would be if deletion candidates turn out to be R plus the new item
    // note: (r_ + 1) - 1 is intentional
    const double hypothetical_tau = (weight + total_wt_r_) / ((r_ + 1) - 1);

    // is new item's turn to be considered for reservoir?
    const double condition1 = (h_ == 0) || (weight <= peek_min());

    // is new item light enough for reservoir?
    const double condition2 = weight < hypothetical_tau;
  
    if (condition1 && condition2) {
      update_light(std::forward<O>(item), weight, mark);
    } else if (r_ == 1) {
      update_heavy_r_eq1(std::forward<O>(item), weight, mark);
    } else {
      update_heavy_general(std::forward<O>(item), weight, mark);
    }
  }
}

template<typename T, typename S, typename A>
template<typename O>
void var_opt_sketch<T,S,A>::update_warmup_phase(O&& item, double weight, bool mark) {
  // seems overly cautious
  if (r_ > 0 || m_ != 0 || h_ > k_) throw std::logic_error("invalid sketch state during warmup");

  if (h_ >= curr_items_alloc_) {
    grow_data_arrays();
  }

  // store items as they come in until full
  new (&data_[h_]) T(std::forward<O>(item));
  weights_[h_] = weight;
  if (marks_ != nullptr) {
    marks_[h_] = mark;
  }
  ++h_;
  num_marks_in_h_ += mark ? 1 : 0;

  // check if need to heapify
  if (h_ > k_) {
    filled_data_ = true;
    transition_from_warmup();
  }
}

/* In the "light" case the new item has weight <= old_tau, so
   would appear to the right of the R items in a hypothetical reverse-sorted
   list. It is easy to prove that it is light enough to be part of this
   round's downsampling */
template<typename T, typename S, typename A>
template<typename O>
void var_opt_sketch<T,S,A>::update_light(O&& item, double weight, bool mark) {
  if (r_ == 0 || (r_ + h_) != k_) throw std::logic_error("invalid sketch state during light warmup");

  const uint32_t m_slot = h_; // index of the gap, which becomes the M region
  if (filled_data_) {
    data_[m_slot] = std::forward<O>(item);
  } else {
    new (&data_[m_slot]) T(std::forward<O>(item));
    filled_data_ = true;
  }
  weights_[m_slot] = weight;
  if (marks_ != nullptr) { marks_[m_slot] = mark; }
  ++m_;
  
  grow_candidate_set(total_wt_r_ + weight, r_ + 1);
}

/* In the "heavy" case the new item has weight > old_tau, so would
   appear to the left of items in R in a hypothetical reverse-sorted list and
   might or might not be light enough be part of this round's downsampling.
   [After first splitting off the R=1 case] we greatly simplify the code by
   putting the new item into the H heap whether it needs to be there or not.
   In other words, it might go into the heap and then come right back out,
   but that should be okay because pseudo_heavy items cannot predominate
   in long streams unless (max wt) / (min wt) > o(exp(N)) */
template<typename T, typename S, typename A>
template<typename O>
void var_opt_sketch<T,S,A>::update_heavy_general(O&& item, double weight, bool mark) {
  if (r_ < 2 || m_ != 0 || (r_ + h_) != k_) throw std::logic_error("invalid sketch state during heavy general update");

  // put into H, although may come back out momentarily
  push(std::forward<O>(item), weight, mark);

  grow_candidate_set(total_wt_r_, r_);
}

/* The analysis of this case is similar to that of the general heavy case.
   The one small technical difference is that since R < 2, we must grab an M item
   to have a valid starting point for continue_by_growing_candidate_set () */
template<typename T, typename S, typename A>
template<typename O>
void var_opt_sketch<T,S,A>::update_heavy_r_eq1(O&& item, double weight, bool mark) {
  if (r_ != 1 || m_ != 0 || (r_ + h_) != k_) throw std::logic_error("invalid sketch state during heavy r=1 update");

  push(std::forward<O>(item), weight, mark);  // new item into H
  pop_min_to_m_region();     // pop lightest back into M

  // Any set of two items is downsample-able to one item,
  // so the two lightest items are a valid starting point for the following
  const uint32_t m_slot = k_ - 1; // array is k+1, 1 in R, so slot before is M
  grow_candidate_set(weights_[m_slot] + total_wt_r_, 2);
}

/**
 * Decreases sketch's value of k by 1, updating stored values as needed.
 *
 * <p>Subject to certain pre-conditions, decreasing k causes tau to increase. This fact is used by
 * the unioning algorithm to force "marked" items out of H and into the reservoir region.</p>
 */
template<typename T, typename S, typename A>
void var_opt_sketch<T,S,A>::decrease_k_by_1() {
  if (k_ <= 1) {
    throw std::logic_error("Cannot decrease k below 1 in union");
  }

  if ((h_ == 0) && (r_ == 0)) {
    // exact mode, but no data yet; this reduction is somewhat gratuitous
    --k_;
  } else if ((h_ > 0) && (r_ == 0)) {
    // exact mode, but we have some data
    --k_;
    if (h_ > k_) {
      transition_from_warmup();
    }
  } else if ((h_ > 0) && (r_ > 0)) {
    // reservoir mode, but we have some exact samples.
    // Our strategy will be to pull an item out of H (which we are allowed to do since it's
    // still just data), reduce k, and then re-insert the item

    // first, slide the R zone to the left by 1, temporarily filling the gap
    const uint32_t old_gap_idx = h_;
    const uint32_t old_final_r_idx = (h_ + 1 + r_) - 1;
    //if (old_final_r_idx != k_) throw std::logic_error("gadget in invalid state");
    
    swap_values(old_final_r_idx, old_gap_idx);

    // now we pull an item out of H; any item is ok, but if we grab the rightmost and then
    // reduce h_, the heap invariant will be preserved (and the gap will be restored), plus
    // the push() of the item that will probably happen later will be cheap.

    const uint32_t pulled_idx = h_ - 1;
    double pulled_weight = weights_[pulled_idx];
    bool pulled_mark = marks_[pulled_idx];
    // will move the pulled item below; don't do antying to it here

    if (pulled_mark) { --num_marks_in_h_; }
    weights_[pulled_idx] = -1.0; // to make bugs easier to spot

    --h_;
    --k_;
    --n_; // will be re-incremented with the update

    update(std::move(data_[pulled_idx]), pulled_weight, pulled_mark);
  } else if ((h_ == 0) && (r_ > 0)) {
    // pure reservoir mode, so can simply eject a randomly chosen sample from the reservoir
    if (r_ < 2) throw std::logic_error("r_ too small for pure reservoir mode");

    const uint32_t r_idx_to_delete = 1 + next_int(r_); // 1 for the gap
    const uint32_t rightmost_r_idx = (1 + r_) - 1;
    swap_values(r_idx_to_delete, rightmost_r_idx);
    weights_[rightmost_r_idx] = -1.0;

    --k_;
    --r_;
  }
}

template<typename T, typename S, typename A>
void var_opt_sketch<T,S,A>::allocate_data_arrays(uint32_t tgt_size, bool use_marks) {
  filled_data_ = false;

  data_ = A().allocate(tgt_size);
  weights_ = AllocDouble().allocate(tgt_size);

  if (use_marks) {
    marks_ = AllocBool().allocate(tgt_size);
  } else {
    marks_ = nullptr;
  }
}

template<typename T, typename S, typename A>
void var_opt_sketch<T,S,A>::grow_data_arrays() {
  const uint32_t prev_size = curr_items_alloc_;
  curr_items_alloc_ = get_adjusted_size(k_, curr_items_alloc_ << rf_);
  if (curr_items_alloc_ == k_) {
    ++curr_items_alloc_;
  }

  if (prev_size < curr_items_alloc_) {
    filled_data_ = false;

    T* tmp_data = A().allocate(curr_items_alloc_);
    double* tmp_weights = AllocDouble().allocate(curr_items_alloc_);

    for (uint32_t i = 0; i < prev_size; ++i) {
      new (&tmp_data[i]) T(std::move(data_[i]));
      A().destroy(data_ + i);
      tmp_weights[i] = weights_[i];
    }

    A().deallocate(data_, prev_size);
    AllocDouble().deallocate(weights_, prev_size);

    data_ = tmp_data;
    weights_ = tmp_weights;

    if (marks_ != nullptr) {
      bool* tmp_marks = AllocBool().allocate(curr_items_alloc_);
      for (uint32_t i = 0; i < prev_size; ++i) {
        tmp_marks[i] = marks_[i];
      }
      AllocBool().deallocate(marks_, prev_size);
      marks_ = tmp_marks;
    }
  }
}

template<typename T, typename S, typename A>
void var_opt_sketch<T,S,A>::transition_from_warmup() {
  // Move the 2 lightest items from H to M
  // But the lighter really belongs in R, so update counts to reflect that
  convert_to_heap();
  pop_min_to_m_region();
  pop_min_to_m_region();
  --m_;
  ++r_;

  if (h_ != (k_ -1) || m_ != 1 || r_ != 1)
    throw std::logic_error("invalid state for transitioning from warmup");

  // Update total weight in R and then, having grabbed the value, overwrite
  // in weight_ array to help make bugs more obvious
  total_wt_r_ = weights_[k_]; // only one item, known location
  weights_[k_] = -1.0;

  // The two lightest items are ncessarily downsample-able to one item,
  // and are therefore a valid initial candidate set
  grow_candidate_set(weights_[k_ - 1] + total_wt_r_, 2);
}

template<typename T, typename S, typename A>
void var_opt_sketch<T,S,A>::convert_to_heap() {
  if (h_ < 2) {
    return; // nothing to do
  }

  const uint32_t last_slot = h_ - 1;
  const int last_non_leaf = ((last_slot + 1) / 2) - 1;
  
  for (int j = last_non_leaf; j >= 0; --j) {
    restore_towards_leaves(j);
  }

  // validates heap, used for initial debugging
  //for (uint32_t j = h_ - 1; j >= 1; --j) {
  //  uint32_t p = ((j + 1) / 2) - 1;
  //  if (weights_[p] > weights_[j]) throw std::logic_error("invalid heap");
  //}
}

template<typename T, typename S, typename A>
void var_opt_sketch<T,S,A>::restore_towards_leaves(uint32_t slot_in) {
  const uint32_t last_slot = h_ - 1;
  if (h_ == 0 || slot_in > last_slot) throw std::logic_error("invalid heap state");

  uint32_t slot = slot_in;
  uint32_t child = (2 * slot_in) + 1; // might be invalid, need to check

  while (child <= last_slot) {
    uint32_t child2 = child + 1; // might also be invalid
    if ((child2 <= last_slot) && (weights_[child2] < weights_[child])) {
      // siwtch to other child if it's both valid and smaller
      child = child2;
    }

    if (weights_[slot] <= weights_[child]) {
      // invariant holds so we're done
      break;
    }

    // swap and continue
    swap_values(slot, child);

    slot = child;
    child = (2 * slot) + 1; // might be invalid, checked on next loop
  }
}

template<typename T, typename S, typename A>
void var_opt_sketch<T,S,A>::restore_towards_root(uint32_t slot_in) {
  uint32_t slot = slot_in;
  uint32_t p = (((slot + 1) / 2) - 1); // valid if slot >= 1
  while ((slot > 0) && (weights_[slot] < weights_[p])) {
    swap_values(slot, p);
    slot = p;
    p = (((slot + 1) / 2) - 1); // valid if slot >= 1
  }
}

template<typename T, typename S, typename A>
template<typename O>
void var_opt_sketch<T,S,A>::push(O&& item, double wt, bool mark) {
  if (filled_data_) {
    data_[h_] = std::forward<O>(item);
  } else {
    new (&data_[h_]) T(std::forward<O>(item));
    filled_data_ = true;
  }
  weights_[h_] = wt;
  if (marks_ != nullptr) {
    marks_[h_] = mark;
    num_marks_in_h_ += (mark ? 1 : 0);
  }
  ++h_;

  restore_towards_root(h_ - 1); // need use old h_, but want accurate h_
}

template<typename T, typename S, typename A>
void var_opt_sketch<T,S,A>::pop_min_to_m_region() {
  if (h_ == 0 || (h_ + m_ + r_ != k_ + 1))
    throw std::logic_error("invalid heap state popping min to M region");

  if (h_ == 1) {
    // just update bookkeeping
    ++m_;
    --h_;
  } else {
    // main case
    uint32_t tgt = h_ - 1; // last slot, will swap with root
    swap_values(0, tgt);
    ++m_;
    --h_;

    restore_towards_leaves(0);
  }

  if (is_marked(h_)) {
    --num_marks_in_h_;
  }
}


template<typename T, typename S, typename A>
void var_opt_sketch<T,S,A>::swap_values(uint32_t src, uint32_t dst) {
  std::swap(data_[src], data_[dst]);
  std::swap(weights_[src], weights_[dst]);

  if (marks_ != nullptr) {
    std::swap(marks_[src], marks_[dst]);
  }
}

/* When entering here we should be in a well-characterized state where the
   new item has been placed in either h or m and we have a valid but not necessarily
   maximal sampling plan figured out. The array is completely full at this point.
   Everyone in h and m has an explicit weight. The candidates are right-justified
   and are either just the r set or the r set + exactly one m item. The number
   of cands is at least 2. We will now grow the candidate set as much as possible
   by pulling sufficiently light items from h to m.
*/
template<typename T, typename S, typename A>
void var_opt_sketch<T,S,A>::grow_candidate_set(double wt_cands, uint32_t num_cands) {
  if ((h_ + m_ + r_ != k_ + 1) || (num_cands < 1) || (num_cands != m_ + r_) || (m_ >= 2))
    throw std::logic_error("invariant violated when growing candidate set");

  while (h_ > 0) {
    const double next_wt = peek_min();
    const double next_tot_wt = wt_cands + next_wt;

    // test for strict lightness of next prospect (denominator multiplied through)
    // ideally: (next_wt * (next_num_cands-1) < next_tot_wt)
    //          but can use num_cands directly
    if ((next_wt * num_cands) < next_tot_wt) {
      wt_cands = next_tot_wt;
      ++num_cands;
      pop_min_to_m_region(); // adjusts h_ and m_
    } else {
      break;
    }
  }

  downsample_candidate_set(wt_cands, num_cands);
}

template<typename T, typename S, typename A>
void var_opt_sketch<T,S,A>::downsample_candidate_set(double wt_cands, uint32_t num_cands) {
  if (num_cands < 2 || h_ + num_cands != k_ + 1)
    throw std::logic_error("invalid num_cands when downsampling");

  // need this before overwriting anything
  const uint32_t delete_slot = choose_delete_slot(wt_cands, num_cands);
  const uint32_t leftmost_cand_slot = h_;
  if (delete_slot < leftmost_cand_slot || delete_slot > k_)
    throw std::logic_error("invalid delete slot index when downsampling");

  // Overwrite weights for items from M moving into R,
  // to make bugs more obvious. Also needed so anyone reading the
  // weight knows if it's invalid without checking h_ and m_
  const uint32_t stop_idx = leftmost_cand_slot + m_;
  for (uint32_t j = leftmost_cand_slot; j < stop_idx; ++j) {
    weights_[j] = -1.0;
  }

  // The next two lines work even when delete_slot == leftmost_cand_slot
  data_[delete_slot] = std::move(data_[leftmost_cand_slot]);
  // cannot set data_[leftmost_cand_slot] to null since not uisng T*

  m_ = 0;
  r_ = num_cands - 1;
  total_wt_r_ = wt_cands;
}

template<typename T, typename S, typename A>
uint32_t var_opt_sketch<T,S,A>::choose_delete_slot(double wt_cands, uint32_t num_cands) const {
  if (r_ == 0) throw std::logic_error("choosing delete slot while in exact mode");

  if (m_ == 0) {
    // this happens if we insert a really heavy item
    return pick_random_slot_in_r();
  } else if (m_ == 1) {
    // check if we keep th item in M or pick oen from R
    // p(keep) = (num_cand - 1) * wt_M / wt_cand
    double wt_m_cand = weights_[h_]; // slot of item in M is h_
    if ((wt_cands * next_double_exclude_zero()) < ((num_cands - 1) * wt_m_cand)) {
      return pick_random_slot_in_r(); // keep item in M
    } else {
      return h_; // indext of item in M
    }
  } else {
    // general case
    const uint32_t delete_slot = choose_weighted_delete_slot(wt_cands, num_cands);
    const uint32_t first_r_slot = h_ + m_;
    if (delete_slot == first_r_slot) {
      return pick_random_slot_in_r();
    } else {
      return delete_slot;
    }
  }
}

template<typename T, typename S, typename A>
uint32_t var_opt_sketch<T,S,A>::choose_weighted_delete_slot(double wt_cands, uint32_t num_cands) const {
  if (m_ < 1) throw std::logic_error("must have weighted delete slot");

  const uint32_t offset = h_;
  const uint32_t final_m = (offset + m_) - 1;
  const uint32_t num_to_keep = num_cands - 1;

  double left_subtotal = 0.0;
  double right_subtotal = -1.0 * wt_cands * next_double_exclude_zero();

  for (uint32_t i = offset; i <= final_m; ++i) {
    left_subtotal += num_to_keep * weights_[i];
    right_subtotal += wt_cands;

    if (left_subtotal < right_subtotal) {
      return i;
    }
  }

  // this slot tells caller that we need to delete out of R
  return final_m + 1;
}

template<typename T, typename S, typename A>
uint32_t var_opt_sketch<T,S,A>::pick_random_slot_in_r() const {
  if (r_ == 0) throw std::logic_error("r_ = 0 when picking slot in R region");
  const uint32_t offset = h_ + m_;
  if (r_ == 1) {
    return offset;
  } else {
    return offset + next_int(r_);
  }
}

template<typename T, typename S, typename A>
double var_opt_sketch<T,S,A>::peek_min() const {
  if (h_ == 0) throw std::logic_error("h_ = 0 when checking min in H region");
  return weights_[0];
}

template<typename T, typename S, typename A>
inline bool var_opt_sketch<T,S,A>::is_marked(uint32_t idx) const {
  return marks_ == nullptr ? false : marks_[idx];
}

template<typename T, typename S, typename A>
double var_opt_sketch<T,S,A>::get_tau() const {
  return r_ == 0 ? std::nan("1") : (total_wt_r_ / r_);
}

template<typename T, typename S, typename A>
void var_opt_sketch<T,S,A>::strip_marks() {
  if (marks_ == nullptr) throw std::logic_error("request to strip marks from non-gadget");
  num_marks_in_h_ = 0;
  AllocBool().deallocate(marks_, curr_items_alloc_);
  marks_ = nullptr;
}

template<typename T, typename S, typename A>
void var_opt_sketch<T,S,A>::check_preamble_longs(uint8_t preamble_longs, uint8_t flags) {
  const bool is_empty(flags & EMPTY_FLAG_MASK);
  
  if (is_empty) {
    if (preamble_longs != PREAMBLE_LONGS_EMPTY) {
      throw std::invalid_argument("Possible corruption: Preamble longs must be "
        + std::to_string(PREAMBLE_LONGS_EMPTY) + " for an empty sketch. Found: "
        + std::to_string(preamble_longs));
    }
  } else {
    if (preamble_longs != PREAMBLE_LONGS_WARMUP
        && preamble_longs != PREAMBLE_LONGS_FULL) {
      throw std::invalid_argument("Possible corruption: Preamble longs must be "
        + std::to_string(PREAMBLE_LONGS_WARMUP) + " or "
        + std::to_string(PREAMBLE_LONGS_FULL)
        + " for a non-empty sketch. Found: " + std::to_string(preamble_longs));
    }
  }
}

template<typename T, typename S, typename A>
void var_opt_sketch<T,S,A>::check_family_and_serialization_version(uint8_t family_id, uint8_t ser_ver) {
  if (family_id == FAMILY_ID) {
    if (ser_ver != SER_VER) {
      throw std::invalid_argument("Possible corruption: VarOpt serialization version must be "
        + std::to_string(SER_VER) + ". Found: " + std::to_string(ser_ver));
    }
    return;
  }
  // TODO: extend to handle reservoir sampling

  throw std::invalid_argument("Possible corruption: VarOpt family id must be "
    + std::to_string(FAMILY_ID) + ". Found: " + std::to_string(family_id));
}

template<typename T, typename S, typename A>
uint32_t var_opt_sketch<T, S, A>::validate_and_get_target_size(uint32_t preamble_longs, uint32_t k, uint64_t n,
                                                               uint32_t h, uint32_t r, resize_factor rf) {
  if (k == 0 || k > MAX_K) {
    throw std::invalid_argument("k must be at least 1 and less than 2^31 - 1");
  }

  uint32_t array_size;

  if (n <= k) {
    if (preamble_longs != PREAMBLE_LONGS_WARMUP) {
      throw std::invalid_argument("Possible corruption: deserializing with n <= k but not in warmup mode. "
       "Found n = " + std::to_string(n) + ", k = " + std::to_string(k));
    }
    if (n != h) {
      throw std::invalid_argument("Possible corruption: deserializing in warmup mode but n != h. "
       "Found n = " + std::to_string(n) + ", h = " + std::to_string(h));
    }
    if (r > 0) {
      throw std::invalid_argument("Possible corruption: deserializing in warmup mode but r > 0. "
       "Found r = " + std::to_string(r));
    }

    const uint32_t ceiling_lg_k = to_log_2(ceiling_power_of_2(k));
    const uint32_t min_lg_size = to_log_2(ceiling_power_of_2(h));
    const uint32_t initial_lg_size = starting_sub_multiple(ceiling_lg_k, rf, min_lg_size);
    array_size = get_adjusted_size(k, 1 << initial_lg_size);
    if (array_size == k) { // if full size, need to leave 1 for the gap
      ++array_size;
    }
  } else { // n > k
    if (preamble_longs != PREAMBLE_LONGS_FULL) { 
      throw std::invalid_argument("Possible corruption: deserializing with n > k but not in full mode. "
       "Found n = " + std::to_string(n) + ", k = " + std::to_string(k));
    }
    if (h + r != k) {
      throw std::invalid_argument("Possible corruption: deserializing in full mode but h + r != n. "
       "Found h = " + std::to_string(h) + ", r = " + std::to_string(r) + ", n = " + std::to_string(n));
    }

    array_size = k + 1;
  }

  return array_size;
}

template<typename T, typename S, typename A>
template<typename P>
subset_summary var_opt_sketch<T, S, A>::estimate_subset_sum(P predicate) const {
  if (n_ == 0) {
    return {0.0, 0.0, 0.0, 0.0};
  }

  double total_wt_h = 0.0;
  double h_true_wt = 0.0;
  size_t idx = 0;
  for (; idx < h_; ++idx) {
    double wt = weights_[idx];
    total_wt_h += wt;
    if (predicate(data_[idx])) {
      h_true_wt += wt;
    }
  }

  // if only heavy items, we have an exact answer
  if (r_ == 0) {
    return {h_true_wt, h_true_wt, h_true_wt, h_true_wt};
  }

  // since r_ > 0, we know we have samples
  const uint64_t num_samples = n_ - h_;
  double effective_sampling_rate = r_ / static_cast<double>(num_samples);
  if (effective_sampling_rate < 0.0 || effective_sampling_rate > 1.0)
    throw std::logic_error("invalid sampling rate outside [0.0, 1.0]");

  size_t r_true_count = 0;
  ++idx; // skip the gap
  for (; idx < (k_ + 1); ++idx) {
    if (predicate(data_[idx])) {
      ++r_true_count;
    }
  }

  double lb_true_fraction = pseudo_hypergeometric_lb_on_p(r_, r_true_count, effective_sampling_rate);
  double estimated_true_fraction = (1.0 * r_true_count) / r_;
  double ub_true_fraction = pseudo_hypergeometric_ub_on_p(r_, r_true_count, effective_sampling_rate);

  return {  h_true_wt + (total_wt_r_ * lb_true_fraction),
            h_true_wt + (total_wt_r_ * estimated_true_fraction),
            h_true_wt + (total_wt_r_ * ub_true_fraction),
            total_wt_h + total_wt_r_
         };
}

template<typename T, typename S, typename A>
class var_opt_sketch<T, S, A>::items_deleter {
  public:
  items_deleter(uint32_t num) : num(num), h_count(0), r_count(0) {}
  void set_h(uint32_t h) { h_count = h; }
  void set_r(uint32_t r) { r_count = r; }  
  void operator() (T* ptr) const {
    if (h_count > 0) {
      for (size_t i = 0; i < h_count; ++i) {
        ptr[i].~T();
      }
    }
    if (r_count > 0) {
      uint32_t end = h_count + r_count + 1;
      for (size_t i = h_count + 1; i < end; ++i) {
        ptr[i].~T();
      }
    }
    if (ptr != nullptr) {
      A().deallocate(ptr, num);
    }
  }
  private:
  uint32_t num;
  uint32_t h_count;
  uint32_t r_count;
};

template<typename T, typename S, typename A>
class var_opt_sketch<T, S, A>::weights_deleter {
  public:
  weights_deleter(uint32_t num) : num(num) {}
  void operator() (double* ptr) const {
    if (ptr != nullptr) {
      AllocDouble().deallocate(ptr, num);
    }
  }
  private:
  uint32_t num;
};

template<typename T, typename S, typename A>
class var_opt_sketch<T, S, A>::marks_deleter {
  public:
  marks_deleter(uint32_t num) : num(num) {}
  void operator() (bool* ptr) const {
    if (ptr != nullptr) {
      AllocBool().deallocate(ptr, 1);
    }
  }
  private:
  uint32_t num;
};


template<typename T, typename S, typename A>
typename var_opt_sketch<T, S, A>::const_iterator var_opt_sketch<T, S, A>::begin() const {
  return var_opt_sketch<T, S, A>::const_iterator(*this, false);
}

template<typename T, typename S, typename A>
typename var_opt_sketch<T, S, A>::const_iterator var_opt_sketch<T, S, A>::end() const {
  return var_opt_sketch<T, S, A>::const_iterator(*this, true);
}

// -------- var_opt_sketch::const_iterator implementation ---------

template<typename T, typename S, typename A>
var_opt_sketch<T,S,A>::const_iterator::const_iterator(const var_opt_sketch<T,S,A>& sk, bool is_end) :
  sk_(&sk),
  cum_r_weight_(0.0),
  r_item_wt_(sk.get_tau()),
  final_idx_(sk.r_ > 0 ? sk.h_ + sk.r_ + 1 : sk.h_)
{
  // index logic easier to read if not inline
  if (is_end) {
    idx_ = final_idx_;
    sk_ = nullptr;
  } else {
    idx_ = (sk.h_ == 0 && sk.r_ > 0 ? 1 : 0); // skip if gap is at start
  }

  // should only apply if sketch is empty
  if (idx_ == final_idx_) { sk_ = nullptr; }
}

template<typename T, typename S, typename A>
var_opt_sketch<T,S,A>::const_iterator::const_iterator(const var_opt_sketch<T,S,A>& sk, bool is_end, bool use_r_region) :
  sk_(&sk),
  cum_r_weight_(0.0),
  r_item_wt_(sk.get_tau()),
  final_idx_(sk.h_ + (use_r_region ? 1 + sk.r_ : 0))
{
  if (use_r_region) {
    idx_ = sk.h_ + 1 + (is_end ? sk.r_ : 0);
  } else { // H region
    // gap at start only if h_ == 0, so index always starts at 0
    idx_ = (is_end ? sk.h_ : 0);
  }
  
  // unlike in full iterator case, may happen even if sketch is not empty
  if (idx_ == final_idx_) { sk_ = nullptr; }
}


template<typename T,  typename S, typename A>
var_opt_sketch<T, S, A>::const_iterator::const_iterator(const const_iterator& other) :
  sk_(other.sk_),
  cum_r_weight_(other.cum_r_weight_),
  r_item_wt_(other.r_item_wt_),
  idx_(other.idx_),
  final_idx_(other.final_idx_)
{}

template<typename T,  typename S, typename A>
typename var_opt_sketch<T, S, A>::const_iterator& var_opt_sketch<T, S, A>::const_iterator::operator++() {
  ++idx_;
  
  if (idx_ == final_idx_) {
    sk_ = nullptr;
    return *this;
  } else if (idx_ == sk_->h_ && sk_->r_ > 0) { // check for the gap
    ++idx_;
  }
  if (idx_ > sk_->h_) { cum_r_weight_ += r_item_wt_; }
  return *this;
}

template<typename T,  typename S, typename A>
typename var_opt_sketch<T, S, A>::const_iterator& var_opt_sketch<T, S, A>::const_iterator::operator++(int) {
  const_iterator tmp(*this);
  operator++();
  return tmp;
}

template<typename T, typename S, typename A>
bool var_opt_sketch<T, S, A>::const_iterator::operator==(const const_iterator& other) const {
  if (sk_ != other.sk_) return false;
  if (sk_ == nullptr) return true; // end (and we know other.sk_ is also null)
  return idx_ == other.idx_;
}

template<typename T, typename S, typename A>
bool var_opt_sketch<T, S, A>::const_iterator::operator!=(const const_iterator& other) const {
  return !operator==(other);
}

template<typename T, typename S, typename A>
const std::pair<const T&, const double> var_opt_sketch<T, S, A>::const_iterator::operator*() const {
  double wt;
  if (idx_ < sk_->h_) {
    wt = sk_->weights_[idx_];
  } else {
    wt = r_item_wt_;
  }
  return std::pair<const T&, const double>(sk_->data_[idx_], wt);
}

template<typename T, typename S, typename A>
bool var_opt_sketch<T, S, A>::const_iterator::get_mark() const {
  return sk_->marks_ == nullptr ? false : sk_->marks_[idx_];
}


// -------- var_opt_sketch::iterator implementation ---------

template<typename T, typename S, typename A>
var_opt_sketch<T,S,A>::iterator::iterator(const var_opt_sketch<T,S,A>& sk, bool is_end, bool use_r_region) :
  sk_(&sk),
  cum_r_weight_(0.0),
  r_item_wt_(sk.get_tau()),
  final_idx_(sk.h_ + (use_r_region ? 1 + sk.r_ : 0))
{
  if (use_r_region) {
    idx_ = sk.h_ + 1 + (is_end ? sk.r_ : 0);
  } else { // H region
    // gap at start only if h_ == 0, so index always starts at 0
    idx_ = (is_end ? sk.h_ : 0);
  }
  
  // unlike in full iterator case, may happen even if sketch is not empty
  if (idx_ == final_idx_) { sk_ = nullptr; }
}

template<typename T,  typename S, typename A>
var_opt_sketch<T, S, A>::iterator::iterator(const iterator& other) :
  sk_(other.sk_),
  cum_r_weight_(other.cum_r_weight_),
  r_item_wt_(other.r_item_wt_),
  idx_(other.idx_),
  final_idx_(other.final_idx_)
{}

template<typename T,  typename S, typename A>
typename var_opt_sketch<T, S, A>::iterator& var_opt_sketch<T, S, A>::iterator::operator++() {
  ++idx_;
  
  if (idx_ == final_idx_) {
    sk_ = nullptr;
    return *this;
  } else if (idx_ == sk_->h_ && sk_->r_ > 0) { // check for the gap
    ++idx_;
  }
  if (idx_ > sk_->h_) { cum_r_weight_ += r_item_wt_; }
  return *this;
}

template<typename T,  typename S, typename A>
typename var_opt_sketch<T, S, A>::iterator& var_opt_sketch<T, S, A>::iterator::operator++(int) {
  const_iterator tmp(*this);
  operator++();
  return tmp;
}

template<typename T, typename S, typename A>
bool var_opt_sketch<T, S, A>::iterator::operator==(const iterator& other) const {
  if (sk_ != other.sk_) return false;
  if (sk_ == nullptr) return true; // end (and we know other.sk_ is also null)
  return idx_ == other.idx_;
}

template<typename T, typename S, typename A>
bool var_opt_sketch<T, S, A>::iterator::operator!=(const iterator& other) const {
  return !operator==(other);
}

template<typename T, typename S, typename A>
std::pair<T&, double> var_opt_sketch<T, S, A>::iterator::operator*() {
  double wt;
  if (idx_ < sk_->h_) {
    wt = sk_->weights_[idx_];
  } else if (idx_ == final_idx_ - 1) {
    wt = sk_->total_wt_r_ - cum_r_weight_;
  } else {
    wt = r_item_wt_;
  }
  return std::pair<T&, double>(sk_->data_[idx_], wt);
}

template<typename T, typename S, typename A>
bool var_opt_sketch<T, S, A>::iterator::get_mark() const {
  return sk_->marks_ == nullptr ? false : sk_->marks_[idx_];
}



// ******************** MOVE TO COMMON UTILS AREA EVENTUALLY *********************

namespace random_utils {
  static std::random_device rd; // possibly unsafe in MinGW with GCC < 9.2
  static std::mt19937_64 rand(rd());
  static std::uniform_real_distribution<> next_double(0.0, 1.0);
}

/**
 * Checks if target sampling allocation is more than 50% of max sampling size.
 * If so, returns max sampling size, otherwise passes through target size.
 */
template<typename T, typename S, typename A>
uint32_t var_opt_sketch<T,S,A>::get_adjusted_size(uint32_t max_size, uint32_t resize_target) {
  if (max_size - (resize_target << 1) < 0L) {
    return max_size;
  }
  return resize_target;
}

template<typename T, typename S, typename A>
uint32_t var_opt_sketch<T,S,A>::starting_sub_multiple(uint32_t lg_target, uint32_t lg_rf, uint32_t lg_min) {
  return (lg_target <= lg_min)
          ? lg_min : (lg_rf == 0) ? lg_target
          : (lg_target - lg_min) % lg_rf + lg_min;
}

template<typename T, typename S, typename A>
double var_opt_sketch<T,S,A>::pseudo_hypergeometric_ub_on_p(uint64_t n, uint32_t k, double sampling_rate) {
  const double adjusted_kappa = DEFAULT_KAPPA * sqrt(1 - sampling_rate);
  return bounds_binomial_proportions::approximate_upper_bound_on_p(n, k, adjusted_kappa);
}

template<typename T, typename S, typename A>
double var_opt_sketch<T,S,A>::pseudo_hypergeometric_lb_on_p(uint64_t n, uint32_t k, double sampling_rate) {
  const double adjusted_kappa = DEFAULT_KAPPA * sqrt(1 - sampling_rate);
  return bounds_binomial_proportions::approximate_lower_bound_on_p(n, k, adjusted_kappa);
}

template<typename T, typename S, typename A>
bool var_opt_sketch<T,S,A>::is_power_of_2(uint32_t v) {
  return v && !(v & (v - 1));
}

template<typename T, typename S, typename A>
uint32_t var_opt_sketch<T,S,A>::to_log_2(uint32_t v) {
  if (is_power_of_2(v)) {
    return count_trailing_zeros_in_u32(v);
  } else {
    throw std::invalid_argument("Attempt to compute integer log2 of non-positive or non-power of 2");
  }
}

// Returns an integer in the range [0, max_value) -- excludes max_value
template<typename T, typename S, typename A>
uint32_t var_opt_sketch<T,S,A>::next_int(uint32_t max_value) {
  std::uniform_int_distribution<uint32_t> dist(0, max_value - 1);
  return dist(random_utils::rand);
}

template<typename T, typename S, typename A>
double var_opt_sketch<T,S,A>::next_double_exclude_zero() {
  double r = random_utils::next_double(random_utils::rand);
  while (r == 0.0) {
    r = random_utils::next_double(random_utils::rand);
  }
  return r;
}

}

// namespace datasketches

#endif // _VAR_OPT_SKETCH_IMPL_HPP_
