Merge pull request #223 from apache/wrapped_compact_theta

wrapped compact theta sketch
diff --git a/theta/include/compact_theta_sketch_parser.hpp b/theta/include/compact_theta_sketch_parser.hpp
new file mode 100644
index 0000000..a06924e
--- /dev/null
+++ b/theta/include/compact_theta_sketch_parser.hpp
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef COMPACT_THETA_SKETCH_PARSER_HPP_
+#define COMPACT_THETA_SKETCH_PARSER_HPP_
+
+#include <stdint.h>
+
+namespace datasketches {
+
+template<bool dummy>
+class compact_theta_sketch_parser {
+public:
+  struct compact_theta_sketch_data {
+    bool is_empty;
+    bool is_ordered;
+    uint16_t seed_hash;
+    uint32_t num_entries;
+    uint64_t theta;
+    const uint64_t* entries;
+  };
+
+  static compact_theta_sketch_data parse(const void* ptr, size_t size, uint64_t seed, bool dump_on_error = false);
+
+private:
+  // offsets are in sizeof(type)
+  static const size_t COMPACT_SKETCH_PRE_LONGS_BYTE = 0;
+  static const size_t COMPACT_SKETCH_SERIAL_VERSION_BYTE = 1;
+  static const size_t COMPACT_SKETCH_TYPE_BYTE = 2;
+  static const size_t COMPACT_SKETCH_FLAGS_BYTE = 5;
+  static const size_t COMPACT_SKETCH_SEED_HASH_U16 = 3;
+  static const size_t COMPACT_SKETCH_NUM_ENTRIES_U32 = 2;
+  static const size_t COMPACT_SKETCH_SINGLE_ENTRY_U64 = 1;
+  static const size_t COMPACT_SKETCH_ENTRIES_EXACT_U64 = 2;
+  static const size_t COMPACT_SKETCH_THETA_U64 = 2;
+  static const size_t COMPACT_SKETCH_ENTRIES_ESTIMATION_U64 = 3;
+
+  static const uint8_t COMPACT_SKETCH_IS_EMPTY_FLAG = 2;
+  static const uint8_t COMPACT_SKETCH_IS_ORDERED_FLAG = 4;
+
+  static const uint8_t COMPACT_SKETCH_SERIAL_VERSION = 3;
+  static const uint8_t COMPACT_SKETCH_TYPE = 3;
+
+  static std::string hex_dump(const uint8_t* ptr, size_t size);
+};
+
+} /* namespace datasketches */
+
+#include "compact_theta_sketch_parser_impl.hpp"
+
+#endif
diff --git a/theta/include/compact_theta_sketch_parser_impl.hpp b/theta/include/compact_theta_sketch_parser_impl.hpp
new file mode 100644
index 0000000..c546ba7
--- /dev/null
+++ b/theta/include/compact_theta_sketch_parser_impl.hpp
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef COMPACT_THETA_SKETCH_PARSER_IMPL_HPP_
+#define COMPACT_THETA_SKETCH_PARSER_IMPL_HPP_
+
+#include <iostream>
+#include <iomanip>
+
+namespace datasketches {
+
+template<bool dummy>
+auto compact_theta_sketch_parser<dummy>::parse(const void* ptr, size_t size, uint64_t seed, bool dump_on_error) -> compact_theta_sketch_data {
+  if (size < 8) throw std::invalid_argument("at least 8 bytes expected, actual " + std::to_string(size)
+      + (dump_on_error ? (", sketch dump: " + hex_dump(reinterpret_cast<const uint8_t*>(ptr), size)) : ""));
+  checker<true>::check_serial_version(reinterpret_cast<const uint8_t*>(ptr)[COMPACT_SKETCH_SERIAL_VERSION_BYTE], COMPACT_SKETCH_SERIAL_VERSION);
+  checker<true>::check_sketch_type(reinterpret_cast<const uint8_t*>(ptr)[COMPACT_SKETCH_TYPE_BYTE], COMPACT_SKETCH_TYPE);
+  uint64_t theta = theta_constants::MAX_THETA;
+  const uint16_t seed_hash = reinterpret_cast<const uint16_t*>(ptr)[COMPACT_SKETCH_SEED_HASH_U16];
+  checker<true>::check_seed_hash(seed_hash, compute_seed_hash(seed));
+  if (reinterpret_cast<const uint8_t*>(ptr)[COMPACT_SKETCH_FLAGS_BYTE] & (1 << COMPACT_SKETCH_IS_EMPTY_FLAG)) {
+    return {true, true, seed_hash, 0, theta, nullptr};
+  }
+  const bool has_theta = reinterpret_cast<const uint8_t*>(ptr)[COMPACT_SKETCH_PRE_LONGS_BYTE] > 2;
+  if (has_theta) {
+    if (size < 16) throw std::invalid_argument("at least 16 bytes expected, actual " + std::to_string(size));
+    theta = reinterpret_cast<const uint64_t*>(ptr)[COMPACT_SKETCH_THETA_U64];
+  }
+  if (reinterpret_cast<const uint8_t*>(ptr)[COMPACT_SKETCH_PRE_LONGS_BYTE] == 1) {
+    return {false, true, seed_hash, 1, theta, reinterpret_cast<const uint64_t*>(ptr) + COMPACT_SKETCH_SINGLE_ENTRY_U64};
+  }
+  const uint32_t num_entries = reinterpret_cast<const uint32_t*>(ptr)[COMPACT_SKETCH_NUM_ENTRIES_U32];
+  const size_t entries_start_u64 = has_theta ? COMPACT_SKETCH_ENTRIES_ESTIMATION_U64 : COMPACT_SKETCH_ENTRIES_EXACT_U64;
+  const uint64_t* entries = reinterpret_cast<const uint64_t*>(ptr) + entries_start_u64;
+  const size_t expected_size_bytes = (entries_start_u64 + num_entries) * sizeof(uint64_t);
+  if (size < expected_size_bytes) {
+    throw std::invalid_argument(std::to_string(expected_size_bytes) + " bytes expected, actual " + std::to_string(size)
+        + (dump_on_error ? (", sketch dump: " + hex_dump(reinterpret_cast<const uint8_t*>(ptr), size)) : ""));
+  }
+  const bool is_ordered = reinterpret_cast<const uint8_t*>(ptr)[COMPACT_SKETCH_FLAGS_BYTE] & (1 << COMPACT_SKETCH_IS_ORDERED_FLAG);
+  return {false, is_ordered, seed_hash, num_entries, theta, entries};
+}
+
+template<bool dummy>
+std::string compact_theta_sketch_parser<dummy>::hex_dump(const uint8_t* ptr, size_t size) {
+  std::stringstream s;
+  s << std::hex << std::setfill('0') << std::uppercase;
+  for (size_t i = 0; i < size; ++i) s << std::setw(2) << (ptr[i] & 0xff);
+  return s.str();
+}
+
+} /* namespace datasketches */
+
+#endif
diff --git a/theta/include/theta_sketch.hpp b/theta/include/theta_sketch.hpp
index 2e24168..1bca459 100644
--- a/theta/include/theta_sketch.hpp
+++ b/theta/include/theta_sketch.hpp
@@ -311,7 +311,8 @@
   // - as a result of a set operation
   // - by deserializing a previously serialized compact sketch
 
