Merge pull request #428 from apache/tdigest

added get_serialized_size_bytes()
diff --git a/tdigest/include/tdigest.hpp b/tdigest/include/tdigest.hpp
index d0c32d6..38f3b1a 100644
--- a/tdigest/include/tdigest.hpp
+++ b/tdigest/include/tdigest.hpp
@@ -20,8 +20,10 @@
 #ifndef _TDIGEST_HPP_
 #define _TDIGEST_HPP_
 
-#include <type_traits>
+#include <cstddef>
 #include <limits>
+#include <type_traits>
+#include <vector>
 
 #include "common_defs.hpp"
 
@@ -84,6 +86,7 @@
     T mean_;
     W weight_;
   };
+  using vector_t = std::vector<T, Allocator>;
   using vector_centroid = std::vector<centroid, typename std::allocator_traits<Allocator>::template rebind_alloc<centroid>>;
   using vector_bytes = std::vector<uint8_t, typename std::allocator_traits<Allocator>::template rebind_alloc<uint8_t>>;
 
@@ -166,19 +169,28 @@
   string<Allocator> to_string(bool print_centroids = false) const;
 
   /**
+   * Computes size needed to serialize the current state.
+   * @param with_buffer optionally serialize buffered values avoiding compression
+   * @return size in bytes needed to serialize this tdigest
+   */
+  size_t get_serialized_size_bytes(bool with_buffer = false) const;
+
+  /**
    * This method serializes t-Digest into a given stream in a binary form
    * @param os output stream
+   * @param with_buffer optionally serialize buffered values avoiding compression
    */
-  void serialize(std::ostream& os) const;
+  void serialize(std::ostream& os, bool with_buffer = false) const;
 
   /**
    * This method serializes t-Digest as a vector of bytes.
    * An optional header can be reserved in front of the sketch.
    * It is an uninitialized space of a given size.
    * @param header_size_bytes space to reserve in front of the sketch
+   * @param with_buffer optionally serialize buffered values avoiding compression
    * @return serialized sketch as a vector of bytes
    */
