Merge pull request #190 from apache/sampling_allocator

allocator instance support in sampling
diff --git a/sampling/include/var_opt_sketch.hpp b/sampling/include/var_opt_sketch.hpp
index 4572ea0..3f5110c 100644
--- a/sampling/include/var_opt_sketch.hpp
+++ b/sampling/include/var_opt_sketch.hpp
@@ -62,7 +62,7 @@
     static const resize_factor DEFAULT_RESIZE_FACTOR = X8;
     static const uint32_t MAX_K = ((uint32_t) 1 << 31) - 2;
 
-    explicit var_opt_sketch(uint32_t k, resize_factor rf = DEFAULT_RESIZE_FACTOR);
+    explicit var_opt_sketch(uint32_t k, resize_factor rf = DEFAULT_RESIZE_FACTOR, const A& allocator = A());
     var_opt_sketch(const var_opt_sketch& other);
     var_opt_sketch(var_opt_sketch&& other) noexcept;
 
@@ -167,7 +167,7 @@
      * @param is input stream
      * @return an instance of a sketch
      */
-    static var_opt_sketch deserialize(std::istream& is);
+    static var_opt_sketch deserialize(std::istream& is, const A& allocator = A());
 
     /**
      * This method deserializes a sketch from a given array of bytes.
@@ -175,7 +175,7 @@
      * @param size the size of the array
      * @return an instance of a sketch
      */
-    static var_opt_sketch deserialize(const void* bytes, size_t size);
+    static var_opt_sketch deserialize(const void* bytes, size_t size, const A& allocator = A());
 
     /**
      * Prints a summary of the sketch.
@@ -226,8 +226,9 @@
     resize_factor rf_;              // resize factor
 
     uint32_t curr_items_alloc_;     // currently allocated array size
-    bool filled_data_;              // true if we've explciitly set all entries in data_
+    bool filled_data_;              // true if we've explicitly set all entries in data_
 
+    A allocator_;
     T* data_;                       // stored sampled items
     double* weights_;               // weights for sampled items
 
@@ -249,20 +250,20 @@
     // occurs and is properly tracked.
     bool* marks_;
 
-    // used during deserialization to avoid memork leaks upon errors
+    // used during deserialization to avoid memory leaks upon errors
     class items_deleter;
     class weights_deleter;
     class marks_deleter;
 
-    var_opt_sketch(uint32_t k, resize_factor rf, bool is_gadget);
+    var_opt_sketch(uint32_t k, resize_factor rf, bool is_gadget, const A& allocator);
     var_opt_sketch(uint32_t k, uint32_t h, uint32_t m, uint32_t r, uint64_t n, double total_wt_r, resize_factor rf,
                    uint32_t curr_items_alloc, bool filled_data, std::unique_ptr<T, items_deleter> items,
                    std::unique_ptr<double, weights_deleter> weights, uint32_t num_marks_in_h,
-                   std::unique_ptr<bool, marks_deleter> marks);
+                   std::unique_ptr<bool, marks_deleter> marks, const A& allocator);
 
     friend class var_opt_union<T,S,A>;
     var_opt_sketch(const var_opt_sketch& other, bool as_sketch, uint64_t adjusted_n);
-    var_opt_sketch(T* data, double* weights, size_t len, uint32_t k, uint64_t n, uint32_t h_count, uint32_t r_count, double total_wt_r);
+    var_opt_sketch(T* data, double* weights, size_t len, uint32_t k, uint64_t n, uint32_t h_count, uint32_t r_count, double total_wt_r, const A& allocator);
 
     string<A> items_to_string(bool print_gap) const;
 
@@ -353,7 +354,7 @@
   double r_item_wt_;
   size_t idx_;
   const size_t final_idx_;
-  bool weight_correction_;
+//  bool weight_correction_;
 };
 
 // non-const iterator for internal use
diff --git a/sampling/include/var_opt_sketch_impl.hpp b/sampling/include/var_opt_sketch_impl.hpp
index 413e932..edc695c 100644
--- a/sampling/include/var_opt_sketch_impl.hpp
+++ b/sampling/include/var_opt_sketch_impl.hpp
@@ -42,8 +42,8 @@
  * author Jon Malkin
  */
 template<typename T, typename S, typename A>