-  compact_theta_sketch_alloc(const Base& other, bool ordered);
+  template<typename Other>
+  compact_theta_sketch_alloc(const Other& other, bool ordered);
   compact_theta_sketch_alloc(const compact_theta_sketch_alloc&) = default;
   compact_theta_sketch_alloc(compact_theta_sketch_alloc&&) noexcept = default;
   virtual ~compact_theta_sketch_alloc() = default;
@@ -387,10 +388,50 @@
     update_theta_sketch_alloc build() const;
 };
 
+// This is to wrap a buffer containing a serialized compact sketch and use it in a set operation avoiding some cost of deserialization.
+// It does not take the ownership of the buffer.
+
+template<typename Allocator = std::allocator<uint64_t>>
+class wrapped_compact_theta_sketch_alloc {
+public:
+  using const_iterator = const uint64_t*;
+
+  Allocator get_allocator() const;
+  bool is_empty() const;
+  bool is_ordered() const;
+  uint64_t get_theta64() const;
+  uint32_t get_num_retained() const;
+  uint16_t get_seed_hash() const;
+
+  const_iterator begin() const;
+  const_iterator end() const;
+
+  /**
+   * This method wraps a serialized compact sketch as an array of bytes.
+   * @param bytes pointer to the array of bytes
+   * @param size the size of the array
+   * @param seed the seed for the hash function that was used to create the sketch
+   * @return an instance of the sketch
+   */
+  static const wrapped_compact_theta_sketch_alloc wrap(const void* bytes, size_t size, uint64_t seed = DEFAULT_SEED, bool dump_on_error = false);
+
+private:
+  bool is_empty_;
+  bool is_ordered_;
+  uint16_t seed_hash_;
+  uint32_t num_entries_;
+  uint64_t theta_;
+  const uint64_t* entries_;
+
+  wrapped_compact_theta_sketch_alloc(bool is_empty, bool is_ordered, uint16_t seed_hash, uint32_t num_entries,
+      uint64_t theta, const uint64_t* entries);
+};
+
 // aliases with default allocator for convenience
 using theta_sketch = theta_sketch_alloc<std::allocator<uint64_t>>;
 using update_theta_sketch = update_theta_sketch_alloc<std::allocator<uint64_t>>;
 using compact_theta_sketch = compact_theta_sketch_alloc<std::allocator<uint64_t>>;
