Merge pull request #215 from apache/kolmogorov_smirnov

Kolmogorov-Smirnov test
diff --git a/kll/include/kll_quantile_calculator.hpp b/kll/include/kll_quantile_calculator.hpp
index 86dfe98..a329ff2 100644
--- a/kll/include/kll_quantile_calculator.hpp
+++ b/kll/include/kll_quantile_calculator.hpp
@@ -24,19 +24,27 @@
 
 namespace datasketches {
 
+// forward declaration
+template<typename T, typename C, typename S, typename A> class kll_sketch;
+
 template <typename T, typename C, typename A>
 class kll_quantile_calculator {
   public:
-    // assumes that all levels are sorted including level 0
-    kll_quantile_calculator(const T* items, const uint32_t* levels, uint8_t num_levels, uint64_t n, const A& allocator);
+    using Entry = std::pair<T, uint64_t>;
+    using AllocEntry = typename std::allocator_traits<A>::template rebind_alloc<Entry>;
+    using Container = std::vector<Entry, AllocEntry>;
+    using const_iterator = typename Container::const_iterator;
+
+    template<typename S>
+    kll_quantile_calculator(const kll_sketch<T, C, S, A>& sketch);
+
     T get_quantile(double fraction) const;
+    const_iterator begin() const;
+    const_iterator end() const;
 
   private:
     using AllocU32 = typename std::allocator_traits<A>::template rebind_alloc<uint32_t>;
     using vector_u32 = std::vector<uint32_t, AllocU32>;
-    using Entry = std::pair<T, uint64_t>;
-    using AllocEntry = typename std::allocator_traits<A>::template rebind_alloc<Entry>;
-    using Container = std::vector<Entry, AllocEntry>;
     uint64_t n_;
     vector_u32 levels_;
     Container entries_;
diff --git a/kll/include/kll_quantile_calculator_impl.hpp b/kll/include/kll_quantile_calculator_impl.hpp
index 5bd4294..4d3d118 100644
--- a/kll/include/kll_quantile_calculator_impl.hpp
+++ b/kll/include/kll_quantile_calculator_impl.hpp
@@ -28,24 +28,38 @@
 
 namespace datasketches {
 
-template <typename T, typename C, typename A>
-kll_quantile_calculator<T, C, A>::kll_quantile_calculator(const T* items, const uint32_t* levels, uint8_t num_levels, uint64_t n, const A& allocator):
-n_(n), levels_(num_levels + 1, 0, allocator), entries_(allocator)
+template<typename T, typename C, typename A>
+template<typename S>
+kll_quantile_calculator<T, C, A>::kll_quantile_calculator(const kll_sketch<T, C, S, A>& sketch):
+n_(sketch.n_), levels_(sketch.num_levels_ + 1, 0, sketch.allocator_), entries_(sketch.allocator_)
 {
-  const uint32_t num_items = levels[num_levels] - levels[0];
-  entries_.reserve(num_items);
-  populate_from_sketch(items, levels, num_levels);
-  merge_sorted_blocks(entries_, levels_.data(), static_cast<uint8_t>(levels_.size()) - 1, num_items);
-  if (!is_sorted(entries_.begin(), entries_.end(), compare_pair_by_first<C>())) throw std::logic_error("entries must be sorted");
-  convert_to_preceding_cummulative();
+  const uint32_t num_items = sketch.levels_[sketch.num_levels_] - sketch.levels_[0];
+  if (num_items > 0) {
+    entries_.reserve(num_items);
+    populate_from_sketch(sketch.items_, sketch.levels_.data(), sketch.num_levels_);
+    if (!sketch.is_level_zero_sorted_) std::sort(entries_.begin(), entries_.begin() + levels_[1], compare_pair_by_first<C>());
+    merge_sorted_blocks(entries_, levels_.data(), static_cast<uint8_t>(levels_.size()) - 1, num_items);
+    if (!is_sorted(entries_.begin(), entries_.end(), compare_pair_by_first<C>())) throw std::logic_error("entries must be sorted");
+    convert_to_preceding_cummulative();
+  }
 }
 
-template <typename T, typename C, typename A>
+template<typename T, typename C, typename A>
 T kll_quantile_calculator<T, C, A>::get_quantile(double fraction) const {
   return approximately_answer_positional_query(pos_of_phi(fraction, n_));
 }
 
-template <typename T, typename C, typename A>
+template<typename T, typename C, typename A>
+auto kll_quantile_calculator<T, C, A>::begin() const -> const_iterator {
+  return entries_.begin();
+}
+
+template<typename T, typename C, typename A>
+auto kll_quantile_calculator<T, C, A>::end() const -> const_iterator {
+  return entries_.end();
+}
+
+template<typename T, typename C, typename A>
 void kll_quantile_calculator<T, C, A>::populate_from_sketch(const T* items, const uint32_t* levels, uint8_t num_levels) {
   size_t src_level = 0;
   size_t dst_level = 0;
@@ -68,7 +82,7 @@
   if (levels_.size() > static_cast<size_t>(dst_level + 1)) levels_.resize(dst_level + 1);
 }
 
-template <typename T, typename C, typename A>
+template<typename T, typename C, typename A>
 T kll_quantile_calculator<T, C, A>::approximately_answer_positional_query(uint64_t pos) const {
   if (pos >= n_) throw std::logic_error("position out of range");
   const uint32_t num_items = levels_[levels_.size() - 1];
@@ -77,7 +91,7 @@
   return entries_[index].first;
 }
 
-template <typename T, typename C, typename A>
+template<typename T, typename C, typename A>
 void kll_quantile_calculator<T, C, A>::convert_to_preceding_cummulative() {
   uint64_t subtotal = 0;
   for (auto& entry: entries_) {
@@ -87,13 +101,13 @@
   }
 }
 
-template <typename T, typename C, typename A>
+template<typename T, typename C, typename A>
 uint64_t kll_quantile_calculator<T, C, A>::pos_of_phi(double phi, uint64_t n) {
   const uint64_t pos = static_cast<uint64_t>(std::floor(phi * n));
   return (pos == n) ? n - 1 : pos;
 }
 
-template <typename T, typename C, typename A>
+template<typename T, typename C, typename A>
 uint32_t kll_quantile_calculator<T, C, A>::chunk_containing_pos(uint64_t pos) const {
   if (entries_.size() < 1) throw std::logic_error("array too short");
   if (pos < entries_[0].second) throw std::logic_error("position too small");
@@ -101,7 +115,7 @@
   return search_for_chunk_containing_pos(pos, 0, entries_.size());
 }
 
-template <typename T, typename C, typename A>
+template<typename T, typename C, typename A>
 uint32_t kll_quantile_calculator<T, C, A>::search_for_chunk_containing_pos(uint64_t pos, uint64_t l, uint64_t r) const {
   if (l + 1 == r) {
     return static_cast<uint32_t>(l);
@@ -113,7 +127,7 @@
   return search_for_chunk_containing_pos(pos, l, m);
 }
 
-template <typename T, typename C, typename A>
+template<typename T, typename C, typename A>
 void kll_quantile_calculator<T, C, A>::merge_sorted_blocks(Container& entries, const uint32_t* levels, uint8_t num_levels, uint32_t num_items) {
   if (num_levels == 1) return;
   Container temporary(entries.get_allocator());
@@ -121,7 +135,7 @@
   merge_sorted_blocks_direct(entries, temporary, levels, 0, num_levels);
 }
 
-template <typename T, typename C, typename A>
+template<typename T, typename C, typename A>
 void kll_quantile_calculator<T, C, A>::merge_sorted_blocks_direct(Container& orig, Container& temp, const uint32_t* levels,
     uint8_t starting_level, uint8_t num_levels) {
   if (num_levels == 1) return;
@@ -142,7 +156,7 @@
   temp.erase(chunk_begin, temp.end());
 }
 
-template <typename T, typename C, typename A>
+template<typename T, typename C, typename A>
 void kll_quantile_calculator<T, C, A>::merge_sorted_blocks_reversed(Container& orig, Container& temp, const uint32_t* levels,
     uint8_t starting_level, uint8_t num_levels) {
   if (num_levels == 1) {
diff --git a/kll/include/kll_sketch.hpp b/kll/include/kll_sketch.hpp
index 6447a26..b7f076e 100644
--- a/kll/include/kll_sketch.hpp
+++ b/kll/include/kll_sketch.hpp
@@ -156,6 +156,9 @@
 template <typename T, typename C = std::less<T>, typename S = serde<T>, typename A = std::allocator<T>>
 class kll_sketch {
   public:
+    using value_type = T;
+    using comparator = C;
+
     static const uint8_t DEFAULT_M = 8;
     static const uint16_t DEFAULT_K = 200;
     static const uint16_t MIN_K = DEFAULT_M;
@@ -391,7 +394,7 @@
 
     // This is a convenience alias for users
     // The type returned by the following serialize method
-    typedef vector_u8<A> vector_bytes;
+    using vector_bytes = vector_u8<A>;
 
     /**
      * This method serializes the sketch as a vector of bytes.
@@ -480,6 +483,8 @@
     T* max_value_;
     bool is_level_zero_sorted_;
 
+    friend class kll_quantile_calculator<T, C, A>;
+
     // for deserialization
     class item_deleter;
     class items_deleter;
diff --git a/kll/include/kll_sketch_impl.hpp b/kll/include/kll_sketch_impl.hpp
index 0115d75..dc30d55 100644
--- a/kll/include/kll_sketch_impl.hpp
+++ b/kll/include/kll_sketch_impl.hpp
@@ -771,7 +771,7 @@
   using AllocCalc = typename std::allocator_traits<A>::template rebind_alloc<kll_quantile_calculator<T, C, A>>;
   AllocCalc alloc(allocator_);
   std::unique_ptr<kll_quantile_calculator<T, C, A>, std::function<void(kll_quantile_calculator<T, C, A>*)>> quantile_calculator(
-    new (alloc.allocate(1)) kll_quantile_calculator<T, C, A>(items_, levels_.data(), num_levels_, n_, allocator_),
+    new (alloc.allocate(1)) kll_quantile_calculator<T, C, A>(*this),
     [&alloc](kll_quantile_calculator<T, C, A>* ptr){ ptr->~kll_quantile_calculator<T, C, A>(); alloc.deallocate(ptr, 1); }
   );
   return quantile_calculator;
diff --git a/kll/include/kolmogorov_smirnov.hpp b/kll/include/kolmogorov_smirnov.hpp
new file mode 100644
index 0000000..9068522
--- /dev/null
+++ b/kll/include/kolmogorov_smirnov.hpp
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef KOLMOGOROV_SMIRNOV_HPP_
+#define KOLMOGOROV_SMIRNOV_HPP_
+
+namespace datasketches {
+
+class kolmogorov_smirnov {
+public:
+  /**
+   * Computes the raw delta area between two KLL quantile sketches for the Kolmogorov-Smirnov Test.
+   * @param sketch1 KLL sketch 1
+   * @param sketch2 KLL sketch 2
+   * @return the raw delta between two KLL quantile sketches
+   */
+  template<typename Sketch>
+  static double delta(const Sketch& sketch1, const Sketch& sketch2);
+
+  /**
+   * Computes the adjusted delta area threshold for the Kolmogorov-Smirnov Test.
+   * Adjusts the computed threshold by the error epsilons of the two given sketches.
+   * See <a href="https://en.wikipedia.org/wiki/Kolmogorov-Smirnov_test">Kolmogorov–Smirnov Test</a>
+   * @param sketch1 KLL sketch 1
+   * @param sketch2 KLL sketch 2
+   * @param p Target p-value. Typically .001 to .1, e.g., .05.
+   * @return the adjusted threshold to be compared with the raw delta
+   */
+  template<typename Sketch>
+  static double threshold(const Sketch& sketch1, const Sketch& sketch2, double p);
+
+  /**
+   * Performs the Kolmogorov-Smirnov Test between two KLL quantiles sketches.
+   * Note: if the given sketches have insufficient data or if the sketch sizes are too small,
+   * this will return false.
+   * @param sketch1 KLL sketch 1
+   * @param sketch2 KLL sketch 2
+   * @param p Target p-value. Typically .001 to .1, e.g., .05.
+   * @return Boolean indicating whether we can reject the null hypothesis (that the sketches
+   * reflect the same underlying distribution) using the provided p-value.
+   */
+  template<typename Sketch>
+  static bool test(const Sketch& sketch1, const Sketch& sketch2, double p);
+
+};
+
+} /* namespace datasketches */
+
+#include "kolmogorov_smirnov_impl.hpp"
+
+#endif
diff --git a/kll/include/kolmogorov_smirnov_impl.hpp b/kll/include/kolmogorov_smirnov_impl.hpp
new file mode 100644
index 0000000..17eaf0d
--- /dev/null
+++ b/kll/include/kolmogorov_smirnov_impl.hpp
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef KOLMOGOROV_SMIRNOV_IMPL_HPP_
+#define KOLMOGOROV_SMIRNOV_IMPL_HPP_
+
+namespace datasketches {
+
+// type resolver
+template<typename T, typename C, typename S, typename A>
+kll_quantile_calculator<T, C, A> make_quantile_calculator(const kll_sketch<T, C, S, A>& sketch) {
+  return kll_quantile_calculator<T, C, A>(sketch);
+}
+
+template<typename Sketch>
+double kolmogorov_smirnov::delta(const Sketch& sketch1, const Sketch& sketch2) {
+  using Comparator = typename Sketch::comparator;
+  auto calc1 = make_quantile_calculator(sketch1);
+  auto calc2 = make_quantile_calculator(sketch2);
+  auto it1 = calc1.begin();
+  auto it2 = calc2.begin();
+  const auto n1 = sketch1.get_n();
+  const auto n2 = sketch2.get_n();
+  double delta = 0;
+  while (it1 != calc1.end() && it2 != calc2.end()) {
+    const double norm_cum_wt1 = static_cast<double>((*it1).second) / n1;
+    const double norm_cum_wt2 = static_cast<double>((*it2).second) / n2;
+    delta = std::max(delta, std::abs(norm_cum_wt1 - norm_cum_wt2));
+    if (Comparator()((*it1).first, (*it2).first)) {
+      ++it1;
+    } else if (Comparator()((*it2).first, (*it1).first)) {
+      ++it2;
+    } else {
+      ++it1;
+      ++it2;
+    }
+  }
+  const double norm_cum_wt1 = it1 == calc1.end() ? 1 : static_cast<double>((*it1).second) / n1;
+  const double norm_cum_wt2 = it2 == calc2.end() ? 1 : static_cast<double>((*it2).second) / n2;
+  delta = std::max(delta, std::abs(norm_cum_wt1 - norm_cum_wt2));
+  return delta;
+}
+
+template<typename Sketch>
+double kolmogorov_smirnov::threshold(const Sketch& sketch1, const Sketch& sketch2, double p) {
+  const double r1 = sketch1.get_num_retained();
+  const double r2 = sketch2.get_num_retained();
+  const double alpha_factor = sqrt(-0.5 * log(0.5 * p));
+  const double delta_area_threshold = alpha_factor * sqrt((r1 + r2) / (r1 * r2));
+  const double eps1 = sketch1.get_normalized_rank_error(false);
+  const double eps2 = sketch2.get_normalized_rank_error(false);
+  return delta_area_threshold + eps1 + eps2;
+}
+
+template<typename Sketch>
+bool kolmogorov_smirnov::test(const Sketch& sketch1, const Sketch& sketch2, double p) {
+  return delta(sketch1, sketch2) > threshold(sketch1, sketch2, p);
+}
+
+} /* namespace datasketches */
+
+#endif
diff --git a/kll/test/CMakeLists.txt b/kll/test/CMakeLists.txt
index c48d1b2..e5f8325 100644
--- a/kll/test/CMakeLists.txt
+++ b/kll/test/CMakeLists.txt
@@ -41,4 +41,5 @@
     kll_sketch_test.cpp
     kll_sketch_custom_type_test.cpp
     kll_sketch_validation.cpp
+    kolmogorov_smirnov_test.cpp
 )
diff --git a/kll/test/kolmogorov_smirnov_test.cpp b/kll/test/kolmogorov_smirnov_test.cpp
new file mode 100644
index 0000000..4c1d7b2
--- /dev/null
+++ b/kll/test/kolmogorov_smirnov_test.cpp
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <catch.hpp>
+
+#include <random>
+
+#include <kll_sketch.hpp>
+#include <kolmogorov_smirnov.hpp>
+
+namespace datasketches {
+
+TEST_CASE("kolmogorov-smirnov empty", "[kll_sketch]") {
+  const uint16_t k = 200;
+  kll_sketch<double> sketch1(k);
+  kll_sketch<double> sketch2(k);
+  REQUIRE(kolmogorov_smirnov::delta(sketch1, sketch2) == 0);
+  REQUIRE_FALSE(kolmogorov_smirnov::test(sketch1, sketch2, 0.01));
+}
+
+TEST_CASE("kolmogorov-smirnov same distribution", "[kll_sketch]") {
+  const uint16_t k = 200;
+  kll_sketch<double> sketch1(k);
+  kll_sketch<double> sketch2(k);
+  std::default_random_engine rand;
+  std::normal_distribution<double> distr;
+  const int n = k * 3 - 1;
+  for (int i = 0; i < n; ++i) {
+    const double x = distr(rand);
+    sketch1.update(x);
+    sketch2.update(x);
+  }
+  REQUIRE(kolmogorov_smirnov::delta(sketch1, sketch2) == Approx(0).margin(0.01));
+  REQUIRE_FALSE(kolmogorov_smirnov::test(sketch1, sketch2, 0.01));
+}
+
+TEST_CASE("kolmogorov-smirnov very different distributions", "[kll_sketch]") {
+  const uint16_t k = 200;
+  kll_sketch<double> sketch1(k);
+  kll_sketch<double> sketch2(k);
+  std::default_random_engine rand;
+  std::normal_distribution<double> distr;
+  const int n = k * 3 - 1;
+  for (int i = 0; i < n; ++i) {
+    const double x = distr(rand);
+    sketch1.update(x + 100.0);
+    sketch2.update(x);
+  }
+  const auto delta = kolmogorov_smirnov::delta(sketch1, sketch2);
+  REQUIRE(delta == Approx(1.0).margin(1e-6));
+  REQUIRE(delta <= 1);
+  REQUIRE(kolmogorov_smirnov::test(sketch1, sketch2, 0.05));
+}
+
+TEST_CASE("kolmogorov-smirnov slightly different distributions", "[kll_sketch]") {
+  const uint16_t k = 2000;
+  kll_sketch<double> sketch1(k);
+  kll_sketch<double> sketch2(k);
+  std::default_random_engine rand;
+  std::normal_distribution<double> distr;
+  const int n = k * 3 - 1;
+  for (int i = 0; i < n; ++i) {
+    const double x = distr(rand);
+    sketch1.update(x + 0.05);
+    sketch2.update(x);
+  }
+  const double delta = kolmogorov_smirnov::delta(sketch1, sketch2);
+  REQUIRE(delta == Approx(0.02).margin(0.01));
+  const double threshold = kolmogorov_smirnov::threshold(sketch1, sketch2, 0.05);
+  //std::cout << "delta=" << delta << ", threshold=" << threshold << "\n";
+  REQUIRE_FALSE(delta > threshold);
+  REQUIRE_FALSE(kolmogorov_smirnov::test(sketch1, sketch2, 0.05));
+}
+
+TEST_CASE("kolmogorov-smirnov slightly different distributions high resolution", "[kll_sketch]") {
+  const uint16_t k = 8000;
+  kll_sketch<double> sketch1(k);
+  kll_sketch<double> sketch2(k);
+  std::default_random_engine rand;
+  std::normal_distribution<double> distr;
+  const int n = k * 3 - 1;
+  for (int i = 0; i < n; ++i) {
+    const double x = distr(rand);
+    sketch1.update(x + 0.05);
+    sketch2.update(x);
+  }
+  const double delta = kolmogorov_smirnov::delta(sketch1, sketch2);
+  REQUIRE(delta == Approx(0.02).margin(0.01));
+  const double threshold = kolmogorov_smirnov::threshold(sketch1, sketch2, 0.05);
+  //std::cout << "delta=" << delta << ", threshold=" << threshold << "\n";
+  REQUIRE(delta > threshold);
+  REQUIRE(kolmogorov_smirnov::test(sketch1, sketch2, 0.05));
+}
+
+} /* namespace datasketches */