array of doubles sketch
diff --git a/Makefile b/Makefile
index 61ddebf..fe2c471 100644
--- a/Makefile
+++ b/Makefile
@@ -19,7 +19,12 @@
EXTVERSION = $(shell grep default_version $(EXTENSION).control | sed -e "s/default_version[[:space:]]*=[[:space:]]*'\([^']*\)'/\1/")
MODULE_big = datasketches
-SQL_MODULES = sql/datasketches_cpc_sketch.sql sql/datasketches_kll_float_sketch.sql sql/datasketches_theta_sketch.sql sql/datasketches_frequent_strings_sketch.sql sql/datasketches_hll_sketch.sql
+SQL_MODULES = sql/datasketches_cpc_sketch.sql \
+ sql/datasketches_kll_float_sketch.sql \
+ sql/datasketches_theta_sketch.sql \
+ sql/datasketches_frequent_strings_sketch.sql \
+ sql/datasketches_hll_sketch.sql \
+ sql/datasketches_aod_sketch.sql
SQL_INSTALL = sql/$(EXTENSION)--$(EXTVERSION).sql
DATA = $(SQL_INSTALL)
@@ -30,12 +35,13 @@
src/cpc_sketch_pg_functions.o src/cpc_sketch_c_adapter.o \
src/theta_sketch_pg_functions.o src/theta_sketch_c_adapter.o \
src/frequent_strings_sketch_pg_functions.o src/frequent_strings_sketch_c_adapter.o \
- src/hll_sketch_pg_functions.o src/hll_sketch_c_adapter.o
+ src/hll_sketch_pg_functions.o src/hll_sketch_c_adapter.o \
+ src/aod_sketch_pg_functions.o src/aod_sketch_c_adapter.o
# assume a copy or link datasketches-cpp in the current dir
CORE = datasketches-cpp
-PG_CPPFLAGS = -fPIC -I/usr/local/include -I$(CORE)/kll/include -I$(CORE)/common/include -I$(CORE)/cpc/include -I$(CORE)/theta/include -I$(CORE)/fi/include -I$(CORE)/hll/include
+PG_CPPFLAGS = -fPIC -I/usr/local/include -I$(CORE)/kll/include -I$(CORE)/common/include -I$(CORE)/cpc/include -I$(CORE)/theta/include -I$(CORE)/fi/include -I$(CORE)/hll/include -I$(CORE)/tuple/include
PG_CXXFLAGS = -std=c++11
SHLIB_LINK = -lstdc++ -L/usr/local/lib
diff --git a/sql/datasketches_aod_sketch.sql b/sql/datasketches_aod_sketch.sql
new file mode 100644
index 0000000..8f26cd0
--- /dev/null
+++ b/sql/datasketches_aod_sketch.sql
@@ -0,0 +1,171 @@
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements. See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership. The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied. See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+
+CREATE TYPE aod_sketch;
+
+CREATE OR REPLACE FUNCTION aod_sketch_in(cstring) RETURNS aod_sketch
+ AS '$libdir/datasketches', 'pg_sketch_in'
+ LANGUAGE C STRICT IMMUTABLE;
+
+CREATE OR REPLACE FUNCTION aod_sketch_out(aod_sketch) RETURNS cstring
+ AS '$libdir/datasketches', 'pg_sketch_out'
+ LANGUAGE C STRICT IMMUTABLE;
+
+CREATE TYPE aod_sketch (
+ INPUT = aod_sketch_in,
+ OUTPUT = aod_sketch_out,
+ STORAGE = EXTERNAL
+);
+
+CREATE CAST (bytea as aod_sketch) WITHOUT FUNCTION AS ASSIGNMENT;
+CREATE CAST (aod_sketch as bytea) WITHOUT FUNCTION AS ASSIGNMENT;
+
+CREATE OR REPLACE FUNCTION aod_sketch_add_item(internal, anyelement, double precision[]) RETURNS internal
+ AS '$libdir/datasketches', 'pg_aod_sketch_add_item'
+ LANGUAGE C IMMUTABLE;
+
+CREATE OR REPLACE FUNCTION aod_sketch_add_item(internal, anyelement, double precision[], int) RETURNS internal
+ AS '$libdir/datasketches', 'pg_aod_sketch_add_item'
+ LANGUAGE C IMMUTABLE;
+
+CREATE OR REPLACE FUNCTION aod_sketch_add_item(internal, anyelement, double precision[], int, real) RETURNS internal
+ AS '$libdir/datasketches', 'pg_aod_sketch_add_item'
+ LANGUAGE C IMMUTABLE;
+
+CREATE OR REPLACE FUNCTION aod_sketch_get_estimate(aod_sketch) RETURNS double precision
+ AS '$libdir/datasketches', 'pg_aod_sketch_get_estimate'
+ LANGUAGE C STRICT IMMUTABLE;
+
+CREATE OR REPLACE FUNCTION aod_sketch_get_estimate_and_bounds(aod_sketch) RETURNS double precision[]
+ AS '$libdir/datasketches', 'pg_aod_sketch_get_estimate_and_bounds'
+ LANGUAGE C STRICT IMMUTABLE;
+
+CREATE OR REPLACE FUNCTION aod_sketch_get_estimate_and_bounds(aod_sketch, int) RETURNS double precision[]
+ AS '$libdir/datasketches', 'pg_aod_sketch_get_estimate_and_bounds'
+ LANGUAGE C STRICT IMMUTABLE;
+
+CREATE OR REPLACE FUNCTION aod_sketch_from_internal(internal) RETURNS aod_sketch
+ AS '$libdir/datasketches', 'pg_aod_sketch_from_internal'
+ LANGUAGE C STRICT IMMUTABLE;
+
+CREATE OR REPLACE FUNCTION aod_sketch_to_string(aod_sketch) RETURNS TEXT
+ AS '$libdir/datasketches', 'pg_aod_sketch_to_string'
+ LANGUAGE C STRICT IMMUTABLE;
+
+CREATE OR REPLACE FUNCTION aod_sketch_to_string(aod_sketch, boolean) RETURNS TEXT
+ AS '$libdir/datasketches', 'pg_aod_sketch_to_string'
+ LANGUAGE C STRICT IMMUTABLE;
+
+CREATE OR REPLACE FUNCTION aod_sketch_union_agg(internal, aod_sketch) RETURNS internal
+ AS '$libdir/datasketches', 'pg_aod_sketch_union_agg'
+ LANGUAGE C IMMUTABLE;
+
+CREATE OR REPLACE FUNCTION aod_sketch_union_agg(internal, aod_sketch, int) RETURNS internal
+ AS '$libdir/datasketches', 'pg_aod_sketch_union_agg'
+ LANGUAGE C IMMUTABLE;
+
+CREATE OR REPLACE FUNCTION aod_sketch_union_agg(internal, aod_sketch, int, int) RETURNS internal
+ AS '$libdir/datasketches', 'pg_aod_sketch_union_agg'
+ LANGUAGE C IMMUTABLE;
+
+CREATE OR REPLACE FUNCTION aod_sketch_intersection_agg(internal, aod_sketch) RETURNS internal
+ AS '$libdir/datasketches', 'pg_aod_sketch_intersection_agg'
+ LANGUAGE C IMMUTABLE;
+
+CREATE OR REPLACE FUNCTION aod_sketch_intersection_agg(internal, aod_sketch, int) RETURNS internal
+ AS '$libdir/datasketches', 'pg_aod_sketch_intersection_agg'
+ LANGUAGE C IMMUTABLE;
+
+CREATE OR REPLACE FUNCTION aod_union_get_result(internal) RETURNS aod_sketch
+ AS '$libdir/datasketches', 'pg_aod_union_get_result'
+ LANGUAGE C STRICT IMMUTABLE;
+
+CREATE OR REPLACE FUNCTION aod_intersection_get_result(internal) RETURNS aod_sketch
+ AS '$libdir/datasketches', 'pg_aod_intersection_get_result'
+ LANGUAGE C STRICT IMMUTABLE;
+
+CREATE AGGREGATE aod_sketch_build(anyelement, double precision[]) (
+ sfunc = aod_sketch_add_item,
+ stype = internal,
+ finalfunc = aod_sketch_from_internal
+);
+
+CREATE AGGREGATE aod_sketch_build(anyelement, double precision[], int) (
+ sfunc = aod_sketch_add_item,
+ stype = internal,
+ finalfunc = aod_sketch_from_internal
+);
+
+CREATE AGGREGATE aod_sketch_build(anyelement, double precision[], int, real) (
+ sfunc = aod_sketch_add_item,
+ stype = internal,
+ finalfunc = aod_sketch_from_internal
+);
+
+CREATE AGGREGATE aod_sketch_union(aod_sketch) (
+ sfunc = aod_sketch_union_agg,
+ stype = internal,
+ finalfunc = aod_union_get_result
+);
+
+CREATE AGGREGATE aod_sketch_union(aod_sketch, int) (
+ sfunc = aod_sketch_union_agg,
+ stype = internal,
+ finalfunc = aod_union_get_result
+);
+
+CREATE AGGREGATE aod_sketch_union(aod_sketch, int, int) (
+ sfunc = aod_sketch_union_agg,
+ stype = internal,
+ finalfunc = aod_union_get_result
+);
+
+CREATE AGGREGATE aod_sketch_intersection(aod_sketch) (
+ sfunc = aod_sketch_intersection_agg,
+ stype = internal,
+ finalfunc = aod_intersection_get_result
+);
+
+CREATE AGGREGATE aod_sketch_intersection(aod_sketch, int) (
+ sfunc = aod_sketch_intersection_agg,
+ stype = internal,
+ finalfunc = aod_intersection_get_result
+);
+
+CREATE OR REPLACE FUNCTION aod_sketch_union(aod_sketch, aod_sketch) RETURNS aod_sketch
+ AS '$libdir/datasketches', 'pg_aod_sketch_union'
+ LANGUAGE C STRICT IMMUTABLE;
+
+CREATE OR REPLACE FUNCTION aod_sketch_union(aod_sketch, aod_sketch, int) RETURNS aod_sketch
+ AS '$libdir/datasketches', 'pg_aod_sketch_union'
+ LANGUAGE C STRICT IMMUTABLE;
+
+CREATE OR REPLACE FUNCTION aod_sketch_intersection(aod_sketch, aod_sketch) RETURNS aod_sketch
+ AS '$libdir/datasketches', 'pg_aod_sketch_intersection'
+ LANGUAGE C STRICT IMMUTABLE;
+
+CREATE OR REPLACE FUNCTION aod_sketch_intersection(aod_sketch, aod_sketch, int) RETURNS aod_sketch
+ AS '$libdir/datasketches', 'pg_aod_sketch_intersection'
+ LANGUAGE C STRICT IMMUTABLE;
+
+CREATE OR REPLACE FUNCTION aod_sketch_a_not_b(aod_sketch, aod_sketch) RETURNS aod_sketch
+ AS '$libdir/datasketches', 'pg_aod_sketch_a_not_b'
+ LANGUAGE C STRICT IMMUTABLE;
+
+CREATE OR REPLACE FUNCTION aod_sketch_a_not_b(aod_sketch, aod_sketch, int) RETURNS aod_sketch
+ AS '$libdir/datasketches', 'pg_aod_sketch_a_not_b'
+ LANGUAGE C STRICT IMMUTABLE;
diff --git a/src/allocator.h b/src/allocator.h
index 1a49f4b..06b810c 100644
--- a/src/allocator.h
+++ b/src/allocator.h
@@ -42,11 +42,9 @@
template <class U>
struct rebind { typedef palloc_allocator<U> other; };
- palloc_allocator() {}
- palloc_allocator(const palloc_allocator&) {}
+ palloc_allocator() = default;
template <class U>
palloc_allocator(const palloc_allocator<U>&) {}
- ~palloc_allocator() {}
pointer address(reference x) const { return &x; }
const_pointer address(const_reference x) const {
@@ -70,9 +68,6 @@
new(p) value_type(std::forward<Args>(args)...);
}
void destroy(pointer p) { p->~value_type(); }
-
-private:
- void operator=(const palloc_allocator&);
};
template<> class palloc_allocator<void> {
diff --git a/src/aod_sketch_c_adapter.cpp b/src/aod_sketch_c_adapter.cpp
new file mode 100644
index 0000000..ef8e9f6
--- /dev/null
+++ b/src/aod_sketch_c_adapter.cpp
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "aod_sketch_c_adapter.h"
+#include "allocator.h"
+#include "postgres_h_substitute.h"
+
+#include <array_of_doubles_sketch.hpp>
+#include <array_of_doubles_union.hpp>
+#include <array_of_doubles_intersection.hpp>
+#include <array_of_doubles_a_not_b.hpp>
+
+using vector_double = std::vector<double, palloc_allocator<double>>;
+
+using update_aod_sketch_pg = datasketches::update_array_of_doubles_sketch_alloc<palloc_allocator<double>>;
+using compact_aod_sketch_pg = datasketches::compact_array_of_doubles_sketch_alloc<palloc_allocator<double>>;
+using aod_union_pg = datasketches::array_of_doubles_union_alloc<palloc_allocator<double>>;
+// using the union policy in the intersection since this is how it is done in Druid
+using aod_intersection_pg = datasketches::array_of_doubles_intersection<datasketches::array_of_doubles_union_policy_alloc<palloc_allocator<double>>, palloc_allocator<double>>;
+using aod_a_not_b_pg = datasketches::array_of_doubles_a_not_b_alloc<palloc_allocator<double>>;
+
+std::ostream& operator<<(std::ostream& os, const vector_double& v) {
+ os << "(";
+ for (size_t i = 0; i < v.size(); ++i) {
+ if (i != 0) os << ", ";
+ os << v[i];
+ }
+ os << ")";
+ return os;
+}
+
+void* aod_sketch_new(unsigned num_values) {
+ try {
+ return new (palloc(sizeof(update_aod_sketch_pg))) update_aod_sketch_pg(update_aod_sketch_pg::builder(num_values).build());
+ } catch (std::exception& e) {
+ pg_error(e.what());
+ }
+ pg_unreachable();
+}
+
+void* aod_sketch_new_lgk(unsigned num_values, unsigned lg_k) {
+ try {
+ return new (palloc(sizeof(update_aod_sketch_pg))) update_aod_sketch_pg(update_aod_sketch_pg::builder(num_values).set_lg_k(lg_k).build());
+ } catch (std::exception& e) {
+ pg_error(e.what());
+ }
+ pg_unreachable();
+}
+
+void* aod_sketch_new_lgk_p(unsigned num_values, unsigned lg_k, float p) {
+ try {
+ return new (palloc(sizeof(update_aod_sketch_pg))) update_aod_sketch_pg(update_aod_sketch_pg::builder(num_values).set_lg_k(lg_k).set_p(p).build());
+ } catch (std::exception& e) {
+ pg_error(e.what());
+ }
+ pg_unreachable();
+}
+
+void update_aod_sketch_delete(void* sketchptr) {
+ try {
+ static_cast<update_aod_sketch_pg*>(sketchptr)->~update_aod_sketch_pg();
+ pfree(sketchptr);
+ } catch (std::exception& e) {
+ pg_error(e.what());
+ }
+}
+
+void compact_aod_sketch_delete(void* sketchptr) {
+ try {
+ static_cast<compact_aod_sketch_pg*>(sketchptr)->~compact_aod_sketch_pg();
+ pfree(sketchptr);
+ } catch (std::exception& e) {
+ pg_error(e.what());
+ }
+}
+
+void aod_sketch_update(void* sketchptr, const void* data, unsigned length, const double* values) {
+ try {
+ static_cast<update_aod_sketch_pg*>(sketchptr)->update(data, length, values);
+ } catch (std::exception& e) {
+ pg_error(e.what());
+ }
+}
+
+void* aod_sketch_compact(void* sketchptr) {
+ try {
+ auto newptr = new (palloc(sizeof(compact_aod_sketch_pg))) compact_aod_sketch_pg(static_cast<update_aod_sketch_pg*>(sketchptr)->compact());
+ static_cast<update_aod_sketch_pg*>(sketchptr)->~update_aod_sketch_pg();
+ pfree(sketchptr);
+ return newptr;
+ } catch (std::exception& e) {
+ pg_error(e.what());
+ }
+ pg_unreachable();
+}
+
+double update_aod_sketch_get_estimate(const void* sketchptr) {
+ try {
+ return static_cast<const update_aod_sketch_pg*>(sketchptr)->get_estimate();
+ } catch (std::exception& e) {
+ pg_error(e.what());
+ }
+ pg_unreachable();
+}
+
+double compact_aod_sketch_get_estimate(const void* sketchptr) {
+ try {
+ return static_cast<const compact_aod_sketch_pg*>(sketchptr)->get_estimate();
+ } catch (std::exception& e) {
+ pg_error(e.what());
+ }
+ pg_unreachable();
+}
+
+Datum* aod_sketch_get_estimate_and_bounds(const void* sketchptr, unsigned num_std_devs) {
+ try {
+ Datum* est_and_bounds = (Datum*) palloc(sizeof(Datum) * 3);
+ est_and_bounds[0] = pg_float8_get_datum(static_cast<const compact_aod_sketch_pg*>(sketchptr)->get_estimate());
+ est_and_bounds[1] = pg_float8_get_datum(static_cast<const compact_aod_sketch_pg*>(sketchptr)->get_lower_bound(num_std_devs));
+ est_and_bounds[2] = pg_float8_get_datum(static_cast<const compact_aod_sketch_pg*>(sketchptr)->get_upper_bound(num_std_devs));
+ return est_and_bounds;
+ } catch (std::exception& e) {
+ pg_error(e.what());
+ }
+ pg_unreachable();
+}
+
+char* aod_sketch_to_string(const void* sketchptr, bool print_entries) {
+ try {
+ auto str = static_cast<const compact_aod_sketch_pg*>(sketchptr)->to_string(print_entries);
+ const size_t len = str.length() + 1;
+ char* buffer = (char*) palloc(len);
+ strncpy(buffer, str.c_str(), len);
+ return buffer;
+ } catch (std::exception& e) {
+ pg_error(e.what());
+ }
+ pg_unreachable();
+}
+
+ptr_with_size aod_sketch_serialize(const void* sketchptr, unsigned header_size) {
+ try {
+ ptr_with_size p;
+ auto bytes = new (palloc(sizeof(compact_aod_sketch_pg::vector_bytes))) compact_aod_sketch_pg::vector_bytes(
+ static_cast<const compact_aod_sketch_pg*>(sketchptr)->serialize(header_size)
+ );
+ p.ptr = bytes->data();
+ p.size = bytes->size();
+ return p;
+ } catch (std::exception& e) {
+ pg_error(e.what());
+ }
+ pg_unreachable();
+}
+
+void* aod_sketch_deserialize(const char* buffer, unsigned length) {
+ try {
+ return new (palloc(sizeof(compact_aod_sketch_pg))) compact_aod_sketch_pg(compact_aod_sketch_pg::deserialize(buffer, length));
+ } catch (std::exception& e) {
+ pg_error(e.what());
+ }
+ pg_unreachable();
+}
+
+void* aod_union_new(unsigned num_values) {
+ try {
+ return new (palloc(sizeof(aod_union_pg))) aod_union_pg(aod_union_pg::builder(num_values).build());
+ } catch (std::exception& e) {
+ pg_error(e.what());
+ }
+ pg_unreachable();
+}
+
+void* aod_union_new_lgk(unsigned num_values, unsigned lg_k) {
+ try {
+ return new (palloc(sizeof(aod_union_pg))) aod_union_pg(aod_union_pg::builder(num_values).set_lg_k(lg_k).build());
+ } catch (std::exception& e) {
+ pg_error(e.what());
+ }
+ pg_unreachable();
+}
+
+void aod_union_delete(void* unionptr) {
+ try {
+ static_cast<aod_union_pg*>(unionptr)->~aod_union_pg();
+ pfree(unionptr);
+ } catch (std::exception& e) {
+ pg_error(e.what());
+ }
+}
+
+void aod_union_update(void* unionptr, const void* sketchptr) {
+ try {
+ static_cast<aod_union_pg*>(unionptr)->update(std::move(*static_cast<const compact_aod_sketch_pg*>(sketchptr)));
+ } catch (std::exception& e) {
+ pg_error(e.what());
+ }
+}
+
+void* aod_union_get_result(const void* unionptr) {
+ try {
+ return new (palloc(sizeof(compact_aod_sketch_pg))) compact_aod_sketch_pg(static_cast<const aod_union_pg*>(unionptr)->get_result());
+ } catch (std::exception& e) {
+ pg_error(e.what());
+ }
+ pg_unreachable();
+}
+
+void* aod_intersection_new(unsigned num_values) {
+ try {
+ return new (palloc(sizeof(aod_intersection_pg))) aod_intersection_pg(datasketches::DEFAULT_SEED, num_values);
+ } catch (std::exception& e) {
+ pg_error(e.what());
+ }
+ pg_unreachable();
+}
+
+void aod_intersection_delete(void* interptr) {
+ try {
+ static_cast<aod_intersection_pg*>(interptr)->~aod_intersection_pg();
+ pfree(interptr);
+ } catch (std::exception& e) {
+ pg_error(e.what());
+ }
+}
+
+void aod_intersection_update(void* interptr, const void* sketchptr) {
+ try {
+ static_cast<aod_intersection_pg*>(interptr)->update(*static_cast<const compact_aod_sketch_pg*>(sketchptr));
+ } catch (std::exception& e) {
+ pg_error(e.what());
+ }
+}
+
+void* aod_intersection_get_result(const void* interptr) {
+ try {
+ return new (palloc(sizeof(compact_aod_sketch_pg))) compact_aod_sketch_pg(static_cast<const aod_intersection_pg*>(interptr)->get_result());
+ } catch (std::exception& e) {
+ pg_error(e.what());
+ }
+ pg_unreachable();
+}
+
+void* aod_a_not_b(const void* sketchptr1, const void* sketchptr2) {
+ try {
+ aod_a_not_b_pg a_not_b;
+ return new (palloc(sizeof(compact_aod_sketch_pg))) compact_aod_sketch_pg(a_not_b.compute(
+ *static_cast<const compact_aod_sketch_pg*>(sketchptr1),
+ *static_cast<const compact_aod_sketch_pg*>(sketchptr2)
+ ));
+ } catch (std::exception& e) {
+ pg_error(e.what());
+ }
+ pg_unreachable();
+}
diff --git a/src/aod_sketch_c_adapter.h b/src/aod_sketch_c_adapter.h
new file mode 100644
index 0000000..d3ed775
--- /dev/null
+++ b/src/aod_sketch_c_adapter.h
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef AOD_SKETCH_C_ADAPTER_H
+#define AOD_SKETCH_C_ADAPTER_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+void* aod_sketch_new(unsigned num_values);
+void* aod_sketch_new_lgk(unsigned num_values, unsigned lg_k);
+void* aod_sketch_new_lgk_p(unsigned num_values, unsigned lg_k, float p);
+void update_aod_sketch_delete(void* sketchptr);
+void compact_aod_sketch_delete(void* sketchptr);
+
+void aod_sketch_update(void* sketchptr, const void* data, unsigned length, const double* values);
+void* aod_sketch_compact(void* sketchptr);
+void aod_sketch_union(void* sketchptr1, const void* sketchptr2);
+double update_aod_sketch_get_estimate(const void* sketchptr);
+double compact_aod_sketch_get_estimate(const void* sketchptr);
+void** aod_sketch_get_estimate_and_bounds(const void* sketchptr, unsigned num_std_devs);
+char* aod_sketch_to_string(const void* sketchptr, bool print_entries);
+
+struct ptr_with_size {
+ void* ptr;
+ unsigned long long size;
+};
+
+struct ptr_with_size aod_sketch_serialize(const void* sketchptr, unsigned header_size);
+void* aod_sketch_deserialize(const char* buffer, unsigned length);
+
+void* aod_union_new(unsigned num_values);
+void* aod_union_new_lgk(unsigned num_values, unsigned lg_k);
+void aod_union_delete(void* unionptr);
+void aod_union_update(void* unionptr, const void* sketchptr);
+void* aod_union_get_result(const void* unionptr);
+
+void* aod_intersection_new(unsigned num_values);
+void aod_intersection_delete(void* interptr);
+void aod_intersection_update(void* interptr, const void* sketchptr);
+void* aod_intersection_get_result(const void* interptr);
+
+void* aod_a_not_b(const void* sketchptr1, const void* sketchptr2);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
diff --git a/src/aod_sketch_pg_functions.c b/src/aod_sketch_pg_functions.c
new file mode 100644
index 0000000..04feec7
--- /dev/null
+++ b/src/aod_sketch_pg_functions.c
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <postgres.h>
+#include <fmgr.h>
+#include <utils/lsyscache.h>
+#include <utils/builtins.h>
+#include <utils/array.h>
+#include <catalog/pg_type.h>
+
+#include "aod_sketch_c_adapter.h"
+#include "base64.h"
+
+/* PG_FUNCTION_INFO_V1 macro to pass functions to postgres */
+PG_FUNCTION_INFO_V1(pg_aod_sketch_add_item);
+PG_FUNCTION_INFO_V1(pg_aod_sketch_get_estimate);
+PG_FUNCTION_INFO_V1(pg_aod_sketch_get_estimate_and_bounds);
+PG_FUNCTION_INFO_V1(pg_aod_sketch_to_string);
+PG_FUNCTION_INFO_V1(pg_aod_sketch_union_agg);
+PG_FUNCTION_INFO_V1(pg_aod_sketch_intersection_agg);
+PG_FUNCTION_INFO_V1(pg_aod_sketch_from_internal);
+PG_FUNCTION_INFO_V1(pg_aod_sketch_get_estimate_from_internal);
+PG_FUNCTION_INFO_V1(pg_aod_union_get_result);
+PG_FUNCTION_INFO_V1(pg_aod_intersection_get_result);
+PG_FUNCTION_INFO_V1(pg_aod_sketch_union);
+PG_FUNCTION_INFO_V1(pg_aod_sketch_intersection);
+PG_FUNCTION_INFO_V1(pg_aod_sketch_a_not_b);
+
+/* function declarations */
+Datum pg_aod_sketch_recv(PG_FUNCTION_ARGS);
+Datum pg_aod_sketch_send(PG_FUNCTION_ARGS);
+Datum pg_aod_sketch_add_item(PG_FUNCTION_ARGS);
+Datum pg_aod_sketch_get_estimate(PG_FUNCTION_ARGS);
+Datum pg_aod_sketch_get_estimate_and_bounds(PG_FUNCTION_ARGS);
+Datum pg_aod_sketch_to_string(PG_FUNCTION_ARGS);
+Datum pg_aod_sketch_union_agg(PG_FUNCTION_ARGS);
+Datum pg_aod_sketch_intersection_agg(PG_FUNCTION_ARGS);
+Datum pg_aod_sketch_from_internal(PG_FUNCTION_ARGS);
+Datum pg_aod_sketch_get_estimate_from_internal(PG_FUNCTION_ARGS);
+Datum pg_aod_union_get_result(PG_FUNCTION_ARGS);
+Datum pg_aod_intersection_get_result(PG_FUNCTION_ARGS);
+Datum pg_aod_sketch_union(PG_FUNCTION_ARGS);
+Datum pg_aod_sketch_intersection(PG_FUNCTION_ARGS);
+Datum pg_aod_sketch_a_not_b(PG_FUNCTION_ARGS);
+
+Datum pg_aod_sketch_add_item(PG_FUNCTION_ARGS) {
+ void* sketchptr;
+ int lg_k;
+ float p;
+
+ // anyelement
+ Oid element_type;
+ Datum element;
+ int16 typlen;
+ bool typbyval;
+ char typalign;
+
+ // input array of doubles
+ ArrayType* arr_in;
+ Oid elmtype_in;
+ int16 elmlen_in;
+ bool elmbyval_in;
+ char elmalign_in;
+ Datum* data_in;
+ bool* nulls_in;
+ int arr_len;
+ double* values;
+ int i;
+
+ MemoryContext oldcontext;
+ MemoryContext aggcontext;
+
+ if (PG_ARGISNULL(0) && PG_ARGISNULL(1)) {
+ PG_RETURN_NULL();
+ } else if (PG_ARGISNULL(1)) {
+ PG_RETURN_POINTER(PG_GETARG_POINTER(0)); // no update value. return unmodified state
+ }
+
+ if (!AggCheckCallContext(fcinfo, &aggcontext)) {
+ elog(ERROR, "aod_sketch_add_item called in non-aggregate context");
+ }
+ oldcontext = MemoryContextSwitchTo(aggcontext);
+
+ // look at the array of values first to know the array length in case we need to create a new sketch
+ arr_in = PG_GETARG_ARRAYTYPE_P(2);
+ elmtype_in = ARR_ELEMTYPE(arr_in);
+ get_typlenbyvalalign(elmtype_in, &elmlen_in, &elmbyval_in, &elmalign_in);
+ deconstruct_array(arr_in, elmtype_in, elmlen_in, elmbyval_in, elmalign_in, &data_in, &nulls_in, &arr_len);
+
+ values = palloc(sizeof(double) * arr_len);
+ for (i = 0; i < arr_len; i++) {
+ values[i] = DatumGetFloat8(data_in[i]);
+ }
+
+ if (PG_ARGISNULL(0)) {
+ lg_k = PG_GETARG_INT32(3);
+ p = PG_GETARG_FLOAT4(4);
+ if (lg_k) {
+ sketchptr = p ? aod_sketch_new_lgk_p(arr_len, lg_k, p) : aod_sketch_new_lgk(arr_len, lg_k);
+ } else {
+ sketchptr = aod_sketch_new(arr_len);
+ }
+ } else {
+ sketchptr = PG_GETARG_POINTER(0);
+ }
+
+ element_type = get_fn_expr_argtype(fcinfo->flinfo, 1);
+ element = PG_GETARG_DATUM(1);
+ get_typlenbyvalalign(element_type, &typlen, &typbyval, &typalign);
+ if (typlen == -1) {
+ // varlena
+ aod_sketch_update(sketchptr, VARDATA_ANY(element), VARSIZE_ANY_EXHDR(element), values);
+ } else if (typbyval) {
+ // fixed-length passed by value
+ aod_sketch_update(sketchptr, &element, typlen, values);
+ } else {
+ // fixed-length passed by reference
+ aod_sketch_update(sketchptr, (void*)element, typlen, values);
+ }
+ pfree(values);
+ MemoryContextSwitchTo(oldcontext);
+
+ PG_RETURN_POINTER(sketchptr);
+}
+
+Datum pg_aod_sketch_get_estimate(PG_FUNCTION_ARGS) {
+ const bytea* bytes_in;
+ void* sketchptr;
+ double estimate;
+ bytes_in = PG_GETARG_BYTEA_P(0);
+ sketchptr = aod_sketch_deserialize(VARDATA(bytes_in), VARSIZE(bytes_in) - VARHDRSZ);
+ estimate = compact_aod_sketch_get_estimate(sketchptr);
+ compact_aod_sketch_delete(sketchptr);
+ PG_RETURN_FLOAT8(estimate);
+}
+
+Datum pg_aod_sketch_get_estimate_and_bounds(PG_FUNCTION_ARGS) {
+ const bytea* bytes_in;
+ void* sketchptr;
+ int num_std_devs;
+
+ // output array
+ Datum* est_and_bounds;
+ ArrayType* arr_out;
+ int16 elmlen_out;
+ bool elmbyval_out;
+ char elmalign_out;
+
+ bytes_in = PG_GETARG_BYTEA_P(0);
+ sketchptr = aod_sketch_deserialize(VARDATA(bytes_in), VARSIZE(bytes_in) - VARHDRSZ);
+ num_std_devs = PG_GETARG_INT32(1);
+ if (num_std_devs == 0) num_std_devs = 1; // default
+ est_and_bounds = (Datum*) aod_sketch_get_estimate_and_bounds(sketchptr, num_std_devs);
+ compact_aod_sketch_delete(sketchptr);
+
+ // construct output array
+ get_typlenbyvalalign(FLOAT8OID, &elmlen_out, &elmbyval_out, &elmalign_out);
+ arr_out = construct_array(est_and_bounds, 3, FLOAT8OID, elmlen_out, elmbyval_out, elmalign_out);
+ PG_RETURN_ARRAYTYPE_P(arr_out);
+}
+
+Datum pg_aod_sketch_to_string(PG_FUNCTION_ARGS) {
+ const bytea* bytes_in;
+ void* sketchptr;
+ bool print_entries;
+ char* str;
+ bytes_in = PG_GETARG_BYTEA_P(0);
+ if (PG_NARGS() > 1) {
+ print_entries = PG_GETARG_BOOL(1);
+ } else {
+ print_entries = false;
+ }
+ sketchptr = aod_sketch_deserialize(VARDATA(bytes_in), VARSIZE(bytes_in) - VARHDRSZ);
+ str = aod_sketch_to_string(sketchptr, print_entries);
+ compact_aod_sketch_delete(sketchptr);
+ PG_RETURN_TEXT_P(cstring_to_text(str));
+}
+
+Datum pg_aod_sketch_intersection_agg(PG_FUNCTION_ARGS) {
+ void* interptr;
+ bytea* sketch_bytes;
+ void* sketchptr;
+ int num_values;
+
+ MemoryContext oldcontext;
+ MemoryContext aggcontext;
+
+ if (PG_ARGISNULL(0) && PG_ARGISNULL(1)) {
+ PG_RETURN_NULL();
+ } else if (PG_ARGISNULL(1)) {
+ PG_RETURN_POINTER(PG_GETARG_POINTER(0)); // no update value. return unmodified state
+ }
+
+ if (!AggCheckCallContext(fcinfo, &aggcontext)) {
+ elog(ERROR, "aod_sketch_intersect called in non-aggregate context");
+ }
+ oldcontext = MemoryContextSwitchTo(aggcontext);
+
+ if (PG_ARGISNULL(0)) {
+ num_values = PG_GETARG_INT32(2);
+ if (num_values == 0) num_values = 1;
+ interptr = aod_intersection_new(num_values);
+ } else {
+ interptr = PG_GETARG_POINTER(0);
+ }
+
+ sketch_bytes = PG_GETARG_BYTEA_P(1);
+ sketchptr = aod_sketch_deserialize(VARDATA(sketch_bytes), VARSIZE(sketch_bytes) - VARHDRSZ);
+ aod_intersection_update(interptr, sketchptr);
+ compact_aod_sketch_delete(sketchptr);
+
+ MemoryContextSwitchTo(oldcontext);
+
+ PG_RETURN_POINTER(interptr);
+}
+
+Datum pg_aod_sketch_union_agg(PG_FUNCTION_ARGS) {
+ void* unionptr;
+ bytea* sketch_bytes;
+ void* sketchptr;
+ int num_values;
+ int lg_k;
+
+ MemoryContext oldcontext;
+ MemoryContext aggcontext;
+
+ if (PG_ARGISNULL(0) && PG_ARGISNULL(1)) {
+ PG_RETURN_NULL();
+ } else if (PG_ARGISNULL(1)) {
+ PG_RETURN_POINTER(PG_GETARG_POINTER(0)); // no update value. return unmodified state
+ }
+
+ if (!AggCheckCallContext(fcinfo, &aggcontext)) {
+ elog(ERROR, "aod_sketch_merge called in non-aggregate context");
+ }
+ oldcontext = MemoryContextSwitchTo(aggcontext);
+
+ if (PG_ARGISNULL(0)) {
+ num_values = PG_GETARG_INT32(2);
+ if (num_values == 0) num_values = 1;
+ lg_k = PG_GETARG_INT32(3);
+ unionptr = lg_k ? aod_union_new_lgk(num_values, lg_k) : aod_union_new(num_values);
+ } else {
+ unionptr = PG_GETARG_POINTER(0);
+ }
+
+ sketch_bytes = PG_GETARG_BYTEA_P(1);
+ sketchptr = aod_sketch_deserialize(VARDATA(sketch_bytes), VARSIZE(sketch_bytes) - VARHDRSZ);
+ aod_union_update(unionptr, sketchptr);
+ compact_aod_sketch_delete(sketchptr);
+
+ MemoryContextSwitchTo(oldcontext);
+
+ PG_RETURN_POINTER(unionptr);
+}
+
+Datum pg_aod_sketch_from_internal(PG_FUNCTION_ARGS) {
+ void* sketchptr;
+ struct ptr_with_size bytes_out;
+
+ MemoryContext oldcontext;
+ MemoryContext aggcontext;
+
+ if (PG_ARGISNULL(0)) PG_RETURN_NULL();
+
+ if (!AggCheckCallContext(fcinfo, &aggcontext)) {
+ elog(ERROR, "aod_sketch_from_internal called in non-aggregate context");
+ }
+ oldcontext = MemoryContextSwitchTo(aggcontext);
+
+ sketchptr = PG_GETARG_POINTER(0);
+ sketchptr = aod_sketch_compact(sketchptr);
+ bytes_out = aod_sketch_serialize(sketchptr, VARHDRSZ);
+ compact_aod_sketch_delete(sketchptr);
+ SET_VARSIZE(bytes_out.ptr, bytes_out.size);
+
+ MemoryContextSwitchTo(oldcontext);
+
+ PG_RETURN_BYTEA_P(bytes_out.ptr);
+}
+
+Datum pg_aod_sketch_get_estimate_from_internal(PG_FUNCTION_ARGS) {
+ void* sketchptr;
+ double estimate;
+
+ MemoryContext oldcontext;
+ MemoryContext aggcontext;
+
+ if (PG_ARGISNULL(0)) PG_RETURN_NULL();
+
+ if (!AggCheckCallContext(fcinfo, &aggcontext)) {
+ elog(ERROR, "aod_sketch_from_internal called in non-aggregate context");
+ }
+ oldcontext = MemoryContextSwitchTo(aggcontext);
+
+ sketchptr = PG_GETARG_POINTER(0);
+ estimate = update_aod_sketch_get_estimate(sketchptr);
+ update_aod_sketch_delete(sketchptr);
+
+ MemoryContextSwitchTo(oldcontext);
+
+ PG_RETURN_FLOAT8(estimate);
+}
+
+Datum pg_aod_union_get_result(PG_FUNCTION_ARGS) {
+ void* unionptr;
+ void* sketchptr;
+ struct ptr_with_size bytes_out;
+
+ MemoryContext oldcontext;
+ MemoryContext aggcontext;
+
+ if (PG_ARGISNULL(0)) PG_RETURN_NULL();
+
+ if (!AggCheckCallContext(fcinfo, &aggcontext)) {
+ elog(ERROR, "aod_union_get_result called in non-aggregate context");
+ }
+ oldcontext = MemoryContextSwitchTo(aggcontext);
+
+ unionptr = PG_GETARG_POINTER(0);
+ sketchptr = aod_union_get_result(unionptr);
+ bytes_out = aod_sketch_serialize(sketchptr, VARHDRSZ);
+ compact_aod_sketch_delete(sketchptr);
+ aod_union_delete(unionptr);
+ SET_VARSIZE(bytes_out.ptr, bytes_out.size);
+
+ MemoryContextSwitchTo(oldcontext);
+
+ PG_RETURN_BYTEA_P(bytes_out.ptr);
+}
+
+Datum pg_aod_intersection_get_result(PG_FUNCTION_ARGS) {
+ void* interptr;
+ void* sketchptr;
+ struct ptr_with_size bytes_out;
+
+ MemoryContext oldcontext;
+ MemoryContext aggcontext;
+
+ if (PG_ARGISNULL(0)) PG_RETURN_NULL();
+
+ if (!AggCheckCallContext(fcinfo, &aggcontext)) {
+ elog(ERROR, "aod_intersection_get_result called in non-aggregate context");
+ }
+ oldcontext = MemoryContextSwitchTo(aggcontext);
+
+ interptr = PG_GETARG_POINTER(0);
+ sketchptr = aod_intersection_get_result(interptr);
+ bytes_out = aod_sketch_serialize(sketchptr, VARHDRSZ);
+ compact_aod_sketch_delete(sketchptr);
+ aod_intersection_delete(interptr);
+ SET_VARSIZE(bytes_out.ptr, bytes_out.size);
+
+ MemoryContextSwitchTo(oldcontext);
+
+ PG_RETURN_BYTEA_P(bytes_out.ptr);
+}
+
+Datum pg_aod_sketch_union(PG_FUNCTION_ARGS) {
+ const bytea* bytes_in1;
+ const bytea* bytes_in2;
+ void* sketchptr1;
+ void* sketchptr2;
+ void* unionptr;
+ void* sketchptr;
+ struct ptr_with_size bytes_out;
+ int num_values;
+ int lg_k;
+
+ num_values = PG_GETARG_INT32(2);
+ if (num_values == 0) num_values = 1;
+ lg_k = PG_GETARG_INT32(3);
+ unionptr = lg_k ? aod_union_new_lgk(num_values, lg_k) : aod_union_new(num_values);
+ if (!PG_ARGISNULL(0)) {
+ bytes_in1 = PG_GETARG_BYTEA_P(0);
+ sketchptr1 = aod_sketch_deserialize(VARDATA(bytes_in1), VARSIZE(bytes_in1) - VARHDRSZ);
+ aod_union_update(unionptr, sketchptr1);
+ compact_aod_sketch_delete(sketchptr1);
+ }
+ if (!PG_ARGISNULL(1)) {
+ bytes_in2 = PG_GETARG_BYTEA_P(1);
+ sketchptr2 = aod_sketch_deserialize(VARDATA(bytes_in2), VARSIZE(bytes_in2) - VARHDRSZ);
+ aod_union_update(unionptr, sketchptr2);
+ compact_aod_sketch_delete(sketchptr2);
+ }
+ sketchptr = aod_union_get_result(unionptr);
+ aod_union_delete(unionptr);
+ bytes_out = aod_sketch_serialize(sketchptr, VARHDRSZ);
+ compact_aod_sketch_delete(sketchptr);
+ SET_VARSIZE(bytes_out.ptr, bytes_out.size);
+ PG_RETURN_BYTEA_P(bytes_out.ptr);
+}
+
+Datum pg_aod_sketch_intersection(PG_FUNCTION_ARGS) {
+ const bytea* bytes_in1;
+ const bytea* bytes_in2;
+ void* sketchptr1;
+ void* sketchptr2;
+ void* interptr;
+ void* sketchptr;
+ struct ptr_with_size bytes_out;
+ int num_values;
+
+ num_values = PG_GETARG_INT32(2);
+ if (num_values == 0) num_values = 1;
+ interptr = aod_intersection_new(num_values);
+ if (!PG_ARGISNULL(0)) {
+ bytes_in1 = PG_GETARG_BYTEA_P(0);
+ sketchptr1 = aod_sketch_deserialize(VARDATA(bytes_in1), VARSIZE(bytes_in1) - VARHDRSZ);
+ aod_intersection_update(interptr, sketchptr1);
+ compact_aod_sketch_delete(sketchptr1);
+ }
+ if (!PG_ARGISNULL(1)) {
+ bytes_in2 = PG_GETARG_BYTEA_P(1);
+ sketchptr2 = aod_sketch_deserialize(VARDATA(bytes_in2), VARSIZE(bytes_in2) - VARHDRSZ);
+ aod_intersection_update(interptr, sketchptr2);
+ compact_aod_sketch_delete(sketchptr2);
+ }
+ sketchptr = aod_intersection_get_result(interptr);
+ aod_intersection_delete(interptr);
+ bytes_out = aod_sketch_serialize(sketchptr, VARHDRSZ);
+ compact_aod_sketch_delete(sketchptr);
+ SET_VARSIZE(bytes_out.ptr, bytes_out.size);
+ PG_RETURN_BYTEA_P(bytes_out.ptr);
+}
+
+Datum pg_aod_sketch_a_not_b(PG_FUNCTION_ARGS) {
+ const bytea* bytes_in1;
+ const bytea* bytes_in2;
+ void* sketchptr1;
+ void* sketchptr2;
+ void* sketchptr;
+ struct ptr_with_size bytes_out;
+
+ if (PG_ARGISNULL(0) || PG_ARGISNULL(1)) {
+ elog(ERROR, "aod_a_not_b expects two valid aod sketches");
+ }
+
+ bytes_in1 = PG_GETARG_BYTEA_P(0);
+ sketchptr1 = aod_sketch_deserialize(VARDATA(bytes_in1), VARSIZE(bytes_in1) - VARHDRSZ);
+ bytes_in2 = PG_GETARG_BYTEA_P(1);
+ sketchptr2 = aod_sketch_deserialize(VARDATA(bytes_in2), VARSIZE(bytes_in2) - VARHDRSZ);
+ sketchptr = aod_a_not_b(sketchptr1, sketchptr2);
+ compact_aod_sketch_delete(sketchptr1);
+ compact_aod_sketch_delete(sketchptr2);
+ bytes_out = aod_sketch_serialize(sketchptr, VARHDRSZ);
+ compact_aod_sketch_delete(sketchptr);
+ SET_VARSIZE(bytes_out.ptr, bytes_out.size);
+ PG_RETURN_BYTEA_P(bytes_out.ptr);
+}