-var_opt_sketch<T,S,A>::var_opt_sketch(uint32_t k, resize_factor rf) :
-  var_opt_sketch<T,S,A>(k, rf, false) {}
+var_opt_sketch<T,S,A>::var_opt_sketch(uint32_t k, resize_factor rf, const A& allocator) :
+  var_opt_sketch<T,S,A>(k, rf, false, allocator) {}
 
 template<typename T, typename S, typename A>
 var_opt_sketch<T,S,A>::var_opt_sketch(const var_opt_sketch& other) :
@@ -56,12 +56,13 @@
   rf_(other.rf_),
   curr_items_alloc_(other.curr_items_alloc_),
   filled_data_(other.filled_data_),
+  allocator_(other.allocator_),
   data_(nullptr),
   weights_(nullptr),
   num_marks_in_h_(other.num_marks_in_h_),
   marks_(nullptr)
   {
-    data_ = A().allocate(curr_items_alloc_);
+    data_ = allocator_.allocate(curr_items_alloc_);
     // skip gap or anything unused at the end
     for (size_t i = 0; i < h_; ++i)
       new (&data_[i]) T(other.data_[i]);
@@ -71,13 +72,13 @@
     // we skipped the gap
     filled_data_ = false;
 
-    weights_ = AllocDouble().allocate(curr_items_alloc_);
+    weights_ = AllocDouble(allocator_).allocate(curr_items_alloc_);
     // doubles so can successfully copy regardless of the internal state
-    std::copy(&other.weights_[0], &other.weights_[curr_items_alloc_], weights_);
-    
+    std::copy(other.weights_, other.weights_ + curr_items_alloc_, weights_);
+
     if (other.marks_ != nullptr) {
-      marks_ = AllocBool().allocate(curr_items_alloc_);
-      std::copy(&other.marks_[0], &other.marks_[curr_items_alloc_], marks_);
+      marks_ = AllocBool(allocator_).allocate(curr_items_alloc_);
+      std::copy(other.marks_, other.marks_ + curr_items_alloc_, marks_);
     }
   }
 
@@ -92,12 +93,13 @@
   rf_(other.rf_),
   curr_items_alloc_(other.curr_items_alloc_),
   filled_data_(other.filled_data_),
+  allocator_(other.allocator_),
   data_(nullptr),
   weights_(nullptr),
   num_marks_in_h_(other.num_marks_in_h_),
   marks_(nullptr)
   {
-    data_ = A().allocate(curr_items_alloc_);
+    data_ = allocator_.allocate(curr_items_alloc_);
     // skip gap or anything unused at the end
     for (size_t i = 0; i < h_; ++i)
       new (&data_[i]) T(other.data_[i]);
@@ -107,19 +109,19 @@
     // we skipped the gap
     filled_data_ = false;
 
-    weights_ = AllocDouble().allocate(curr_items_alloc_);
+    weights_ = AllocDouble(allocator_).allocate(curr_items_alloc_);
     // doubles so can successfully copy regardless of the internal state
-    std::copy(&other.weights_[0], &other.weights_[curr_items_alloc_], weights_);
+    std::copy(other.weights_, other.weights_ + curr_items_alloc_, weights_);
 
     if (!as_sketch && other.marks_ != nullptr) {
-      marks_ = AllocBool().allocate(curr_items_alloc_);
-      std::copy(&other.marks_[0], &other.marks_[curr_items_alloc_], marks_);
+      marks_ = AllocBool(allocator_).allocate(curr_items_alloc_);
+      std::copy(other.marks_, other.marks_ + curr_items_alloc_, marks_);
     }
   }
 
 template<typename T, typename S, typename A>
 var_opt_sketch<T,S,A>::var_opt_sketch(T* data, double* weights, size_t len,
-                                      uint32_t k, uint64_t n, uint32_t h_count, uint32_t r_count, double total_wt_r) :
+    uint32_t k, uint64_t n, uint32_t h_count, uint32_t r_count, double total_wt_r, const A& allocator) :
   k_(k),
   h_(h_count),
   m_(0),