-  vector_bytes serialize(unsigned header_size_bytes = 0) const;
+  vector_bytes serialize(unsigned header_size_bytes = 0, bool with_buffer = false) const;
 
   /**
    * This method deserializes t-Digest from a given stream.
@@ -198,7 +210,6 @@
   static tdigest deserialize(const void* bytes, size_t size, const Allocator& allocator = Allocator());
 
 private:
-  Allocator allocator_;
   bool reverse_merge_;
   uint16_t k_;
   uint16_t internal_k_;
@@ -208,8 +219,9 @@
   vector_centroid centroids_;
   uint64_t centroids_weight_;
   size_t buffer_capacity_;
-  vector_centroid buffer_;
-  uint64_t buffered_weight_;
+  vector_t buffer_;
+
+  static const size_t BUFFER_MULTIPLIER = 4;
 
   static const uint8_t PREAMBLE_LONGS_EMPTY_OR_SINGLE = 1;
   static const uint8_t PREAMBLE_LONGS_MULTIPLE = 2;
@@ -222,11 +234,11 @@
   enum flags { IS_EMPTY, IS_SINGLE_VALUE, REVERSE_MERGE };
 
   bool is_single_value() const;
+  uint8_t get_preamble_longs() const;
+  void merge(vector_centroid& buffer, W weight);
 
   // for deserialize
-  tdigest(bool reverse_merge, uint16_t k, T min, T max, vector_centroid&& centroids, uint64_t total_weight_, const Allocator& allocator);
-
-  void merge_buffered();
+  tdigest(bool reverse_merge, uint16_t k, T min, T max, vector_centroid&& centroids, uint64_t total_weight_, vector_t&& buffer);
 
   static double weighted_average(double x1, double w1, double x2, double w2);
 
diff --git a/tdigest/include/tdigest_impl.hpp b/tdigest/include/tdigest_impl.hpp
index 1a48f88..f8af532 100644
--- a/tdigest/include/tdigest_impl.hpp
+++ b/tdigest/include/tdigest_impl.hpp
@@ -30,15 +30,14 @@
 
 template<typename T, typename A>
 tdigest<T, A>::tdigest(uint16_t k, const A& allocator):
-tdigest(false, k, std::numeric_limits<T>::infinity(), -std::numeric_limits<T>::infinity(), vector_centroid(allocator), 0, allocator)
+tdigest(false, k, std::numeric_limits<T>::infinity(), -std::numeric_limits<T>::infinity(), vector_centroid(allocator), 0, vector_t(allocator))
 {}
 
 template<typename T, typename A>
 void tdigest<T, A>::update(T value) {
   if (std::isnan(value)) return;
-  if (buffer_.size() >= buffer_capacity_ - centroids_.size()) merge_buffered();
-  buffer_.push_back(centroid(value, 1));
-  ++buffered_weight_;
+  if (buffer_.size() == centroids_capacity_ * BUFFER_MULTIPLIER) compress();
+  buffer_.push_back(value);
   min_ = std::min(min_, value);
   max_ = std::max(max_, value);
 }
@@ -46,22 +45,21 @@
 template<typename T, typename A>
 void tdigest<T, A>::merge(tdigest& other) {
   if (other.is_empty()) return;
-  size_t num = buffer_.size() + centroids_.size() + other.buffer_.size() + other.centroids_.size();
-  buffer_.reserve(num);
-  std::copy(other.buffer_.begin(), other.buffer_.end(), std::back_inserter(buffer_));
-  std::copy(other.centroids_.begin(), other.centroids_.end(), std::back_inserter(buffer_));
-  buffered_weight_ += other.get_total_weight();
-  if (num > buffer_capacity_) {
-    merge_buffered();
-  } else {
-    min_ = std::min(min_, other.get_min_value());
-    max_ = std::max(max_, other.get_max_value());
-  }
+  vector_centroid tmp(buffer_.get_allocator());
+  tmp.reserve(buffer_.size() + centroids_.size() + other.buffer_.size() + other.centroids_.size());
+  for (const T value: buffer_) tmp.push_back(centroid(value, 1));
+  for (const T value: other.buffer_) tmp.push_back(centroid(value, 1));
+  std::copy(other.centroids_.begin(), other.centroids_.end(), std::back_inserter(tmp));
+  merge(tmp, buffer_.size() + other.get_total_weight());
 }
 
 template<typename T, typename A>
 void tdigest<T, A>::compress() {
-  merge_buffered();
+  if (buffer_.size() == 0) return;
+  vector_centroid tmp(buffer_.get_allocator());
+  tmp.reserve(buffer_.size() + centroids_.size());
+  for (const T value: buffer_) tmp.push_back(centroid(value, 1));
+  merge(tmp, buffer_.size());
 }
 
 template<typename T, typename A>
@@ -83,7 +81,7 @@
 
 template<typename T, typename A>
 uint64_t tdigest<T, A>::get_total_weight() const {
-  return centroids_weight_ + buffered_weight_;
+  return centroids_weight_ + buffer_.size();
 }
 
 template<typename T, typename A>
@@ -95,7 +93,7 @@
   // one centroid and value == min_ == max_
   if ((centroids_.size() + buffer_.size()) == 1) return 0.5;
 
-  const_cast<tdigest*>(this)->merge_buffered(); // side effect
+  const_cast<tdigest*>(this)->compress(); // side effect
 
   // left tail
   const T first_mean = centroids_.front().get_mean();
@@ -149,7 +147,7 @@
   if ((rank < 0.0) || (rank > 1.0)) {
     throw std::invalid_argument("Normalized rank cannot be less than 0 or greater than 1");
   }
-  const_cast<tdigest*>(this)->merge_buffered(); // side effect
+  const_cast<tdigest*>(this)->compress(); // side effect
   if (centroids_.size() == 1) return centroids_.front().get_mean();
 
   // at least 2 centroids
@@ -204,13 +202,11 @@
   std::ostringstream os;
   os << "### t-Digest summary:" << std::endl;
   os << "   Nominal k          : " << k_ << std::endl;
-  os << "   Internal k         : " << internal_k_ << std::endl;
   os << "   Centroids          : " << centroids_.size() << std::endl;
   os << "   Buffered           : " << buffer_.size() << std::endl;
   os << "   Centroids capacity : " << centroids_capacity_ << std::endl;
-  os << "   Buffer capacity    : " << buffer_capacity_ << std::endl;
+  os << "   Buffer capacity    : " << centroids_capacity_ * BUFFER_MULTIPLIER << std::endl;
   os << "   Centroids Weight   : " << centroids_weight_ << std::endl;
-  os << "   Buffered Weight    : " << buffered_weight_ << std::endl;
   os << "   Total Weight       : " << get_total_weight() << std::endl;
   os << "   Reverse Merge      : " << (reverse_merge_ ? "true" : "false") << std::endl;
   if (!is_empty()) {
@@ -229,33 +225,33 @@
     if (buffer_.size() > 0) {
       os << "Buffer:" << std::endl;
       int i = 0;
-      for (const auto& b: buffer_) {
-        os << i++ << ": " << b.get_mean() << ", " << b.get_weight() << std::endl;
+      for (const T value: buffer_) {
+        os << i++ << ": " << value << std::endl;
       }
     }
   }
-  return string<A>(os.str().c_str(), allocator_);
+  return string<A>(os.str().c_str(), buffer_.get_allocator());
 }
 
+// assumes that there is enough room in the input buffer to add centroids from this tdigest
 template<typename T, typename A>
-void tdigest<T, A>::merge_buffered() {
-  if (buffered_weight_ == 0) return;
-  std::copy(centroids_.begin(), centroids_.end(), std::back_inserter(buffer_));
+void tdigest<T, A>::merge(vector_centroid& buffer, W weight) {
+  std::copy(centroids_.begin(), centroids_.end(), std::back_inserter(buffer));
   centroids_.clear();
-  std::stable_sort(buffer_.begin(), buffer_.end(), centroid_cmp());
-  if (reverse_merge_) std::reverse(buffer_.begin(), buffer_.end());
-  centroids_weight_ += buffered_weight_;
-  auto it = buffer_.begin();
+  std::stable_sort(buffer.begin(), buffer.end(), centroid_cmp());
+  if (reverse_merge_) std::reverse(buffer.begin(), buffer.end());
+  centroids_weight_ += weight;
+  auto it = buffer.begin();
   centroids_.push_back(*it);
   ++it;
   double weight_so_far = 0;
-  while (it != buffer_.end()) {
+  while (it != buffer.end()) {
     const double proposed_weight = centroids_.back().get_weight() + it->get_weight();
     bool add_this = false;
-    if (std::distance(buffer_.begin(), it) != 1 && std::distance(buffer_.end(), it) != 1) {
+    if (std::distance(buffer.begin(), it) != 1 && std::distance(buffer.end(), it) != 1) {
       const double q0 = weight_so_far / centroids_weight_;
       const double q2 = (weight_so_far + proposed_weight) / centroids_weight_;
-      const double normalizer = scale_function().normalizer(internal_k_, centroids_weight_);
+      const double normalizer = scale_function().normalizer(2 * k_, centroids_weight_);
       add_this = proposed_weight <= centroids_weight_ * std::min(scale_function().max(q0, normalizer), scale_function().max(q2, normalizer));
     }
     if (add_this) {
@@ -267,13 +263,10 @@
     ++it;
   }
   if (reverse_merge_) std::reverse(centroids_.begin(), centroids_.end());
-  if (centroids_weight_ > 0) {
-    min_ = std::min(min_, centroids_.front().get_mean());
-    max_ = std::max(max_, centroids_.back().get_mean());
-  }
+  min_ = std::min(min_, centroids_.front().get_mean());
+  max_ = std::max(max_, centroids_.back().get_mean());
   reverse_merge_ = !reverse_merge_;
   buffer_.clear();
-  buffered_weight_ = 0;
 }
 
 template<typename T, typename A>
@@ -282,68 +275,76 @@
 }
 
 template<typename T, typename A>
-void tdigest<T, A>::serialize(std::ostream& os) const {
-  const_cast<tdigest*>(this)->merge_buffered(); // side effect
-  write(os, is_empty() || is_single_value() ? PREAMBLE_LONGS_EMPTY_OR_SINGLE : PREAMBLE_LONGS_MULTIPLE);
+void tdigest<T, A>::serialize(std::ostream& os, bool with_buffer) const {
+  if (!with_buffer) const_cast<tdigest*>(this)->compress(); // side effect
+  write(os, get_preamble_longs());
   write(os, SERIAL_VERSION);
   write(os, SKETCH_TYPE);
   write(os, k_);
   const uint8_t flags_byte(
-    (is_empty() ? 1 << flags::IS_EMPTY : 0) |
-    (is_single_value() ? 1 << flags::IS_SINGLE_VALUE : 0) |
-    (reverse_merge_ ? 1 << flags::REVERSE_MERGE : 0)
+     (is_empty() ? 1 << flags::IS_EMPTY : 0)
+   | (is_single_value() ? 1 << flags::IS_SINGLE_VALUE : 0)
+   | (reverse_merge_ ? 1 << flags::REVERSE_MERGE : 0)
   );
   write(os, flags_byte);
   write<uint16_t>(os, 0); // unused
-
   if (is_empty()) return;
-
   if (is_single_value()) {
     write(os, min_);
     return;
   }
-
   write(os, static_cast<uint32_t>(centroids_.size()));
-  write<uint32_t>(os, 0); // unused
-
+  write(os, static_cast<uint32_t>(buffer_.size()));
   write(os, min_);
   write(os, max_);
-  write(os, centroids_.data(), centroids_.size() * sizeof(centroid));
+  if (centroids_.size() > 0) write(os, centroids_.data(), centroids_.size() * sizeof(centroid));
+  if (buffer_.size() > 0) write(os, buffer_.data(), buffer_.size() * sizeof(T));
 }
 
 template<typename T, typename A>
-auto tdigest<T, A>::serialize(unsigned header_size_bytes) const -> vector_bytes {
-  const_cast<tdigest*>(this)->merge_buffered(); // side effect
-  const uint8_t preamble_longs = is_empty() || is_single_value() ? PREAMBLE_LONGS_EMPTY_OR_SINGLE : PREAMBLE_LONGS_MULTIPLE;
-  const size_t size_bytes = preamble_longs * sizeof(uint64_t) +
-      (is_empty() ? 0 : (is_single_value() ? sizeof(T) : sizeof(T) * 2 + sizeof(centroid) * centroids_.size()));
-  vector_bytes bytes(size_bytes, 0, allocator_);
-  uint8_t* ptr = bytes.data() + header_size_bytes;
+uint8_t tdigest<T, A>::get_preamble_longs() const {
+  return is_empty() || is_single_value() ? PREAMBLE_LONGS_EMPTY_OR_SINGLE : PREAMBLE_LONGS_MULTIPLE;
+}
 
-  *ptr++ = preamble_longs;
+template<typename T, typename A>
+size_t tdigest<T, A>::get_serialized_size_bytes(bool with_buffer) const {
+  if (!with_buffer) const_cast<tdigest*>(this)->compress(); // side effect
+  size_t size_bytes = get_preamble_longs() * sizeof(uint64_t);
+  if (is_empty()) return size_bytes;
+  if (is_single_value()) return size_bytes + sizeof(T);
+  size_bytes += sizeof(T) * 2 // min and max
+      + sizeof(centroid) * centroids_.size();
+  if (with_buffer) size_bytes += sizeof(T) * buffer_.size(); // count is a part of preamble
+  return size_bytes;
+}
+
+template<typename T, typename A>
+auto tdigest<T, A>::serialize(unsigned header_size_bytes, bool with_buffer) const -> vector_bytes {
+  if (!with_buffer) const_cast<tdigest*>(this)->compress(); // side effect
+  vector_bytes bytes(get_serialized_size_bytes(with_buffer), 0, buffer_.get_allocator());
+  uint8_t* ptr = bytes.data() + header_size_bytes;
+  *ptr++ = get_preamble_longs();
   *ptr++ = SERIAL_VERSION;
   *ptr++ = SKETCH_TYPE;
   ptr += copy_to_mem(k_, ptr);
   const uint8_t flags_byte(
-    (is_empty() ? 1 << flags::IS_EMPTY : 0) |
-    (is_single_value() ? 1 << flags::IS_SINGLE_VALUE : 0) |
-    (reverse_merge_ ? 1 << flags::REVERSE_MERGE : 0)
+      (is_empty() ? 1 << flags::IS_EMPTY : 0)
+    | (is_single_value() ? 1 << flags::IS_SINGLE_VALUE : 0)
+    | (reverse_merge_ ? 1 << flags::REVERSE_MERGE : 0)
   );
   *ptr++ = flags_byte;
   ptr += 2; // unused
   if (is_empty()) return bytes;
-
   if (is_single_value()) {
     copy_to_mem(min_, ptr);
     return bytes;
   }
-
   ptr += copy_to_mem(static_cast<uint32_t>(centroids_.size()), ptr);
-  ptr += 4; // unused
-
+  ptr += copy_to_mem(static_cast<uint32_t>(buffer_.size()), ptr);
   ptr += copy_to_mem(min_, ptr);
   ptr += copy_to_mem(max_, ptr);
-  copy_to_mem(centroids_.data(), ptr, centroids_.size() * sizeof(centroid));
+  if (centroids_.size() > 0) ptr += copy_to_mem(centroids_.data(), ptr, centroids_.size() * sizeof(centroid));
+  if (buffer_.size() > 0) copy_to_mem(buffer_.data(), ptr, buffer_.size() * sizeof(T));
   return bytes;
 }
 
@@ -374,19 +375,21 @@
   const bool reverse_merge = flags_byte & (1 << flags::REVERSE_MERGE);
   if (is_single_value) {
     const T value = read<T>(is);
-    return tdigest(reverse_merge, k, value, value, vector_centroid(1, centroid(value, 1), allocator), 1, allocator);
+    return tdigest(reverse_merge, k, value, value, vector_centroid(1, centroid(value, 1), allocator), 1, vector_t(allocator));
   }
 
   const auto num_centroids = read<uint32_t>(is);
-  read<uint32_t>(is); // unused
+  const auto num_buffered = read<uint32_t>(is);
 
   const T min = read<T>(is);
   const T max = read<T>(is);
   vector_centroid centroids(num_centroids, centroid(0, 0), allocator);
-  read(is, centroids.data(), num_centroids * sizeof(centroid));
-  uint64_t total_weight = 0;
-  for (const auto& c: centroids) total_weight += c.get_weight();
-  return tdigest(reverse_merge, k, min, max, std::move(centroids), total_weight, allocator);
+  if (num_centroids > 0) read(is, centroids.data(), num_centroids * sizeof(centroid));
+  vector_t buffer(num_buffered, 0, allocator);
+  if (num_buffered > 0) read(is, buffer.data(), num_buffered * sizeof(T));
+  uint64_t weight = 0;
+  for (const auto& c: centroids) weight += c.get_weight();
+  return tdigest(reverse_merge, k, min, max, std::move(centroids), weight, std::move(buffer));
 }
 
 template<typename T, typename A>
@@ -423,24 +426,27 @@
     ensure_minimum_memory(end_ptr - ptr, sizeof(T));
     T value;
     ptr += copy_from_mem(ptr, value);
-    return tdigest(reverse_merge, k, value, value, vector_centroid(1, centroid(value, 1), allocator), 1, allocator);
+    return tdigest(reverse_merge, k, value, value, vector_centroid(1, centroid(value, 1), allocator), 1, vector_t(allocator));
   }
 
   ensure_minimum_memory(end_ptr - ptr, 8);
   uint32_t num_centroids;
   ptr += copy_from_mem(ptr, num_centroids);
-  ptr += 4; // unused
+  uint32_t num_buffered;
+  ptr += copy_from_mem(ptr, num_buffered);
 
-  ensure_minimum_memory(end_ptr - ptr, sizeof(T) * 2 + sizeof(centroid) * num_centroids);
+  ensure_minimum_memory(end_ptr - ptr, sizeof(T) * 2 + sizeof(centroid) * num_centroids + sizeof(T) * num_buffered);
   T min;
   ptr += copy_from_mem(ptr, min);
   T max;
   ptr += copy_from_mem(ptr, max);
   vector_centroid centroids(num_centroids, centroid(0, 0), allocator);
-  copy_from_mem(ptr, centroids.data(), sizeof(centroid) * num_centroids);
-  uint64_t total_weight = 0;
-  for (const auto& c: centroids) total_weight += c.get_weight();
-  return tdigest(reverse_merge, k, min, max, std::move(centroids), total_weight, allocator);
+  if (num_centroids > 0) ptr += copy_from_mem(ptr, centroids.data(), num_centroids * sizeof(centroid));
+  vector_t buffer(num_buffered, 0, allocator);
+  if (num_buffered > 0) copy_from_mem(ptr, buffer.data(), num_buffered * sizeof(T));
+  uint64_t weight = 0;
+  for (const auto& c: centroids) weight += c.get_weight();
+  return tdigest(reverse_merge, k, min, max, std::move(centroids), weight, std::move(buffer));
 }
 
 // compatibility with the format of the reference implementation
@@ -466,7 +472,7 @@
       c = centroid(mean, weight);
       total_weight += weight;
     }
-    return tdigest(false, k, min, max, std::move(centroids), total_weight, allocator);
+    return tdigest(false, k, min, max, std::move(centroids), total_weight, vector_t(allocator));
   }
   // COMPAT_FLOAT: compatibility with asSmallBytes()
   const auto min = read_big_endian<double>(is); // reference implementation uses doubles for min and max
@@ -484,7 +490,7 @@
     c = centroid(mean, weight);
     total_weight += weight;
   }
-  return tdigest(false, k, min, max, std::move(centroids), total_weight, allocator);
+  return tdigest(false, k, min, max, std::move(centroids), total_weight, vector_t(allocator));
 }
 
 // compatibility with the format of the reference implementation
@@ -526,7 +532,7 @@
       c = centroid(mean, static_cast<W>(weight));
       total_weight += static_cast<uint64_t>(weight);
     }
-    return tdigest(false, k, min, max, std::move(centroids), total_weight, allocator);
+    return tdigest(false, k, min, max, std::move(centroids), total_weight, vector_t(allocator));
   }
   // COMPAT_FLOAT: compatibility with asSmallBytes()
   ensure_minimum_memory(end_ptr - ptr, sizeof(double) * 2 + sizeof(float) + sizeof(uint16_t) * 3);
@@ -558,7 +564,7 @@
     c = centroid(mean, static_cast<W>(weight));
     total_weight += static_cast<uint64_t>(weight);
   }
-  return tdigest(false, k, min, max, std::move(centroids), total_weight, allocator);
+  return tdigest(false, k, min, max, std::move(centroids), total_weight, vector_t(allocator));
 }
 
 template<typename T, typename A>
@@ -567,30 +573,21 @@
 }
 
 template<typename T, typename A>
-tdigest<T, A>::tdigest(bool reverse_merge, uint16_t k, T min, T max, vector_centroid&& centroids, uint64_t total_weight, const A& allocator):
-allocator_(allocator),
+tdigest<T, A>::tdigest(bool reverse_merge, uint16_t k, T min, T max, vector_centroid&& centroids, uint64_t weight, vector_t&& buffer):
 reverse_merge_(reverse_merge),
 k_(k),
-internal_k_(k),
 min_(min),
 max_(max),
 centroids_capacity_(0),
 centroids_(std::move(centroids)),
-centroids_weight_(total_weight),
-buffer_capacity_(0),
-buffer_(allocator),
-buffered_weight_(0)
+centroids_weight_(weight),
+buffer_(std::move(buffer))
 {
   if (k < 10) throw std::invalid_argument("k must be at least 10");
   const size_t fudge = k < 30 ? 30 : 10;
   centroids_capacity_ = 2 * k_ + fudge;
-  buffer_capacity_ = 5 * centroids_capacity_;
-  const double scale = std::max(1.0, static_cast<double>(buffer_capacity_) / centroids_capacity_ - 1.0);
-  internal_k_ = std::ceil(std::sqrt(scale) * k_);
-  centroids_capacity_ = std::max(centroids_capacity_, internal_k_ + fudge);
-  buffer_capacity_ = std::max(buffer_capacity_, 2 * centroids_capacity_);
   centroids_.reserve(centroids_capacity_);
-  buffer_.reserve(buffer_capacity_);
+  buffer_.reserve(centroids_capacity_ * BUFFER_MULTIPLIER);
 }
 
 } /* namespace datasketches */