+using wrapped_compact_theta_sketch = wrapped_compact_theta_sketch_alloc<std::allocator<uint64_t>>;
 
 } /* namespace datasketches */
 
diff --git a/theta/include/theta_sketch_impl.hpp b/theta/include/theta_sketch_impl.hpp
index 0653a70..1bee921 100644
--- a/theta/include/theta_sketch_impl.hpp
+++ b/theta/include/theta_sketch_impl.hpp
@@ -26,6 +26,7 @@
 #include "serde.hpp"
 #include "binomial_bounds.hpp"
 #include "theta_helpers.hpp"
+#include "compact_theta_sketch_parser.hpp"
 
 namespace datasketches {
 
@@ -246,7 +247,8 @@
 // compact sketch
 
 template<typename A>
-compact_theta_sketch_alloc<A>::compact_theta_sketch_alloc(const Base& other, bool ordered):
+template<typename Other>
+compact_theta_sketch_alloc<A>::compact_theta_sketch_alloc(const Other& other, bool ordered):
 is_empty_(other.is_empty()),
 is_ordered_(other.is_ordered() || ordered),
 seed_hash_(other.get_seed_hash()),
@@ -472,7 +474,65 @@
   return compact_theta_sketch_alloc(is_empty, is_ordered, seed_hash, theta, std::move(entries));
 }
 
+// wrapped compact sketch
+
+template<typename A>
+wrapped_compact_theta_sketch_alloc<A>::wrapped_compact_theta_sketch_alloc(bool is_empty, bool is_ordered, uint16_t seed_hash, uint32_t num_entries,
+    uint64_t theta, const uint64_t* entries):
+is_empty_(is_empty),
+is_ordered_(is_ordered),
+seed_hash_(seed_hash),
+num_entries_(num_entries),
+theta_(theta),
+entries_(entries)
+{}
+
+template<typename A>
+const wrapped_compact_theta_sketch_alloc<A> wrapped_compact_theta_sketch_alloc<A>::wrap(const void* bytes, size_t size, uint64_t seed, bool dump_on_error) {
+  auto data = compact_theta_sketch_parser<true>::parse(bytes, size, seed, dump_on_error);
+  return wrapped_compact_theta_sketch_alloc(data.is_empty, data.is_ordered, data.seed_hash, data.num_entries, data.theta, data.entries);
+}
+
+template<typename A>
+A wrapped_compact_theta_sketch_alloc<A>::get_allocator() const {
+  return A();
+}
+
+template<typename A>
+bool wrapped_compact_theta_sketch_alloc<A>::is_empty() const {
+  return is_empty_;
+}
+
+template<typename A>
+bool wrapped_compact_theta_sketch_alloc<A>::is_ordered() const {
+  return is_ordered_;
+}
+
+template<typename A>
+uint64_t wrapped_compact_theta_sketch_alloc<A>::get_theta64() const {
+  return theta_;
+}
+
+template<typename A>
+uint32_t wrapped_compact_theta_sketch_alloc<A>::get_num_retained() const {
+  return static_cast<uint32_t>(num_entries_);
+}
+
+template<typename A>
+uint16_t wrapped_compact_theta_sketch_alloc<A>::get_seed_hash() const {
+  return seed_hash_;
+}
+
+template<typename A>
+auto wrapped_compact_theta_sketch_alloc<A>::begin() const -> const_iterator {
+  return entries_;
+}
+
+template<typename A>
+auto wrapped_compact_theta_sketch_alloc<A>::end() const -> const_iterator {
+  return entries_ + num_entries_;
+}
+
 } /* namespace datasketches */
 
 #endif