@@ -129,6 +131,7 @@
   rf_(DEFAULT_RESIZE_FACTOR),
   curr_items_alloc_(len),
   filled_data_(n > k),
+  allocator_(allocator),
   data_(data),
   weights_(weights),
   num_marks_in_h_(0),
@@ -146,6 +149,7 @@
   rf_(other.rf_),
   curr_items_alloc_(other.curr_items_alloc_),
   filled_data_(other.filled_data_),
+  allocator_(other.allocator_),
   data_(other.data_),
   weights_(other.weights_),
   num_marks_in_h_(other.num_marks_in_h_),
@@ -157,8 +161,8 @@
   }
 
 template<typename T, typename S, typename A>
-var_opt_sketch<T,S,A>::var_opt_sketch(uint32_t k, resize_factor rf, bool is_gadget) :
-  k_(k), h_(0), m_(0), r_(0), n_(0), total_wt_r_(0.0), rf_(rf) {
+var_opt_sketch<T,S,A>::var_opt_sketch(uint32_t k, resize_factor rf, bool is_gadget, const A& allocator) :
+  k_(k), h_(0), m_(0), r_(0), n_(0), total_wt_r_(0.0), rf_(rf), allocator_(allocator) {
   if (k == 0 || k_ > MAX_K) {
     throw std::invalid_argument("k must be at least 1 and less than 2^31 - 1");
   }
@@ -178,7 +182,7 @@
 var_opt_sketch<T,S,A>::var_opt_sketch(uint32_t k, uint32_t h, uint32_t m, uint32_t r, uint64_t n, double total_wt_r, resize_factor rf,
                                       uint32_t curr_items_alloc, bool filled_data, std::unique_ptr<T, items_deleter> items,
                                       std::unique_ptr<double, weights_deleter> weights, uint32_t num_marks_in_h,
-                                      std::unique_ptr<bool, marks_deleter> marks) :
+                                      std::unique_ptr<bool, marks_deleter> marks, const A& allocator) :
   k_(k),
   h_(h),
   m_(m),
@@ -188,6 +192,7 @@
   rf_(rf),
   curr_items_alloc_(curr_items_alloc),
   filled_data_(filled_data),
+  allocator_(allocator),
   data_(items.release()),
   weights_(weights.release()),
   num_marks_in_h_(num_marks_in_h),
@@ -202,27 +207,27 @@
       // destroy everything
       const size_t num_to_destroy = std::min(k_ + 1, curr_items_alloc_);
       for (size_t i = 0; i < num_to_destroy; ++i) {
-        A().destroy(data_ + i);
+        allocator_.destroy(data_ + i);
       }
     } else {
       // skip gap or anything unused at the end
       for (size_t i = 0; i < h_; ++i) {
-        A().destroy(data_+ i);
+        allocator_.destroy(data_+ i);
       }
     
       for (size_t i = h_ + 1; i < h_ + r_ + 1; ++i) {
-        A().destroy(data_ + i);
+        allocator_.destroy(data_ + i);
       }
     }
-    A().deallocate(data_, curr_items_alloc_);
+    allocator_.deallocate(data_, curr_items_alloc_);
   }
 
   if (weights_ != nullptr) {
-    AllocDouble().deallocate(weights_, curr_items_alloc_);
+    AllocDouble(allocator_).deallocate(weights_, curr_items_alloc_);
   }
   
   if (marks_ != nullptr) {
-    AllocBool().deallocate(marks_, curr_items_alloc_);
+    AllocBool(allocator_).deallocate(marks_, curr_items_alloc_);
   }
 }
 
@@ -238,6 +243,7 @@
   std::swap(rf_, sk_copy.rf_);
   std::swap(curr_items_alloc_, sk_copy.curr_items_alloc_);
   std::swap(filled_data_, sk_copy.filled_data_);