diff --git a/tdigest/test/tdigest_serialize_for_java.cpp b/tdigest/test/tdigest_serialize_for_java.cpp
index 1f3c1fb..fe0a9d4 100644
--- a/tdigest/test/tdigest_serialize_for_java.cpp
+++ b/tdigest/test/tdigest_serialize_for_java.cpp
@@ -34,6 +34,16 @@
   }
 }
 
+TEST_CASE("tdigest double generate with buffer", "[serialize_for_java]") {
+  const unsigned n_arr[] = {0, 1, 10, 100, 1000, 10000, 100000, 1000000};
+  for (const unsigned n: n_arr) {
+    tdigest_double td(100);
+    for (unsigned i = 1; i <= n; ++i) td.update(i);
+    std::ofstream os("tdigest_double_buf_n" + std::to_string(n) + "_cpp.sk", std::ios::binary);
+    td.serialize(os, true);
+  }
+}
+
 TEST_CASE("tdigest float generate", "[serialize_for_java]") {
   const unsigned n_arr[] = {0, 1, 10, 100, 1000, 10000, 100000, 1000000};
   for (const unsigned n: n_arr) {
@@ -44,4 +54,14 @@
   }
 }
 
+TEST_CASE("tdigest float generate with buffer", "[serialize_for_java]") {
+  const unsigned n_arr[] = {0, 1, 10, 100, 1000, 10000, 100000, 1000000};
+  for (const unsigned n: n_arr) {
+    tdigest_float td(100);
+    for (unsigned i = 1; i <= n; ++i) td.update(i);
+    std::ofstream os("tdigest_float_buf_n" + std::to_string(n) + "_cpp.sk", std::ios::binary);
+    td.serialize(os, true);
+  }
+}
+
 } /* namespace datasketches */