-
diff --git a/theta/test/theta_a_not_b_test.cpp b/theta/test/theta_a_not_b_test.cpp
index 4e6ff26..ec2aa54 100644
--- a/theta/test/theta_a_not_b_test.cpp
+++ b/theta/test/theta_a_not_b_test.cpp
@@ -167,6 +167,28 @@
   REQUIRE(result.get_estimate() == Approx(5000).margin(5000 * 0.02));
 }
 
+TEST_CASE("theta a-not-b: estimation mode half overlap wrapped compact", "[theta_a_not_b]") {
+  update_theta_sketch a = update_theta_sketch::builder().build();
+  int value = 0;
+  for (int i = 0; i < 10000; i++) a.update(value++);
+  auto bytes_a = a.compact().serialize();
+
+  update_theta_sketch b = update_theta_sketch::builder().build();
+  value = 5000;
+  for (int i = 0; i < 10000; i++) b.update(value++);
+  auto bytes_b = b.compact().serialize();
+
+  theta_a_not_b a_not_b;
+
+  auto result = a_not_b.compute(
+    wrapped_compact_theta_sketch::wrap(bytes_a.data(), bytes_a.size()),
+    wrapped_compact_theta_sketch::wrap(bytes_b.data(), bytes_b.size())
+  );
+  REQUIRE_FALSE(result.is_empty());
+  REQUIRE(result.is_estimation_mode());
+  REQUIRE(result.get_estimate() == Approx(5000).margin(5000 * 0.02));
+}
+
 TEST_CASE("theta a-not-b: estimation mode disjoint", "[theta_a_not_b]") {
   update_theta_sketch a = update_theta_sketch::builder().build();
   int value = 0;
diff --git a/theta/test/theta_intersection_test.cpp b/theta/test/theta_intersection_test.cpp
index c8fb6e6..34ff667 100644
--- a/theta/test/theta_intersection_test.cpp
+++ b/theta/test/theta_intersection_test.cpp
@@ -174,6 +174,26 @@
   REQUIRE(result.get_estimate() == Approx(5000).margin(5000 * 0.02));
 }
 
+TEST_CASE("theta intersection: estimation mode half overlap ordered wrapped compact", "[theta_intersection]") {
+  update_theta_sketch sketch1 = update_theta_sketch::builder().build();
+  int value = 0;
+  for (int i = 0; i < 10000; i++) sketch1.update(value++);
+  auto bytes1 = sketch1.compact().serialize();
+
+  update_theta_sketch sketch2 = update_theta_sketch::builder().build();
+  value = 5000;
+  for (int i = 0; i < 10000; i++) sketch2.update(value++);
+  auto bytes2 = sketch2.compact().serialize();
+
+  theta_intersection intersection;
+  intersection.update(wrapped_compact_theta_sketch::wrap(bytes1.data(), bytes1.size()));
+  intersection.update(wrapped_compact_theta_sketch::wrap(bytes2.data(), bytes2.size()));
+  compact_theta_sketch result = intersection.get_result();
+  REQUIRE_FALSE(result.is_empty());
+  REQUIRE(result.is_estimation_mode());
+  REQUIRE(result.get_estimate() == Approx(5000).margin(5000 * 0.02));
+}
+
 TEST_CASE("theta intersection: estimation mode disjoint unordered", "[theta_intersection]") {
   update_theta_sketch sketch1 = update_theta_sketch::builder().build();
   int value = 0;
diff --git a/theta/test/theta_sketch_test.cpp b/theta/test/theta_sketch_test.cpp
index eeb2a73..730b904 100644
--- a/theta/test/theta_sketch_test.cpp
+++ b/theta/test/theta_sketch_test.cpp
@@ -238,4 +238,40 @@
   REQUIRE_THROWS_AS(compact_theta_sketch::deserialize(bytes.data(), bytes.size() - 1), std::out_of_range);
 }
 