+  std::swap(allocator_, sk_copy.allocator_);
   std::swap(data_, sk_copy.data_);
   std::swap(weights_, sk_copy.weights_);
   std::swap(num_marks_in_h_, sk_copy.num_marks_in_h_);
@@ -256,6 +262,7 @@
   std::swap(rf_, other.rf_);
   std::swap(curr_items_alloc_, other.curr_items_alloc_);
   std::swap(filled_data_, other.filled_data_);
+  std::swap(allocator_, other.allocator_);
   std::swap(data_, other.data_);
   std::swap(weights_, other.weights_);
   std::swap(num_marks_in_h_, other.num_marks_in_h_);
@@ -335,7 +342,7 @@
 template<typename T, typename S, typename A>
 std::vector<uint8_t, AllocU8<A>> var_opt_sketch<T,S,A>::serialize(unsigned header_size_bytes) const {
   const size_t size = header_size_bytes + get_serialized_size_bytes();
-  std::vector<uint8_t, AllocU8<A>> bytes(size);
+  std::vector<uint8_t, AllocU8<A>> bytes(size, 0, allocator_);
   uint8_t* ptr = bytes.data() + header_size_bytes;
   uint8_t* end_ptr = ptr + size;
 
@@ -468,7 +475,7 @@
 }
 
 template<typename T, typename S, typename A>
-var_opt_sketch<T,S,A> var_opt_sketch<T,S,A>::deserialize(const void* bytes, size_t size) {
+var_opt_sketch<T,S,A> var_opt_sketch<T,S,A>::deserialize(const void* bytes, size_t size, const A& allocator) {
   ensure_minimum_memory(size, 8);
   const char* ptr = static_cast<const char*>(bytes);
   const char* base = ptr;
@@ -494,7 +501,7 @@
   const bool is_gadget = flags & GADGET_FLAG_MASK;
 
   if (is_empty) {
-    return var_opt_sketch<T,S,A>(k, rf, is_gadget);
+    return var_opt_sketch<T,S,A>(k, rf, is_gadget, allocator);
   }
 
   // second and third prelongs
@@ -520,7 +527,8 @@
 
   // read the first h_ weights, fill in rest of array with -1.0
   check_memory_size(ptr - base + (h * sizeof(double)), size);
-  std::unique_ptr<double, weights_deleter> weights(AllocDouble().allocate(array_size), weights_deleter(array_size));
+  std::unique_ptr<double, weights_deleter> weights(AllocDouble(allocator).allocate(array_size),
+      weights_deleter(array_size, allocator));
   double* wts = weights.get(); // to avoid lots of .get() calls -- do not delete
   ptr += copy_from_mem(ptr, wts, h * sizeof(double));
   for (size_t i = 0; i < h; ++i) {
@@ -528,14 +536,14 @@
       throw std::invalid_argument("Possible corruption: Non-positive weight when deserializing: " + std::to_string(wts[i]));
     }
   }
-  std::fill(&wts[h], &wts[array_size], -1.0);
+  std::fill(wts + h, wts + array_size, -1.0);
   
   // read the first h_ marks as packed bytes iff we have a gadget
   uint32_t num_marks_in_h = 0;
-  std::unique_ptr<bool, marks_deleter> marks(nullptr, marks_deleter(array_size));
+  std::unique_ptr<bool, marks_deleter> marks(nullptr, marks_deleter(array_size, allocator));
   if (is_gadget) {
     uint8_t val = 0;
-    marks = std::unique_ptr<bool, marks_deleter>(AllocBool().allocate(array_size), marks_deleter(array_size));
+    marks = std::unique_ptr<bool, marks_deleter>(AllocBool(allocator).allocate(array_size), marks_deleter(array_size, allocator));
     const size_t size_marks = (h / 8) + (h % 8 > 0 ? 1 : 0);
     check_memory_size(ptr - base + size_marks, size);
     for (uint32_t i = 0; i < h; ++i) {
@@ -548,8 +556,8 @@
   }
 
   // read the sample items, skipping the gap. Either h_ or r_ may be 0
-  items_deleter deleter(array_size);
-  std::unique_ptr<T, items_deleter> items(A().allocate(array_size), deleter);
+  items_deleter deleter(array_size, allocator);
+  std::unique_ptr<T, items_deleter> items(A(allocator).allocate(array_size), deleter);
   
   ptr += S().deserialize(ptr, end_ptr - ptr, items.get(), h);
   items.get_deleter().set_h(h); // serde didn't throw, so the items are now valid
@@ -558,11 +566,11 @@
   items.get_deleter().set_r(r); // serde didn't throw, so the items are now valid
 
   return var_opt_sketch(k, h, (r > 0 ? 1 : 0), r, n, total_wt_r, rf, array_size, false,
-                        std::move(items), std::move(weights), num_marks_in_h, std::move(marks));
+                        std::move(items), std::move(weights), num_marks_in_h, std::move(marks), allocator);
 }
 
 template<typename T, typename S, typename A>