diff --git a/tdigest/test/tdigest_test.cpp b/tdigest/test/tdigest_test.cpp
index ac52f55..bf64dbb 100644
--- a/tdigest/test/tdigest_test.cpp
+++ b/tdigest/test/tdigest_test.cpp
@@ -54,8 +54,9 @@
 
 TEST_CASE("many values", "[tdigest]") {
   const size_t n = 10000;
-  tdigest_double td(100);
+  tdigest_double td;
   for (size_t i = 0; i < n; ++i) td.update(i);
+//  std::cout << td.to_string(true);
 //  td.compress();
 //  std::cout << td.to_string(true);
   REQUIRE_FALSE(td.is_empty());
@@ -68,7 +69,7 @@
   REQUIRE(td.get_rank(n * 3 / 4) == Approx(0.75).margin(0.0001));
   REQUIRE(td.get_rank(n) == 1);
   REQUIRE(td.get_quantile(0) == 0);
-  REQUIRE(td.get_quantile(0.5) == Approx(n / 2).epsilon(0.01));
+  REQUIRE(td.get_quantile(0.5) == Approx(n / 2).epsilon(0.03));
   REQUIRE(td.get_quantile(0.9) == Approx(n * 0.9).epsilon(0.01));
   REQUIRE(td.get_quantile(0.95) == Approx(n * 0.95).epsilon(0.01));
   REQUIRE(td.get_quantile(1) == n - 1);
@@ -137,12 +138,14 @@
 
 TEST_CASE("merge large", "[tdigest]") {
   const size_t n = 10000;
-  tdigest_double td1(100);
-  tdigest_double td2(100);
+  tdigest_double td1;
+  tdigest_double td2;
   for (size_t i = 0; i < n / 2; ++i) {
     td1.update(i);
     td2.update(n / 2 + i);
   }
+//  std::cout << td1.to_string();
+//  std::cout << td2.to_string();
   td1.merge(td2);
 //  td1.compress();
 //  std::cout << td1.to_string(true);
@@ -179,6 +182,19 @@
   REQUIRE(deserialized_td.get_max_value() == 123);
 }
 
+TEST_CASE("serialize deserialize stream single value buffered", "[tdigest]") {
+  tdigest<double> td;
+  td.update(123);
+  std::stringstream s(std::ios::in | std::ios::out | std::ios::binary);
+  td.serialize(s, true);
+  auto deserialized_td = tdigest<double>::deserialize(s);
+  REQUIRE(deserialized_td.get_k() == 200);
+  REQUIRE(deserialized_td.get_total_weight() == 1);
+  REQUIRE_FALSE(deserialized_td.is_empty());
+  REQUIRE(deserialized_td.get_min_value() == 123);
+  REQUIRE(deserialized_td.get_max_value() == 123);
+}
+
 TEST_CASE("serialize deserialize stream many values", "[tdigest]") {
   tdigest<double> td(100);
   for (int i = 0; i < 1000; ++i) td.update(i);
@@ -194,6 +210,21 @@
   REQUIRE(td.get_quantile(0.5) == deserialized_td.get_quantile(0.5));
 }
 
+TEST_CASE("serialize deserialize stream many values with buffer", "[tdigest]") {
+  tdigest<double> td(100);
+  for (int i = 0; i < 10000; ++i) td.update(i);
+  std::stringstream s(std::ios::in | std::ios::out | std::ios::binary);
+  td.serialize(s, true);
+  auto deserialized_td = tdigest<double>::deserialize(s);
+  REQUIRE(td.get_k() == deserialized_td.get_k());
+  REQUIRE(td.get_total_weight() == deserialized_td.get_total_weight());
+  REQUIRE(td.is_empty() == deserialized_td.is_empty());
+  REQUIRE(td.get_min_value() == deserialized_td.get_min_value());
+  REQUIRE(td.get_max_value() == deserialized_td.get_max_value());
+  REQUIRE(td.get_rank(500) == deserialized_td.get_rank(500));
+  REQUIRE(td.get_quantile(0.5) == deserialized_td.get_quantile(0.5));
+}
+
 TEST_CASE("serialize deserialize bytes empty", "[tdigest]") {
   tdigest<double> td(100);
   auto bytes = td.serialize();
@@ -215,6 +246,18 @@
   REQUIRE(deserialized_td.get_max_value() == 123);
 }
 
+TEST_CASE("serialize deserialize bytes single value buffered", "[tdigest]") {
+  tdigest<double> td(200);
+  td.update(123);
+  auto bytes = td.serialize(0, true);
+  auto deserialized_td = tdigest<double>::deserialize(bytes.data(), bytes.size());
+  REQUIRE(deserialized_td.get_k() == 200);
+  REQUIRE(deserialized_td.get_total_weight() == 1);
+  REQUIRE_FALSE(deserialized_td.is_empty());
+  REQUIRE(deserialized_td.get_min_value() == 123);
+  REQUIRE(deserialized_td.get_max_value() == 123);
+}
+
 TEST_CASE("serialize deserialize bytes many values", "[tdigest]") {
   tdigest<double> td(100);
   for (int i = 0; i < 1000; ++i) td.update(i);
@@ -229,6 +272,20 @@
   REQUIRE(td.get_quantile(0.5) == deserialized_td.get_quantile(0.5));
 }
 
+TEST_CASE("serialize deserialize bytes many values with buffer", "[tdigest]") {
+  tdigest<double> td(100);
+  for (int i = 0; i < 10000; ++i) td.update(i);
+  auto bytes = td.serialize();
+  auto deserialized_td = tdigest<double>::deserialize(bytes.data(), bytes.size());
+  REQUIRE(td.get_k() == deserialized_td.get_k());
+  REQUIRE(td.get_total_weight() == deserialized_td.get_total_weight());
+  REQUIRE(td.is_empty() == deserialized_td.is_empty());
+  REQUIRE(td.get_min_value() == deserialized_td.get_min_value());
+  REQUIRE(td.get_max_value() == deserialized_td.get_max_value());
+  REQUIRE(td.get_rank(500) == deserialized_td.get_rank(500));
+  REQUIRE(td.get_quantile(0.5) == deserialized_td.get_quantile(0.5));
+}
+
 TEST_CASE("serialize deserialize steam and bytes equivalence empty", "[tdigest]") {
   tdigest<double> td(100);
   std::stringstream s(std::ios::in | std::ios::out | std::ios::binary);
@@ -287,6 +344,40 @@
   REQUIRE(deserialized_td1.get_quantile(0.5) == deserialized_td2.get_quantile(0.5));
 }
 
+TEST_CASE("serialize deserialize steam and bytes equivalence with buffer", "[tdigest]") {
+  tdigest<double> td(100);
+  const int n = 10000;
+  for (int i = 0; i < n; ++i) td.update(i);
+  std::stringstream s(std::ios::in | std::ios::out | std::ios::binary);
+  td.serialize(s, true);
+  auto bytes = td.serialize(0, true);
+
+  REQUIRE(bytes.size() == static_cast<size_t>(s.tellp()));
+  for (size_t i = 0; i < bytes.size(); ++i) {
+    REQUIRE(((char*)bytes.data())[i] == (char)s.get());
+  }
+
+  s.seekg(0); // rewind
+  auto deserialized_td1 = tdigest<double>::deserialize(s);
+  auto deserialized_td2 = tdigest<double>::deserialize(bytes.data(), bytes.size());
+  REQUIRE(bytes.size() == static_cast<size_t>(s.tellg()));
+
+  REQUIRE_FALSE(deserialized_td1.is_empty());
+  REQUIRE(deserialized_td1.get_k() == 100);
+  REQUIRE(deserialized_td1.get_total_weight() == n);
+  REQUIRE(deserialized_td1.get_min_value() == 0);
+  REQUIRE(deserialized_td1.get_max_value() == n - 1);
+
+  REQUIRE_FALSE(deserialized_td2.is_empty());
+  REQUIRE(deserialized_td2.get_k() == 100);
+  REQUIRE(deserialized_td2.get_total_weight() == n);
+  REQUIRE(deserialized_td2.get_min_value() == 0);
+  REQUIRE(deserialized_td2.get_max_value() == n - 1);
+
+  REQUIRE(deserialized_td1.get_rank(n / 2) == deserialized_td2.get_rank(n / 2));
+  REQUIRE(deserialized_td1.get_quantile(0.5) == deserialized_td2.get_quantile(0.5));
+}
+
 TEST_CASE("deserialize from reference implementation stream double", "[tdigest]") {
   std::ifstream is;
   is.exceptions(std::ios::failbit | std::ios::badbit);