+TEST_CASE("theta sketch: conversion constructor and wrapped compact", "[theta_sketch]") {
+  update_theta_sketch update_sketch = update_theta_sketch::builder().build();
+  const int n = 8192;
+  for (int i = 0; i < n; i++) update_sketch.update(i);
+
+  // unordered
+  auto unordered_compact1 = update_sketch.compact(false);
+  compact_theta_sketch unordered_compact2(update_sketch, false);
+  auto it = unordered_compact1.begin();
+  for (auto entry: unordered_compact2) {
+    REQUIRE(*it == entry);
+    ++it;
+  }
+
+  // ordered
+  auto ordered_compact1 = update_sketch.compact();
+  compact_theta_sketch ordered_compact2(update_sketch, true);
+  it = ordered_compact1.begin();
+  for (auto entry: ordered_compact2) {
+    REQUIRE(*it == entry);
+    ++it;
+  }
+
+  // wrapped compact
+  auto bytes = ordered_compact1.serialize();
+  auto ordered_compact3 = wrapped_compact_theta_sketch::wrap(bytes.data(), bytes.size());
+  it = ordered_compact1.begin();
+  for (auto entry: ordered_compact3) {
+    REQUIRE(*it == entry);
+    ++it;
+  }
+
+  // seed mismatch
+  REQUIRE_THROWS_AS(wrapped_compact_theta_sketch::wrap(bytes.data(), bytes.size(), 0), std::invalid_argument);
+}
+
 } /* namespace datasketches */
diff --git a/theta/test/theta_union_test.cpp b/theta/test/theta_union_test.cpp
index c170457..a8b3878 100644
--- a/theta/test/theta_union_test.cpp
+++ b/theta/test/theta_union_test.cpp
@@ -65,7 +65,27 @@
   compact_theta_sketch sketch3 = u.get_result();
   REQUIRE_FALSE(sketch3.is_empty());
   REQUIRE_FALSE(sketch3.is_estimation_mode());
-  REQUIRE(sketch3.get_estimate() == Approx(1500).margin(1500 * 0.01));
+  REQUIRE(sketch3.get_estimate() == 1500.0);
+}
+
+TEST_CASE("theta union: exact mode half overlap wrapped compact", "[theta_union]") {
+  update_theta_sketch sketch1 = update_theta_sketch::builder().build();
+  int value = 0;
+  for (int i = 0; i < 1000; i++) sketch1.update(value++);
+  auto bytes1 = sketch1.compact().serialize();
+
+  update_theta_sketch sketch2 = update_theta_sketch::builder().build();
+  value = 500;
+  for (int i = 0; i < 1000; i++) sketch2.update(value++);
+  auto bytes2 = sketch2.compact().serialize();
+
+  theta_union u = theta_union::builder().build();
+  u.update(wrapped_compact_theta_sketch::wrap(bytes1.data(), bytes1.size()));
+  u.update(wrapped_compact_theta_sketch::wrap(bytes2.data(), bytes2.size()));
+  compact_theta_sketch sketch3 = u.get_result();
+  REQUIRE_FALSE(sketch3.is_empty());
+  REQUIRE_FALSE(sketch3.is_estimation_mode());
+  REQUIRE(sketch3.get_estimate() == 1500.0);
 }
 
 TEST_CASE("theta union: estimation mode half overlap", "[theta_union]") {