-var_opt_sketch<T,S,A> var_opt_sketch<T,S,A>::deserialize(std::istream& is) {
+var_opt_sketch<T,S,A> var_opt_sketch<T,S,A>::deserialize(std::istream& is, const A& allocator) {
   uint8_t first_byte;
   is.read((char*)&first_byte, sizeof(first_byte));
   uint8_t preamble_longs = first_byte & 0x3f;
@@ -586,7 +594,7 @@
     if (!is.good())
       throw std::runtime_error("error reading from std::istream"); 
     else
-      return var_opt_sketch<T,S,A>(k, rf, is_gadget);
+      return var_opt_sketch<T,S,A>(k, rf, is_gadget, allocator);
   }
 
   // second and third prelongs
@@ -611,7 +619,8 @@
   }
 
   // read the first h weights, fill remainder with -1.0
-  std::unique_ptr<double, weights_deleter> weights(AllocDouble().allocate(array_size), weights_deleter(array_size));
+  std::unique_ptr<double, weights_deleter> weights(AllocDouble(allocator).allocate(array_size),
+      weights_deleter(array_size, allocator));
   double* wts = weights.get(); // to avoid lots of .get() calls -- do not delete
   is.read((char*)wts, h * sizeof(double));
   for (size_t i = 0; i < h; ++i) {
@@ -619,13 +628,13 @@
       throw std::invalid_argument("Possible corruption: Non-positive weight when deserializing: " + std::to_string(wts[i]));
     }
   }
-  std::fill(&wts[h], &wts[array_size], -1.0);
+  std::fill(wts + h, wts + array_size, -1.0);
 
   // read the first h_ marks as packed bytes iff we have a gadget
   uint32_t num_marks_in_h = 0;
-  std::unique_ptr<bool, marks_deleter> marks(nullptr, marks_deleter(array_size));
+  std::unique_ptr<bool, marks_deleter> marks(nullptr, marks_deleter(array_size, allocator));
   if (is_gadget) {
-    marks = std::unique_ptr<bool, marks_deleter>(AllocBool().allocate(array_size), marks_deleter(array_size));
+    marks = std::unique_ptr<bool, marks_deleter>(AllocBool(allocator).allocate(array_size), marks_deleter(array_size, allocator));
     uint8_t val = 0;
     for (uint32_t i = 0; i < h; ++i) {
       if ((i & 0x7) == 0x0) { // should trigger on first iteration
@@ -637,12 +646,12 @@
   }
 
   // read the sample items, skipping the gap. Either h or r may be 0
-  items_deleter deleter(array_size);
-  std::unique_ptr<T, items_deleter> items(A().allocate(array_size), deleter);
-  
+  items_deleter deleter(array_size, allocator);
+  std::unique_ptr<T, items_deleter> items(A(allocator).allocate(array_size), deleter);
+
   S().deserialize(is, items.get(), h); // aka &data_[0]
   items.get_deleter().set_h(h); // serde didn't throw, so the items are now valid
-  
+
   S().deserialize(is, &(items.get()[h + 1]), r);
   items.get_deleter().set_r(r); // serde didn't throw, so the items are now valid
 
@@ -650,7 +659,7 @@
     throw std::runtime_error("error reading from std::istream"); 
 
   return var_opt_sketch(k, h, (r > 0 ? 1 : 0), r, n, total_wt_r, rf, array_size, false,
-                        std::move(items), std::move(weights), num_marks_in_h, std::move(marks));
+                        std::move(items), std::move(weights), num_marks_in_h, std::move(marks), allocator);
 }
 
 template<typename T, typename S, typename A>
