Merge pull request #26 from phstudy/add-intesection-agg-func
add theta-sketch intesection agg function
diff --git a/sql/datasketches_theta_sketch.sql b/sql/datasketches_theta_sketch.sql
index 10d1eab..8be1154 100644
--- a/sql/datasketches_theta_sketch.sql
+++ b/sql/datasketches_theta_sketch.sql
@@ -78,10 +78,18 @@
AS '$libdir/datasketches', 'pg_theta_sketch_union_agg'
LANGUAGE C IMMUTABLE;
+CREATE OR REPLACE FUNCTION theta_sketch_intersection_agg(internal, theta_sketch) RETURNS internal
+ AS '$libdir/datasketches', 'pg_theta_sketch_intersection_agg'
+ LANGUAGE C IMMUTABLE;
+
CREATE OR REPLACE FUNCTION theta_union_get_result(internal) RETURNS theta_sketch
AS '$libdir/datasketches', 'pg_theta_union_get_result'
LANGUAGE C STRICT IMMUTABLE;
+CREATE OR REPLACE FUNCTION theta_intersection_get_result(internal) RETURNS theta_sketch
+ AS '$libdir/datasketches', 'pg_theta_intersection_get_result'
+ LANGUAGE C STRICT IMMUTABLE;
+
CREATE AGGREGATE theta_sketch_distinct(anyelement) (
sfunc = theta_sketch_add_item,
stype = internal,
@@ -124,6 +132,12 @@
finalfunc = theta_union_get_result
);
+CREATE AGGREGATE theta_sketch_intersection(theta_sketch) (
+ sfunc = theta_sketch_intersection_agg,
+ stype = internal,
+ finalfunc = theta_intersection_get_result
+);
+
CREATE OR REPLACE FUNCTION theta_sketch_union(theta_sketch, theta_sketch) RETURNS theta_sketch
AS '$libdir/datasketches', 'pg_theta_sketch_union'
LANGUAGE C STRICT IMMUTABLE;
diff --git a/src/theta_sketch_pg_functions.c b/src/theta_sketch_pg_functions.c
index 1294d12..1e9f077 100644
--- a/src/theta_sketch_pg_functions.c
+++ b/src/theta_sketch_pg_functions.c
@@ -33,9 +33,11 @@
PG_FUNCTION_INFO_V1(pg_theta_sketch_get_estimate_and_bounds);
PG_FUNCTION_INFO_V1(pg_theta_sketch_to_string);
PG_FUNCTION_INFO_V1(pg_theta_sketch_union_agg);
+PG_FUNCTION_INFO_V1(pg_theta_sketch_intersection_agg);
PG_FUNCTION_INFO_V1(pg_theta_sketch_from_internal);
PG_FUNCTION_INFO_V1(pg_theta_sketch_get_estimate_from_internal);
PG_FUNCTION_INFO_V1(pg_theta_union_get_result);
+PG_FUNCTION_INFO_V1(pg_theta_intersection_get_result);
PG_FUNCTION_INFO_V1(pg_theta_sketch_union);
PG_FUNCTION_INFO_V1(pg_theta_sketch_intersection);
PG_FUNCTION_INFO_V1(pg_theta_sketch_a_not_b);
@@ -48,9 +50,11 @@
Datum pg_theta_sketch_get_estimate_and_bounds(PG_FUNCTION_ARGS);
Datum pg_theta_sketch_to_string(PG_FUNCTION_ARGS);
Datum pg_theta_sketch_union_agg(PG_FUNCTION_ARGS);
+Datum pg_theta_sketch_intersection_agg(PG_FUNCTION_ARGS);
Datum pg_theta_sketch_from_internal(PG_FUNCTION_ARGS);
Datum pg_theta_sketch_get_estimate_from_internal(PG_FUNCTION_ARGS);
Datum pg_theta_union_get_result(PG_FUNCTION_ARGS);
+Datum pg_theta_intersection_get_result(PG_FUNCTION_ARGS);
Datum pg_theta_sketch_union(PG_FUNCTION_ARGS);
Datum pg_theta_sketch_intersection(PG_FUNCTION_ARGS);
Datum pg_theta_sketch_a_not_b(PG_FUNCTION_ARGS);
@@ -159,6 +163,41 @@
PG_RETURN_TEXT_P(cstring_to_text(str));
}
+Datum pg_theta_sketch_intersection_agg(PG_FUNCTION_ARGS) {
+ void* interptr;
+ bytea* sketch_bytes;
+ void* sketchptr;
+
+ MemoryContext oldcontext;
+ MemoryContext aggcontext;
+
+ if (PG_ARGISNULL(0) && PG_ARGISNULL(1)) {
+ PG_RETURN_NULL();
+ } else if (PG_ARGISNULL(1)) {
+ PG_RETURN_POINTER(PG_GETARG_POINTER(0)); // no update value. return unmodified state
+ }
+
+ if (!AggCheckCallContext(fcinfo, &aggcontext)) {
+ elog(ERROR, "theta_sketch_intersect called in non-aggregate context");
+ }
+ oldcontext = MemoryContextSwitchTo(aggcontext);
+
+ if (PG_ARGISNULL(0)) {
+ interptr = theta_intersection_new_default();
+ } else {
+ interptr = PG_GETARG_POINTER(0);
+ }
+
+ sketch_bytes = PG_GETARG_BYTEA_P(1);
+ sketchptr = theta_sketch_deserialize(VARDATA(sketch_bytes), VARSIZE(sketch_bytes) - VARHDRSZ);
+ theta_intersection_update(interptr, sketchptr);
+ theta_sketch_delete(sketchptr);
+
+ MemoryContextSwitchTo(oldcontext);
+
+ PG_RETURN_POINTER(interptr);
+}
+
Datum pg_theta_sketch_union_agg(PG_FUNCTION_ARGS) {
void* unionptr;
bytea* sketch_bytes;
@@ -271,6 +310,33 @@
PG_RETURN_BYTEA_P(bytes_out.ptr);
}
+Datum pg_theta_intersection_get_result(PG_FUNCTION_ARGS) {
+ void* interptr;
+ void* sketchptr;
+ struct ptr_with_size bytes_out;
+
+ MemoryContext oldcontext;
+ MemoryContext aggcontext;
+
+ if (PG_ARGISNULL(0)) PG_RETURN_NULL();
+
+ if (!AggCheckCallContext(fcinfo, &aggcontext)) {
+ elog(ERROR, "theta_intersection_get_result called in non-aggregate context");
+ }
+ oldcontext = MemoryContextSwitchTo(aggcontext);
+
+ interptr = PG_GETARG_POINTER(0);
+ sketchptr = theta_intersection_get_result(interptr);
+ bytes_out = theta_sketch_serialize(sketchptr, VARHDRSZ);
+ theta_sketch_delete(sketchptr);
+ theta_intersection_delete(interptr);
+ SET_VARSIZE(bytes_out.ptr, bytes_out.size);
+
+ MemoryContextSwitchTo(oldcontext);
+
+ PG_RETURN_BYTEA_P(bytes_out.ptr);
+}
+
Datum pg_theta_sketch_union(PG_FUNCTION_ARGS) {
const bytea* bytes_in1;
const bytea* bytes_in2;