Merge pull request #183 from apache/cpc_allocator

support allocator instance
diff --git a/CMakeLists.txt b/CMakeLists.txt
index ff1d4a5..d3c1fef 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -96,6 +96,7 @@
 add_subdirectory(theta)
 add_subdirectory(sampling)
 add_subdirectory(tuple)
+add_subdirectory(req)
 
 if (WITH_PYTHON)
   add_subdirectory(python)
diff --git a/common/include/memory_operations.hpp b/common/include/memory_operations.hpp
index 80dc3a3..986b2b0 100644
--- a/common/include/memory_operations.hpp
+++ b/common/include/memory_operations.hpp
@@ -52,6 +52,18 @@
   return size;
 }
 
+template<typename T>
+static inline size_t copy_to_mem(const T& item, void* dst) {
+  memcpy(dst, &item, sizeof(T));
+  return sizeof(T);
+}
+
+template<typename T>
+static inline size_t copy_from_mem(const void* src, T& item) {
+  memcpy(&item, src, sizeof(T));
+  return sizeof(T);
+}
+
 } // namespace
 
 #endif // _MEMORY_OPERATIONS_HPP_
diff --git a/fi/include/frequent_items_sketch.hpp b/fi/include/frequent_items_sketch.hpp
index b2a891f..6efe2b9 100644
--- a/fi/include/frequent_items_sketch.hpp
+++ b/fi/include/frequent_items_sketch.hpp
@@ -40,15 +40,20 @@
 
 enum frequent_items_error_type { NO_FALSE_POSITIVES, NO_FALSE_NEGATIVES };
 
-// for serialization as raw bytes
-template<typename A> using AllocU8 = typename std::allocator_traits<A>::template rebind_alloc<uint8_t>;
-template<typename A> using vector_u8 = std::vector<uint8_t, AllocU8<A>>;
-
 // type W for weight must be an arithmetic type (integral or floating point)