@@ -672,24 +681,24 @@
     // destroy everything
     const size_t num_to_destroy = std::min(k_ + 1, prev_alloc);
     for (size_t i = 0; i < num_to_destroy; ++i) 
-      A().destroy(data_ + i);      
+      allocator_.destroy(data_ + i);
   } else {
     // skip gap or anything unused at the end
     for (size_t i = 0; i < h_; ++i)
-      A().destroy(data_+ i);
+      allocator_.destroy(data_+ i);
     
     for (size_t i = h_ + 1; i < h_ + r_ + 1; ++i)
-      A().destroy(data_ + i);
+      allocator_.destroy(data_ + i);
   }
 
   if (curr_items_alloc_ < prev_alloc) {
     const bool is_gadget = (marks_ != nullptr);
   
-    A().deallocate(data_, prev_alloc);
-    AllocDouble().deallocate(weights_, prev_alloc);
+    allocator_.deallocate(data_, prev_alloc);
+    AllocDouble(allocator_).deallocate(weights_, prev_alloc);
   
     if (marks_ != nullptr)
-      AllocBool().deallocate(marks_, prev_alloc);
+      AllocBool(allocator_).deallocate(marks_, prev_alloc);
 
     allocate_data_arrays(curr_items_alloc_, is_gadget);
   }
