Merge pull request #16 from apache/cpc_sketch_header_only
use header-only version of CPC sketch
diff --git a/Makefile b/Makefile
index 2b6e81b..61ddebf 100644
--- a/Makefile
+++ b/Makefile
@@ -35,9 +35,6 @@
# assume a copy or link datasketches-cpp in the current dir
CORE = datasketches-cpp
-CPC = $(CORE)/cpc/src
-OBJS += $(CPC)/cpc_sketch.o $(CPC)/fm85.o $(CPC)/fm85Compression.o $(CPC)/fm85Confidence.o $(CPC)/fm85Merging.o $(CPC)/fm85Util.o $(CPC)/iconEstimator.o $(CPC)/u32Table.o
-
PG_CPPFLAGS = -fPIC -I/usr/local/include -I$(CORE)/kll/include -I$(CORE)/common/include -I$(CORE)/cpc/include -I$(CORE)/theta/include -I$(CORE)/fi/include -I$(CORE)/hll/include
PG_CXXFLAGS = -std=c++11
SHLIB_LINK = -lstdc++ -L/usr/local/lib
diff --git a/src/allocator.h b/src/allocator.h
index c66a0a9..1a49f4b 100644
--- a/src/allocator.h
+++ b/src/allocator.h
@@ -50,7 +50,7 @@
pointer address(reference x) const { return &x; }
const_pointer address(const_reference x) const {
- return x;
+ return &x;
}
pointer allocate(size_type n, const_pointer = 0) {
diff --git a/src/cpc_sketch_c_adapter.cpp b/src/cpc_sketch_c_adapter.cpp
index 35de712..3d18828 100644
--- a/src/cpc_sketch_c_adapter.cpp
+++ b/src/cpc_sketch_c_adapter.cpp
@@ -18,126 +18,135 @@
*/
#include "cpc_sketch_c_adapter.h"
+#include "allocator.h"
+#include "postgres_h_substitute.h"
#include <sstream>
#include <cpc_sketch.hpp>
#include <cpc_union.hpp>
+typedef datasketches::cpc_sketch_alloc<palloc_allocator<char>> cpc_sketch_pg;
+typedef datasketches::cpc_union_alloc<palloc_allocator<char>> cpc_union_pg;
+
void cpc_init() {
- datasketches::cpc_init(&palloc, &pfree);
+ datasketches::cpc_init<palloc_allocator<char>>();
}
void cpc_cleanup() {
- datasketches::cpc_cleanup();
}
void* cpc_sketch_new(unsigned lg_k) {
try {
- return new (palloc(sizeof(datasketches::cpc_sketch))) datasketches::cpc_sketch(lg_k, datasketches::DEFAULT_SEED);
+ return new (palloc(sizeof(cpc_sketch_pg))) cpc_sketch_pg(lg_k, datasketches::DEFAULT_SEED);
} catch (std::exception& e) {
- elog(ERROR, "%s", e.what());
+ pg_error(e.what());
}
+ pg_unreachable();
}
void cpc_sketch_delete(void* sketchptr) {
try {
- static_cast<datasketches::cpc_sketch*>(sketchptr)->~cpc_sketch();
+ static_cast<cpc_sketch_pg*>(sketchptr)->~cpc_sketch_pg();
pfree(sketchptr);
} catch (std::exception& e) {
- elog(ERROR, "%s", e.what());
+ pg_error(e.what());
}
}
void cpc_sketch_update(void* sketchptr, const void* data, unsigned length) {
try {
- static_cast<datasketches::cpc_sketch*>(sketchptr)->update(data, length);
+ static_cast<cpc_sketch_pg*>(sketchptr)->update(data, length);
} catch (std::exception& e) {
- elog(ERROR, "%s", e.what());
+ pg_error(e.what());
}
}
double cpc_sketch_get_estimate(const void* sketchptr) {
try {
- return static_cast<const datasketches::cpc_sketch*>(sketchptr)->get_estimate();
+ return static_cast<const cpc_sketch_pg*>(sketchptr)->get_estimate();
} catch (std::exception& e) {
- elog(ERROR, "%s", e.what());
+ pg_error(e.what());
}
+ pg_unreachable();
}
Datum* cpc_sketch_get_estimate_and_bounds(const void* sketchptr, unsigned num_std_devs) {
try {
Datum* est_and_bounds = (Datum*) palloc(sizeof(Datum) * 3);
- est_and_bounds[0] = Float8GetDatum(static_cast<const datasketches::cpc_sketch*>(sketchptr)->get_estimate());
- est_and_bounds[1] = Float8GetDatum(static_cast<const datasketches::cpc_sketch*>(sketchptr)->get_lower_bound(num_std_devs));
- est_and_bounds[2] = Float8GetDatum(static_cast<const datasketches::cpc_sketch*>(sketchptr)->get_upper_bound(num_std_devs));
+ est_and_bounds[0] = pg_float8_get_datum(static_cast<const cpc_sketch_pg*>(sketchptr)->get_estimate());
+ est_and_bounds[1] = pg_float8_get_datum(static_cast<const cpc_sketch_pg*>(sketchptr)->get_lower_bound(num_std_devs));
+ est_and_bounds[2] = pg_float8_get_datum(static_cast<const cpc_sketch_pg*>(sketchptr)->get_upper_bound(num_std_devs));
return est_and_bounds;
} catch (std::exception& e) {
- elog(ERROR, "%s", e.what());
+ pg_error(e.what());
}
+ pg_unreachable();
}
void cpc_sketch_to_string(const void* sketchptr, char* buffer, unsigned length) {
try {
std::stringstream s;
- s << *(static_cast<const datasketches::cpc_sketch*>(sketchptr));
+ static_cast<const cpc_sketch_pg*>(sketchptr)->to_stream(s);;
snprintf(buffer, length, "%s", s.str().c_str());
} catch (std::exception& e) {
- elog(ERROR, "%s", e.what());
+ pg_error(e.what());
}
}
-void* cpc_sketch_serialize(const void* sketchptr) {
+struct ptr_with_size cpc_sketch_serialize(const void* sketchptr, unsigned header_size) {
try {
- auto data = static_cast<const datasketches::cpc_sketch*>(sketchptr)->serialize(VARHDRSZ);
- bytea* buffer = (bytea*) data.first.release();
- const size_t length = data.second;
- SET_VARSIZE(buffer, length);
- return buffer;
+ ptr_with_size p;
+ auto data = static_cast<const cpc_sketch_pg*>(sketchptr)->serialize(header_size);
+ p.ptr = data.first.release();
+ p.size = data.second;
+ return p;
} catch (std::exception& e) {
- elog(ERROR, "%s", e.what());
+ pg_error(e.what());
}
+ pg_unreachable();
}
void* cpc_sketch_deserialize(const char* buffer, unsigned length) {
try {
- auto ptr = datasketches::cpc_sketch::deserialize(buffer, length, datasketches::DEFAULT_SEED);
- return ptr.release();
+ return new (palloc(sizeof(cpc_sketch_pg))) cpc_sketch_pg(cpc_sketch_pg::deserialize(buffer, length, datasketches::DEFAULT_SEED));
} catch (std::exception& e) {
- elog(ERROR, "%s", e.what());
+ pg_error(e.what());
}
+ pg_unreachable();
}
void* cpc_union_new(unsigned lg_k) {
try {
- return new (palloc(sizeof(datasketches::cpc_union))) datasketches::cpc_union(lg_k, datasketches::DEFAULT_SEED);
+ return new (palloc(sizeof(cpc_union_pg))) cpc_union_pg(lg_k, datasketches::DEFAULT_SEED);
} catch (std::exception& e) {
- elog(ERROR, "%s", e.what());
+ pg_error(e.what());
}
+ pg_unreachable();
}
void cpc_union_delete(void* unionptr) {
try {
- static_cast<datasketches::cpc_union*>(unionptr)->~cpc_union();
+ static_cast<cpc_union_pg*>(unionptr)->~cpc_union_pg();
pfree(unionptr);
} catch (std::exception& e) {
- elog(ERROR, "%s", e.what());
+ pg_error(e.what());
}
}
void cpc_union_update(void* unionptr, const void* sketchptr) {
try {
- static_cast<datasketches::cpc_union*>(unionptr)->update(*static_cast<const datasketches::cpc_sketch*>(sketchptr));
+ static_cast<cpc_union_pg*>(unionptr)->update(*static_cast<const cpc_sketch_pg*>(sketchptr));
} catch (std::exception& e) {
- elog(ERROR, "%s", e.what());
+ pg_error(e.what());
}
}
void* cpc_union_get_result(void* unionptr) {
try {
- auto ptr = static_cast<datasketches::cpc_union*>(unionptr)->get_result();
- return ptr.release();
+ return new (palloc(sizeof(cpc_sketch_pg))) cpc_sketch_pg(static_cast<cpc_union_pg*>(unionptr)->get_result());
} catch (std::exception& e) {
- elog(ERROR, "%s", e.what());
+ pg_error(e.what());
}
+ pg_unreachable();
}
diff --git a/src/cpc_sketch_c_adapter.h b/src/cpc_sketch_c_adapter.h
index 91683ef..338f391 100644
--- a/src/cpc_sketch_c_adapter.h
+++ b/src/cpc_sketch_c_adapter.h
@@ -24,8 +24,6 @@
extern "C" {
#endif
-#include <postgres.h>
-
void cpc_init();
void cpc_cleanup();
@@ -35,10 +33,15 @@
void cpc_sketch_update(void* sketchptr, const void* data, unsigned length);
void cpc_sketch_merge(void* sketchptr1, const void* sketchptr2);
double cpc_sketch_get_estimate(const void* sketchptr);
-Datum* cpc_sketch_get_estimate_and_bounds(const void* sketchptr, unsigned num_std_devs);
+void** cpc_sketch_get_estimate_and_bounds(const void* sketchptr, unsigned num_std_devs);
void cpc_sketch_to_string(const void* sketchptr, char* buffer, unsigned length);
-void* cpc_sketch_serialize(const void* sketchptr);
+struct ptr_with_size {
+ void* ptr;
+ unsigned long long size;
+};
+
+struct ptr_with_size cpc_sketch_serialize(const void* sketchptr, unsigned header_size);
void* cpc_sketch_deserialize(const char* buffer, unsigned length);
void* cpc_union_new(unsigned lg_k);
diff --git a/src/cpc_sketch_pg_functions.c b/src/cpc_sketch_pg_functions.c
index c8b6aa9..98000c5 100644
--- a/src/cpc_sketch_pg_functions.c
+++ b/src/cpc_sketch_pg_functions.c
@@ -131,7 +131,7 @@
sketchptr = cpc_sketch_deserialize(VARDATA(bytes_in), VARSIZE(bytes_in) - VARHDRSZ);
num_std_devs = PG_GETARG_INT32(1);
if (num_std_devs == 0) num_std_devs = 1; // default
- est_and_bounds = cpc_sketch_get_estimate_and_bounds(sketchptr, num_std_devs);
+ est_and_bounds = (Datum*) cpc_sketch_get_estimate_and_bounds(sketchptr, num_std_devs);
cpc_sketch_delete(sketchptr);
// construct output array
@@ -190,7 +190,7 @@
Datum pg_cpc_sketch_from_internal(PG_FUNCTION_ARGS) {
void* sketchptr;
- bytea* bytes_out;
+ struct ptr_with_size bytes_out;
MemoryContext oldcontext;
MemoryContext aggcontext;
@@ -203,12 +203,13 @@
oldcontext = MemoryContextSwitchTo(aggcontext);
sketchptr = PG_GETARG_POINTER(0);
- bytes_out = cpc_sketch_serialize(sketchptr);
+ bytes_out = cpc_sketch_serialize(sketchptr, VARHDRSZ);
cpc_sketch_delete(sketchptr);
+ SET_VARSIZE(bytes_out.ptr, bytes_out.size);
MemoryContextSwitchTo(oldcontext);
- PG_RETURN_BYTEA_P(bytes_out);
+ PG_RETURN_BYTEA_P(bytes_out.ptr);
}
Datum pg_cpc_sketch_get_estimate_from_internal(PG_FUNCTION_ARGS) {
@@ -237,7 +238,7 @@
Datum pg_cpc_union_get_result(PG_FUNCTION_ARGS) {
void* unionptr;
void* sketchptr;
- bytea* bytes_out;
+ struct ptr_with_size bytes_out;
MemoryContext oldcontext;
MemoryContext aggcontext;
@@ -251,13 +252,14 @@
unionptr = PG_GETARG_POINTER(0);
sketchptr = cpc_union_get_result(unionptr);
- bytes_out = cpc_sketch_serialize(sketchptr);
+ bytes_out = cpc_sketch_serialize(sketchptr, VARHDRSZ);
cpc_sketch_delete(sketchptr);
cpc_union_delete(unionptr);
+ SET_VARSIZE(bytes_out.ptr, bytes_out.size);
MemoryContextSwitchTo(oldcontext);
- PG_RETURN_BYTEA_P(bytes_out);
+ PG_RETURN_BYTEA_P(bytes_out.ptr);
}
Datum pg_cpc_sketch_union(PG_FUNCTION_ARGS) {
@@ -267,7 +269,7 @@
void* sketchptr2;
void* unionptr;
void* sketchptr;
- bytea* bytes_out;
+ struct ptr_with_size bytes_out;
int lg_k;
lg_k = PG_GETARG_INT32(2);
@@ -286,7 +288,8 @@
}
sketchptr = cpc_union_get_result(unionptr);
cpc_union_delete(unionptr);
- bytes_out = cpc_sketch_serialize(sketchptr);
+ bytes_out = cpc_sketch_serialize(sketchptr, VARHDRSZ);
cpc_sketch_delete(sketchptr);
- PG_RETURN_BYTEA_P(bytes_out);
+ SET_VARSIZE(bytes_out.ptr, bytes_out.size);
+ PG_RETURN_BYTEA_P(bytes_out.ptr);
}
diff --git a/src/global_hooks.c b/src/global_hooks.c
index 7f8d529..f1fdfdd 100644
--- a/src/global_hooks.c
+++ b/src/global_hooks.c
@@ -22,6 +22,9 @@
#include "cpc_sketch_c_adapter.h"
+void _PG_init(void);
+void _PG_fini(void);
+
void _PG_init() {
cpc_init();
}