-template<typename T, typename W = uint64_t, typename H = std::hash<T>, typename E = std::equal_to<T>, typename S = serde<T>, typename A = std::allocator<T>>
+template<
+  typename T,
+  typename W = uint64_t,
+  typename H = std::hash<T>,
+  typename E = std::equal_to<T>,
+  typename S = serde<T>,
+  typename A = std::allocator<T>
+>
 class frequent_items_sketch {
 public:
 
+  static const uint8_t LG_MIN_MAP_SIZE = 3;
+
   /**
    * Construct this sketch with parameters lg_max_map_size and lg_start_map_size.
    *
@@ -59,7 +64,7 @@
    * @param lg_start_map_size Log2 of the starting physical size of the internal hash
    * map managed by this sketch.
    */
-  explicit frequent_items_sketch(uint8_t lg_max_map_size, uint8_t lg_start_map_size = LG_MIN_MAP_SIZE);
+  explicit frequent_items_sketch(uint8_t lg_max_map_size, uint8_t lg_start_map_size = LG_MIN_MAP_SIZE, const A& allocator = A());
 
   /**
    * Update this sketch with an item and a positive weight (frequency count).
@@ -232,7 +237,8 @@
 
   // This is a convenience alias for users
   // The type returned by the following serialize method
-  typedef vector_u8<A> vector_bytes;
+  using vector_bytes = std::vector<uint8_t, typename std::allocator_traits<A>::template rebind_alloc<uint8_t>>;
+
 
   /**
    * This method serializes the sketch as a vector of bytes.
@@ -249,7 +255,7 @@
    * @param is input stream
    * @return an instance of the sketch
    */
-  static frequent_items_sketch deserialize(std::istream& is);
+  static frequent_items_sketch deserialize(std::istream& is, const A& allocator = A());
 
   /**
    * This method deserializes a sketch from a given array of bytes.
@@ -257,7 +263,7 @@
    * @param size the size of the array
    * @return an instance of the sketch
    */
-  static frequent_items_sketch deserialize(const void* bytes, size_t size);
+  static frequent_items_sketch deserialize(const void* bytes, size_t size, const A& allocator = A());
 
   /**
    * Returns a human readable summary of this sketch
@@ -266,7 +272,6 @@
   string<A> to_string(bool print_items = false) const;
 
 private:
-  static const uint8_t LG_MIN_MAP_SIZE = 3;
   static const uint8_t SERIAL_VERSION = 1;
   static const uint8_t FAMILY_ID = 10;
   static const uint8_t PREAMBLE_LONGS_EMPTY = 1;
diff --git a/fi/include/frequent_items_sketch_impl.hpp b/fi/include/frequent_items_sketch_impl.hpp
index bb05aa9..df4352e 100644
--- a/fi/include/frequent_items_sketch_impl.hpp
+++ b/fi/include/frequent_items_sketch_impl.hpp
@@ -33,10 +33,14 @@
 const uint8_t frequent_items_sketch<T, W, H, E, S, A>::LG_MIN_MAP_SIZE;
 
 template<typename T, typename W, typename H, typename E, typename S, typename A>
-frequent_items_sketch<T, W, H, E, S, A>::frequent_items_sketch(uint8_t lg_max_map_size, uint8_t lg_start_map_size):
+frequent_items_sketch<T, W, H, E, S, A>::frequent_items_sketch(uint8_t lg_max_map_size, uint8_t lg_start_map_size, const A& allocator):
 total_weight(0),
 offset(0),
-map(std::max(lg_start_map_size, frequent_items_sketch::LG_MIN_MAP_SIZE), std::max(lg_max_map_size, frequent_items_sketch::LG_MIN_MAP_SIZE))
+map(
+  std::max(lg_start_map_size, frequent_items_sketch::LG_MIN_MAP_SIZE),
+  std::max(lg_max_map_size, frequent_items_sketch::LG_MIN_MAP_SIZE),
+  allocator
+)
 {
   if (lg_start_map_size > lg_max_map_size) throw std::invalid_argument("starting size must not be greater than maximum size");
 }
@@ -142,7 +146,7 @@
 template<typename T, typename W, typename H, typename E, typename S, typename A>
 typename frequent_items_sketch<T, W, H, E, S, A>::vector_row
 frequent_items_sketch<T, W, H, E, S, A>::get_frequent_items(frequent_items_error_type err_type, W threshold) const {
-  vector_row items;
+  vector_row items(map.get_allocator());
   for (auto &it: map) {
     const W lb = it.second;
     const W ub = it.second + offset;
@@ -182,19 +186,21 @@
     os.write((char*)&offset, sizeof(offset));
 
     // copy active items and their weights to use batch serialization
-    typedef typename std::allocator_traits<A>::template rebind_alloc<W> AllocW;
-    W* weights = AllocW().allocate(num_items);
-    T* items = A().allocate(num_items);
+    using AllocW = typename std::allocator_traits<A>::template rebind_alloc<W>;
+    AllocW aw(map.get_allocator());
+    W* weights = aw.allocate(num_items);
+    A alloc(map.get_allocator());
+    T* items = alloc.allocate(num_items);
     uint32_t i = 0;
     for (auto &it: map) {
       new (&items[i]) T(it.first);
       weights[i++] = it.second;
     }
     os.write((char*)weights, sizeof(W) * num_items);
-    AllocW().deallocate(weights, num_items);
+    aw.deallocate(weights, num_items);
     S().serialize(os, items, num_items);
     for (unsigned i = 0; i < num_items; i++) items[i].~T();
-    A().deallocate(items, num_items);
+    alloc.deallocate(items, num_items);
   }
 }
 
@@ -207,9 +213,9 @@
 }
 
 template<typename T, typename W, typename H, typename E, typename S, typename A>
-vector_u8<A> frequent_items_sketch<T, W, H, E, S, A>::serialize(unsigned header_size_bytes) const {
+auto frequent_items_sketch<T, W, H, E, S, A>::serialize(unsigned header_size_bytes) const -> vector_bytes {
   const size_t size = header_size_bytes + get_serialized_size_bytes();
-  vector_u8<A> bytes(size);
+  vector_bytes bytes(size, 0, map.get_allocator());
   uint8_t* ptr = bytes.data() + header_size_bytes;
   uint8_t* end_ptr = ptr + size;
 
@@ -238,20 +244,22 @@
     ptr += copy_to_mem(&offset, ptr, sizeof(offset));
 
     // copy active items and their weights to use batch serialization
-    typedef typename std::allocator_traits<A>::template rebind_alloc<W> AllocW;
-    W* weights = AllocW().allocate(num_items);
-    T* items = A().allocate(num_items);
+    using AllocW = typename std::allocator_traits<A>::template rebind_alloc<W>;
+    AllocW aw(map.get_allocator());
+    W* weights = aw.allocate(num_items);
+    A alloc(map.get_allocator());
+    T* items = alloc.allocate(num_items);
     uint32_t i = 0;
     for (auto &it: map) {
       new (&items[i]) T(it.first);
       weights[i++] = it.second;
     }
     ptr += copy_to_mem(weights, ptr, sizeof(W) * num_items);
-    AllocW().deallocate(weights, num_items);
+    aw.deallocate(weights, num_items);
     const size_t bytes_remaining = end_ptr - ptr;
     ptr += S().serialize(ptr, bytes_remaining, items, num_items);
     for (unsigned i = 0; i < num_items; i++) items[i].~T();
-    A().deallocate(items, num_items);
+    alloc.deallocate(items, num_items);
   }
   return bytes;
 }
@@ -259,23 +267,25 @@
 template<typename T, typename W, typename H, typename E, typename S, typename A>
 class frequent_items_sketch<T, W, H, E, S, A>::items_deleter {
 public:
-  items_deleter(uint32_t num, bool destroy): num(num), destroy(destroy) {}
+  items_deleter(uint32_t num, bool destroy, const A& allocator):
+    allocator(allocator), num(num), destroy(destroy) {}
   void set_destroy(bool destroy) { this->destroy = destroy; }
-  void operator() (T* ptr) const {
+  void operator() (T* ptr) {
     if (ptr != nullptr) {
       if (destroy) {
         for (uint32_t i = 0; i < num; ++i) ptr[i].~T();
       }
-      A().deallocate(ptr, num);
+      allocator.deallocate(ptr, num);
     }
   }
 private:
+  A allocator;
   uint32_t num;
   bool destroy;
 };
 
 template<typename T, typename W, typename H, typename E, typename S, typename A>
-frequent_items_sketch<T, W, H, E, S, A> frequent_items_sketch<T, W, H, E, S, A>::deserialize(std::istream& is) {
+frequent_items_sketch<T, W, H, E, S, A> frequent_items_sketch<T, W, H, E, S, A>::deserialize(std::istream& is, const A& allocator) {
   uint8_t preamble_longs;
   is.read((char*)&preamble_longs, sizeof(preamble_longs));
   uint8_t serial_version;
@@ -298,7 +308,7 @@
   check_family_id(family_id);
   check_size(lg_cur_size, lg_max_size);
 
-  frequent_items_sketch<T, W, H, E, S, A> sketch(lg_max_size, lg_cur_size);
+  frequent_items_sketch<T, W, H, E, S, A> sketch(lg_max_size, lg_cur_size, allocator);
   if (!is_empty) {
     uint32_t num_items;
     is.read((char*)&num_items, sizeof(num_items));
@@ -310,10 +320,11 @@
     is.read((char*)&offset, sizeof(offset));
 
     // batch deserialization with intermediate array of items and weights
-    typedef typename std::allocator_traits<A>::template rebind_alloc<W> AllocW;
-    std::vector<W, AllocW> weights(num_items);
+    using AllocW = typename std::allocator_traits<A>::template rebind_alloc<W>;
+    std::vector<W, AllocW> weights(num_items, 0, allocator);
     is.read((char*)weights.data(), sizeof(W) * num_items);
-    std::unique_ptr<T, items_deleter> items(A().allocate(num_items), items_deleter(num_items, false));
+    A alloc(allocator);
+    std::unique_ptr<T, items_deleter> items(alloc.allocate(num_items), items_deleter(num_items, false, alloc));
     S().deserialize(is, items.get(), num_items);
     items.get_deleter().set_destroy(true); // serde did not throw, so the items must be constructed
     for (uint32_t i = 0; i < num_items; i++) {
@@ -328,7 +339,7 @@
 }
 
 template<typename T, typename W, typename H, typename E, typename S, typename A>
-frequent_items_sketch<T, W, H, E, S, A> frequent_items_sketch<T, W, H, E, S, A>::deserialize(const void* bytes, size_t size) {
+frequent_items_sketch<T, W, H, E, S, A> frequent_items_sketch<T, W, H, E, S, A>::deserialize(const void* bytes, size_t size, const A& allocator) {
   ensure_minimum_memory(size, 8);
   const char* ptr = static_cast<const char*>(bytes);
   const char* base = static_cast<const char*>(bytes);
@@ -355,7 +366,7 @@
   check_size(lg_cur_size, lg_max_size);
   ensure_minimum_memory(size, 1 << preamble_longs);
 
-  frequent_items_sketch<T, W, H, E, S, A> sketch(lg_max_size, lg_cur_size);
+  frequent_items_sketch<T, W, H, E, S, A> sketch(lg_max_size, lg_cur_size, allocator);
   if (!is_empty) {
     uint32_t num_items;
     ptr += copy_from_mem(ptr, &num_items, sizeof(uint32_t));
@@ -368,10 +379,11 @@
 
     ensure_minimum_memory(size, ptr - base + (sizeof(W) * num_items));
     // batch deserialization with intermediate array of items and weights
-    typedef typename std::allocator_traits<A>::template rebind_alloc<W> AllocW;
-    std::vector<W, AllocW> weights(num_items);
+    using AllocW = typename std::allocator_traits<A>::template rebind_alloc<W>;
+    std::vector<W, AllocW> weights(num_items, 0, allocator);
     ptr += copy_from_mem(ptr, weights.data(), sizeof(W) * num_items);
-    std::unique_ptr<T, items_deleter> items(A().allocate(num_items), items_deleter(num_items, false));
+    A alloc(allocator);
+    std::unique_ptr<T, items_deleter> items(alloc.allocate(num_items), items_deleter(num_items, false, alloc));
     const size_t bytes_remaining = size - (ptr - base);
     ptr += S().deserialize(ptr, bytes_remaining, items.get(), num_items);
     items.get_deleter().set_destroy(true); // serde did not throw, so the items must be constructed
diff --git a/fi/include/reverse_purge_hash_map.hpp b/fi/include/reverse_purge_hash_map.hpp
index e3e7bfb..fc4cd83 100644
--- a/fi/include/reverse_purge_hash_map.hpp
+++ b/fi/include/reverse_purge_hash_map.hpp
@@ -39,33 +39,39 @@
   using AllocV = typename std::allocator_traits<A>::template rebind_alloc<V>;
   using AllocU16 = typename std::allocator_traits<A>::template rebind_alloc<uint16_t>;
 
-  reverse_purge_hash_map(uint8_t lg_size, uint8_t lg_max_size);
+  reverse_purge_hash_map(uint8_t lg_size, uint8_t lg_max_size, const A& allocator);
   reverse_purge_hash_map(const reverse_purge_hash_map& other);
   reverse_purge_hash_map(reverse_purge_hash_map&& other) noexcept;
   ~reverse_purge_hash_map();
   reverse_purge_hash_map& operator=(reverse_purge_hash_map other);
   reverse_purge_hash_map& operator=(reverse_purge_hash_map&& other);
-  V adjust_or_insert(const K& key, V value);
-  V adjust_or_insert(K&& key, V value);
+
+  template<typename FwdK>
+  V adjust_or_insert(FwdK&& key, V value);
+
   V get(const K& key) const;
   uint8_t get_lg_cur_size() const;
   uint8_t get_lg_max_size() const;
   uint32_t get_capacity() const;
   uint32_t get_num_active() const;
+  const A& get_allocator() const;
+
   class iterator;
   iterator begin() const;
   iterator end() const;
+
 private:
   static constexpr double LOAD_FACTOR = 0.75;
   static constexpr uint16_t DRIFT_LIMIT = 1024; // used only for stress testing
   static constexpr uint32_t MAX_SAMPLE_SIZE = 1024; // number of samples to compute approximate median during purge
 
-  uint8_t lg_cur_size;
-  uint8_t lg_max_size;
-  uint32_t num_active;
-  K* keys;
-  V* values;
-  uint16_t* states;
+  A allocator_;
+  uint8_t lg_cur_size_;
+  uint8_t lg_max_size_;
+  uint32_t num_active_;
+  K* keys_;
+  V* values_;
+  uint16_t* states_;
 
   inline bool is_active(uint32_t probe) const;
   void subtract_and_keep_positive_only(V amount);
@@ -83,8 +89,8 @@
   friend class reverse_purge_hash_map<K, V, H, E, A>;
   iterator& operator++() {
     ++count;
-    if (count < map->num_active) {
-      const uint32_t mask = (1 << map->lg_cur_size) - 1;
+    if (count < map->num_active_) {
+      const uint32_t mask = (1 << map->lg_cur_size_) - 1;
       do {
         index = (index + stride) & mask;
       } while (!map->is_active(index));
@@ -95,7 +101,7 @@
   bool operator==(const iterator& rhs) const { return count == rhs.count; }
   bool operator!=(const iterator& rhs) const { return count != rhs.count; }
   const std::pair<K&, V> operator*() const {
-    return std::pair<K&, V>(map->keys[index], map->values[index]);
+    return std::pair<K&, V>(map->keys_[index], map->values_[index]);
   }
 private:
   static constexpr double GOLDEN_RATIO_RECIPROCAL = 0.6180339887498949; // = (sqrt(5) - 1) / 2
@@ -104,7 +110,7 @@
   uint32_t count;
   uint32_t stride;
   iterator(const reverse_purge_hash_map<K, V, H, E, A>* map, uint32_t index, uint32_t count):
-    map(map), index(index), count(count), stride(static_cast<uint32_t>((1 << map->lg_cur_size) * GOLDEN_RATIO_RECIPROCAL) | 1) {}
+    map(map), index(index), count(count), stride(static_cast<uint32_t>((1 << map->lg_cur_size_) * GOLDEN_RATIO_RECIPROCAL) | 1) {}
 };
 
 } /* namespace datasketches */
diff --git a/fi/include/reverse_purge_hash_map_impl.hpp b/fi/include/reverse_purge_hash_map_impl.hpp
index 732d923..beccea4 100644
--- a/fi/include/reverse_purge_hash_map_impl.hpp
+++ b/fi/include/reverse_purge_hash_map_impl.hpp
@@ -34,113 +34,121 @@
 constexpr uint32_t reverse_purge_hash_map<K, V, H, E, A>::MAX_SAMPLE_SIZE;
 
 template<typename K, typename V, typename H, typename E, typename A>
-reverse_purge_hash_map<K, V, H, E, A>::reverse_purge_hash_map(uint8_t lg_cur_size, uint8_t lg_max_size):
-lg_cur_size(lg_cur_size),
-lg_max_size(lg_max_size),
-num_active(0),
-keys(A().allocate(1 << lg_cur_size)),
-values(AllocV().allocate(1 << lg_cur_size)),
-states(AllocU16().allocate(1 << lg_cur_size))
+reverse_purge_hash_map<K, V, H, E, A>::reverse_purge_hash_map(uint8_t lg_cur_size, uint8_t lg_max_size, const A& allocator):
+allocator_(allocator),
+lg_cur_size_(lg_cur_size),
+lg_max_size_(lg_max_size),
+num_active_(0),
+keys_(allocator_.allocate(1 << lg_cur_size)),
+values_(nullptr),
+states_(nullptr)
 {
-  std::fill(states, &states[1 << lg_cur_size], 0);
+  AllocV av(allocator_);
+  values_ = av.allocate(1 << lg_cur_size);
+  AllocU16 au16(allocator_);
+  states_ = au16.allocate(1 << lg_cur_size);
+  std::fill(states_, states_ + (1 << lg_cur_size), 0);
 }
 
 template<typename K, typename V, typename H, typename E, typename A>
 reverse_purge_hash_map<K, V, H, E, A>::reverse_purge_hash_map(const reverse_purge_hash_map<K, V, H, E, A>& other):
-lg_cur_size(other.lg_cur_size),
-lg_max_size(other.lg_max_size),
-num_active(other.num_active),
-keys(A().allocate(1 << lg_cur_size)),
-values(AllocV().allocate(1 << lg_cur_size)),
-states(AllocU16().allocate(1 << lg_cur_size))
+allocator_(other.allocator_),
+lg_cur_size_(other.lg_cur_size_),
+lg_max_size_(other.lg_max_size_),
+num_active_(other.num_active_),
+keys_(allocator_.allocate(1 << lg_cur_size_)),
+values_(nullptr),
+states_(nullptr)
 {
-  const uint32_t size = 1 << lg_cur_size;
-  if (num_active > 0) {
-    auto num = num_active;
+  AllocV av(allocator_);
+  values_ = av.allocate(1 << lg_cur_size_);
+  AllocU16 au16(allocator_);
+  states_ = au16.allocate(1 << lg_cur_size_);
+  const uint32_t size = 1 << lg_cur_size_;
+  if (num_active_ > 0) {
+    auto num = num_active_;
     for (uint32_t i = 0; i < size; i++) {
-      if (other.states[i] > 0) {
-        new (&keys[i]) K(other.keys[i]);
-        values[i] = other.values[i];
+      if (other.states_[i] > 0) {
+        new (&keys_[i]) K(other.keys_[i]);
+        values_[i] = other.values_[i];
       }
       if (--num == 0) break;
     }
   }
-  std::copy(&other.states[0], &other.states[size], states);
+  std::copy(other.states_, other.states_ + size, states_);
 }
 
 template<typename K, typename V, typename H, typename E, typename A>
 reverse_purge_hash_map<K, V, H, E, A>::reverse_purge_hash_map(reverse_purge_hash_map<K, V, H, E, A>&& other) noexcept:
-lg_cur_size(other.lg_cur_size),
-lg_max_size(other.lg_max_size),
-num_active(other.num_active),
-keys(nullptr),
-values(nullptr),
-states(nullptr)
+allocator_(std::move(other.allocator_)),
+lg_cur_size_(other.lg_cur_size_),
+lg_max_size_(other.lg_max_size_),
+num_active_(other.num_active_),
+keys_(nullptr),
+values_(nullptr),
+states_(nullptr)
 {
-  std::swap(keys, other.keys);
-  std::swap(values, other.values);
-  std::swap(states, other.states);
-  other.num_active = 0;
+  std::swap(keys_, other.keys_);
+  std::swap(values_, other.values_);
+  std::swap(states_, other.states_);
+  other.num_active_ = 0;
 }
 
 template<typename K, typename V, typename H, typename E, typename A>
 reverse_purge_hash_map<K, V, H, E, A>::~reverse_purge_hash_map() {
-  const uint32_t size = 1 << lg_cur_size;
-  if (num_active > 0) {
+  const uint32_t size = 1 << lg_cur_size_;
+  if (num_active_ > 0) {
     for (uint32_t i = 0; i < size; i++) {
       if (is_active(i)) {
-        keys[i].~K();
-        if (--num_active == 0) break;
+        keys_[i].~K();
+        if (--num_active_ == 0) break;
       }
     }
   }
-  if (keys != nullptr)
-    A().deallocate(keys, size);
-  if (values != nullptr)
-    AllocV().deallocate(values, size);
-  if (states != nullptr)
-    AllocU16().deallocate(states, size);
+  if (keys_ != nullptr) {
+    allocator_.deallocate(keys_, size);
+  }
+  if (values_ != nullptr) {
+    AllocV av(allocator_);
+    av.deallocate(values_, size);
+  }
+  if (states_ != nullptr) {
+    AllocU16 au16(allocator_);
+    au16.deallocate(states_, size);
+  }
 }
 
 template<typename K, typename V, typename H, typename E, typename A>
 reverse_purge_hash_map<K, V, H, E, A>& reverse_purge_hash_map<K, V, H, E, A>::operator=(reverse_purge_hash_map<K, V, H, E, A> other) {
-  std::swap(lg_cur_size, other.lg_cur_size);
-  std::swap(lg_max_size, other.lg_max_size);
-  std::swap(num_active, other.num_active);
-  std::swap(keys, other.keys);
-  std::swap(values, other.values);
-  std::swap(states, other.states);
+  std::swap(allocator_, other.allocator_);
+  std::swap(lg_cur_size_, other.lg_cur_size_);
+  std::swap(lg_max_size_, other.lg_max_size_);
+  std::swap(num_active_, other.num_active_);
+  std::swap(keys_, other.keys_);
+  std::swap(values_, other.values_);
+  std::swap(states_, other.states_);
   return *this;
 }
 
 template<typename K, typename V, typename H, typename E, typename A>
 reverse_purge_hash_map<K, V, H, E, A>& reverse_purge_hash_map<K, V, H, E, A>::operator=(reverse_purge_hash_map<K, V, H, E, A>&& other) {
-  std::swap(lg_cur_size, other.lg_cur_size);
-  std::swap(lg_max_size, other.lg_max_size);
-  std::swap(num_active, other.num_active);
-  std::swap(keys, other.keys);
-  std::swap(values, other.values);
-  std::swap(states, other.states);
+  std::swap(allocator_, other.allocator_);
+  std::swap(lg_cur_size_, other.lg_cur_size_);
+  std::swap(lg_max_size_, other.lg_max_size_);
+  std::swap(num_active_, other.num_active_);
+  std::swap(keys_, other.keys_);
+  std::swap(values_, other.values_);
+  std::swap(states_, other.states_);
   return *this;
 }
 
 template<typename K, typename V, typename H, typename E, typename A>
-V reverse_purge_hash_map<K, V, H, E, A>::adjust_or_insert(const K& key, V value) {
-  const uint32_t num_active_before = num_active;
+template<typename FwdK>
+V reverse_purge_hash_map<K, V, H, E, A>::adjust_or_insert(FwdK&& key, V value) {
+  const uint32_t num_active_before = num_active_;
   const uint32_t index = internal_adjust_or_insert(key, value);
-  if (num_active > num_active_before) {
-    new (&keys[index]) K(key);
-    return resize_or_purge_if_needed();
-  }
-  return 0;
-}
-
-template<typename K, typename V, typename H, typename E, typename A>
-V reverse_purge_hash_map<K, V, H, E, A>::adjust_or_insert(K&& key, V value) {
-  const uint32_t num_active_before = num_active;
-  const uint32_t index = internal_adjust_or_insert(key, value);
-  if (num_active > num_active_before) {
-    new (&keys[index]) K(std::move(key));
+  if (num_active_ > num_active_before) {
+    new (&keys_[index]) K(std::forward<FwdK>(key));
     return resize_or_purge_if_needed();
   }
   return 0;
@@ -148,10 +156,10 @@
 
 template<typename K, typename V, typename H, typename E, typename A>
 V reverse_purge_hash_map<K, V, H, E, A>::get(const K& key) const {
-  const uint32_t mask = (1 << lg_cur_size) - 1;
+  const uint32_t mask = (1 << lg_cur_size_) - 1;
   uint32_t probe = fmix64(H()(key)) & mask;
   while (is_active(probe)) {
-    if (E()(keys[probe], key)) return values[probe];
+    if (E()(keys_[probe], key)) return values_[probe];
     probe = (probe + 1) & mask;
   }
   return 0;
@@ -159,27 +167,32 @@
 
 template<typename K, typename V, typename H, typename E, typename A>
 uint8_t reverse_purge_hash_map<K, V, H, E, A>::get_lg_cur_size() const {
-  return lg_cur_size;
+  return lg_cur_size_;
 }
 
 template<typename K, typename V, typename H, typename E, typename A>
 uint8_t reverse_purge_hash_map<K, V, H, E, A>::get_lg_max_size() const {
-  return lg_max_size;
+  return lg_max_size_;
 }
 
 template<typename K, typename V, typename H, typename E, typename A>
 uint32_t reverse_purge_hash_map<K, V, H, E, A>::get_capacity() const {
-  return (1 << lg_cur_size) * LOAD_FACTOR;
+  return (1 << lg_cur_size_) * LOAD_FACTOR;
 }
 
 template<typename K, typename V, typename H, typename E, typename A>
 uint32_t reverse_purge_hash_map<K, V, H, E, A>::get_num_active() const {
-  return num_active;
+  return num_active_;
+}
+
+template<typename K, typename V, typename H, typename E, typename A>
+const A& reverse_purge_hash_map<K, V, H, E, A>::get_allocator() const {
+  return allocator_;
 }
 
 template<typename K, typename V, typename H, typename E, typename A>
 typename reverse_purge_hash_map<K, V, H, E, A>::iterator reverse_purge_hash_map<K, V, H, E, A>::begin() const {
-  const uint32_t size = 1 << lg_cur_size;
+  const uint32_t size = 1 << lg_cur_size_;
   uint32_t i = 0;
   while (i < size && !is_active(i)) i++;
   return reverse_purge_hash_map<K, V, H, E, A>::iterator(this, i, 0);
@@ -187,40 +200,40 @@
 
 template<typename K, typename V, typename H, typename E, typename A>
 typename reverse_purge_hash_map<K, V, H, E, A>::iterator reverse_purge_hash_map<K, V, H, E, A>::end() const {
-  return reverse_purge_hash_map<K, V, H, E, A>::iterator(this, 1 << lg_cur_size, num_active);
+  return reverse_purge_hash_map<K, V, H, E, A>::iterator(this, 1 << lg_cur_size_, num_active_);
 }
 
 template<typename K, typename V, typename H, typename E, typename A>
 bool reverse_purge_hash_map<K, V, H, E, A>::is_active(uint32_t index) const {
-  return states[index] > 0;
+  return states_[index] > 0;
 }
 
 template<typename K, typename V, typename H, typename E, typename A>
 void reverse_purge_hash_map<K, V, H, E, A>::subtract_and_keep_positive_only(V amount) {
   // starting from the back, find the first empty cell,
   // which establishes the high end of a cluster.
-  uint32_t first_probe = (1 << lg_cur_size) - 1;
+  uint32_t first_probe = (1 << lg_cur_size_) - 1;
   while (is_active(first_probe)) first_probe--;
   // when we find the next non-empty cell, we know we are at the high end of a cluster
   // work towards the front, delete any non-positive entries.
   for (uint32_t probe = first_probe; probe-- > 0;) {
     if (is_active(probe)) {
-      if (values[probe] <= amount) {
+      if (values_[probe] <= amount) {
         hash_delete(probe); // does the work of deletion and moving higher items towards the front
-        num_active--;
+        num_active_--;
       } else {
-        values[probe] -= amount;
+        values_[probe] -= amount;
       }
     }
   }
   // now work on the first cluster that was skipped
-  for (uint32_t probe = (1 << lg_cur_size); probe-- > first_probe;) {
+  for (uint32_t probe = (1 << lg_cur_size_); probe-- > first_probe;) {
     if (is_active(probe)) {
-      if (values[probe] <= amount) {
+      if (values_[probe] <= amount) {
         hash_delete(probe);
-        num_active--;
+        num_active_--;
       } else {
-        values[probe] -= amount;
+        values_[probe] -= amount;
       }
     }
   }
@@ -231,20 +244,20 @@
   // Looks ahead in the table to search for another
   // item to move to this location
   // if none are found, the status is changed
-  states[delete_index] = 0; // mark as empty
-  keys[delete_index].~K();
+  states_[delete_index] = 0; // mark as empty
+  keys_[delete_index].~K();
   uint32_t drift = 1;
-  const uint32_t mask = (1 << lg_cur_size) - 1;
+  const uint32_t mask = (1 << lg_cur_size_) - 1;
   uint32_t probe = (delete_index + drift) & mask; // map length must be a power of 2
   // advance until we find a free location replacing locations as needed
   while (is_active(probe)) {
-    if (states[probe] > drift) {
+    if (states_[probe] > drift) {
       // move current element
-      new (&keys[delete_index]) K(std::move(keys[probe]));
-      values[delete_index] = values[probe];
-      states[delete_index] = states[probe] - drift;
-      states[probe] = 0; // mark as empty
-      keys[probe].~K();
+      new (&keys_[delete_index]) K(std::move(keys_[probe]));
+      values_[delete_index] = values_[probe];
+      states_[delete_index] = states_[probe] - drift;
+      states_[probe] = 0; // mark as empty
+      keys_[probe].~K();
       drift = 0;
       delete_index = probe;
     }
@@ -257,13 +270,13 @@
 
 template<typename K, typename V, typename H, typename E, typename A>
 uint32_t reverse_purge_hash_map<K, V, H, E, A>::internal_adjust_or_insert(const K& key, V value) {
-  const uint32_t mask = (1 << lg_cur_size) - 1;
+  const uint32_t mask = (1 << lg_cur_size_) - 1;
   uint32_t index = fmix64(H()(key)) & mask;
   uint16_t drift = 1;
   while (is_active(index)) {
-    if (E()(keys[index], key)) {
+    if (E()(keys_[index], key)) {
       // adjusting the value of an existing key
-      values[index] += value;
+      values_[index] += value;
       return index;
     }
     index = (index + 1) & mask;
@@ -272,23 +285,23 @@
     if (drift >= DRIFT_LIMIT) throw std::logic_error("drift limit reached");
   }
   // adding the key and value to the table
-  if (num_active > get_capacity()) {
-    throw std::logic_error("num_active " + std::to_string(num_active) + " > capacity " + std::to_string(get_capacity()));
+  if (num_active_ > get_capacity()) {
+    throw std::logic_error("num_active " + std::to_string(num_active_) + " > capacity " + std::to_string(get_capacity()));
   }
-  values[index] = value;
-  states[index] = drift;
-  num_active++;
+  values_[index] = value;
+  states_[index] = drift;
+  num_active_++;
   return index;
 }
 
 template<typename K, typename V, typename H, typename E, typename A>
 V reverse_purge_hash_map<K, V, H, E, A>::resize_or_purge_if_needed() {
-  if (num_active > get_capacity()) {
-    if (lg_cur_size < lg_max_size) { // can grow
-      resize(lg_cur_size + 1);
+  if (num_active_ > get_capacity()) {
+    if (lg_cur_size_ < lg_max_size_) { // can grow
+      resize(lg_cur_size_ + 1);
     } else { // at target size, must purge
       const V offset = purge();
-      if (num_active > get_capacity()) {
+      if (num_active_ > get_capacity()) {
         throw std::logic_error("purge did not reduce number of active items");
       }
       return offset;
@@ -299,43 +312,46 @@
 
 template<typename K, typename V, typename H, typename E, typename A>
 void reverse_purge_hash_map<K, V, H, E, A>::resize(uint8_t lg_new_size) {
-  const uint32_t old_size = 1 << lg_cur_size;
-  K* old_keys = keys;
-  V* old_values = values;
-  uint16_t* old_states = states;
+  const uint32_t old_size = 1 << lg_cur_size_;
+  K* old_keys = keys_;
+  V* old_values = values_;
+  uint16_t* old_states = states_;
   const uint32_t new_size = 1 << lg_new_size;
-  keys = A().allocate(new_size);
-  values = AllocV().allocate(new_size);
-  states = AllocU16().allocate(new_size);
-  std::fill(states, &states[new_size], 0);
-  num_active = 0;
-  lg_cur_size = lg_new_size;
+  keys_ = allocator_.allocate(new_size);
+  AllocV av(allocator_);
+  values_ = av.allocate(new_size);
+  AllocU16 au16(allocator_);
+  states_ = au16.allocate(new_size);
+  std::fill(states_, states_ + new_size, 0);
+  num_active_ = 0;
+  lg_cur_size_ = lg_new_size;
   for (uint32_t i = 0; i < old_size; i++) {
     if (old_states[i] > 0) {
       adjust_or_insert(std::move(old_keys[i]), old_values[i]);
       old_keys[i].~K();
     }
   }
-  A().deallocate(old_keys, old_size);
-  AllocV().deallocate(old_values, old_size);
-  AllocU16().deallocate(old_states, old_size);
+  allocator_.deallocate(old_keys, old_size);
+  av.deallocate(old_values, old_size);
+  au16.deallocate(old_states, old_size);
 }
 
 template<typename K, typename V, typename H, typename E, typename A>
 V reverse_purge_hash_map<K, V, H, E, A>::purge() {
-  const uint32_t limit = std::min(MAX_SAMPLE_SIZE, num_active);
+  const uint32_t limit = std::min(MAX_SAMPLE_SIZE, num_active_);
   uint32_t num_samples = 0;
   uint32_t i = 0;
-  V* samples = AllocV().allocate(limit);
+  AllocV av(allocator_);
+  V* samples = av.allocate(limit);
   while (num_samples < limit) {
     if (is_active(i)) {
-      samples[num_samples++] = values[i];
+      samples[num_samples++] = values_[i];
     }
     i++;
   }
-  std::nth_element(&samples[0], &samples[num_samples / 2], &samples[num_samples]);
+  std::nth_element(samples, samples+ (num_samples / 2), samples + num_samples);
   const V median = samples[num_samples / 2];
-  AllocV().deallocate(samples, limit);
+  av.deallocate(samples, limit);
   subtract_and_keep_positive_only(median);
   return median;
 }
diff --git a/fi/test/reverse_purge_hash_map_test.cpp b/fi/test/reverse_purge_hash_map_test.cpp
index e372171..a74345c 100644
--- a/fi/test/reverse_purge_hash_map_test.cpp
+++ b/fi/test/reverse_purge_hash_map_test.cpp
@@ -24,20 +24,20 @@
 namespace datasketches {
 
 TEST_CASE("reverse purge hash map: empty", "[frequent_items_sketch]") {
-  reverse_purge_hash_map<int> map(3, 3);
+  reverse_purge_hash_map<int> map(3, 3, std::allocator<int>());
   REQUIRE(map.get_num_active() == 0);
   REQUIRE(map.get_lg_cur_size() == 3);  // static_cast<uint8_t>(3)
 }
 
 TEST_CASE("reverse purge hash map: one item", "[frequent_items_sketch]") {
-  reverse_purge_hash_map<int> map(3, 3);
+  reverse_purge_hash_map<int> map(3, 3, std::allocator<int>());
   map.adjust_or_insert(1, 1);
   REQUIRE(map.get_num_active() == 1);
   REQUIRE(map.get(1) == 1);
 }
 
 TEST_CASE("reverse purge hash map: iterator", "[frequent_items_sketch]") {
-  reverse_purge_hash_map<int> map(3, 4);
+  reverse_purge_hash_map<int> map(3, 4, std::allocator<int>());
   for (int i = 0; i < 11; i++) map.adjust_or_insert(i, 1); // this should fit with no purge
   int sum = 0;
   for (auto &it: map) sum += it.second;
diff --git a/kll/include/kll_quantile_calculator.hpp b/kll/include/kll_quantile_calculator.hpp
index bc60f26..5114399 100644
--- a/kll/include/kll_quantile_calculator.hpp
+++ b/kll/include/kll_quantile_calculator.hpp
@@ -28,7 +28,7 @@
 class kll_quantile_calculator {
   public:
     // assumes that all levels are sorted including level 0
-    kll_quantile_calculator(const T* items, const uint32_t* levels, uint8_t num_levels, uint64_t n);
+    kll_quantile_calculator(const T* items, const uint32_t* levels, uint8_t num_levels, uint64_t n, const A& allocator);
     T get_quantile(double fraction) const;
 
   private:
diff --git a/kll/include/kll_quantile_calculator_impl.hpp b/kll/include/kll_quantile_calculator_impl.hpp
index f580819..23efa4d 100644
--- a/kll/include/kll_quantile_calculator_impl.hpp
+++ b/kll/include/kll_quantile_calculator_impl.hpp
@@ -29,8 +29,8 @@
 namespace datasketches {
 
 template <typename T, typename C, typename A>
-kll_quantile_calculator<T, C, A>::kll_quantile_calculator(const T* items, const uint32_t* levels, uint8_t num_levels, uint64_t n):
-n_(n), levels_(num_levels + 1)
+kll_quantile_calculator<T, C, A>::kll_quantile_calculator(const T* items, const uint32_t* levels, uint8_t num_levels, uint64_t n, const A& allocator):
+n_(n), levels_(num_levels + 1, 0, allocator), entries_(allocator)
 {
   const uint32_t num_items = levels[num_levels] - levels[0];
   entries_.reserve(num_items);
@@ -116,7 +116,7 @@
 template <typename T, typename C, typename A>
 void kll_quantile_calculator<T, C, A>::merge_sorted_blocks(Container& entries, const uint32_t* levels, uint8_t num_levels, uint32_t num_items) {
   if (num_levels == 1) return;
-  Container temporary;
+  Container temporary(entries.get_allocator());
   temporary.reserve(num_items);
   merge_sorted_blocks_direct(entries, temporary, levels, 0, num_levels);
 }
diff --git a/kll/include/kll_sketch.hpp b/kll/include/kll_sketch.hpp
index a4530c9..af36ceb 100644
--- a/kll/include/kll_sketch.hpp
+++ b/kll/include/kll_sketch.hpp
@@ -161,7 +161,7 @@
     static const uint16_t MIN_K = DEFAULT_M;
     static const uint16_t MAX_K = (1 << 16) - 1;
 
-    explicit kll_sketch(uint16_t k = DEFAULT_K);
+    explicit kll_sketch(uint16_t k = DEFAULT_K, const A& allocator = A());
     kll_sketch(const kll_sketch& other);
     kll_sketch(kll_sketch&& other) noexcept;
     ~kll_sketch();
@@ -401,7 +401,7 @@
      * @param is input stream
      * @return an instance of a sketch
      */
-    static kll_sketch deserialize(std::istream& is);
+    static kll_sketch<T, C, S, A> deserialize(std::istream& is, const A& allocator = A());
 
     /**
      * This method deserializes a sketch from a given array of bytes.
@@ -409,7 +409,7 @@
      * @param size the size of the array
      * @return an instance of a sketch
      */
-    static kll_sketch deserialize(const void* bytes, size_t size);
+    static kll_sketch<T, C, S, A> deserialize(const void* bytes, size_t size, const A& allocator = A());
 
     /*
      * Gets the normalized rank error given k and pmf.
@@ -461,6 +461,7 @@
     static const uint8_t PREAMBLE_INTS_SHORT = 2; // for empty and single item
     static const uint8_t PREAMBLE_INTS_FULL = 5;
 
+    A allocator_;
     uint16_t k_;
     uint8_t m_; // minimum buffer "width"
     uint16_t min_k_; // for error estimation after merging with different k
diff --git a/kll/include/kll_sketch_impl.hpp b/kll/include/kll_sketch_impl.hpp
index f0c5ff3..90d3be7 100644
--- a/kll/include/kll_sketch_impl.hpp
+++ b/kll/include/kll_sketch_impl.hpp
@@ -30,13 +30,14 @@
 namespace datasketches {
 
 template<typename T, typename C, typename S, typename A>
-kll_sketch<T, C, S, A>::kll_sketch(uint16_t k):
+kll_sketch<T, C, S, A>::kll_sketch(uint16_t k, const A& allocator):
+allocator_(allocator),
 k_(k),
 m_(DEFAULT_M),
 min_k_(k),
 n_(0),
 num_levels_(1),
-levels_(2),
+levels_(2, 0, allocator),
 items_(nullptr),
 items_size_(k_),
 min_value_(nullptr),
@@ -47,11 +48,12 @@
     throw std::invalid_argument("K must be >= " + std::to_string(MIN_K) + " and <= " + std::to_string(MAX_K) + ": " + std::to_string(k));
   }
   levels_[0] = levels_[1] = k;
-  items_ = A().allocate(items_size_);
+  items_ = allocator_.allocate(items_size_);
 }
 
 template<typename T, typename C, typename S, typename A>
 kll_sketch<T, C, S, A>::kll_sketch(const kll_sketch& other):
+allocator_(other.allocator_),
 k_(other.k_),
 m_(other.m_),
 min_k_(other.min_k_),
@@ -64,14 +66,15 @@
 max_value_(nullptr),
 is_level_zero_sorted_(other.is_level_zero_sorted_)
 {
-  items_ = A().allocate(items_size_);
+  items_ = allocator_.allocate(items_size_);
   std::copy(&other.items_[levels_[0]], &other.items_[levels_[num_levels_]], &items_[levels_[0]]);
-  if (other.min_value_ != nullptr) min_value_ = new (A().allocate(1)) T(*other.min_value_);
-  if (other.max_value_ != nullptr) max_value_ = new (A().allocate(1)) T(*other.max_value_);
+  if (other.min_value_ != nullptr) min_value_ = new (allocator_.allocate(1)) T(*other.min_value_);
+  if (other.max_value_ != nullptr) max_value_ = new (allocator_.allocate(1)) T(*other.max_value_);
 }
 
 template<typename T, typename C, typename S, typename A>
 kll_sketch<T, C, S, A>::kll_sketch(kll_sketch&& other) noexcept:
+allocator_(std::move(other.allocator_)),
 k_(other.k_),
 m_(other.m_),
 min_k_(other.min_k_),
@@ -91,7 +94,8 @@
 
 template<typename T, typename C, typename S, typename A>
 kll_sketch<T, C, S, A>& kll_sketch<T, C, S, A>::operator=(const kll_sketch& other) {
-  kll_sketch copy(other);
+  kll_sketch<T, C, S, A> copy(other);
+  std::swap(allocator_, copy.allocator_);
   std::swap(k_, copy.k_);
   std::swap(m_, copy.m_);
   std::swap(min_k_, copy.min_k_);
@@ -108,6 +112,7 @@
 
 template<typename T, typename C, typename S, typename A>
 kll_sketch<T, C, S, A>& kll_sketch<T, C, S, A>::operator=(kll_sketch&& other) {
+  std::swap(allocator_, other.allocator_);
   std::swap(k_, other.k_);
   std::swap(m_, other.m_);
   std::swap(min_k_, other.min_k_);
@@ -128,15 +133,15 @@
     const uint32_t begin = levels_[0];
     const uint32_t end = levels_[num_levels_];
     for (uint32_t i = begin; i < end; i++) items_[i].~T();
-    A().deallocate(items_, items_size_);
+    allocator_.deallocate(items_, items_size_);
   }
   if (min_value_ != nullptr) {
     min_value_->~T();
-    A().deallocate(min_value_, 1);
+    allocator_.deallocate(min_value_, 1);
   }
   if (max_value_ != nullptr) {
     max_value_->~T();
-    A().deallocate(max_value_, 1);
+    allocator_.deallocate(max_value_, 1);
   }
 }
 
@@ -159,8 +164,8 @@
 template<typename T, typename C, typename S, typename A>
 void kll_sketch<T, C, S, A>::update_min_max(const T& value) {
   if (is_empty()) {
-    min_value_ = new (A().allocate(1)) T(value);
-    max_value_ = new (A().allocate(1)) T(value);
+    min_value_ = new (allocator_.allocate(1)) T(value);
+    max_value_ = new (allocator_.allocate(1)) T(value);
   } else {
     if (C()(value, *min_value_)) *min_value_ = value;
     if (C()(*max_value_, value)) *max_value_ = value;
@@ -182,8 +187,8 @@
     throw std::invalid_argument("incompatible M: " + std::to_string(m_) + " and " + std::to_string(other.m_));
   }
   if (is_empty()) {
-    min_value_ = new (A().allocate(1)) T(*other.min_value_);
-    max_value_ = new (A().allocate(1)) T(*other.max_value_);
+    min_value_ = new (allocator_.allocate(1)) T(*other.min_value_);
+    max_value_ = new (allocator_.allocate(1)) T(*other.max_value_);
   } else {
     if (C()(*other.min_value_, *min_value_)) *min_value_ = *other.min_value_;
     if (C()(*max_value_, *other.max_value_)) *max_value_ = *other.max_value_;
@@ -206,8 +211,8 @@
     throw std::invalid_argument("incompatible M: " + std::to_string(m_) + " and " + std::to_string(other.m_));
   }
   if (is_empty()) {
-    min_value_ = new (A().allocate(1)) T(std::move(*other.min_value_));
-    max_value_ = new (A().allocate(1)) T(std::move(*other.max_value_));
+    min_value_ = new (allocator_.allocate(1)) T(std::move(*other.min_value_));
+    max_value_ = new (allocator_.allocate(1)) T(std::move(*other.max_value_));
   } else {
     if (C()(*other.min_value_, *min_value_)) *min_value_ = std::move(*other.min_value_);
     if (C()(*max_value_, *other.max_value_)) *max_value_ = std::move(*other.max_value_);
@@ -270,8 +275,7 @@
 
 template<typename T, typename C, typename S, typename A>
 std::vector<T, A> kll_sketch<T, C, S, A>::get_quantiles(const double* fractions, uint32_t size) const {
-  std::vector<T, A> quantiles;
-  quantiles.reserve(size);
+  std::vector<T, A> quantiles(allocator_);
   if (is_empty()) return quantiles;
   std::unique_ptr<kll_quantile_calculator<T, C, A>, std::function<void(kll_quantile_calculator<T, C, A>*)>> quantile_calculator;
   quantiles.reserve(size);
@@ -295,11 +299,11 @@
 
 template<typename T, typename C, typename S, typename A>
 std::vector<T, A> kll_sketch<T, C, S, A>::get_quantiles(size_t num) const {
-  if (is_empty()) return std::vector<T, A>();
+  if (is_empty()) return std::vector<T, A>(allocator_);
   if (num == 0) {
     throw std::invalid_argument("num must be > 0");
   }
-  std::vector<double> fractions(num);
+  vector_d<A> fractions(num, 0, allocator_);
   fractions[0] = 0.0;
   for (size_t i = 1; i < num; i++) {
     fractions[i] = static_cast<double>(i) / (num - 1);
@@ -411,7 +415,7 @@
 vector_u8<A> kll_sketch<T, C, S, A>::serialize(unsigned header_size_bytes) const {
   const bool is_single_item = n_ == 1;
   const size_t size = header_size_bytes + get_serialized_size_bytes();
-  vector_u8<A> bytes(size);
+  vector_u8<A> bytes(size, 0, allocator_);
   uint8_t* ptr = bytes.data() + header_size_bytes;
   const uint8_t* end_ptr = ptr + size;
   const uint8_t preamble_ints(is_empty() || is_single_item ? PREAMBLE_INTS_SHORT : PREAMBLE_INTS_FULL);
@@ -449,7 +453,7 @@
 }
 
 template<typename T, typename C, typename S, typename A>
-kll_sketch<T, C, S, A> kll_sketch<T, C, S, A>::deserialize(std::istream& is) {
+kll_sketch<T, C, S, A> kll_sketch<T, C, S, A>::deserialize(std::istream& is, const A& allocator) {
   uint8_t preamble_ints;
   is.read((char*)&preamble_ints, sizeof(preamble_ints));
   uint8_t serial_version;
@@ -472,7 +476,7 @@
 
   if (!is.good()) throw std::runtime_error("error reading from std::istream");
   const bool is_empty(flags_byte & (1 << flags::IS_EMPTY));
-  if (is_empty) return kll_sketch(k);
+  if (is_empty) return kll_sketch(k, allocator);
 
   uint64_t n;
   uint16_t min_k;
@@ -488,7 +492,7 @@
     is.read((char*)&num_levels, sizeof(num_levels));
     is.read((char*)&unused, sizeof(unused));
   }
-  vector_u32<A> levels(num_levels + 1);
+  vector_u32<A> levels(num_levels + 1, 0, allocator);
   const uint32_t capacity(kll_helper::compute_total_capacity(k, m, num_levels));
   if (is_single_item) {
     levels[0] = capacity - 1;
@@ -497,41 +501,43 @@
     is.read((char*)levels.data(), sizeof(levels[0]) * num_levels);
   }
   levels[num_levels] = capacity;
-  auto item_buffer_deleter = [](T* ptr) { A().deallocate(ptr, 1); };
-  std::unique_ptr<T, decltype(item_buffer_deleter)> min_value_buffer(A().allocate(1), item_buffer_deleter);
-  std::unique_ptr<T, decltype(item_buffer_deleter)> max_value_buffer(A().allocate(1), item_buffer_deleter);
-  std::unique_ptr<T, item_deleter> min_value;
-  std::unique_ptr<T, item_deleter> max_value;
+  A alloc(allocator);
+  auto item_buffer_deleter = [&alloc](T* ptr) { alloc.deallocate(ptr, 1); };
+  std::unique_ptr<T, decltype(item_buffer_deleter)> min_value_buffer(alloc.allocate(1), item_buffer_deleter);
+  std::unique_ptr<T, decltype(item_buffer_deleter)> max_value_buffer(alloc.allocate(1), item_buffer_deleter);
+  std::unique_ptr<T, item_deleter> min_value(nullptr, item_deleter(allocator));
+  std::unique_ptr<T, item_deleter> max_value(nullptr, item_deleter(allocator));
   if (!is_single_item) {
     S().deserialize(is, min_value_buffer.get(), 1);
     // serde call did not throw, repackage with destrtuctor
-    min_value = std::unique_ptr<T, item_deleter>(min_value_buffer.release(), item_deleter());
+    min_value = std::unique_ptr<T, item_deleter>(min_value_buffer.release(), item_deleter(allocator));
     S().deserialize(is, max_value_buffer.get(), 1);
     // serde call did not throw, repackage with destrtuctor
-    max_value = std::unique_ptr<T, item_deleter>(max_value_buffer.release(), item_deleter());
+    max_value = std::unique_ptr<T, item_deleter>(max_value_buffer.release(), item_deleter(allocator));
   }
-  auto items_buffer_deleter = [capacity](T* ptr) { A().deallocate(ptr, capacity); };
-  std::unique_ptr<T, decltype(items_buffer_deleter)> items_buffer(A().allocate(capacity), items_buffer_deleter);
+  auto items_buffer_deleter = [capacity, &alloc](T* ptr) { alloc.deallocate(ptr, capacity); };
+  std::unique_ptr<T, decltype(items_buffer_deleter)> items_buffer(alloc.allocate(capacity), items_buffer_deleter);
   const auto num_items = levels[num_levels] - levels[0];
   S().deserialize(is, &items_buffer.get()[levels[0]], num_items);
   // serde call did not throw, repackage with destrtuctors
-  std::unique_ptr<T, items_deleter> items(items_buffer.release(), items_deleter(levels[0], capacity));
+  std::unique_ptr<T, items_deleter> items(items_buffer.release(), items_deleter(levels[0], capacity, allocator));
   const bool is_level_zero_sorted = (flags_byte & (1 << flags::IS_LEVEL_ZERO_SORTED)) > 0;
   if (is_single_item) {
     new (min_value_buffer.get()) T(items.get()[levels[0]]);
     // copy did not throw, repackage with destrtuctor
-    min_value = std::unique_ptr<T, item_deleter>(min_value_buffer.release(), item_deleter());
+    min_value = std::unique_ptr<T, item_deleter>(min_value_buffer.release(), item_deleter(allocator));
     new (max_value_buffer.get()) T(items.get()[levels[0]]);
     // copy did not throw, repackage with destrtuctor
-    max_value = std::unique_ptr<T, item_deleter>(max_value_buffer.release(), item_deleter());
+    max_value = std::unique_ptr<T, item_deleter>(max_value_buffer.release(), item_deleter(allocator));
   }
-  if (!is.good()) throw std::runtime_error("error reading from std::istream");
+  if (!is.good())
+    throw std::runtime_error("error reading from std::istream");
   return kll_sketch(k, min_k, n, num_levels, std::move(levels), std::move(items), capacity,
       std::move(min_value), std::move(max_value), is_level_zero_sorted);
 }
 
 template<typename T, typename C, typename S, typename A>
-kll_sketch<T, C, S, A> kll_sketch<T, C, S, A>::deserialize(const void* bytes, size_t size) {
+kll_sketch<T, C, S, A> kll_sketch<T, C, S, A>::deserialize(const void* bytes, size_t size, const A& allocator) {
   ensure_minimum_memory(size, 8);
   const char* ptr = static_cast<const char*>(bytes);
   uint8_t preamble_ints;
@@ -555,7 +561,7 @@
   ensure_minimum_memory(size, 1 << preamble_ints);
 
   const bool is_empty(flags_byte & (1 << flags::IS_EMPTY));
-  if (is_empty) return kll_sketch<T, C, S, A>(k);
+  if (is_empty) return kll_sketch<T, C, S, A>(k, allocator);
 
   uint64_t n;
   uint16_t min_k;
@@ -572,7 +578,7 @@
     ptr += copy_from_mem(ptr, &num_levels, sizeof(num_levels));
     ptr++; // skip unused byte
   }
-  vector_u32<A> levels(num_levels + 1);
+  vector_u32<A> levels(num_levels + 1, 0, allocator);
   const uint32_t capacity(kll_helper::compute_total_capacity(k, m, num_levels));
   if (is_single_item) {
     levels[0] = capacity - 1;
@@ -581,35 +587,36 @@
     ptr += copy_from_mem(ptr, levels.data(), sizeof(levels[0]) * num_levels);
   }
   levels[num_levels] = capacity;
-  auto item_buffer_deleter = [](T* ptr) { A().deallocate(ptr, 1); };
-  std::unique_ptr<T, decltype(item_buffer_deleter)> min_value_buffer(A().allocate(1), item_buffer_deleter);
-  std::unique_ptr<T, decltype(item_buffer_deleter)> max_value_buffer(A().allocate(1), item_buffer_deleter);
-  std::unique_ptr<T, item_deleter> min_value;
-  std::unique_ptr<T, item_deleter> max_value;
+  A alloc(allocator);
+  auto item_buffer_deleter = [&alloc](T* ptr) { alloc.deallocate(ptr, 1); };
+  std::unique_ptr<T, decltype(item_buffer_deleter)> min_value_buffer(alloc.allocate(1), item_buffer_deleter);
+  std::unique_ptr<T, decltype(item_buffer_deleter)> max_value_buffer(alloc.allocate(1), item_buffer_deleter);
+  std::unique_ptr<T, item_deleter> min_value(nullptr, item_deleter(allocator));
+  std::unique_ptr<T, item_deleter> max_value(nullptr, item_deleter(allocator));
   if (!is_single_item) {
     ptr += S().deserialize(ptr, end_ptr - ptr, min_value_buffer.get(), 1);
     // serde call did not throw, repackage with destrtuctor
-    min_value = std::unique_ptr<T, item_deleter>(min_value_buffer.release(), item_deleter());
+    min_value = std::unique_ptr<T, item_deleter>(min_value_buffer.release(), item_deleter(allocator));
     ptr += S().deserialize(ptr, end_ptr - ptr, max_value_buffer.get(), 1);
     // serde call did not throw, repackage with destrtuctor
-    max_value = std::unique_ptr<T, item_deleter>(max_value_buffer.release(), item_deleter());
+    max_value = std::unique_ptr<T, item_deleter>(max_value_buffer.release(), item_deleter(allocator));
   }
-  auto items_buffer_deleter = [capacity](T* ptr) { A().deallocate(ptr, capacity); };
-  std::unique_ptr<T, decltype(items_buffer_deleter)> items_buffer(A().allocate(capacity), items_buffer_deleter);
+  auto items_buffer_deleter = [capacity, &alloc](T* ptr) { alloc.deallocate(ptr, capacity); };
+  std::unique_ptr<T, decltype(items_buffer_deleter)> items_buffer(alloc.allocate(capacity), items_buffer_deleter);
   const auto num_items = levels[num_levels] - levels[0];
   ptr += S().deserialize(ptr, end_ptr - ptr, &items_buffer.get()[levels[0]], num_items);
   // serde call did not throw, repackage with destrtuctors
-  std::unique_ptr<T, items_deleter> items(items_buffer.release(), items_deleter(levels[0], capacity));
+  std::unique_ptr<T, items_deleter> items(items_buffer.release(), items_deleter(levels[0], capacity, allocator));
   const size_t delta = ptr - static_cast<const char*>(bytes);
   if (delta != size) throw std::logic_error("deserialized size mismatch: " + std::to_string(delta) + " != " + std::to_string(size));
   const bool is_level_zero_sorted = (flags_byte & (1 << flags::IS_LEVEL_ZERO_SORTED)) > 0;
   if (is_single_item) {
     new (min_value_buffer.get()) T(items.get()[levels[0]]);
     // copy did not throw, repackage with destrtuctor
-    min_value = std::unique_ptr<T, item_deleter>(min_value_buffer.release(), item_deleter());
+    min_value = std::unique_ptr<T, item_deleter>(min_value_buffer.release(), item_deleter(allocator));
     new (max_value_buffer.get()) T(items.get()[levels[0]]);
     // copy did not throw, repackage with destrtuctor
-    max_value = std::unique_ptr<T, item_deleter>(max_value_buffer.release(), item_deleter());
+    max_value = std::unique_ptr<T, item_deleter>(max_value_buffer.release(), item_deleter(allocator));
   }
   return kll_sketch(k, min_k, n, num_levels, std::move(levels), std::move(items), capacity,
       std::move(min_value), std::move(max_value), is_level_zero_sorted);
@@ -634,6 +641,7 @@
 kll_sketch<T, C, S, A>::kll_sketch(uint16_t k, uint16_t min_k, uint64_t n, uint8_t num_levels, vector_u32<A>&& levels,
     std::unique_ptr<T, items_deleter> items, uint32_t items_size, std::unique_ptr<T, item_deleter> min_value,
     std::unique_ptr<T, item_deleter> max_value, bool is_level_zero_sorted):
+allocator_(levels.get_allocator()),
 k_(k),
 m_(DEFAULT_M),
 min_k_(min_k),
@@ -735,9 +743,9 @@
   const uint32_t new_total_cap = cur_total_cap + delta_cap;
 
   // move (and shift) the current data into the new buffer
-  T* new_buf = A().allocate(new_total_cap);
+  T* new_buf = allocator_.allocate(new_total_cap);
   kll_helper::move_construct<T>(items_, 0, cur_total_cap, new_buf, delta_cap, true);
-  A().deallocate(items_, items_size_);
+  allocator_.deallocate(items_, items_size_);
   items_ = new_buf;
   items_size_ = new_total_cap;
 
@@ -763,19 +771,20 @@
 template<typename T, typename C, typename S, typename A>
 std::unique_ptr<kll_quantile_calculator<T, C, A>, std::function<void(kll_quantile_calculator<T, C, A>*)>> kll_sketch<T, C, S, A>::get_quantile_calculator() {
   sort_level_zero();
-  typedef typename std::allocator_traits<A>::template rebind_alloc<kll_quantile_calculator<T, C, A>> AllocCalc;
+  using AllocCalc = typename std::allocator_traits<A>::template rebind_alloc<kll_quantile_calculator<T, C, A>>;
+  AllocCalc alloc(allocator_);
   std::unique_ptr<kll_quantile_calculator<T, C, A>, std::function<void(kll_quantile_calculator<T, C, A>*)>> quantile_calculator(
-    new (AllocCalc().allocate(1)) kll_quantile_calculator<T, C, A>(items_, levels_.data(), num_levels_, n_),
-    [](kll_quantile_calculator<T, C, A>* ptr){ ptr->~kll_quantile_calculator<T, C, A>(); AllocCalc().deallocate(ptr, 1); }
+    new (alloc.allocate(1)) kll_quantile_calculator<T, C, A>(items_, levels_.data(), num_levels_, n_, allocator_),
+    [&alloc](kll_quantile_calculator<T, C, A>* ptr){ ptr->~kll_quantile_calculator<T, C, A>(); alloc.deallocate(ptr, 1); }
   );
   return quantile_calculator;
 }
 
 template<typename T, typename C, typename S, typename A>
 vector_d<A> kll_sketch<T, C, S, A>::get_PMF_or_CDF(const T* split_points, uint32_t size, bool is_CDF) const {
-  if (is_empty()) return vector_d<A>();
+  if (is_empty()) return vector_d<A>(allocator_);
   kll_helper::validate_values<T, C>(split_points, size);
-  vector_d<A> buckets(size + 1, 0);
+  vector_d<A> buckets(size + 1, 0, allocator_);
   uint8_t level = 0;
   uint64_t weight = 1;
   while (level < num_levels_) {
@@ -845,12 +854,13 @@
 template<typename O>
 void kll_sketch<T, C, S, A>::merge_higher_levels(O&& other, uint64_t final_n) {
   const uint32_t tmp_num_items = get_num_retained() + other.get_num_retained_above_level_zero();
-  auto tmp_items_deleter = [tmp_num_items](T* ptr) { A().deallocate(ptr, tmp_num_items); }; // no destructor needed
-  const std::unique_ptr<T, decltype(tmp_items_deleter)> workbuf(A().allocate(tmp_num_items), tmp_items_deleter);
+  A alloc(allocator_);
+  auto tmp_items_deleter = [tmp_num_items, &alloc](T* ptr) { alloc.deallocate(ptr, tmp_num_items); }; // no destructor needed
+  const std::unique_ptr<T, decltype(tmp_items_deleter)> workbuf(allocator_.allocate(tmp_num_items), tmp_items_deleter);
   const uint8_t ub = kll_helper::ub_on_num_levels(final_n);
   const size_t work_levels_size = ub + 2; // ub+1 does not work
-  vector_u32<A> worklevels(work_levels_size);
-  vector_u32<A> outlevels(work_levels_size);
+  vector_u32<A> worklevels(work_levels_size, 0, allocator_);
+  vector_u32<A> outlevels(work_levels_size, 0, allocator_);
 
   const uint8_t provisional_num_levels = std::max(num_levels_, other.num_levels_);
 
@@ -864,9 +874,9 @@
 
   // now we need to transfer the results back into "this" sketch
   if (result.final_capacity != items_size_) {
-    A().deallocate(items_, items_size_);
+    allocator_.deallocate(items_, items_size_);
     items_size_ = result.final_capacity;
-    items_ = A().allocate(items_size_);
+    items_ = allocator_.allocate(items_size_);
   }
   const uint32_t free_space_at_bottom = result.final_capacity - result.final_num_items;
   kll_helper::move_construct<T>(workbuf.get(), outlevels[0], outlevels[0] + result.final_num_items, items_, free_space_at_bottom, true);
@@ -1101,29 +1111,32 @@
 template<typename T, typename C, typename S, typename A>
 class kll_sketch<T, C, S, A>::item_deleter {
   public:
-  void operator() (T* ptr) const {
+  item_deleter(const A& allocator): allocator_(allocator) {}
+  void operator() (T* ptr) {
     if (ptr != nullptr) {
       ptr->~T();
-      A().deallocate(ptr, 1);
+      allocator_.deallocate(ptr, 1);
     }
   }
+  private:
+  A allocator_;
 };
 
 template<typename T, typename C, typename S, typename A>
 class kll_sketch<T, C, S, A>::items_deleter {
   public:
-  items_deleter(uint32_t start, uint32_t num): start(start), num(num) {}
-  void operator() (T* ptr) const {
+  items_deleter(uint32_t start, uint32_t num, const A& allocator):
+    allocator_(allocator), start_(start), num_(num) {}
+  void operator() (T* ptr) {
     if (ptr != nullptr) {
-      for (uint32_t i = start; i < num; ++i) {
-        ptr[i].~T();
-      }
-      A().deallocate(ptr, num);
+      for (uint32_t i = start_; i < num_; ++i) ptr[i].~T();
+      allocator_.deallocate(ptr, num_);
     }
   }
   private:
-  uint32_t start;
-  uint32_t num;
+  A allocator_;
+  uint32_t start_;
+  uint32_t num_;
 };
 
 } /* namespace datasketches */
diff --git a/req/CMakeLists.txt b/req/CMakeLists.txt
new file mode 100755
index 0000000..7ebefca
--- /dev/null
+++ b/req/CMakeLists.txt
@@ -0,0 +1,60 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+add_library(req INTERFACE)
+
+add_library(${PROJECT_NAME}::REQ ALIAS req)
+
+if (BUILD_TESTS)
+  add_subdirectory(test)
+endif()
+
+target_include_directories(req
+  INTERFACE
+    $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
+    $<INSTALL_INTERFACE:$<INSTALL_PREFIX>/include>
+)
+
+target_link_libraries(req INTERFACE common)
+target_compile_features(req INTERFACE cxx_std_11)
+
+set(req_HEADERS "")
+list(APPEND req_HEADERS "include/req_common.hpp")
+list(APPEND req_HEADERS "include/req_sketch.hpp")
+list(APPEND req_HEADERS "include/req_sketch_impl.hpp")
+list(APPEND req_HEADERS "include/req_compactor.hpp")
+list(APPEND req_HEADERS "include/req_compactor_impl.hpp")
+list(APPEND req_HEADERS "include/req_quantile_calculator.hpp")
+list(APPEND req_HEADERS "include/req_quantile_calculator_impl.hpp")
+
+install(TARGETS req
+  EXPORT ${PROJECT_NAME}
+)
+
+install(FILES ${req_HEADERS}
+  DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/DataSketches")
+
+target_sources(req
+  INTERFACE
+    ${CMAKE_CURRENT_SOURCE_DIR}/include/req_common.hpp
+    ${CMAKE_CURRENT_SOURCE_DIR}/include/req_sketch.hpp
+    ${CMAKE_CURRENT_SOURCE_DIR}/include/req_sketch_impl.hpp
+    ${CMAKE_CURRENT_SOURCE_DIR}/include/req_compactor.hpp
+    ${CMAKE_CURRENT_SOURCE_DIR}/include/req_compactor_impl.hpp
+    ${CMAKE_CURRENT_SOURCE_DIR}/include/req_quantile_calculator.hpp
+    ${CMAKE_CURRENT_SOURCE_DIR}/include/req_quantile_calculator_impl.hpp
+)
diff --git a/req/include/req_common.hpp b/req/include/req_common.hpp
new file mode 100755
index 0000000..d2dc518
--- /dev/null
+++ b/req/include/req_common.hpp
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef REQ_COMMON_HPP_
+#define REQ_COMMON_HPP_
+
+#include <random>
+#include <chrono>
+
+#include "serde.hpp"
+#include "common_defs.hpp"
+
+namespace datasketches {
+
+// TODO: have a common random bit with KLL
+static std::independent_bits_engine<std::mt19937, 1, unsigned> req_random_bit(std::chrono::system_clock::now().time_since_epoch().count());
+
+namespace req_constants {
+  static const uint16_t MIN_K = 4;
+  static const uint8_t INIT_NUM_SECTIONS = 3;
+  static const unsigned MULTIPLIER = 2;
+}
+
+} /* namespace datasketches */
+
+#endif
diff --git a/req/include/req_compactor.hpp b/req/include/req_compactor.hpp
new file mode 100755
index 0000000..2ca768b
--- /dev/null
+++ b/req/include/req_compactor.hpp
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef REQ_COMPACTOR_HPP_
+#define REQ_COMPACTOR_HPP_
+
+#include <memory>
+
+namespace datasketches {
+
+template<
+typename T,
+typename Comparator,
+typename Allocator
+>
+class req_compactor {
+public:
+  req_compactor(bool hra, uint8_t lg_weight, uint32_t section_size, const Allocator& allocator, bool sorted = true);
+  ~req_compactor();
+  req_compactor(const req_compactor& other);
+  req_compactor(req_compactor&& other) noexcept;
+  req_compactor& operator=(const req_compactor& other);
+  req_compactor& operator=(req_compactor&& other);
+
+  bool is_sorted() const;
+  uint32_t get_num_items() const;
+  uint32_t get_nom_capacity() const;
+  uint8_t get_lg_weight() const;
+  const T* begin() const;
+  const T* end() const;
+  T* begin();
+  T* end();
+
+  template<bool inclusive>
+  uint64_t compute_weight(const T& item) const;
+
+  template<typename FwdT>
+  void append(FwdT&& item);
+
+  template<typename FwdC>
+  void merge(FwdC&& other);
+
+  void sort();
+
+  std::pair<uint32_t, uint32_t> compact(req_compactor& next);
+
+  /**
+   * Computes size needed to serialize the current state of the compactor.
+   * This version is for fixed-size arithmetic types (integral and floating point).
+   * @return size in bytes needed to serialize this compactor
+   */
+  template<typename S, typename TT = T, typename std::enable_if<std::is_arithmetic<TT>::value, int>::type = 0>
+  size_t get_serialized_size_bytes(const S& serde) const;
+
+  /**
+   * Computes size needed to serialize the current state of the compactor.
+   * This version is for all other types and can be expensive since every item needs to be looked at.
+   * @return size in bytes needed to serialize this compactor
+   */
+  template<typename S, typename TT = T, typename std::enable_if<!std::is_arithmetic<TT>::value, int>::type = 0>
+  size_t get_serialized_size_bytes(const S& serde) const;
+
+  template<typename S>
+  void serialize(std::ostream& os, const S& serde) const;
+
+  template<typename S>
+  size_t serialize(void* dst, size_t capacity, const S& serde) const;
+
+  template<typename S>
+  static req_compactor deserialize(std::istream& is, const S& serde, const Allocator& allocator, bool sorted, bool hra);
+
+  template<typename S>
+  static std::pair<req_compactor, size_t> deserialize(const void* bytes, size_t size, const S& serde, const Allocator& allocator, bool sorted, bool hra);
+
+  template<typename S>
+  static req_compactor deserialize(std::istream& is, const S& serde, const Allocator& allocator, bool sorted, uint16_t k, uint8_t num_items, bool hra);
+
+  template<typename S>
+  static std::pair<req_compactor, size_t> deserialize(const void* bytes, size_t size, const S& serde, const Allocator& allocator, bool sorted, uint16_t k, uint8_t num_items, bool hra);
+
+private:
+  Allocator allocator_;
+  uint8_t lg_weight_;
+  bool hra_;
+  bool coin_; // random bit for compaction
+  bool sorted_;
+  float section_size_raw_;
+  uint32_t section_size_;
+  uint8_t num_sections_;
+  uint64_t state_; // state of the deterministic compaction schedule
+  uint32_t num_items_;
+  uint32_t capacity_;
+  T* items_;
+
+  bool ensure_enough_sections();
+  std::pair<uint32_t, uint32_t> compute_compaction_range(uint32_t secs_to_compact) const;
+  void grow(size_t new_capacity);
+  void ensure_space(size_t num);
+
+  static uint32_t nearest_even(float value);
+
+  template<typename InIter, typename OutIter>
+  static void promote_evens_or_odds(InIter from, InIter to, bool flag, OutIter dst);
+
+  // for deserialization
+  class items_deleter;
+  req_compactor(bool hra, uint8_t lg_weight, bool sorted, float section_size_raw, uint8_t num_sections, uint64_t state, std::unique_ptr<T, items_deleter> items, uint32_t num_items, const Allocator& allocator);
+
+  template<typename S>
+  static std::unique_ptr<T, items_deleter> deserialize_items(std::istream& is, const S& serde, const Allocator& allocator, size_t num);
+
+  template<typename S>
+  static std::pair<std::unique_ptr<T, items_deleter>, size_t> deserialize_items(const void* bytes, size_t size, const S& serde, const Allocator& allocator, size_t num);
+
+};
+
+} /* namespace datasketches */
+
+#include "req_compactor_impl.hpp"
+
+#endif
diff --git a/req/include/req_compactor_impl.hpp b/req/include/req_compactor_impl.hpp
new file mode 100755
index 0000000..63a7dcf
--- /dev/null
+++ b/req/include/req_compactor_impl.hpp
@@ -0,0 +1,501 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef REQ_COMPACTOR_IMPL_HPP_
+#define REQ_COMPACTOR_IMPL_HPP_
+
+#include <stdexcept>
+#include <cmath>
+#include <algorithm>
+
+#include "count_zeros.hpp"
+#include "conditional_forward.hpp"
+
+#include <iomanip>
+
+namespace datasketches {
+
+template<typename T, typename C, typename A>
+req_compactor<T, C, A>::req_compactor(bool hra, uint8_t lg_weight, uint32_t section_size, const A& allocator, bool sorted):
+allocator_(allocator),
+lg_weight_(lg_weight),
+hra_(hra),
+coin_(false),
+sorted_(sorted),
+section_size_raw_(section_size),
+section_size_(section_size),
+num_sections_(req_constants::INIT_NUM_SECTIONS),
+state_(0),
+num_items_(0),
+capacity_(2 * get_nom_capacity()),
+items_(allocator_.allocate(capacity_))
+{}
+
+template<typename T, typename C, typename A>
+req_compactor<T, C, A>::~req_compactor() {
+  if (items_ != nullptr) {
+    for (auto it = begin(); it != end(); ++it) (*it).~T();
+    allocator_.deallocate(items_, capacity_);
+  }
+}
+
+template<typename T, typename C, typename A>
+req_compactor<T, C, A>::req_compactor(const req_compactor& other):
+allocator_(other.allocator_),
+lg_weight_(other.lg_weight_),
+hra_(other.hra_),
+coin_(other.coin_),
+sorted_(other.sorted_),
+section_size_raw_(other.section_size_raw_),
+section_size_(other.section_size_),
+num_sections_(other.num_sections_),
+state_(other.state_),
+num_items_(other.num_items_),
+capacity_(other.capacity_),
+items_(nullptr)
+{
+  if (other.items_ != nullptr) {
+    items_ = allocator_.allocate(capacity_);
+    const size_t from = hra_ ? capacity_ - num_items_ : 0;
+    const size_t to = hra_ ? capacity_ : num_items_;
+    for (size_t i = from; i < to; ++i) new (items_ + i) T(other.items_[i]);
+  }
+}
+
+template<typename T, typename C, typename A>
+req_compactor<T, C, A>::req_compactor(req_compactor&& other) noexcept :
+allocator_(std::move(other.allocator_)),
+lg_weight_(other.lg_weight_),
+hra_(other.hra_),
+coin_(other.coin_),
+sorted_(other.sorted_),
+section_size_raw_(other.section_size_raw_),
+section_size_(other.section_size_),
+num_sections_(other.num_sections_),
+state_(other.state_),
+num_items_(other.num_items_),
+capacity_(other.capacity_),
+items_(other.items_)
+{
+  other.items_ = nullptr;
+}
+
+template<typename T, typename C, typename A>
+req_compactor<T, C, A>& req_compactor<T, C, A>::operator=(const req_compactor& other) {
+  req_compactor copy(other);
+  std::swap(allocator_, copy.allocator_);
+  std::swap(lg_weight_, copy.lg_weight_);
+  std::swap(hra_, copy.hra_);
+  std::swap(coin_, copy.coin_);
+  std::swap(sorted_, copy.sorted_);
+  std::swap(section_size_raw_, copy.section_size_raw_);
+  std::swap(section_size_, copy.section_size_);
+  std::swap(num_sections_, copy.num_sections_);
+  std::swap(state_, copy.state_);
+  std::swap(num_items_, copy.num_items_);
+  std::swap(capacity_, copy.capacity_);
+  std::swap(items_, copy.items_);
+  return *this;
+}
+
+template<typename T, typename C, typename A>
+req_compactor<T, C, A>& req_compactor<T, C, A>::operator=(req_compactor&& other) {
+  std::swap(allocator_, other.allocator_);
+  std::swap(lg_weight_, other.lg_weight_);
+  std::swap(hra_, other.hra_);
+  std::swap(coin_, other.coin_);
+  std::swap(sorted_, other.sorted_);
+  std::swap(section_size_raw_, other.section_size_raw_);
+  std::swap(section_size_, other.section_size_);
+  std::swap(num_sections_, other.num_sections_);
+  std::swap(state_, other.state_);
+  std::swap(num_items_, other.num_items_);
+  std::swap(capacity_, other.capacity_);
+  std::swap(items_, other.items_);
+  return *this;
+}
+
+template<typename T, typename C, typename A>
+bool req_compactor<T, C, A>::is_sorted() const {
+  return sorted_;
+}
+
+template<typename T, typename C, typename A>
+uint32_t req_compactor<T, C, A>::get_num_items() const {
+  return num_items_;
+}
+
+template<typename T, typename C, typename A>
+uint32_t req_compactor<T, C, A>::get_nom_capacity() const {
+  return req_constants::MULTIPLIER * num_sections_ * section_size_;
+}
+
+template<typename T, typename C, typename A>
+uint8_t req_compactor<T, C, A>::get_lg_weight() const {
+  return lg_weight_;
+}
+
+template<typename T, typename C, typename A>
+template<bool inclusive>
+uint64_t req_compactor<T, C, A>::compute_weight(const T& item) const {
+  if (!sorted_) const_cast<req_compactor*>(this)->sort(); // allow sorting as a side effect
+  auto it = inclusive ?
+      std::upper_bound(begin(), end(), item, C()) :
+      std::lower_bound(begin(), end(), item, C());
+  return std::distance(begin(), it) << lg_weight_;
+}
+
+template<typename T, typename C, typename A>
+template<typename FwdT>
+void req_compactor<T, C, A>::append(FwdT&& item) {
+  if (num_items_ == capacity_) grow(capacity_ + get_nom_capacity());
+  const size_t i = hra_ ? capacity_ - num_items_ - 1 : num_items_;
+  new (items_ + i) T(std::forward<FwdT>(item));
+  ++num_items_;
+  if (num_items_ > 1) sorted_ = false;
+}
+
+template<typename T, typename C, typename A>
+void req_compactor<T, C, A>::grow(size_t new_capacity) {
+  T* new_items = allocator_.allocate(new_capacity);
+  size_t new_i = hra_ ? new_capacity - num_items_ : 0;
+  for (auto it = begin(); it != end(); ++it, ++new_i) {
+    new (new_items + new_i) T(std::move(*it));
+    (*it).~T();
+  }
+  allocator_.deallocate(items_, capacity_);
+  items_ = new_items;
+  capacity_ = new_capacity;
+}
+
+template<typename T, typename C, typename A>
+void req_compactor<T, C, A>::ensure_space(size_t num) {
+  if (num_items_ + num > capacity_) grow(num_items_ + num + get_nom_capacity());
+}
+
+template<typename T, typename C, typename A>
+const T* req_compactor<T, C, A>::begin() const {
+  return items_ + (hra_ ? capacity_ - num_items_ : 0);
+}
+
+template<typename T, typename C, typename A>
+const T* req_compactor<T, C, A>::end() const {
+  return items_ + (hra_ ? capacity_ : num_items_);
+}
+
+template<typename T, typename C, typename A>
+T* req_compactor<T, C, A>::begin() {
+  return items_ + (hra_ ? capacity_ - num_items_ : 0);
+}
+
+template<typename T, typename C, typename A>
+T* req_compactor<T, C, A>::end() {
+  return items_ + (hra_ ? capacity_ : num_items_);
+}
+
+template<typename T, typename C, typename A>
+template<typename FwdC>
+void req_compactor<T, C, A>::merge(FwdC&& other) {
+  // TODO: swap if other is larger?
+  if (lg_weight_ != other.lg_weight_) throw std::logic_error("weight mismatch");
+  state_ |= other.state_;
+  while (ensure_enough_sections()) {}
+  ensure_space(other.get_num_items());
+  sort();
+  auto middle = hra_ ? begin() : end();
+  auto from = hra_ ? begin() - other.get_num_items() : end();
+  auto to = from + other.get_num_items();
+  auto other_it = other.begin();
+  for (auto it = from; it != to; ++it, ++other_it) new (it) T(conditional_forward<FwdC>(*other_it));
+  if (!other.sorted_) std::sort(from, to, C());
+  if (num_items_ > 0) std::inplace_merge(hra_ ? from : begin(), middle, hra_ ? end() : to, C());
+  num_items_ += other.get_num_items();
+}
+
+template<typename T, typename C, typename A>
+void req_compactor<T, C, A>::sort() {
+  if (!sorted_) {
+    std::sort(begin(), end(), C());
+    sorted_ = true;
+  }
+}
+
+template<typename T, typename C, typename A>
+std::pair<uint32_t, uint32_t> req_compactor<T, C, A>::compact(req_compactor& next) {
+  const uint32_t starting_nom_capacity = get_nom_capacity();
+  // choose a part of the buffer to compact
+  const uint32_t secs_to_compact = std::min(static_cast<uint32_t>(count_trailing_zeros_in_u32(~state_) + 1), static_cast<uint32_t>(num_sections_));
+  auto compaction_range = compute_compaction_range(secs_to_compact);
+  if (compaction_range.second - compaction_range.first < 2) throw std::logic_error("compaction range error");
+
+  if ((state_ & 1) == 1) { coin_ = !coin_; } // for odd flip coin;
+  else { coin_ = req_random_bit(); } // random coin flip
+
+  const auto num = (compaction_range.second - compaction_range.first) / 2;
+  next.ensure_space(num);
+  auto next_middle = hra_ ? next.begin() : next.end();
+  auto next_empty = hra_ ? next.begin() - num : next.end();
+  promote_evens_or_odds(begin() + compaction_range.first, begin() + compaction_range.second, coin_, next_empty);
+  next.num_items_ += num;
+  std::inplace_merge(next.begin(), next_middle, next.end(), C());
+  for (size_t i = compaction_range.first; i < compaction_range.second; ++i) (*(begin() + i)).~T();
+  num_items_ -= compaction_range.second - compaction_range.first;
+
+  ++state_;
+  ensure_enough_sections();
+  return std::pair<uint32_t, uint32_t>(
+    num,
+    get_nom_capacity() - starting_nom_capacity
+  );
+}
+
+template<typename T, typename C, typename A>
+bool req_compactor<T, C, A>::ensure_enough_sections() {
+  const float ssr = section_size_raw_ / sqrt(2);
+  const uint32_t ne = nearest_even(ssr);
+  if (state_ >= static_cast<uint64_t>(1 << (num_sections_ - 1)) && ne >= req_constants::MIN_K) {
+    section_size_raw_ = ssr;
+    section_size_ = ne;
+    num_sections_ <<= 1;
+    if (capacity_ < 2 * get_nom_capacity()) grow(2 * get_nom_capacity());
+    return true;
+  }
+  return false;
+}
+
+template<typename T, typename C, typename A>
+std::pair<uint32_t, uint32_t> req_compactor<T, C, A>::compute_compaction_range(uint32_t secs_to_compact) const {
+  uint32_t non_compact = get_nom_capacity() / 2 + (num_sections_ - secs_to_compact) * section_size_;
+  // make compacted region even
+  if (((num_items_ - non_compact) & 1) == 1) ++non_compact;
+  const size_t low = hra_ ? 0 : non_compact;
+  const size_t high = hra_ ? num_items_ - non_compact : num_items_;
+  return std::pair<uint32_t, uint32_t>(low, high);
+}
+
+template<typename T, typename C, typename A>
+uint32_t req_compactor<T, C, A>::nearest_even(float value) {
+  return static_cast<uint32_t>(round(value / 2)) << 1;
+}
+
+template<typename T, typename C, typename A>
+template<typename InIter, typename OutIter>
+void req_compactor<T, C, A>::promote_evens_or_odds(InIter from, InIter to, bool odds, OutIter dst) {
+  if (from == to) return;
+  InIter i = from;
+  if (odds) ++i;
+  while (i != to) {
+    new (dst) T(std::move(*i));
+    ++dst;
+    ++i;
+    if (i == to) break;
+    ++i;
+  }
+}
+
+// helpers for integral types
+template<typename T>
+static inline T read(std::istream& is) {
+  T value;
+  is.read(reinterpret_cast<char*>(&value), sizeof(T));
+  return value;
+}
+
+template<typename T>
+static inline void write(std::ostream& os, T value) {
+  os.write(reinterpret_cast<const char*>(&value), sizeof(T));
+}
+
+// implementation for fixed-size arithmetic types (integral and floating point)
+template<typename T, typename C, typename A>
+template<typename S, typename TT, typename std::enable_if<std::is_arithmetic<TT>::value, int>::type>
+size_t req_compactor<T, C, A>::get_serialized_size_bytes(const S&) const {
+  return sizeof(state_) + sizeof(section_size_raw_) + sizeof(lg_weight_) + sizeof(num_sections_) +
+      sizeof(uint16_t) + // padding
+      sizeof(uint32_t) + // num_items
+      sizeof(TT) * num_items_;
+}
+
+// implementation for all other types
+template<typename T, typename C, typename A>
+template<typename S, typename TT, typename std::enable_if<!std::is_arithmetic<TT>::value, int>::type>
+size_t req_compactor<T, C, A>::get_serialized_size_bytes(const S& serde) const {
+  size_t size = sizeof(state_) + sizeof(section_size_raw_) + sizeof(lg_weight_) + sizeof(num_sections_) +
+      sizeof(uint16_t) + // padding
+      sizeof(uint32_t); // num_items
+  for (auto it = begin(); it != end(); ++it) size += serde.size_of_item(*it);
+  return size;
+}
+
+template<typename T, typename C, typename A>
+template<typename S>
+void req_compactor<T, C, A>::serialize(std::ostream& os, const S& serde) const {
+  write(os, state_);
+  write(os, section_size_raw_);
+  write(os, lg_weight_);
+  write(os, num_sections_);
+  const uint16_t padding = 0;
+  write(os, padding);
+  write(os, num_items_);
+  serde.serialize(os, begin(), num_items_);
+}
+
+template<typename T, typename C, typename A>
+template<typename S>
+size_t req_compactor<T, C, A>::serialize(void* dst, size_t capacity, const S& serde) const {
+  uint8_t* ptr = static_cast<uint8_t*>(dst);
+  const uint8_t* end_ptr = ptr + capacity;
+  ptr += copy_to_mem(state_, ptr);
+  ptr += copy_to_mem(section_size_raw_, ptr);
+  ptr += copy_to_mem(lg_weight_, ptr);
+  ptr += copy_to_mem(num_sections_, ptr);
+  const uint16_t padding = 0;
+  ptr += copy_to_mem(padding, ptr);
+  ptr += copy_to_mem(num_items_, ptr);
+  ptr += serde.serialize(ptr, end_ptr - ptr, begin(), num_items_);
+  return ptr - static_cast<uint8_t*>(dst);
+}
+
+template<typename T, typename C, typename A>
+template<typename S>
+req_compactor<T, C, A> req_compactor<T, C, A>::deserialize(std::istream& is, const S& serde, const A& allocator, bool sorted, bool hra) {
+  auto state = read<decltype(state_)>(is);
+  auto section_size_raw = read<decltype(section_size_raw_)>(is);
+  auto lg_weight = read<decltype(lg_weight_)>(is);
+  auto num_sections = read<decltype(num_sections_)>(is);
+  read<uint16_t>(is); // padding
+  auto num_items = read<uint32_t>(is);
+  auto items = deserialize_items(is, serde, allocator, num_items);
+  return req_compactor(hra, lg_weight, sorted, section_size_raw, num_sections, state, std::move(items), num_items, allocator);
+}
+
+template<typename T, typename C, typename A>
+template<typename S>
+req_compactor<T, C, A> req_compactor<T, C, A>::deserialize(std::istream& is, const S& serde, const A& allocator, bool sorted, uint16_t k, uint8_t num_items, bool hra) {
+  auto items = deserialize_items(is, serde, allocator, num_items);
+  return req_compactor(hra, 0, sorted, k, req_constants::INIT_NUM_SECTIONS, 0, std::move(items), num_items, allocator);
+}
+
+template<typename T, typename C, typename A>
+template<typename S>
+auto req_compactor<T, C, A>::deserialize_items(std::istream& is, const S& serde, const A& allocator, size_t num)
+-> std::unique_ptr<T, items_deleter> {
+  A alloc(allocator);
+  std::unique_ptr<T, items_deleter> items(alloc.allocate(num), items_deleter(allocator, false, num));
+  serde.deserialize(is, items.get(), num);
+  // serde did not throw, enable destructors
+  items.get_deleter().set_destroy(true);
+  if (!is.good()) throw std::runtime_error("error reading from std::istream");
+  return std::move(items);
+}
+
+template<typename T, typename C, typename A>
+template<typename S>
+std::pair<req_compactor<T, C, A>, size_t> req_compactor<T, C, A>::deserialize(const void* bytes, size_t size, const S& serde, const A& allocator, bool sorted, bool hra) {
+  ensure_minimum_memory(size, 8);
+  const char* ptr = static_cast<const char*>(bytes);
+  const char* end_ptr = static_cast<const char*>(bytes) + size;
+
+  uint64_t state;
+  ptr += copy_from_mem(ptr, state);
+  float section_size_raw;
+  ptr += copy_from_mem(ptr, section_size_raw);
+  uint8_t lg_weight;
+  ptr += copy_from_mem(ptr, lg_weight);
+  uint8_t num_sections;
+  ptr += copy_from_mem(ptr, num_sections);
+  ptr += 2; // padding
+  uint32_t num_items;
+  ptr += copy_from_mem(ptr, num_items);
+  auto pair = deserialize_items(ptr, end_ptr - ptr, serde, allocator, num_items);
+  ptr += pair.second;
+  return std::pair<req_compactor, size_t>(
+      req_compactor(hra, lg_weight, sorted, section_size_raw, num_sections, state, std::move(pair.first), num_items, allocator),
+      ptr - static_cast<const char*>(bytes)
+  );
+}
+
+template<typename T, typename C, typename A>
+template<typename S>
+std::pair<req_compactor<T, C, A>, size_t> req_compactor<T, C, A>::deserialize(const void* bytes, size_t size, const S& serde, const A& allocator, bool sorted, uint16_t k, uint8_t num_items, bool hra) {
+  auto pair = deserialize_items(bytes, size, serde, allocator, num_items);
+  return std::pair<req_compactor, size_t>(
+      req_compactor(hra, 0, sorted, k, req_constants::INIT_NUM_SECTIONS, 0, std::move(pair.first), num_items, allocator),
+      pair.second
+  );
+}
+
+template<typename T, typename C, typename A>
+template<typename S>
+auto req_compactor<T, C, A>::deserialize_items(const void* bytes, size_t size, const S& serde, const A& allocator, size_t num)
+-> std::pair<std::unique_ptr<T, items_deleter>, size_t> {
+  const char* ptr = static_cast<const char*>(bytes);
+  const char* end_ptr = static_cast<const char*>(bytes) + size;
+  A alloc(allocator);
+  std::unique_ptr<T, items_deleter> items(alloc.allocate(num), items_deleter(allocator, false, num));
+  ptr += serde.deserialize(ptr, end_ptr - ptr, items.get(), num);
+  // serde did not throw, enable destructors
+  items.get_deleter().set_destroy(true);
+  return std::pair<std::unique_ptr<T, items_deleter>, size_t>(
+      std::move(items),
+      ptr - static_cast<const char*>(bytes)
+  );
+}
+
+
+template<typename T, typename C, typename A>
+req_compactor<T, C, A>::req_compactor(bool hra, uint8_t lg_weight, bool sorted, float section_size_raw, uint8_t num_sections, uint64_t state, std::unique_ptr<T, items_deleter> items, uint32_t num_items, const A& allocator):
+allocator_(allocator),
+lg_weight_(lg_weight),
+hra_(hra),
+coin_(req_random_bit()),
+sorted_(sorted),
+section_size_raw_(section_size_raw),
+section_size_(nearest_even(section_size_raw)),
+num_sections_(num_sections),
+state_(state),
+num_items_(num_items),
+capacity_(num_items),
+items_(items.release())
+{}
+
+template<typename T, typename C, typename A>
+class req_compactor<T, C, A>::items_deleter {
+  public:
+  items_deleter(const A& allocator, bool destroy, uint32_t num): allocator(allocator), destroy(destroy), num(num) {}
+  void operator() (T* ptr) {
+    if (ptr != nullptr) {
+      if (destroy) {
+        for (uint32_t i = 0; i < num; ++i) {
+          ptr[i].~T();
+        }
+      }
+      allocator.deallocate(ptr, num);
+    }
+  }
+  void set_destroy(bool destroy) { this->destroy = destroy; }
+  private:
+  A allocator;
+  bool destroy;
+  uint32_t num;
+};
+
+} /* namespace datasketches */
+
+#endif
diff --git a/req/include/req_quantile_calculator.hpp b/req/include/req_quantile_calculator.hpp
new file mode 100755
index 0000000..2a6e245
--- /dev/null
+++ b/req/include/req_quantile_calculator.hpp
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef REQ_QUANTILE_CALCULATOR_HPP_
+#define REQ_QUANTILE_CALCULATOR_HPP_
+
+#include <functional>
+
+namespace datasketches {
+
+template<
+  typename T,
+  typename Comparator,
+  typename Allocator
+>
+class req_quantile_calculator {
+public:
+  req_quantile_calculator(uint64_t n, const Allocator& allocator);
+
+  void add(const T* begin, const T* end, uint8_t lg_weight);
+
+  template<bool inclusive>
+  void convert_to_cummulative();
+
+  const T* get_quantile(double rank) const;
+
+private:
+  using Entry = std::pair<const T*, uint64_t>;
+  using AllocEntry = typename std::allocator_traits<Allocator>::template rebind_alloc<Entry>;
+  using Container = std::vector<Entry, AllocEntry>;
+
+  template<typename C>
+  struct compare_pairs_by_first_ptr {
+    bool operator()(const Entry& a, const Entry& b) {
+      return C()(*a.first, *b.first);
+    }
+  };
+
+  struct compare_pairs_by_second {
+    bool operator()(const Entry& a, const Entry& b) {
+      return a.second < b.second;
+    }
+  };
+
+  uint64_t n_;
+  Container entries_;
+};
+
+} /* namespace datasketches */
+
+#include "req_quantile_calculator_impl.hpp"
+
+#endif
diff --git a/req/include/req_quantile_calculator_impl.hpp b/req/include/req_quantile_calculator_impl.hpp
new file mode 100755
index 0000000..63f0e92
--- /dev/null
+++ b/req/include/req_quantile_calculator_impl.hpp
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef REQ_QUANTILE_CALCULATOR_IMPL_HPP_
+#define REQ_QUANTILE_CALCULATOR_IMPL_HPP_
+
+namespace datasketches {
+
+template<typename T, typename C, typename A>
+req_quantile_calculator<T, C, A>::req_quantile_calculator(uint64_t n, const A& allocator):
+n_(n),
+entries_(allocator)
+{}
+
+template<typename T, typename C, typename A>
+void req_quantile_calculator<T, C, A>::add(const T* begin, const T* end, uint8_t lg_weight) {
+  if (entries_.capacity() < entries_.size() + std::distance(begin, end)) entries_.reserve(entries_.size() + std::distance(begin, end));
+  const size_t size_before = entries_.size();
+  for (auto it = begin; it != end; ++it) entries_.push_back(Entry(it, 1 << lg_weight));
+  if (size_before > 0) std::inplace_merge(entries_.begin(), entries_.begin() + size_before, entries_.end(), compare_pairs_by_first_ptr<C>());
+}
+
+template<typename T, typename C, typename A>
+template<bool inclusive>
+void req_quantile_calculator<T, C, A>::convert_to_cummulative() {
+  uint64_t subtotal = 0;
+  for (auto& entry: entries_) {
+    const uint64_t new_subtotal = subtotal + entry.second;
+    entry.second = inclusive ? new_subtotal : subtotal;
+    subtotal = new_subtotal;
+  }
+}
+
+template<typename T, typename C, typename A>
+const T* req_quantile_calculator<T, C, A>::get_quantile(double rank) const {
+  uint64_t weight = static_cast<uint64_t>(rank * n_);
+  auto it = std::lower_bound(entries_.begin(), entries_.end(), Entry(nullptr, weight), compare_pairs_by_second());
+  if (it == entries_.end()) return entries_[entries_.size() - 1].first;
+  return it->first;
+}
+
+} /* namespace datasketches */
+
+#endif
diff --git a/req/include/req_sketch.hpp b/req/include/req_sketch.hpp
new file mode 100755
index 0000000..ca806cc
--- /dev/null
+++ b/req/include/req_sketch.hpp
@@ -0,0 +1,395 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef REQ_SKETCH_HPP_
+#define REQ_SKETCH_HPP_
+
+#include "req_common.hpp"
+#include "req_compactor.hpp"
+#include "req_quantile_calculator.hpp"
+
+namespace datasketches {
+
+template<
+  typename T,
+  typename Comparator = std::less<T>,
+  typename SerDe = serde<T>,
+  typename Allocator = std::allocator<T>
+>
+class req_sketch {
+public:
+  using Compactor = req_compactor<T, Comparator, Allocator>;
+  using AllocCompactor = typename std::allocator_traits<Allocator>::template rebind_alloc<Compactor>;
+  using AllocDouble = typename std::allocator_traits<Allocator>::template rebind_alloc<double>;
+  using vector_double = std::vector<double, AllocDouble>;
+
+  /**
+   * Constructor
+   * @param k Controls the size and error of the sketch. It must be even and in the range [4, 1024], inclusive.
+   * Value of 12 roughly corresponds to 1% relative error guarantee at 95% confidence.
+   * @param hra if true, the default, the high ranks are prioritized for better
+   * accuracy. Otherwise the low ranks are prioritized for better accuracy.
+   * @param allocator to use by this instance
+   */
+  explicit req_sketch(uint16_t k, bool hra = true, const Allocator& allocator = Allocator());
+
+  ~req_sketch();
+  req_sketch(const req_sketch& other);
+  req_sketch(req_sketch&& other) noexcept;
+  req_sketch& operator=(const req_sketch& other);
+  req_sketch& operator=(req_sketch&& other);
+
+  /**
+   * Returns configured parameter K
+   * @return parameter K
+   */
+  uint16_t get_k() const;
+
+  /**
+   * Returns configured parameter High Rank Accuracy
+   * @return parameter HRA
+   */
+  bool is_HRA() const;
+
+  /**
+   * Returns true if this sketch is empty.
+   * @return empty flag
+   */
+  bool is_empty() const;
+
+  /**
+   * Returns the length of the input stream.
+   * @return stream length
+   */
+  uint64_t get_n() const;
+
+  /**
+   * Returns the number of retained items in the sketch.
+   * @return number of retained items
+   */
+  uint32_t get_num_retained() const;
+
+  /**
+   * Returns true if this sketch is in estimation mode.
+   * @return estimation mode flag
+   */
+  bool is_estimation_mode() const;
+
+  template<typename FwdT>
+  void update(FwdT&& item);
+
+  template<typename FwdSk>
+  void merge(FwdSk&& other);
+
+  /**
+   * Returns the min value of the stream.
+   * For floating point types: if the sketch is empty this returns NaN.
+   * For other types: if the sketch is empty this throws runtime_error.
+   * @return the min value of the stream
+   */
+  const T& get_min_value() const;
+
+  /**
+   * Returns the max value of the stream.
+   * For floating point types: if the sketch is empty this returns NaN.
+   * For other types: if the sketch is empty this throws runtime_error.
+   * @return the max value of the stream
+   */
+  const T& get_max_value() const;
+
+  /**
+   * Returns an approximation to the normalized (fractional) rank of the given item from 0 to 1 inclusive.
+   * With the template parameter inclusive=true the weight of the given item is included into the rank.
+   * Otherwise the rank equals the sum of the weights of items less than the given item according to the Comparator.
+   *
+   * <p>If the sketch is empty this returns NaN.
+   *
+   * @param item to be ranked
+   * @return an approximate rank of the given item
+   */
+
+  template<bool inclusive = false>
+  double get_rank(const T& item) const;
+
+  /**
+   * Returns an approximation to the Probability Mass Function (PMF) of the input stream
+   * given a set of split points (values).
+   *
+   * <p>If the sketch is empty this returns an empty vector.
+   *
+   * @param split_points an array of <i>m</i> unique, monotonically increasing values
+   * that divide the input domain into <i>m+1</i> consecutive disjoint intervals.
+   * The definition of an "interval" is inclusive of the left split point (or minimum value) and
+   * exclusive of the right split point, with the exception that the last interval will include
+   * the maximum value.
+   * It is not necessary to include either the min or max values in these split points.
+   *
+   * @return an array of m+1 doubles each of which is an approximation
+   * to the fraction of the input stream values (the mass) that fall into one of those intervals.
+   * If the template parameter inclusive=false, the definition of an "interval" is inclusive of the left split point and exclusive of the right
+   * split point, with the exception that the last interval will include the maximum value.
+   * If the template parameter inclusive=true, the definition of an "interval" is exclusive of the left split point and inclusive of the right
+   * split point.
+   */
+  template<bool inclusive = false>
+  vector_double get_PMF(const T* split_points, uint32_t size) const;
+
+  /**
+   * Returns an approximation to the Cumulative Distribution Function (CDF), which is the
+   * cumulative analog of the PMF, of the input stream given a set of split points (values).
+   *
+   * <p>If the sketch is empty this returns an empty vector.
+   *
+   * @param split_points an array of <i>m</i> unique, monotonically increasing float values
+   * that divide the input domain into <i>m+1</i> consecutive disjoint intervals.
+   * If the template parameter inclusive=false, the definition of an "interval" is inclusive of the left split point and exclusive of the right
+   * split point, with the exception that the last interval will include the maximum value.
+   * If the template parameter inclusive=true, the definition of an "interval" is exclusive of the left split point and inclusive of the right
+   * split point.
+   * It is not necessary to include either the min or max values in these split points.
+   *
+   * @return an array of m+1 double values, which are a consecutive approximation to the CDF
+   * of the input stream given the split_points. The value at array position j of the returned
+   * CDF array is the sum of the returned values in positions 0 through j of the returned PMF
+   * array.
+   */
+  template<bool inclusive = false>
+  vector_double get_CDF(const T* split_points, uint32_t size) const;
+
+  /**
+   * Returns an approximate quantile of the given normalized rank.
+   * The normalized rank must be in the range [0.0, 1.0] (both inclusive).
+   * @param rank the given normalized rank
+   * @return approximate quantile given the normalized rank
+   */
+  template<bool inclusive = false>
+  const T& get_quantile(double rank) const;
+
+  /**
+   * Returns an array of quantiles that correspond to the given array of normalized ranks.
+   * @param ranks given array of normalized ranks.
+   * @return array of quantiles that correspond to the given array of normalized ranks
+   */
+  template<bool inclusive = false>
+  std::vector<T, Allocator> get_quantiles(const double* ranks, uint32_t size) const;
+
+  /**
+   * Returns an approximate lower bound of the given noramalized rank.
+   * @param rank the given rank, a value between 0 and 1.0.
+   * @param num_std_dev the number of standard deviations. Must be 1, 2, or 3.
+   * @return an approximate lower bound rank.
+   */
+  double get_rank_lower_bound(double rank, uint8_t num_std_dev) const;
+
+  /**
+   * Returns an approximate upper bound of the given noramalized rank.
+   * @param rank the given rank, a value between 0 and 1.0.
+   * @param num_std_dev the number of standard deviations. Must be 1, 2, or 3.
+   * @return an approximate upper bound rank.
+   */
+  double get_rank_upper_bound(double rank, uint8_t num_std_dev) const;
+
+  /**
+   * Returns an a priori estimate of relative standard error (RSE, expressed as a number in [0,1]).
+   * Derived from Lemma 12 in https://arxiv.org/abs/2004.01668v2, but the constant factors were
+   * modified based on empirical measurements.
+   *
+   * @param k the given value of k
+   * @param rank the given normalized rank, a number in [0,1].
+   * @param hra if true High Rank Accuracy mode is being selected, otherwise, Low Rank Accuracy.
+   * @param n an estimate of the total number of items submitted to the sketch.
+   * @return an a priori estimate of relative standard error (RSE, expressed as a number in [0,1]).
+   */
+  static double get_RSE(uint16_t k, double rank, bool hra, uint64_t n);
+
+  /**
+   * Computes size needed to serialize the current state of the sketch.
+   * This version is for fixed-size arithmetic types (integral and floating point).
+   * @return size in bytes needed to serialize this sketch
+   */
+  template<typename TT = T, typename std::enable_if<std::is_arithmetic<TT>::value, int>::type = 0>
+  size_t get_serialized_size_bytes() const;
+
+  /**
+   * Computes size needed to serialize the current state of the sketch.
+   * This version is for all other types and can be expensive since every item needs to be looked at.
+   * @return size in bytes needed to serialize this sketch
+   */
+  template<typename TT = T, typename std::enable_if<!std::is_arithmetic<TT>::value, int>::type = 0>
+  size_t get_serialized_size_bytes() const;
+
+  /**
+   * This method serializes the sketch into a given stream in a binary form
+   * @param os output stream
+   */
+  void serialize(std::ostream& os) const;
+
+  // This is a convenience alias for users
+  // The type returned by the following serialize method
+  using vector_bytes = std::vector<uint8_t, typename std::allocator_traits<Allocator>::template rebind_alloc<uint8_t>>;
+
+  /**
+   * This method serializes the sketch as a vector of bytes.
+   * An optional header can be reserved in front of the sketch.
+   * It is a blank space of a given size.
+   * This header is used in Datasketches PostgreSQL extension.
+   * @param header_size_bytes space to reserve in front of the sketch
+   */
+  vector_bytes serialize(unsigned header_size_bytes = 0) const;
+
+  /**
+   * This method deserializes a sketch from a given stream.
+   * @param is input stream
+   * @return an instance of a sketch
+   */
+  static req_sketch deserialize(std::istream& is, const Allocator& allocator = Allocator());
+
+  /**
+   * This method deserializes a sketch from a given array of bytes.
+   * @param bytes pointer to the array of bytes
+   * @param size the size of the array
+   * @return an instance of a sketch
+   */
+  static req_sketch deserialize(const void* bytes, size_t size, const Allocator& allocator = Allocator());
+
+  /**
+   * Prints a summary of the sketch.
+   * @param print_levels if true include information about levels
+   * @param print_items if true include sketch data
+   */
+  string<Allocator> to_string(bool print_levels = false, bool print_items = false) const;
+
+  class const_iterator;
+  const_iterator begin() const;
+  const_iterator end() const;
+
+private:
+  Allocator allocator_;
+  uint16_t k_;
+  bool hra_;
+  uint32_t max_nom_size_;
+  uint32_t num_retained_;
+  uint64_t n_;
+  std::vector<Compactor, AllocCompactor> compactors_;
+  T* min_value_;
+  T* max_value_;
+
+  static const bool LAZY_COMPRESSION = false;
+
+  static const uint8_t SERIAL_VERSION = 1;
+  static const uint8_t FAMILY = 17;
+  static const size_t PREAMBLE_SIZE_BYTES = 8;
+  enum flags { RESERVED1, RESERVED2, IS_EMPTY, IS_HIGH_RANK, RAW_ITEMS, IS_LEVEL_ZERO_SORTED };
+
+  static constexpr double FIXED_RSE_FACTOR = 0.084;
+  static double relative_rse_factor();
+
+  uint8_t get_num_levels() const;
+  void grow();
+  void update_max_nom_size();
+  void update_num_retained();
+  void compress();
+
+  static double get_rank_lb(uint16_t k, uint8_t num_levels, double rank, uint8_t num_std_dev, uint64_t n, bool hra);
+  static double get_rank_ub(uint16_t k, uint8_t num_levels, double rank, uint8_t num_std_dev, uint64_t n, bool hra);
+  static bool is_exact_rank(uint16_t k, uint8_t num_levels, double rank, uint64_t n, bool hra);
+
+  using QuantileCalculator = req_quantile_calculator<T, Comparator, Allocator>;
+  using AllocCalc = typename std::allocator_traits<Allocator>::template rebind_alloc<QuantileCalculator>;
+  class calculator_deleter;
+  using QuantileCalculatorPtr = typename std::unique_ptr<QuantileCalculator, calculator_deleter>;
+  template<bool inclusive>
+  QuantileCalculatorPtr get_quantile_calculator() const;
+
+  // for deserialization
+  class item_deleter;
+  req_sketch(uint32_t k, bool hra, uint64_t n, std::unique_ptr<T, item_deleter> min_value, std::unique_ptr<T, item_deleter> max_value, std::vector<Compactor, AllocCompactor>&& compactors);
+
+  static void check_preamble_ints(uint8_t preamble_ints, uint8_t num_levels);
+  static void check_serial_version(uint8_t serial_version);
+  static void check_family_id(uint8_t family_id);
+
+  // implementations for floating point types
+  template<typename TT = T, typename std::enable_if<std::is_floating_point<TT>::value, int>::type = 0>
+  static const TT& get_invalid_value() {
+    static TT value = std::numeric_limits<TT>::quiet_NaN();
+    return value;
+  }
+
+  template<typename TT = T, typename std::enable_if<std::is_floating_point<TT>::value, int>::type = 0>
+  static inline bool check_update_value(const TT& value) {
+    return !std::isnan(value);
+  }
+
+  template<typename TT = T, typename std::enable_if<std::is_floating_point<TT>::value, int>::type = 0>
+  static inline void check_split_points(const T* values, uint32_t size) {
+    for (uint32_t i = 0; i < size ; i++) {
+      if (std::isnan(values[i])) {
+        throw std::invalid_argument("Values must not be NaN");
+      }
+      if ((i < (size - 1)) && !(Comparator()(values[i], values[i + 1]))) {
+        throw std::invalid_argument("Values must be unique and monotonically increasing");
+      }
+    }
+  }
+
+  // implementations for all other types
+  template<typename TT = T, typename std::enable_if<!std::is_floating_point<TT>::value, int>::type = 0>
+  static const TT& get_invalid_value() {
+    throw std::runtime_error("getting quantiles from empty sketch is not supported for this type of values");
+  }
+
+  template<typename TT = T, typename std::enable_if<!std::is_floating_point<TT>::value, int>::type = 0>
+  static inline bool check_update_value(const TT&) {
+    return true;
+  }
+
+  template<typename TT = T, typename std::enable_if<!std::is_floating_point<TT>::value, int>::type = 0>
+  static inline void check_split_points(const T* values, uint32_t size) {
+    for (uint32_t i = 0; i < size ; i++) {
+      if ((i < (size - 1)) && !(Comparator()(values[i], values[i + 1]))) {
+        throw std::invalid_argument("Values must be unique and monotonically increasing");
+      }
+    }
+  }
+
+};
+
+template<typename T, typename C, typename S, typename A>
+class req_sketch<T, C, S, A>::const_iterator: public std::iterator<std::input_iterator_tag, T> {
+public:
+  const_iterator& operator++();
+  const_iterator& operator++(int);
+  bool operator==(const const_iterator& other) const;
+  bool operator!=(const const_iterator& other) const;
+  std::pair<const T&, const uint64_t> operator*() const;
+private:
+  using LevelsIterator = typename std::vector<Compactor, AllocCompactor>::const_iterator;
+  LevelsIterator levels_it_;
+  LevelsIterator levels_end_;
+  const T* compactor_it_;
+  friend class req_sketch<T, C, S, A>;
+  const_iterator(LevelsIterator begin, LevelsIterator end);
+};
+
+} /* namespace datasketches */
+
+#include "req_sketch_impl.hpp"
+
+#endif
diff --git a/req/include/req_sketch_impl.hpp b/req/include/req_sketch_impl.hpp
new file mode 100755
index 0000000..3c90908
--- /dev/null
+++ b/req/include/req_sketch_impl.hpp
@@ -0,0 +1,810 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef REQ_SKETCH_IMPL_HPP_
+#define REQ_SKETCH_IMPL_HPP_
+
+#include <sstream>
+#include <stdexcept>
+
+namespace datasketches {
+
+template<typename T, typename C, typename S, typename A>
+req_sketch<T, C, S, A>::req_sketch(uint16_t k, bool hra, const A& allocator):
+allocator_(allocator),
+k_(std::max(static_cast<int>(k) & -2, static_cast<int>(req_constants::MIN_K))), //rounds down one if odd
+hra_(hra),
+max_nom_size_(0),
+num_retained_(0),
+n_(0),
+compactors_(allocator),
+min_value_(nullptr),
+max_value_(nullptr)
+{
+  grow();
+}
+
+template<typename T, typename C, typename S, typename A>
+req_sketch<T, C, S, A>::~req_sketch() {
+  if (min_value_ != nullptr) {
+    min_value_->~T();
+    allocator_.deallocate(min_value_, 1);
+  }
+  if (max_value_ != nullptr) {
+    max_value_->~T();
+    allocator_.deallocate(max_value_, 1);
+  }
+}
+
+template<typename T, typename C, typename S, typename A>
+req_sketch<T, C, S, A>::req_sketch(const req_sketch& other):
+allocator_(other.allocator_),
+k_(other.k_),
+hra_(other.hra_),
+max_nom_size_(other.max_nom_size_),
+num_retained_(other.num_retained_),
+n_(other.n_),
+compactors_(other.compactors_),
+min_value_(nullptr),
+max_value_(nullptr)
+{
+  if (other.min_value_ != nullptr) min_value_ = new (A().allocate(1)) T(*other.min_value_);
+  if (other.max_value_ != nullptr) max_value_ = new (A().allocate(1)) T(*other.max_value_);
+}
+
+template<typename T, typename C, typename S, typename A>
+req_sketch<T, C, S, A>::req_sketch(req_sketch&& other) noexcept :
+allocator_(std::move(other.allocator_)),
+k_(other.k_),
+hra_(other.hra_),
+max_nom_size_(other.max_nom_size_),
+num_retained_(other.num_retained_),
+n_(other.n_),
+compactors_(std::move(other.compactors_)),
+min_value_(other.min_value_),
+max_value_(other.max_value_)
+{
+  other.min_value_ = nullptr;
+  other.max_value_ = nullptr;
+}
+
+template<typename T, typename C, typename S, typename A>
+req_sketch<T, C, S, A>& req_sketch<T, C, S, A>::operator=(const req_sketch& other) {
+  req_sketch copy(other);
+  std::swap(allocator_, copy.allocator_);
+  std::swap(k_, copy.k_);
+  std::swap(hra_, copy.hra_);
+  std::swap(max_nom_size_, copy.max_nom_size_);
+  std::swap(num_retained_, copy.num_retained_);
+  std::swap(n_, copy.n_);
+  std::swap(compactors_, copy.compactors_);
+  std::swap(min_value_, copy.min_value_);
+  std::swap(max_value_, copy.max_value_);
+  return *this;
+}
+
+template<typename T, typename C, typename S, typename A>
+req_sketch<T, C, S, A>& req_sketch<T, C, S, A>::operator=(req_sketch&& other) {
+  std::swap(allocator_, other.allocator_);
+  std::swap(k_, other.k_);
+  std::swap(hra_, other.hra_);
+  std::swap(max_nom_size_, other.max_nom_size_);
+  std::swap(num_retained_, other.num_retained_);
+  std::swap(n_, other.n_);
+  std::swap(compactors_, other.compactors_);
+  std::swap(min_value_, other.min_value_);
+  std::swap(max_value_, other.max_value_);
+  return *this;
+}
+
+template<typename T, typename C, typename S, typename A>
+uint16_t req_sketch<T, C, S, A>::get_k() const {
+  return k_;
+}
+
+template<typename T, typename C, typename S, typename A>
+bool req_sketch<T, C, S, A>::is_HRA() const {
+  return hra_;
+}
+
+template<typename T, typename C, typename S, typename A>
+bool req_sketch<T, C, S, A>::is_empty() const {
+  return n_ == 0;
+}
+
+template<typename T, typename C, typename S, typename A>
+uint64_t req_sketch<T, C, S, A>::get_n() const {
+  return n_;
+}
+
+template<typename T, typename C, typename S, typename A>
+uint32_t req_sketch<T, C, S, A>::get_num_retained() const {
+  return num_retained_;
+}
+
+template<typename T, typename C, typename S, typename A>
+bool req_sketch<T, C, S, A>::is_estimation_mode() const {
+  return compactors_.size() > 1;
+}
+
+template<typename T, typename C, typename S, typename A>
+template<typename FwdT>
+void req_sketch<T, C, S, A>::update(FwdT&& item) {
+  if (!check_update_value(item)) { return; }
+  if (is_empty()) {
+    min_value_ = new (allocator_.allocate(1)) T(item);
+    max_value_ = new (allocator_.allocate(1)) T(item);
+  } else {
+    if (C()(item, *min_value_)) *min_value_ = item;
+    if (C()(*max_value_, item)) *max_value_ = item;
+  }
+  compactors_[0].append(std::forward<FwdT>(item));
+  ++num_retained_;
+  ++n_;
+  if (num_retained_ == max_nom_size_) compress();
+}
+
+template<typename T, typename C, typename S, typename A>
+template<typename FwdSk>
+void req_sketch<T, C, S, A>::merge(FwdSk&& other) {
+  if (is_HRA() != other.is_HRA()) throw std::invalid_argument("merging HRA and LRA is not valid");
+  if (other.is_empty()) return;
+  if (is_empty()) {
+    min_value_ = new (allocator_.allocate(1)) T(conditional_forward<FwdSk>(*other.min_value_));
+    max_value_ = new (allocator_.allocate(1)) T(conditional_forward<FwdSk>(*other.max_value_));
+  } else {
+    if (C()(*other.min_value_, *min_value_)) *min_value_ = conditional_forward<FwdSk>(*other.min_value_);
+    if (C()(*max_value_, *other.max_value_)) *max_value_ = conditional_forward<FwdSk>(*other.max_value_);
+  }
+  // grow until this has at least as many compactors as other
+  while (get_num_levels() < other.get_num_levels()) grow();
+  // merge the items in all height compactors
+  for (size_t i = 0; i < other.get_num_levels(); ++i) {
+    compactors_[i].merge(conditional_forward<FwdSk>(other.compactors_[i]));
+  }
+  n_ += other.n_;
+  update_max_nom_size();
+  update_num_retained();
+  if (num_retained_ >= max_nom_size_) compress();
+}
+
+template<typename T, typename C, typename S, typename A>
+const T& req_sketch<T, C, S, A>::get_min_value() const {
+  if (is_empty()) return get_invalid_value();
+  return *min_value_;
+}
+
+template<typename T, typename C, typename S, typename A>
+const T& req_sketch<T, C, S, A>::get_max_value() const {
+  if (is_empty()) return get_invalid_value();
+  return *max_value_;
+}
+
+template<typename T, typename C, typename S, typename A>
+template<bool inclusive>
+double req_sketch<T, C, S, A>::get_rank(const T& item) const {
+  uint64_t weight = 0;
+  for (const auto& compactor: compactors_) {
+    weight += compactor.template compute_weight<inclusive>(item);
+  }
+  return static_cast<double>(weight) / n_;
+}
+
+template<typename T, typename C, typename S, typename A>
+template<bool inclusive>
+auto req_sketch<T, C, S, A>::get_PMF(const T* split_points, uint32_t size) const -> vector_double {
+  auto buckets = get_CDF<inclusive>(split_points, size);
+  for (uint32_t i = size; i > 0; --i) {
+    buckets[i] -= buckets[i - 1];
+  }
+  return buckets;
+}
+
+template<typename T, typename C, typename S, typename A>
+template<bool inclusive>
+auto req_sketch<T, C, S, A>::get_CDF(const T* split_points, uint32_t size) const -> vector_double {
+  vector_double buckets(allocator_);
+  if (is_empty()) return buckets;
+  check_split_points(split_points, size);
+  buckets.reserve(size + 1);
+  for (uint32_t i = 0; i < size; ++i) buckets.push_back(get_rank<inclusive>(split_points[i]));
+  buckets.push_back(1);
+  return buckets;
+}
+
+template<typename T, typename C, typename S, typename A>
+template<bool inclusive>
+const T& req_sketch<T, C, S, A>::get_quantile(double rank) const {
+  if (is_empty()) return get_invalid_value();
+  if (rank == 0.0) return *min_value_;
+  if (rank == 1.0) return *max_value_;
+  if ((rank < 0.0) || (rank > 1.0)) {
+    throw std::invalid_argument("Rank cannot be less than zero or greater than 1.0");
+  }
+  return *(get_quantile_calculator<inclusive>()->get_quantile(rank));
+}
+
+template<typename T, typename C, typename S, typename A>
+template<bool inclusive>
+std::vector<T, A> req_sketch<T, C, S, A>::get_quantiles(const double* ranks, uint32_t size) const {
+  std::vector<T, A> quantiles(allocator_);
+  if (is_empty()) return quantiles;
+  QuantileCalculatorPtr quantile_calculator(nullptr, calculator_deleter(allocator_));
+  quantiles.reserve(size);
+  for (uint32_t i = 0; i < size; ++i) {
+    const double rank = ranks[i];
+    if ((rank < 0.0) || (rank > 1.0)) {
+      throw std::invalid_argument("rank cannot be less than zero or greater than 1.0");
+    }
+    if      (rank == 0.0) quantiles.push_back(*min_value_);
+    else if (rank == 1.0) quantiles.push_back(*max_value_);
+    else {
+      if (!quantile_calculator) {
+        // has side effect of sorting level zero if needed
+        quantile_calculator = const_cast<req_sketch*>(this)->get_quantile_calculator<inclusive>();
+      }
+      quantiles.push_back(*(quantile_calculator->get_quantile(rank)));
+    }
+  }
+  return quantiles;
+}
+
+template<typename T, typename C, typename S, typename A>
+class req_sketch<T, C, S, A>::calculator_deleter {
+  public:
+  calculator_deleter(const AllocCalc& allocator): allocator_(allocator) {}
+  void operator() (QuantileCalculator* ptr) {
+    if (ptr != nullptr) {
+      ptr->~QuantileCalculator();
+      allocator_.deallocate(ptr, 1);
+    }
+  }
+  private:
+  AllocCalc allocator_;
+};
+
+template<typename T, typename C, typename S, typename A>
+template<bool inclusive>
+auto req_sketch<T, C, S, A>::get_quantile_calculator() const -> QuantileCalculatorPtr {
+  if (!compactors_[0].is_sorted()) {
+    const_cast<Compactor&>(compactors_[0]).sort(); // allow this side effect
+  }
+  AllocCalc ac(allocator_);
+  QuantileCalculatorPtr quantile_calculator(
+    new (ac.allocate(1)) req_quantile_calculator<T, C, A>(n_, ac),
+    calculator_deleter(ac)
+  );
+
+  for (auto& compactor: compactors_) {
+    quantile_calculator->add(compactor.begin(), compactor.end(), compactor.get_lg_weight());
+  }
+  quantile_calculator->template convert_to_cummulative<inclusive>();
+  return quantile_calculator;
+}
+
+template<typename T, typename C, typename S, typename A>
+double req_sketch<T, C, S, A>::get_rank_lower_bound(double rank, uint8_t num_std_dev) const {
+  return get_rank_lb(get_k(), get_num_levels(), rank, num_std_dev, get_n(), hra_);
+}
+
+template<typename T, typename C, typename S, typename A>
+double req_sketch<T, C, S, A>::get_rank_upper_bound(double rank, uint8_t num_std_dev) const {
+  return get_rank_ub(get_k(), get_num_levels(), rank, num_std_dev, get_n(), hra_);
+}
+
+template<typename T, typename C, typename S, typename A>
+double req_sketch<T, C, S, A>::get_RSE(uint16_t k, double rank, bool hra, uint64_t n) {
+  return get_rank_lb(k, 2, rank, 1, n, hra);
+}
+
+template<typename T, typename C, typename S, typename A>
+double req_sketch<T, C, S, A>::get_rank_lb(uint16_t k, uint8_t num_levels, double rank, uint8_t num_std_dev, uint64_t n, bool hra) {
+  if (is_exact_rank(k, num_levels, rank, n, hra)) return rank;
+  const double relative = relative_rse_factor() / k * (hra ? 1.0 - rank : rank);
+  const double fixed = FIXED_RSE_FACTOR / k;
+  const double lb_rel = rank - num_std_dev * relative;
+  const double lb_fix = rank - num_std_dev * fixed;
+  return std::max(lb_rel, lb_fix);
+}
+
+template<typename T, typename C, typename S, typename A>
+double req_sketch<T, C, S, A>::get_rank_ub(uint16_t k, uint8_t num_levels, double rank, uint8_t num_std_dev, uint64_t n, bool hra) {
+  if (is_exact_rank(k, num_levels, rank, n, hra)) return rank;
+  const double relative = relative_rse_factor() / k * (hra ? 1.0 - rank : rank);
+  const double fixed = FIXED_RSE_FACTOR / k;
+  const double ub_rel = rank + num_std_dev * relative;
+  const double ub_fix = rank + num_std_dev * fixed;
+  return std::min(ub_rel, ub_fix);
+}
+
+template<typename T, typename C, typename S, typename A>
+bool req_sketch<T, C, S, A>::is_exact_rank(uint16_t k, uint8_t num_levels, double rank, uint64_t n, bool hra) {
+  const unsigned base_cap = k * req_constants::INIT_NUM_SECTIONS;
+  if (num_levels == 1 || n <= base_cap) return true;
+  const double exact_rank_thresh = static_cast<double>(base_cap) / n;
+  return (hra && rank >= 1.0 - exact_rank_thresh) || (!hra && rank <= exact_rank_thresh);
+}
+
+template<typename T, typename C, typename S, typename A>
+double req_sketch<T, C, S, A>::relative_rse_factor() {
+  return sqrt(0.0512 / req_constants::INIT_NUM_SECTIONS);
+}
+
+// implementation for fixed-size arithmetic types (integral and floating point)
+template<typename T, typename C, typename S, typename A>
+template<typename TT, typename std::enable_if<std::is_arithmetic<TT>::value, int>::type>
+size_t req_sketch<T, C, S, A>::get_serialized_size_bytes() const {
+  size_t size = PREAMBLE_SIZE_BYTES;
+  if (is_empty()) return size;
+  if (is_estimation_mode()) {
+    size += sizeof(n_) + sizeof(TT) * 2; // min and max
+  }
+  if (n_ == 1) {
+    size += sizeof(TT);
+  } else {
+    for (const auto& compactor: compactors_) size += compactor.get_serialized_size_bytes(S());
+  }
+  return size;
+}
+
+// implementation for all other types
+template<typename T, typename C, typename S, typename A>
+template<typename TT, typename std::enable_if<!std::is_arithmetic<TT>::value, int>::type>
+size_t req_sketch<T, C, S, A>::get_serialized_size_bytes() const {
+  size_t size = PREAMBLE_SIZE_BYTES;
+  if (is_empty()) return size;
+  if (is_estimation_mode()) {
+    size += sizeof(n_);
+    size += S().size_of_item(*min_value_);
+    size += S().size_of_item(*max_value_);
+  }
+  if (n_ == 1) {
+    size += S().size_of_item(*compactors_[0].begin());
+  } else {
+    for (const auto& compactor: compactors_) size += compactor.get_serialized_size_bytes(S());
+  }
+  return size;
+}
+
+template<typename T, typename C, typename S, typename A>
+void req_sketch<T, C, S, A>::serialize(std::ostream& os) const {
+  const uint8_t preamble_ints = is_estimation_mode() ? 4 : 2;
+  write(os, preamble_ints);
+  const uint8_t serial_version = SERIAL_VERSION;
+  write(os, serial_version);
+  const uint8_t family = FAMILY;
+  write(os, family);
+  const bool raw_items = n_ <= req_constants::MIN_K;
+  const uint8_t flags_byte(
+      (is_empty() ? 1 << flags::IS_EMPTY : 0)
+    | (hra_ ? 1 << flags::IS_HIGH_RANK : 0)
+    | (raw_items ? 1 << flags::RAW_ITEMS : 0)
+    | (compactors_[0].is_sorted() ? 1 << flags::IS_LEVEL_ZERO_SORTED : 0)
+  );
+  write(os, flags_byte);
+  write(os, k_);
+  const uint8_t num_levels = is_empty() ? 0 : get_num_levels();
+  write(os, num_levels);
+  const uint8_t num_raw_items = raw_items ? n_ : 0;
+  write(os, num_raw_items);
+  if (is_empty()) return;
+  if (is_estimation_mode()) {
+    write(os, n_);
+    S().serialize(os, min_value_, 1);
+    S().serialize(os, max_value_, 1);
+  }
+  if (raw_items) {
+    S().serialize(os, compactors_[0].begin(), num_raw_items);
+  } else {
+    for (const auto& compactor: compactors_) compactor.serialize(os, S());
+  }
+}
+
+template<typename T, typename C, typename S, typename A>
+auto req_sketch<T, C, S, A>::serialize(unsigned header_size_bytes) const -> vector_bytes {
+  const size_t size = header_size_bytes + get_serialized_size_bytes();
+  vector_bytes bytes(size, 0, allocator_);
+  uint8_t* ptr = bytes.data() + header_size_bytes;
+  const uint8_t* end_ptr = ptr + size;
+
+  const uint8_t preamble_ints = is_estimation_mode() ? 4 : 2;
+  ptr += copy_to_mem(preamble_ints, ptr);
+  const uint8_t serial_version = SERIAL_VERSION;
+  ptr += copy_to_mem(serial_version, ptr);
+  const uint8_t family = FAMILY;
+  ptr += copy_to_mem(family, ptr);
+  const bool raw_items = n_ <= req_constants::MIN_K;
+  const uint8_t flags_byte(
+      (is_empty() ? 1 << flags::IS_EMPTY : 0)
+    | (hra_ ? 1 << flags::IS_HIGH_RANK : 0)
+    | (raw_items ? 1 << flags::RAW_ITEMS : 0)
+    | (compactors_[0].is_sorted() ? 1 << flags::IS_LEVEL_ZERO_SORTED : 0)
+  );
+  ptr += copy_to_mem(flags_byte, ptr);
+  ptr += copy_to_mem(k_, ptr);
+  const uint8_t num_levels = is_empty() ? 0 : get_num_levels();
+  ptr += copy_to_mem(num_levels, ptr);
+  const uint8_t num_raw_items = raw_items ? n_ : 0;
+  ptr += copy_to_mem(num_raw_items, ptr);
+  if (!is_empty()) {
+    if (is_estimation_mode()) {
+      ptr += copy_to_mem(n_, ptr);
+      ptr += S().serialize(ptr, end_ptr - ptr, min_value_, 1);
+      ptr += S().serialize(ptr, end_ptr - ptr, max_value_, 1);
+    }
+    if (raw_items) {
+      ptr += S().serialize(ptr, end_ptr - ptr, compactors_[0].begin(), num_raw_items);
+    } else {
+      for (const auto& compactor: compactors_) ptr += compactor.serialize(ptr, end_ptr - ptr, S());
+    }
+  }
+  return bytes;
+}
+
+template<typename T, typename C, typename S, typename A>
+req_sketch<T, C, S, A> req_sketch<T, C, S, A>::deserialize(std::istream& is, const A& allocator) {
+  const auto preamble_ints = read<uint8_t>(is);
+  const auto serial_version = read<uint8_t>(is);
+  const auto family_id = read<uint8_t>(is);
+  const auto flags_byte = read<uint8_t>(is);
+  const auto k = read<uint16_t>(is);
+  const auto num_levels = read<uint8_t>(is);
+  const auto num_raw_items = read<uint8_t>(is);
+
+  check_preamble_ints(preamble_ints, num_levels);
+  check_serial_version(serial_version);
+  check_family_id(family_id);
+
+  if (!is.good()) throw std::runtime_error("error reading from std::istream");
+  const bool is_empty = flags_byte & (1 << flags::IS_EMPTY);
+  const bool hra = flags_byte & (1 << flags::IS_HIGH_RANK);
+  if (is_empty) return req_sketch(k, hra, allocator);
+
+  A alloc(allocator);
+  auto item_buffer_deleter = [&alloc](T* ptr) { alloc.deallocate(ptr, 1); };
+  std::unique_ptr<T, decltype(item_buffer_deleter)> min_value_buffer(alloc.allocate(1), item_buffer_deleter);
+  std::unique_ptr<T, decltype(item_buffer_deleter)> max_value_buffer(alloc.allocate(1), item_buffer_deleter);
+  std::unique_ptr<T, item_deleter> min_value(nullptr, item_deleter(allocator));
+  std::unique_ptr<T, item_deleter> max_value(nullptr, item_deleter(allocator));
+
+  const bool raw_items = flags_byte & (1 << flags::RAW_ITEMS);
+  const bool is_level_0_sorted = flags_byte & (1 << flags::IS_LEVEL_ZERO_SORTED);
+  std::vector<Compactor, AllocCompactor> compactors(allocator);
+
+  uint64_t n = 1;
+  if (num_levels > 1) {
+    n = read<uint64_t>(is);
+    S().deserialize(is, min_value_buffer.get(), 1);
+    // serde call did not throw, repackage with destrtuctor
+    min_value = std::unique_ptr<T, item_deleter>(min_value_buffer.release(), item_deleter(allocator));
+    S().deserialize(is, max_value_buffer.get(), 1);
+    // serde call did not throw, repackage with destrtuctor
+    max_value = std::unique_ptr<T, item_deleter>(max_value_buffer.release(), item_deleter(allocator));
+  }
+
+  if (raw_items) {
+    compactors.push_back(Compactor::deserialize(is, S(), allocator, is_level_0_sorted, k, num_raw_items, hra));
+  } else {
+    for (size_t i = 0; i < num_levels; ++i) {
+      compactors.push_back(Compactor::deserialize(is, S(), allocator, i == 0 ? is_level_0_sorted : true, hra));
+    }
+  }
+  if (num_levels == 1) {
+    const auto begin = compactors[0].begin();
+    const auto end = compactors[0].end();
+    n = compactors[0].get_num_items();
+    auto min_it = begin;
+    auto max_it = begin;
+    for (auto it = begin; it != end; ++it) {
+      if (C()(*it, *min_it)) min_it = it;
+      if (C()(*max_it, *it)) max_it = it;
+    }
+    new (min_value_buffer.get()) T(*min_it);
+    // copy did not throw, repackage with destrtuctor
+    min_value = std::unique_ptr<T, item_deleter>(min_value_buffer.release(), item_deleter(allocator));
+    new (max_value_buffer.get()) T(*max_it);
+    // copy did not throw, repackage with destrtuctor
+    max_value = std::unique_ptr<T, item_deleter>(max_value_buffer.release(), item_deleter(allocator));
+  }
+
+  if (!is.good()) throw std::runtime_error("error reading from std::istream");
+  return req_sketch(k, hra, n, std::move(min_value), std::move(max_value), std::move(compactors));
+}
+
+template<typename T, typename C, typename S, typename A>
+req_sketch<T, C, S, A> req_sketch<T, C, S, A>::deserialize(const void* bytes, size_t size, const A& allocator) {
+  ensure_minimum_memory(size, 8);
+  const char* ptr = static_cast<const char*>(bytes);
+  const char* end_ptr = static_cast<const char*>(bytes) + size;
+
+  uint8_t preamble_ints;
+  ptr += copy_from_mem(ptr, preamble_ints);
+  uint8_t serial_version;
+  ptr += copy_from_mem(ptr, serial_version);
+  uint8_t family_id;
+  ptr += copy_from_mem(ptr, family_id);
+  uint8_t flags_byte;
+  ptr += copy_from_mem(ptr, flags_byte);
+  uint16_t k;
+  ptr += copy_from_mem(ptr, k);
+  uint8_t num_levels;
+  ptr += copy_from_mem(ptr, num_levels);
+  uint8_t num_raw_items;
+  ptr += copy_from_mem(ptr, num_raw_items);
+
+  check_preamble_ints(preamble_ints, num_levels);
+  check_serial_version(serial_version);
+  check_family_id(family_id);
+
+  const bool is_empty = flags_byte & (1 << flags::IS_EMPTY);
+  const bool hra = flags_byte & (1 << flags::IS_HIGH_RANK);
+  if (is_empty) return req_sketch(k, hra, allocator);
+
+  A alloc(allocator);
+  auto item_buffer_deleter = [&alloc](T* ptr) { alloc.deallocate(ptr, 1); };
+  std::unique_ptr<T, decltype(item_buffer_deleter)> min_value_buffer(alloc.allocate(1), item_buffer_deleter);
+  std::unique_ptr<T, decltype(item_buffer_deleter)> max_value_buffer(alloc.allocate(1), item_buffer_deleter);
+  std::unique_ptr<T, item_deleter> min_value(nullptr, item_deleter(allocator));
+  std::unique_ptr<T, item_deleter> max_value(nullptr, item_deleter(allocator));
+
+  const bool raw_items = flags_byte & (1 << flags::RAW_ITEMS);
+  const bool is_level_0_sorted = flags_byte & (1 << flags::IS_LEVEL_ZERO_SORTED);
+  std::vector<Compactor, AllocCompactor> compactors(allocator);
+
+  uint64_t n = 1;
+  if (num_levels > 1) {
+    ensure_minimum_memory(end_ptr - ptr, sizeof(n));
+    ptr += copy_from_mem(ptr, n);
+    ptr += S().deserialize(ptr, end_ptr - ptr, min_value_buffer.get(), 1);
+    // serde call did not throw, repackage with destrtuctor
+    min_value = std::unique_ptr<T, item_deleter>(min_value_buffer.release(), item_deleter(allocator));
+    ptr += S().deserialize(ptr, end_ptr - ptr, max_value_buffer.get(), 1);
+    // serde call did not throw, repackage with destrtuctor
+    max_value = std::unique_ptr<T, item_deleter>(max_value_buffer.release(), item_deleter(allocator));
+  }
+
+  if (raw_items) {
+    auto pair = Compactor::deserialize(ptr, end_ptr - ptr, S(), allocator, is_level_0_sorted, k, num_raw_items, hra);
+    compactors.push_back(std::move(pair.first));
+    ptr += pair.second;
+  } else {
+    for (size_t i = 0; i < num_levels; ++i) {
+      auto pair = Compactor::deserialize(ptr, end_ptr - ptr, S(), allocator, i == 0 ? is_level_0_sorted : true, hra);
+      compactors.push_back(std::move(pair.first));
+      ptr += pair.second;
+    }
+  }
+  if (num_levels == 1) {
+    const auto begin = compactors[0].begin();
+    const auto end = compactors[0].end();
+    n = compactors[0].get_num_items();
+    auto min_it = begin;
+    auto max_it = begin;
+    for (auto it = begin; it != end; ++it) {
+      if (C()(*it, *min_it)) min_it = it;
+      if (C()(*max_it, *it)) max_it = it;
+    }
+    new (min_value_buffer.get()) T(*min_it);
+    // copy did not throw, repackage with destrtuctor
+    min_value = std::unique_ptr<T, item_deleter>(min_value_buffer.release(), item_deleter(allocator));
+    new (max_value_buffer.get()) T(*max_it);
+    // copy did not throw, repackage with destrtuctor
+    max_value = std::unique_ptr<T, item_deleter>(max_value_buffer.release(), item_deleter(allocator));
+  }
+
+  return req_sketch(k, hra, n, std::move(min_value), std::move(max_value), std::move(compactors));
+}
+
+template<typename T, typename C, typename S, typename A>
+void req_sketch<T, C, S, A>::grow() {
+  const uint8_t lg_weight = get_num_levels();
+  compactors_.push_back(Compactor(hra_, lg_weight, k_, allocator_));
+  update_max_nom_size();
+}
+
+template<typename T, typename C, typename S, typename A>
+uint8_t req_sketch<T, C, S, A>::get_num_levels() const {
+  return compactors_.size();
+}
+
+template<typename T, typename C, typename S, typename A>
+void req_sketch<T, C, S, A>::update_max_nom_size() {
+  max_nom_size_ = 0;
+  for (const auto& compactor: compactors_) max_nom_size_ += compactor.get_nom_capacity();
+}
+
+template<typename T, typename C, typename S, typename A>
+void req_sketch<T, C, S, A>::update_num_retained() {
+  num_retained_ = 0;
+  for (const auto& compactor: compactors_) num_retained_ += compactor.get_num_items();
+}
+
+template<typename T, typename C, typename S, typename A>
+void req_sketch<T, C, S, A>::compress() {
+  for (size_t h = 0; h < compactors_.size(); ++h) {
+    if (compactors_[h].get_num_items() >= compactors_[h].get_nom_capacity()) {
+      if (h == 0) compactors_[0].sort();
+      if (h + 1 >= get_num_levels()) { // at the top?
+        grow(); // add a level, increases max_nom_size
+      }
+      auto pair = compactors_[h].compact(compactors_[h + 1]);
+      num_retained_ -= pair.first;
+      max_nom_size_ += pair.second;
+      if (LAZY_COMPRESSION && num_retained_ < max_nom_size_) break;
+    }
+  }
+}
+
+template<typename T, typename C, typename S, typename A>
+string<A> req_sketch<T, C, S, A>::to_string(bool print_levels, bool print_items) const {
+  std::basic_ostringstream<char, std::char_traits<char>, AllocChar<A>> os;
+  os << "### REQ sketch summary:" << std::endl;
+  os << "   K              : " << k_ << std::endl;
+  os << "   High Rank Acc  : " << (hra_ ? "true" : "false") << std::endl;
+  os << "   Empty          : " << (is_empty() ? "true" : "false") << std::endl;
+  os << "   Estimation mode: " << (is_estimation_mode() ? "true" : "false") << std::endl;
+  os << "   Sorted         : " << (compactors_[0].is_sorted() ? "true" : "false") << std::endl;
+  os << "   N              : " << n_ << std::endl;
+  os << "   Levels         : " << compactors_.size() << std::endl;
+  os << "   Retained items : " << num_retained_ << std::endl;
+  os << "   Capacity items : " << max_nom_size_ << std::endl;
+  if (!is_empty()) {
+    os << "   Min value      : " << *min_value_ << std::endl;
+    os << "   Max value      : " << *max_value_ << std::endl;
+  }
+  os << "### End sketch summary" << std::endl;
+
+  if (print_levels) {
+    os << "### REQ sketch levels:" << std::endl;
+    os << "   index: nominal capacity, actual size" << std::endl;
+    for (uint8_t i = 0; i < compactors_.size(); i++) {
+      os << "   " << (unsigned int) i << ": "
+        << compactors_[i].get_nom_capacity() << ", "
+        << compactors_[i].get_num_items() << std::endl;
+    }
+    os << "### End sketch levels" << std::endl;
+  }
+
+  if (print_items) {
+    os << "### REQ sketch data:" << std::endl;
+    unsigned level = 0;
+    for (const auto& compactor: compactors_) {
+      os << " level " << level << ": " << std::endl;
+      for (auto it = compactor.begin(); it != compactor.end(); ++it) {
+        os << "   " << *it << std::endl;
+      }
+      ++level;
+    }
+    os << "### End sketch data" << std::endl;
+  }
+  return os.str();
+}
+
+template<typename T, typename C, typename S, typename A>
+class req_sketch<T, C, S, A>::item_deleter {
+  public:
+  item_deleter(const A& allocator): allocator_(allocator) {}
+  void operator() (T* ptr) {
+    if (ptr != nullptr) {
+      ptr->~T();
+      allocator_.deallocate(ptr, 1);
+    }
+  }
+  private:
+  A allocator_;
+};
+
+template<typename T, typename C, typename S, typename A>
+req_sketch<T, C, S, A>::req_sketch(uint32_t k, bool hra, uint64_t n, std::unique_ptr<T, item_deleter> min_value, std::unique_ptr<T, item_deleter> max_value, std::vector<Compactor, AllocCompactor>&& compactors):
+allocator_(compactors.get_allocator()),
+k_(k),
+hra_(hra),
+max_nom_size_(0),
+num_retained_(0),
+n_(n),
+compactors_(std::move(compactors)),
+min_value_(min_value.release()),
+max_value_(max_value.release())
+{
+  update_max_nom_size();
+  update_num_retained();
+}
+
+template<typename T, typename C, typename S, typename A>
+void req_sketch<T, C, S, A>::check_preamble_ints(uint8_t preamble_ints, uint8_t num_levels) {
+  const uint8_t expected_preamble_ints = num_levels > 1 ? 4 : 2;
+  if (preamble_ints != expected_preamble_ints) {
+    throw std::invalid_argument("Possible corruption: preamble ints must be "
+        + std::to_string(expected_preamble_ints) + ", got " + std::to_string(preamble_ints));
+  }
+}
+
+template<typename T, typename C, typename S, typename A>
+void req_sketch<T, C, S, A>::check_serial_version(uint8_t serial_version) {
+  if (serial_version != SERIAL_VERSION) {
+    throw std::invalid_argument("Possible corruption: serial version mismatch: expected "
+        + std::to_string(SERIAL_VERSION)
+        + ", got " + std::to_string(serial_version));
+  }
+}
+
+template<typename T, typename C, typename S, typename A>
+void req_sketch<T, C, S, A>::check_family_id(uint8_t family_id) {
+  if (family_id != FAMILY) {
+    throw std::invalid_argument("Possible corruption: family mismatch: expected "
+        + std::to_string(FAMILY) + ", got " + std::to_string(family_id));
+  }
+}
+
+template<typename T, typename C, typename S, typename A>
+auto req_sketch<T, C, S, A>::begin() const -> const_iterator {
+  return const_iterator(compactors_.begin(), compactors_.end());
+}
+
+template<typename T, typename C, typename S, typename A>
+auto req_sketch<T, C, S, A>::end() const -> const_iterator {
+  return const_iterator(compactors_.end(), compactors_.end());
+}
+
+// iterator
+
+template<typename T, typename C, typename S, typename A>
+req_sketch<T, C, S, A>::const_iterator::const_iterator(LevelsIterator begin, LevelsIterator end):
+levels_it_(begin),
+levels_end_(end),
+compactor_it_((*levels_it_).begin())
+{}
+
+template<typename T, typename C, typename S, typename A>
+auto req_sketch<T, C, S, A>::const_iterator::operator++() -> const_iterator& {
+  ++compactor_it_;
+  if (compactor_it_ == (*levels_it_).end()) {
+    ++levels_it_;
+    if (levels_it_ != levels_end_) compactor_it_ = (*levels_it_).begin();
+  }
+  return *this;
+}
+
+template<typename T, typename C, typename S, typename A>
+auto req_sketch<T, C, S, A>::const_iterator::operator++(int) -> const_iterator& {
+  const_iterator tmp(*this);
+  operator++();
+  return tmp;
+}
+
+template<typename T, typename C, typename S, typename A>
+bool req_sketch<T, C, S, A>::const_iterator::operator==(const const_iterator& other) const {
+  if (levels_it_ != other.levels_it_) return false;
+  if (levels_it_ == levels_end_) return true;
+  return compactor_it_ == other.compactor_it_;
+}
+
+template<typename T, typename C, typename S, typename A>
+bool req_sketch<T, C, S, A>::const_iterator::operator!=(const const_iterator& other) const {
+  return !operator==(other);
+}
+
+template<typename T, typename C, typename S, typename A>
+std::pair<const T&, const uint64_t> req_sketch<T, C, S, A>::const_iterator::operator*() const {
+  return std::pair<const T&, const uint64_t>(*compactor_it_, 1 << (*levels_it_).get_lg_weight());
+}
+
+} /* namespace datasketches */
+
+#endif
diff --git a/req/test/CMakeLists.txt b/req/test/CMakeLists.txt
new file mode 100755
index 0000000..a509068
--- /dev/null
+++ b/req/test/CMakeLists.txt
@@ -0,0 +1,43 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+add_executable(req_test)
+
+target_link_libraries(req_test req common_test)
+
+set_target_properties(req_test PROPERTIES
+  CXX_STANDARD 11
+  CXX_STANDARD_REQUIRED YES
+)
+
+file(TO_CMAKE_PATH "${CMAKE_CURRENT_SOURCE_DIR}" REQ_TEST_BINARY_PATH)
+string(APPEND REQ_TEST_BINARY_PATH "/")
+target_compile_definitions(req_test
+  PRIVATE
+    TEST_BINARY_INPUT_PATH="${REQ_TEST_BINARY_PATH}"
+)
+
+add_test(
+  NAME req_test
+  COMMAND req_test
+)
+
+target_sources(req_test
+  PRIVATE
+    req_sketch_test.cpp
+    req_sketch_custom_type_test.cpp
+)
diff --git a/req/test/req_float_empty_from_java.sk b/req/test/req_float_empty_from_java.sk
new file mode 100644
index 0000000..9b24bcc
--- /dev/null
+++ b/req/test/req_float_empty_from_java.sk
Binary files differ
diff --git a/req/test/req_float_estimation_from_java.sk b/req/test/req_float_estimation_from_java.sk
new file mode 100644
index 0000000..d063b41
--- /dev/null
+++ b/req/test/req_float_estimation_from_java.sk
Binary files differ
diff --git a/req/test/req_float_exact_from_java.sk b/req/test/req_float_exact_from_java.sk
new file mode 100644
index 0000000..d144ac8
--- /dev/null
+++ b/req/test/req_float_exact_from_java.sk
Binary files differ
diff --git a/req/test/req_float_raw_items_from_java.sk b/req/test/req_float_raw_items_from_java.sk
new file mode 100644
index 0000000..0bfe5a9
--- /dev/null
+++ b/req/test/req_float_raw_items_from_java.sk
Binary files differ
diff --git a/req/test/req_float_single_item_from_java.sk b/req/test/req_float_single_item_from_java.sk
new file mode 100644
index 0000000..774db9f
--- /dev/null
+++ b/req/test/req_float_single_item_from_java.sk
Binary files differ
diff --git a/req/test/req_sketch_custom_type_test.cpp b/req/test/req_sketch_custom_type_test.cpp
new file mode 100644
index 0000000..1d76c3c
--- /dev/null
+++ b/req/test/req_sketch_custom_type_test.cpp
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <catch.hpp>
+#include <sstream>
+
+#include <req_sketch.hpp>
+#include <test_allocator.hpp>
+#include <test_type.hpp>
+
+namespace datasketches {
+
+using req_test_type_sketch = req_sketch<test_type, test_type_less, test_type_serde, test_allocator<test_type>>;
+
+TEST_CASE("req sketch custom type", "[req_sketch]") {
+
+  // setup section
+  test_allocator_total_bytes = 0;
+
+  SECTION("compact level zero") {
+    req_test_type_sketch sketch(4);
+    REQUIRE_THROWS_AS(sketch.get_quantile(0), std::runtime_error);
+    REQUIRE_THROWS_AS(sketch.get_min_value(), std::runtime_error);
+    REQUIRE_THROWS_AS(sketch.get_max_value(), std::runtime_error);
+    REQUIRE(sketch.get_serialized_size_bytes() == 8);
+
+    for (int i = 0; i < 24; ++i) sketch.update(i);
+    //std::cout << sketch.to_string(true);
+
+    REQUIRE(sketch.is_estimation_mode());
+    REQUIRE(sketch.get_n() > sketch.get_num_retained());
+    REQUIRE(sketch.get_min_value().get_value() == 0);
+    REQUIRE(sketch.get_max_value().get_value() == 23);
+  }
+
+  SECTION("merge small") {
+    req_test_type_sketch sketch1(4);
+    sketch1.update(1);
+
+    req_test_type_sketch sketch2(4);
+    sketch2.update(2);
+
+    sketch2.merge(sketch1);
+
+    //std::cout << sketch2.to_string(true);
+
+    REQUIRE_FALSE(sketch2.is_estimation_mode());
+    REQUIRE(sketch2.get_num_retained() == sketch2.get_n());
+    REQUIRE(sketch2.get_min_value().get_value() == 1);
+    REQUIRE(sketch2.get_max_value().get_value() == 2);
+  }
+
+  SECTION("merge higher levels") {
+    req_test_type_sketch sketch1(4);
+    for (int i = 0; i < 24; ++i) sketch1.update(i);
+
+    req_test_type_sketch sketch2(4);
+    for (int i = 0; i < 24; ++i) sketch2.update(i);
+
+    sketch2.merge(sketch1);
+
+    //std::cout << sketch2.to_string(true);
+
+    REQUIRE(sketch2.is_estimation_mode());
+    REQUIRE(sketch2.get_n() > sketch2.get_num_retained());
+    REQUIRE(sketch2.get_min_value().get_value() == 0);
+    REQUIRE(sketch2.get_max_value().get_value() == 23);
+  }
+
+  SECTION("serialize deserialize") {
+    req_test_type_sketch sketch1(12);
+
+    const int n = 1000;
+    for (int i = 0; i < n; i++) sketch1.update(i);
+
+    std::stringstream s(std::ios::in | std::ios::out | std::ios::binary);
+    sketch1.serialize(s);
+    REQUIRE((size_t) s.tellp() == sketch1.get_serialized_size_bytes());
+    auto sketch2 = req_test_type_sketch::deserialize(s);
+    REQUIRE((size_t) s.tellg() == sketch2.get_serialized_size_bytes());
+    REQUIRE(s.tellg() == s.tellp());
+    REQUIRE(sketch2.is_empty() == sketch1.is_empty());
+    REQUIRE(sketch2.is_estimation_mode() == sketch1.is_estimation_mode());
+    REQUIRE(sketch2.get_n() == sketch1.get_n());
+    REQUIRE(sketch2.get_num_retained() == sketch1.get_num_retained());
+    REQUIRE(sketch2.get_min_value().get_value() == sketch1.get_min_value().get_value());
+    REQUIRE(sketch2.get_max_value().get_value() == sketch1.get_max_value().get_value());
+    REQUIRE(sketch2.get_quantile(0.5).get_value() == sketch1.get_quantile(0.5).get_value());
+    REQUIRE(sketch2.get_rank(0) == sketch1.get_rank(0));
+    REQUIRE(sketch2.get_rank(n) == sketch1.get_rank(n));
+    REQUIRE(sketch2.get_rank(n / 2) == sketch1.get_rank(n / 2));
+  }
+
+  SECTION("moving merge") {
+    req_test_type_sketch sketch1(4);
+    for (int i = 0; i < 10; i++) sketch1.update(i);
+    req_test_type_sketch sketch2(4);
+    sketch2.update(10);
+    sketch2.merge(std::move(sketch1));
+    REQUIRE(sketch2.get_min_value().get_value() == 0);
+    REQUIRE(sketch2.get_max_value().get_value() == 10);
+    REQUIRE(sketch2.get_n() == 11);
+  }
+
+  // cleanup
+  if (test_allocator_total_bytes != 0) {
+    REQUIRE(test_allocator_total_bytes == 0);
+  }
+}
+
+} /* namespace datasketches */
diff --git a/req/test/req_sketch_test.cpp b/req/test/req_sketch_test.cpp
new file mode 100755
index 0000000..0be4aa5
--- /dev/null
+++ b/req/test/req_sketch_test.cpp
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <catch.hpp>
+
+#include <req_sketch.hpp>
+
+#include <fstream>
+#include <sstream>
+#include <limits>
+
+namespace datasketches {
+
+#ifdef TEST_BINARY_INPUT_PATH
+const std::string input_path = TEST_BINARY_INPUT_PATH;
+#else
+const std::string input_path = "test/";
+#endif
+
+TEST_CASE("req sketch: empty", "[req_sketch]") {
+  std::cout << "sizeof(req_float_sketch)=" << sizeof(req_sketch<float>) << "\n";
+  req_sketch<float> sketch(12);
+  REQUIRE(sketch.get_k() == 12);
+  REQUIRE(sketch.is_HRA());
+  REQUIRE(sketch.is_empty());
+  REQUIRE_FALSE(sketch.is_estimation_mode());
+  REQUIRE(sketch.get_n() == 0);
+  REQUIRE(sketch.get_num_retained() == 0);
+  REQUIRE(std::isnan(sketch.get_rank(0)));
+  REQUIRE(std::isnan(sketch.get_rank(std::numeric_limits<float>::infinity())));
+  REQUIRE(std::isnan(sketch.get_min_value()));
+  REQUIRE(std::isnan(sketch.get_max_value()));
+  REQUIRE(std::isnan(sketch.get_quantile(0)));
+  REQUIRE(std::isnan(sketch.get_quantile(0.5)));
+  REQUIRE(std::isnan(sketch.get_quantile(1)));
+  const double ranks[3] {0, 0.5, 1};
+  REQUIRE(sketch.get_quantiles(ranks, 3).size() == 0);
+}
+
+TEST_CASE("req sketch: single value, lra", "[req_sketch]") {
+  req_sketch<float> sketch(12, false);
+  sketch.update(1);
+  REQUIRE_FALSE(sketch.is_HRA());
+  REQUIRE_FALSE(sketch.is_empty());
+  REQUIRE_FALSE(sketch.is_estimation_mode());
+  REQUIRE(sketch.get_n() == 1);
+  REQUIRE(sketch.get_num_retained() == 1);
+  REQUIRE(sketch.get_rank(1) == 0);
+  REQUIRE(sketch.get_rank<true>(1) == 1);
+  REQUIRE(sketch.get_rank(1.1) == 1);
+  REQUIRE(sketch.get_rank(std::numeric_limits<float>::infinity()) == 1);
+  REQUIRE(sketch.get_quantile(0) == 1);
+  REQUIRE(sketch.get_quantile(0.5) == 1);
+  REQUIRE(sketch.get_quantile(1) == 1);
+
+  const double ranks[3] {0, 0.5, 1};
+  auto quantiles = sketch.get_quantiles(ranks, 3);
+  REQUIRE(quantiles.size() == 3);
+  REQUIRE(quantiles[0] == 1);
+  REQUIRE(quantiles[1] == 1);
+  REQUIRE(quantiles[2] == 1);
+
+  unsigned count = 0;
+  for (auto it: sketch) {
+    REQUIRE(it.second == 1);
+    ++count;
+  }
+  REQUIRE(count == 1);
+}
+
+TEST_CASE("req sketch: repeated values", "[req_sketch]") {
+  req_sketch<float> sketch(12);
+  sketch.update(1);
+  sketch.update(1);
+  sketch.update(1);
+  sketch.update(2);
+  sketch.update(2);
+  sketch.update(2);
+  REQUIRE_FALSE(sketch.is_empty());
+  REQUIRE_FALSE(sketch.is_estimation_mode());
+  REQUIRE(sketch.get_n() == 6);
+  REQUIRE(sketch.get_num_retained() == 6);
+  REQUIRE(sketch.get_rank(1) == 0);
+  REQUIRE(sketch.get_rank<true>(1) == 0.5);
+  REQUIRE(sketch.get_rank(2) == 0.5);
+  REQUIRE(sketch.get_rank<true>(2) == 1);
+}
+
+TEST_CASE("req sketch: exact mode", "[req_sketch]") {
+  req_sketch<float> sketch(12);
+  for (size_t i = 1; i <= 10; ++i) sketch.update(i);
+  REQUIRE_FALSE(sketch.is_empty());
+  REQUIRE_FALSE(sketch.is_estimation_mode());
+  REQUIRE(sketch.get_n() == 10);
+  REQUIRE(sketch.get_num_retained() == 10);
+
+  // like KLL
+  REQUIRE(sketch.get_rank(1) == 0);
+  REQUIRE(sketch.get_rank(2) == 0.1);
+  REQUIRE(sketch.get_rank(6) == 0.5);
+  REQUIRE(sketch.get_rank(9) == 0.8);
+  REQUIRE(sketch.get_rank(10) == 0.9);
+
+  // inclusive
+  REQUIRE(sketch.get_rank<true>(1) == 0.1);
+  REQUIRE(sketch.get_rank<true>(2) == 0.2);
+  REQUIRE(sketch.get_rank<true>(5) == 0.5);
+  REQUIRE(sketch.get_rank<true>(9) == 0.9);
+  REQUIRE(sketch.get_rank<true>(10) == 1);
+
+  // like KLL
+  REQUIRE(sketch.get_quantile(0) == 1);
+  REQUIRE(sketch.get_quantile(0.1) == 2);
+  REQUIRE(sketch.get_quantile(0.5) == 6);
+  REQUIRE(sketch.get_quantile(0.9) == 10);
+  REQUIRE(sketch.get_quantile(1) == 10);
+
+  // inclusive
+  REQUIRE(sketch.get_quantile<true>(0) == 1);
+  REQUIRE(sketch.get_quantile<true>(0.1) == 1);
+  REQUIRE(sketch.get_quantile<true>(0.5) == 5);
+  REQUIRE(sketch.get_quantile<true>(0.9) == 9);
+  REQUIRE(sketch.get_quantile<true>(1) == 10);
+
+  const double ranks[3] {0, 0.5, 1};
+  auto quantiles = sketch.get_quantiles(ranks, 3);
+  REQUIRE(quantiles.size() == 3);
+  REQUIRE(quantiles[0] == 1);
+  REQUIRE(quantiles[1] == 6);
+  REQUIRE(quantiles[2] == 10);
+
+  const float splits[3] {2, 6, 9};
+  auto cdf = sketch.get_CDF(splits, 3);
+  REQUIRE(cdf[0] == 0.1);
+  REQUIRE(cdf[1] == 0.5);
+  REQUIRE(cdf[2] == 0.8);
+  REQUIRE(cdf[3] == 1);
+  auto pmf = sketch.get_PMF(splits, 3);
+  REQUIRE(pmf[0] == Approx(0.1).margin(1e-8));
+  REQUIRE(pmf[1] == Approx(0.4).margin(1e-8));
+  REQUIRE(pmf[2] == Approx(0.3).margin(1e-8));
+  REQUIRE(pmf[3] == Approx(0.2).margin(1e-8));
+
+  REQUIRE(sketch.get_rank_lower_bound(0.5, 1) == 0.5);
+  REQUIRE(sketch.get_rank_upper_bound(0.5, 1) == 0.5);
+}
+
+TEST_CASE("req sketch: estimation mode", "[req_sketch]") {
+  req_sketch<float> sketch(12);
+  const size_t n = 100000;
+  for (size_t i = 0; i < n; ++i) sketch.update(i);
+  REQUIRE_FALSE(sketch.is_empty());
+  REQUIRE(sketch.is_estimation_mode());
+  REQUIRE(sketch.get_n() == n);
+//  std::cout << sketch.to_string(true);
+  REQUIRE(sketch.get_num_retained() < n);
+  REQUIRE(sketch.get_rank(0) == 0);
+  REQUIRE(sketch.get_rank(n) == 1);
+  REQUIRE(sketch.get_rank(n / 2) == Approx(0.5).margin(0.01));
+  REQUIRE(sketch.get_rank(n - 1) == Approx(1).margin(0.01));
+  REQUIRE(sketch.get_min_value() == 0);
+  REQUIRE(sketch.get_max_value() == n - 1);
+  REQUIRE(sketch.get_rank_lower_bound(0.5, 1) < 0.5);
+  REQUIRE(sketch.get_rank_upper_bound(0.5, 1) > 0.5);
+
+  unsigned count = 0;
+  for (auto it: sketch) {
+    REQUIRE(it.second >= 1);
+    ++count;
+  }
+  REQUIRE(count == sketch.get_num_retained());
+}
+
+TEST_CASE("req sketch: stream serialize-deserialize empty", "[req_sketch]") {
+  req_sketch<float> sketch(12);
+
+  std::stringstream s(std::ios::in | std::ios::out | std::ios::binary);
+  sketch.serialize(s);
+  auto sketch2 = req_sketch<float>::deserialize(s);
+  REQUIRE(s.tellg() == s.tellp());
+  REQUIRE(sketch2.is_empty() == sketch.is_empty());
+  REQUIRE(sketch2.is_estimation_mode() == sketch.is_estimation_mode());
+  REQUIRE(sketch2.get_num_retained() == sketch.get_num_retained());
+  REQUIRE(sketch2.get_n() == sketch.get_n());
+  REQUIRE(std::isnan(sketch2.get_min_value()));
+  REQUIRE(std::isnan(sketch2.get_max_value()));
+}
+
+TEST_CASE("req sketch: byte serialize-deserialize empty", "[req_sketch]") {
+  req_sketch<float> sketch(12);
+
+  auto bytes = sketch.serialize();
+  REQUIRE(bytes.size() == sketch.get_serialized_size_bytes());
+  auto sketch2 = req_sketch<float>::deserialize(bytes.data(), bytes.size());
+  REQUIRE(bytes.size() == sketch2.get_serialized_size_bytes());
+  REQUIRE(sketch2.is_empty() == sketch.is_empty());
+  REQUIRE(sketch2.is_estimation_mode() == sketch.is_estimation_mode());
+  REQUIRE(sketch2.get_num_retained() == sketch.get_num_retained());
+  REQUIRE(sketch2.get_n() == sketch.get_n());
+  REQUIRE(std::isnan(sketch2.get_min_value()));
+  REQUIRE(std::isnan(sketch2.get_max_value()));
+}
+
+TEST_CASE("req sketch: stream serialize-deserialize single item", "[req_sketch]") {
+  req_sketch<float> sketch(12);
+  sketch.update(1);
+
+  std::stringstream s(std::ios::in | std::ios::out | std::ios::binary);
+  sketch.serialize(s);
+  auto sketch2 = req_sketch<float>::deserialize(s);
+  REQUIRE(s.tellg() == s.tellp());
+  REQUIRE(sketch2.is_empty() == sketch.is_empty());
+  REQUIRE(sketch2.is_estimation_mode() == sketch.is_estimation_mode());
+  REQUIRE(sketch2.get_num_retained() == sketch.get_num_retained());
+  REQUIRE(sketch2.get_n() == sketch.get_n());
+  REQUIRE(sketch2.get_min_value() == sketch.get_min_value());
+  REQUIRE(sketch2.get_max_value() == sketch.get_max_value());
+}
+
+TEST_CASE("req sketch: byte serialize-deserialize single item", "[req_sketch]") {
+  req_sketch<float> sketch(12);
+  sketch.update(1);
+
+  auto bytes = sketch.serialize();
+  REQUIRE(bytes.size() == sketch.get_serialized_size_bytes());
+  auto sketch2 = req_sketch<float>::deserialize(bytes.data(), bytes.size());
+  std::cout << sketch2.to_string(true);
+  REQUIRE(bytes.size() == sketch2.get_serialized_size_bytes());
+  REQUIRE(sketch2.is_empty() == sketch.is_empty());
+  REQUIRE(sketch2.is_estimation_mode() == sketch.is_estimation_mode());
+  REQUIRE(sketch2.get_num_retained() == sketch.get_num_retained());
+  REQUIRE(sketch2.get_n() == sketch.get_n());
+  REQUIRE(sketch2.get_min_value() == sketch.get_min_value());
+  REQUIRE(sketch2.get_max_value() == sketch.get_max_value());
+}
+
+TEST_CASE("req sketch: stream serialize-deserialize exact mode", "[req_sketch]") {
+  req_sketch<float> sketch(12);
+  const size_t n = 50;
+  for (size_t i = 0; i < n; ++i) sketch.update(i);
+  REQUIRE_FALSE(sketch.is_estimation_mode());
+
+  std::stringstream s(std::ios::in | std::ios::out | std::ios::binary);
+  sketch.serialize(s);
+  auto sketch2 = req_sketch<float>::deserialize(s);
+  REQUIRE(s.tellg() == s.tellp());
+  REQUIRE(sketch2.is_empty() == sketch.is_empty());
+  REQUIRE(sketch2.is_estimation_mode() == sketch.is_estimation_mode());
+  REQUIRE(sketch2.get_num_retained() == sketch.get_num_retained());
+  REQUIRE(sketch2.get_n() == sketch.get_n());
+  REQUIRE(sketch2.get_min_value() == sketch.get_min_value());
+  REQUIRE(sketch2.get_max_value() == sketch.get_max_value());
+}
+
+TEST_CASE("req sketch: byte serialize-deserialize exact mode", "[req_sketch]") {
+  req_sketch<float> sketch(12);
+  const size_t n = 50;
+  for (size_t i = 0; i < n; ++i) sketch.update(i);
+  REQUIRE_FALSE(sketch.is_estimation_mode());
+
+  auto bytes = sketch.serialize();
+  REQUIRE(bytes.size() == sketch.get_serialized_size_bytes());
+  auto sketch2 = req_sketch<float>::deserialize(bytes.data(), bytes.size());
+  std::cout << sketch2.to_string(true);
+  REQUIRE(bytes.size() == sketch2.get_serialized_size_bytes());
+  REQUIRE(sketch2.is_empty() == sketch.is_empty());
+  REQUIRE(sketch2.is_estimation_mode() == sketch.is_estimation_mode());
+  REQUIRE(sketch2.get_num_retained() == sketch.get_num_retained());
+  REQUIRE(sketch2.get_n() == sketch.get_n());
+  REQUIRE(sketch2.get_min_value() == sketch.get_min_value());
+  REQUIRE(sketch2.get_max_value() == sketch.get_max_value());
+}
+
+TEST_CASE("req sketch: stream serialize-deserialize estimation mode", "[req_sketch]") {
+  req_sketch<float> sketch(12);
+  const size_t n = 100000;
+  for (size_t i = 0; i < n; ++i) sketch.update(i);
+  REQUIRE(sketch.is_estimation_mode());
+
+  std::stringstream s(std::ios::in | std::ios::out | std::ios::binary);
+  sketch.serialize(s);
+  auto sketch2 = req_sketch<float>::deserialize(s);
+  REQUIRE(s.tellg() == s.tellp());
+  REQUIRE(sketch2.is_empty() == sketch.is_empty());
+  REQUIRE(sketch2.is_estimation_mode() == sketch.is_estimation_mode());
+  REQUIRE(sketch2.get_num_retained() == sketch.get_num_retained());
+  REQUIRE(sketch2.get_n() == sketch.get_n());
+  REQUIRE(sketch2.get_min_value() == sketch.get_min_value());
+  REQUIRE(sketch2.get_max_value() == sketch.get_max_value());
+}
+
+TEST_CASE("req sketch: byte serialize-deserialize estimation mode", "[req_sketch]") {
+  req_sketch<float> sketch(12);
+  const size_t n = 100000;
+  for (size_t i = 0; i < n; ++i) sketch.update(i);
+  REQUIRE(sketch.is_estimation_mode());
+
+  auto bytes = sketch.serialize();
+  REQUIRE(bytes.size() == sketch.get_serialized_size_bytes());
+  auto sketch2 = req_sketch<float>::deserialize(bytes.data(), bytes.size());
+  REQUIRE(bytes.size() == sketch2.get_serialized_size_bytes());
+  REQUIRE(sketch2.is_empty() == sketch.is_empty());
+  REQUIRE(sketch2.is_estimation_mode() == sketch.is_estimation_mode());
+  REQUIRE(sketch2.get_num_retained() == sketch.get_num_retained());
+  REQUIRE(sketch2.get_n() == sketch.get_n());
+  REQUIRE(sketch2.get_min_value() == sketch.get_min_value());
+  REQUIRE(sketch2.get_max_value() == sketch.get_max_value());
+}
+
+TEST_CASE("req sketch: serialize deserialize stream and bytes equivalence", "[req_sketch]") {
+  req_sketch<float> sketch(12);
+  const size_t n = 100000;
+  for (size_t i = 0; i < n; ++i) sketch.update(i);
+  REQUIRE(sketch.is_estimation_mode());
+
+  std::stringstream s(std::ios::in | std::ios::out | std::ios::binary);
+  sketch.serialize(s);
+  auto bytes = sketch.serialize();
+  REQUIRE(bytes.size() == static_cast<size_t>(s.tellp()));
+  for (size_t i = 0; i < bytes.size(); ++i) {
+    REQUIRE(((char*)bytes.data())[i] == (char)s.get());
+  }
+
+  s.seekg(0); // rewind
+  auto sketch1 = req_sketch<float>::deserialize(s);
+  auto sketch2 = req_sketch<float>::deserialize(bytes.data(), bytes.size());
+  REQUIRE(bytes.size() == static_cast<size_t>(s.tellg()));
+  REQUIRE(sketch2.is_empty() == sketch1.is_empty());
+  REQUIRE(sketch2.is_estimation_mode() == sketch.is_estimation_mode());
+  REQUIRE(sketch2.get_num_retained() == sketch.get_num_retained());
+  REQUIRE(sketch2.get_n() == sketch.get_n());
+  REQUIRE(sketch2.get_min_value() == sketch.get_min_value());
+  REQUIRE(sketch2.get_max_value() == sketch.get_max_value());
+}
+
+TEST_CASE("req sketch: stream deserialize from Java - empty", "[req_sketch]") {
+  std::ifstream is;
+  is.exceptions(std::ios::failbit | std::ios::badbit);
+  is.open(input_path + "req_float_empty_from_java.sk", std::ios::binary);
+  auto sketch = req_sketch<float>::deserialize(is);
+  REQUIRE(sketch.is_empty());
+  REQUIRE_FALSE(sketch.is_estimation_mode());
+  REQUIRE(sketch.get_n() == 0);
+  REQUIRE(sketch.get_num_retained() == 0);
+  REQUIRE(std::isnan(sketch.get_min_value()));
+  REQUIRE(std::isnan(sketch.get_max_value()));
+}
+
+TEST_CASE("req sketch: stream deserialize from Java - single item", "[req_sketch]") {
+  std::ifstream is;
+  is.exceptions(std::ios::failbit | std::ios::badbit);
+  is.open(input_path + "req_float_single_item_from_java.sk", std::ios::binary);
+  auto sketch = req_sketch<float>::deserialize(is);
+  REQUIRE_FALSE(sketch.is_empty());
+  REQUIRE_FALSE(sketch.is_estimation_mode());
+  REQUIRE(sketch.get_n() == 1);
+  REQUIRE(sketch.get_num_retained() == 1);
+  REQUIRE(sketch.get_min_value() == 1);
+  REQUIRE(sketch.get_max_value() == 1);
+  REQUIRE(sketch.get_rank(1) == 0);
+  REQUIRE(sketch.get_rank<true>(1) == 1);
+}
+
+TEST_CASE("req sketch: stream deserialize from Java - raw items", "[req_sketch]") {
+  std::ifstream is;
+  is.exceptions(std::ios::failbit | std::ios::badbit);
+  is.open(input_path + "req_float_raw_items_from_java.sk", std::ios::binary);
+  auto sketch = req_sketch<float>::deserialize(is);
+  REQUIRE_FALSE(sketch.is_empty());
+  REQUIRE_FALSE(sketch.is_estimation_mode());
+  REQUIRE(sketch.get_n() == 4);
+  REQUIRE(sketch.get_num_retained() == 4);
+  REQUIRE(sketch.get_min_value() == 0);
+  REQUIRE(sketch.get_max_value() == 3);
+  REQUIRE(sketch.get_rank(2) == 0.5);
+}
+
+TEST_CASE("req sketch: stream deserialize from Java - exact mode", "[req_sketch]") {
+  std::ifstream is;
+  is.exceptions(std::ios::failbit | std::ios::badbit);
+  is.open(input_path + "req_float_exact_from_java.sk", std::ios::binary);
+  auto sketch = req_sketch<float>::deserialize(is);
+  REQUIRE_FALSE(sketch.is_empty());
+  REQUIRE_FALSE(sketch.is_estimation_mode());
+  REQUIRE(sketch.get_n() == 100);
+  REQUIRE(sketch.get_num_retained() == 100);
+  REQUIRE(sketch.get_min_value() == 0);
+  REQUIRE(sketch.get_max_value() == 99);
+  REQUIRE(sketch.get_rank(50) == 0.5);
+}
+
+TEST_CASE("req sketch: stream deserialize from Java - estimation mode", "[req_sketch]") {
+  std::ifstream is;
+  is.exceptions(std::ios::failbit | std::ios::badbit);
+  is.open(input_path + "req_float_estimation_from_java.sk", std::ios::binary);
+  auto sketch = req_sketch<float>::deserialize(is);
+  REQUIRE_FALSE(sketch.is_empty());
+  REQUIRE(sketch.is_estimation_mode());
+  REQUIRE(sketch.get_n() == 10000);
+  REQUIRE(sketch.get_num_retained() == 2942);
+  REQUIRE(sketch.get_min_value() == 0);
+  REQUIRE(sketch.get_max_value() == 9999);
+  REQUIRE(sketch.get_rank(5000) == 0.5);
+}
+
+TEST_CASE("req sketch: merge into empty", "[req_sketch]") {
+  req_sketch<float> sketch1(40);
+
+  req_sketch<float> sketch2(40);
+  for (size_t i = 0; i < 1000; ++i) sketch2.update(i);
+
+  sketch1.merge(sketch2);
+  REQUIRE(sketch1.get_min_value() == 0);
+  REQUIRE(sketch1.get_max_value() == 999);
+  REQUIRE(sketch1.get_quantile(0.25) == Approx(250).margin(3));
+  REQUIRE(sketch1.get_quantile(0.5) == Approx(500).margin(3));
+  REQUIRE(sketch1.get_quantile(0.75) == Approx(750).margin(3));
+  REQUIRE(sketch1.get_rank(500) == Approx(0.5).margin(0.01));
+}
+
+TEST_CASE("req sketch: merge", "[req_sketch]") {
+  req_sketch<float> sketch1(100);
+  for (size_t i = 0; i < 1000; ++i) sketch1.update(i);
+
+  req_sketch<float> sketch2(100);
+  for (size_t i = 1000; i < 2000; ++i) sketch2.update(i);
+
+  sketch1.merge(sketch2);
+  REQUIRE(sketch1.get_min_value() == 0);
+  REQUIRE(sketch1.get_max_value() == 1999);
+  REQUIRE(sketch1.get_quantile(0.25) == Approx(500).margin(3));
+  REQUIRE(sketch1.get_quantile(0.5) == Approx(1000).margin(1));
+  REQUIRE(sketch1.get_quantile(0.75) == Approx(1500).margin(1));
+  REQUIRE(sketch1.get_rank(1000) == Approx(0.5).margin(0.01));
+}
+
+TEST_CASE("req sketch: merge multiple", "[req_sketch]") {
+  req_sketch<float> sketch1(12);
+  for (size_t i = 0; i < 40; ++i) sketch1.update(i);
+
+  req_sketch<float> sketch2(12);
+  for (size_t i = 40; i < 80; ++i) sketch2.update(i);
+
+  req_sketch<float> sketch3(12);
+  for (size_t i = 80; i < 120; ++i) sketch3.update(i);
+
+  req_sketch<float> sketch(12);
+  sketch.merge(sketch1);
+  sketch.merge(sketch2);
+  sketch.merge(sketch3);
+  REQUIRE(sketch.get_min_value() == 0);
+  REQUIRE(sketch.get_max_value() == 119);
+  REQUIRE(sketch.get_quantile(0.5) == Approx(60).margin(3));
+  REQUIRE(sketch.get_rank(60) == Approx(0.5).margin(0.01));
+}
+
+TEST_CASE("req sketch: merge incompatible HRA and LRA", "[req_sketch]") {
+  req_sketch<float> sketch1(12);
+  sketch1.update(1);
+
+  req_sketch<float> sketch2(12, false);
+  sketch2.update(1);
+
+  REQUIRE_THROWS_AS(sketch1.merge(sketch2), std::invalid_argument);
+}
+
+//TEST_CASE("for manual comparison with Java") {
+//  req_sketch<float> sketch(12, false);
+//  for (size_t i = 0; i < 100000; ++i) sketch.update(i);
+//  sketch.merge(sketch);
+//  std::ofstream os;
+//  os.exceptions(std::ios::failbit | std::ios::badbit);
+//  os.open("req_float_lra_12_100000_merged.sk", std::ios::binary);
+//  sketch.get_quantile(0.5); // force sorting level 0
+//  sketch.serialize(os);
+//}
+
+} /* namespace datasketches */
diff --git a/tuple/include/theta_update_sketch_base_impl.hpp b/tuple/include/theta_update_sketch_base_impl.hpp
index 8fdb3d8..bbf845b 100644
--- a/tuple/include/theta_update_sketch_base_impl.hpp
+++ b/tuple/include/theta_update_sketch_base_impl.hpp
@@ -69,7 +69,7 @@
 
 template<typename EN, typename EK, typename A>
 theta_update_sketch_base<EN, EK, A>::theta_update_sketch_base(theta_update_sketch_base&& other) noexcept:
-allocator_(other.allocator_),
+allocator_(std::move(other.allocator_)),
 is_empty_(other.is_empty_),
 lg_cur_size_(other.lg_cur_size_),
 lg_nom_size_(other.lg_nom_size_),
diff --git a/tuple/include/tuple_sketch.hpp b/tuple/include/tuple_sketch.hpp
index 2292937..4966b74 100644
--- a/tuple/include/tuple_sketch.hpp
+++ b/tuple/include/tuple_sketch.hpp
@@ -444,19 +444,21 @@
   // for deserialize
   class deleter_of_summaries {
   public:
-    deleter_of_summaries(uint32_t num, bool destroy): num(num), destroy(destroy) {}
-    void set_destroy(bool destroy) { this->destroy = destroy; }
-    void operator() (Summary* ptr) const {
+    deleter_of_summaries(uint32_t num, bool destroy, const Allocator& allocator):
+      allocator_(allocator), num_(num), destroy_(destroy) {}
+    void set_destroy(bool destroy) { destroy_ = destroy; }
+    void operator() (Summary* ptr) {
       if (ptr != nullptr) {
-        if (destroy) {
-          for (uint32_t i = 0; i < num; ++i) ptr[i].~Summary();
+        if (destroy_) {
+          for (uint32_t i = 0; i < num_; ++i) ptr[i].~Summary();
         }
-        Allocator().deallocate(ptr, num);
+        allocator_.deallocate(ptr, num_);
       }
     }
   private:
-    uint32_t num;
-    bool destroy;
+    Allocator allocator_;
+    uint32_t num_;
+    bool destroy_;
   };
 
   virtual void print_specifics(std::basic_ostream<char>& os) const;
diff --git a/tuple/include/tuple_sketch_impl.hpp b/tuple/include/tuple_sketch_impl.hpp
index 63552d7..52e2ebf 100644
--- a/tuple/include/tuple_sketch_impl.hpp
+++ b/tuple/include/tuple_sketch_impl.hpp
@@ -470,7 +470,7 @@
   std::vector<Entry, AllocEntry> entries(alloc);
   if (!is_empty) {
     entries.reserve(num_entries);
-    std::unique_ptr<S, deleter_of_summaries> summary(alloc.allocate(1), deleter_of_summaries(1, false));
+    std::unique_ptr<S, deleter_of_summaries> summary(alloc.allocate(1), deleter_of_summaries(1, false, allocator));
     for (size_t i = 0; i < num_entries; ++i) {
       uint64_t key;
       is.read(reinterpret_cast<char*>(&key), sizeof(uint64_t));
@@ -533,7 +533,7 @@
   std::vector<Entry, AllocEntry> entries(alloc);
   if (!is_empty) {
     entries.reserve(num_entries);
-    std::unique_ptr<S, deleter_of_summaries> summary(alloc.allocate(1), deleter_of_summaries(1, false));
+    std::unique_ptr<S, deleter_of_summaries> summary(alloc.allocate(1), deleter_of_summaries(1, false, allocator));
     for (size_t i = 0; i < num_entries; ++i) {
       uint64_t key;
       ptr += copy_from_mem(ptr, &key, sizeof(key));