@@ -970,11 +979,11 @@
 void var_opt_sketch<T,S,A>::allocate_data_arrays(uint32_t tgt_size, bool use_marks) {
   filled_data_ = false;
 
-  data_ = A().allocate(tgt_size);
-  weights_ = AllocDouble().allocate(tgt_size);
+  data_ = allocator_.allocate(tgt_size);
+  weights_ = AllocDouble(allocator_).allocate(tgt_size);
 
   if (use_marks) {
-    marks_ = AllocBool().allocate(tgt_size);
+    marks_ = AllocBool(allocator_).allocate(tgt_size);
   } else {
     marks_ = nullptr;
   }
@@ -991,27 +1000,27 @@
   if (prev_size < curr_items_alloc_) {
     filled_data_ = false;
 
-    T* tmp_data = A().allocate(curr_items_alloc_);
-    double* tmp_weights = AllocDouble().allocate(curr_items_alloc_);
+    T* tmp_data = allocator_.allocate(curr_items_alloc_);
+    double* tmp_weights = AllocDouble(allocator_).allocate(curr_items_alloc_);
 
     for (uint32_t i = 0; i < prev_size; ++i) {
       new (&tmp_data[i]) T(std::move(data_[i]));
-      A().destroy(data_ + i);
+      allocator_.destroy(data_ + i);
       tmp_weights[i] = weights_[i];
     }
 
-    A().deallocate(data_, prev_size);
-    AllocDouble().deallocate(weights_, prev_size);
+    allocator_.deallocate(data_, prev_size);
+    AllocDouble(allocator_).deallocate(weights_, prev_size);
 
     data_ = tmp_data;
     weights_ = tmp_weights;
 
     if (marks_ != nullptr) {
-      bool* tmp_marks = AllocBool().allocate(curr_items_alloc_);
+      bool* tmp_marks = AllocBool(allocator_).allocate(curr_items_alloc_);
       for (uint32_t i = 0; i < prev_size; ++i) {
         tmp_marks[i] = marks_[i];
       }
-      AllocBool().deallocate(marks_, prev_size);
+      AllocBool(allocator_).deallocate(marks_, prev_size);
       marks_ = tmp_marks;
     }
   }
@@ -1296,7 +1305,7 @@
 void var_opt_sketch<T,S,A>::strip_marks() {
   if (marks_ == nullptr) throw std::logic_error("request to strip marks from non-gadget");
   num_marks_in_h_ = 0;
-  AllocBool().deallocate(marks_, curr_items_alloc_);
+  AllocBool(allocator_).deallocate(marks_, curr_items_alloc_);
   marks_ = nullptr;
 }
 
@@ -1433,10 +1442,10 @@
 template<typename T, typename S, typename A>
 class var_opt_sketch<T, S, A>::items_deleter {
   public:
-  items_deleter(uint32_t num) : num(num), h_count(0), r_count(0) {}
+  items_deleter(uint32_t num, const A& allocator) : num(num), h_count(0), r_count(0), allocator(allocator) {}
   void set_h(uint32_t h) { h_count = h; }
   void set_r(uint32_t r) { r_count = r; }  
-  void operator() (T* ptr) const {
+  void operator() (T* ptr) {
     if (h_count > 0) {
       for (size_t i = 0; i < h_count; ++i) {
         ptr[i].~T();
@@ -1449,39 +1458,42 @@
       }
     }
     if (ptr != nullptr) {
-      A().deallocate(ptr, num);
+      allocator.deallocate(ptr, num);
     }
   }
   private:
   uint32_t num;
   uint32_t h_count;
   uint32_t r_count;
+  A allocator;
 };
 
 template<typename T, typename S, typename A>
 class var_opt_sketch<T, S, A>::weights_deleter {
   public:
-  weights_deleter(uint32_t num) : num(num) {}
-  void operator() (double* ptr) const {
+  weights_deleter(uint32_t num, const A& allocator) : num(num), allocator(allocator) {}
+  void operator() (double* ptr) {
     if (ptr != nullptr) {
-      AllocDouble().deallocate(ptr, num);
+      allocator.deallocate(ptr, num);
     }
   }
   private:
   uint32_t num;
+  AllocDouble allocator;
 };
 
 template<typename T, typename S, typename A>
 class var_opt_sketch<T, S, A>::marks_deleter {
   public:
-  marks_deleter(uint32_t num) : num(num) {}
-  void operator() (bool* ptr) const {
+  marks_deleter(uint32_t num, const A& allocator) : num(num), allocator(allocator) {}
+  void operator() (bool* ptr) {
     if (ptr != nullptr) {
-      AllocBool().deallocate(ptr, 1);
+      allocator.deallocate(ptr, 1);
     }
   }
   private:
   uint32_t num;
+  AllocBool allocator;
 };
 
 
diff --git a/sampling/include/var_opt_union.hpp b/sampling/include/var_opt_union.hpp
index 95bf90d..d0e22ed 100644
--- a/sampling/include/var_opt_union.hpp
+++ b/sampling/include/var_opt_union.hpp
@@ -51,7 +51,7 @@
 public:
   static const uint32_t MAX_K = ((uint32_t) 1 << 31) - 2;
 
-  explicit var_opt_union(uint32_t max_k);
+  explicit var_opt_union(uint32_t max_k, const A& allocator = A());
   var_opt_union(const var_opt_union& other);
   var_opt_union(var_opt_union&& other) noexcept;
     
@@ -119,16 +119,16 @@
    * @param is input stream
    * @return an instance of a union
    */
-  static var_opt_union deserialize(std::istream& is);
+  static var_opt_union deserialize(std::istream& is, const A& allocator = A());
 
   /**
    * NOTE: This method may be deprecated in a future version.
-   * This method deserializes a skeuniontch from a given array of bytes.
+   * This method deserializes a union from a given array of bytes.
    * @param bytes pointer to the array of bytes
    * @param size the size of the array
    * @return an instance of a union
    */
-  static var_opt_union deserialize(const void* bytes, size_t size);
+  static var_opt_union deserialize(const void* bytes, size_t size, const A& allocator = A());
 
   /**
    * Prints a summary of the union as a string.
@@ -236,4 +236,4 @@
 
 #include "var_opt_union_impl.hpp"
 
-#endif // _VAR_OPT_UNION_HPP_
\ No newline at end of file
+#endif // _VAR_OPT_UNION_HPP_
diff --git a/sampling/include/var_opt_union_impl.hpp b/sampling/include/var_opt_union_impl.hpp
index 86a2a6e..db26bc0 100644
--- a/sampling/include/var_opt_union_impl.hpp
+++ b/sampling/include/var_opt_union_impl.hpp
@@ -28,12 +28,12 @@
 namespace datasketches {
 
 template<typename T, typename S, typename A>
-var_opt_union<T,S,A>::var_opt_union(uint32_t max_k) :
+var_opt_union<T,S,A>::var_opt_union(uint32_t max_k, const A& allocator) :
   n_(0),
   outer_tau_numer_(0),
   outer_tau_denom_(0.0),
   max_k_(max_k),
-  gadget_(max_k, var_opt_sketch<T,S,A>::DEFAULT_RESIZE_FACTOR, true)
+  gadget_(max_k, var_opt_sketch<T,S,A>::DEFAULT_RESIZE_FACTOR, true, allocator)
 {}
 
 template<typename T, typename S, typename A>
@@ -128,7 +128,7 @@
  */
 
 template<typename T, typename S, typename A>
-var_opt_union<T,S,A> var_opt_union<T,S,A>::deserialize(std::istream& is) {
+var_opt_union<T,S,A> var_opt_union<T,S,A>::deserialize(std::istream& is, const A& allocator) {
   uint8_t preamble_longs;
   is.read((char*)&preamble_longs, sizeof(preamble_longs));
   uint8_t serial_version;
@@ -163,7 +163,7 @@
   uint64_t outer_tau_denom;
   is.read((char*)&outer_tau_denom, sizeof(outer_tau_denom));
 
-  var_opt_sketch<T,S,A> gadget = var_opt_sketch<T,S,A>::deserialize(is);
+  var_opt_sketch<T,S,A> gadget = var_opt_sketch<T,S,A>::deserialize(is, allocator);
 
   if (!is.good())
     throw std::runtime_error("error reading from std::istream"); 
@@ -172,7 +172,7 @@
 }
 
 template<typename T, typename S, typename A>
-var_opt_union<T,S,A> var_opt_union<T,S,A>::deserialize(const void* bytes, size_t size) {
+var_opt_union<T,S,A> var_opt_union<T,S,A>::deserialize(const void* bytes, size_t size, const A& allocator) {
   ensure_minimum_memory(size, 8);
   const char* ptr = static_cast<const char*>(bytes);
   uint8_t preamble_longs;
@@ -207,7 +207,7 @@
   ptr += copy_from_mem(ptr, &outer_tau_denom, sizeof(outer_tau_denom));
 
   const size_t gadget_size = size - (PREAMBLE_LONGS_NON_EMPTY << 3);
-  var_opt_sketch<T,S,A> gadget = var_opt_sketch<T,S,A>::deserialize(ptr, gadget_size);
+  var_opt_sketch<T,S,A> gadget = var_opt_sketch<T,S,A>::deserialize(ptr, gadget_size, allocator);
 
   return var_opt_union<T,S,A>(items_seen, outer_tau_numer, outer_tau_denom, max_k, std::move(gadget));
 }
@@ -255,7 +255,7 @@
 template<typename T, typename S, typename A>
 std::vector<uint8_t, AllocU8<A>> var_opt_union<T,S,A>::serialize(unsigned header_size_bytes) const {
   const size_t size = header_size_bytes + get_serialized_size_bytes();
-  std::vector<uint8_t, AllocU8<A>> bytes(size);
+  std::vector<uint8_t, AllocU8<A>> bytes(size, 0, gadget_.allocator_);
   uint8_t* ptr = bytes.data() + header_size_bytes;
 
   const bool empty = n_ == 0;