blob: 279b0e4a006837af254f9c221e8a32ad46e6c68e [file] [log] [blame]
/*!
* \file countmin.c
*
* \brief CountMin sketch implementation
*
* \implementation
* The basic CountMin sketch is a set of DEPTH arrays, each with NUMCOUNTERS counters.
* The idea is that each of those arrays is used as an independent random trial of the
* same process: for all the values x in a set, each holds counts of h_i(x) mod NUMCOUNTERS for a different random hash function h_i.
* Estimates of the count of some value x are based on the <i>minimum</i> counter h_i(x) across
* the DEPTH arrays (hence the name CountMin.)
*
* Let's call the process described above "sketching" the x's. To support range
* lookups, we repeat the basic CountMin sketching process INT64BITS times as follows.
* (This is the "dyadic range" trick mentioned in Cormode/Muthu.)
*
* Every value x/(2^i) is sketched
* at a different power-of-2 (dyadic) "range" i. So we sketch x in range 0, then sketch x/2
* in range 1, then sketch x/4 in range 2, etc.
* This allows us to count up ranges (like 14-48) by doing CountMin equality lookups on constituent
* dyadic ranges ({[14-15] as 7 in range 2, [16-31] as 1 in range 16, [32-47] as 2 in range 16, [48-48] as 48 in range 1}).
* Dyadic ranges are similarly useful for histogramming, order stats, etc.
*
* The results of the estimators below generally have guarantees of the form
* "the answer is within \epsilon of the true answer with probability 1-\delta."
*/
#include "postgres.h"
#include "utils/array.h"
#include "utils/elog.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/numeric.h"
#include "nodes/execnodes.h"
#include "fmgr.h"
#include "sketch_support.h"
#include "catalog/pg_type.h"
#include "countmin.h"
#include <ctype.h>
PG_FUNCTION_INFO_V1(__cmsketch_trans);
/*
* This is the UDF interface. It does sanity checks and preps values
* for the interesting logic in countmin_dyadic_trans_c
*/
Datum __cmsketch_trans(PG_FUNCTION_ARGS)
{
bytea * transblob = NULL;
cmtransval *transval;
/*
* This function makes destructive updates to its arguments.
* Make sure it's being called in an agg context.
*/
if (!(fcinfo->context &&
(IsA(fcinfo->context, AggState)
#ifdef NOTGP
|| IsA(fcinfo->context, WindowAggState)
#endif
)))
elog(ERROR,
"destructive pass by reference outside agg");
/* get the provided element, being careful in case it's NULL */
if (!PG_ARGISNULL(1)) {
transblob = cmsketch_check_transval(fcinfo, true);
transval = (cmtransval *)(VARDATA(transblob));
/* the following line modifies the contents of transval, and hence transblob */
countmin_dyadic_trans_c(transval, PG_GETARG_INT64(1));
PG_RETURN_DATUM(PointerGetDatum(transblob));
}
else PG_RETURN_DATUM(PointerGetDatum(PG_GETARG_BYTEA_P(0)));
}
/*!
* check if the transblob is not initialized, and do so if not
* \param transblob a cmsketch transval packed in a bytea
*/
bytea *cmsketch_check_transval(PG_FUNCTION_ARGS, bool initargs)
{
bytea * transblob = PG_GETARG_BYTEA_P(0);
cmtransval *transval;
/*
* an uninitialized transval should be a datum smaller than sizeof(cmtransval).
* if this one is small, initialize it now, else return it.
*/
if (!CM_TRANSVAL_INITIALIZED(transblob)) {
/* XXX would be nice to pfree the existing transblob, but pfree complains. */
transblob = cmsketch_init_transval();
transval = (cmtransval *)VARDATA(transblob);
if (initargs) {
int nargs = PG_NARGS();
int i;
/* carry along any additional args */
if (nargs - 2 > MAXARGS)
elog(
ERROR,
"no more than %d additional arguments should be passed to __cmsketch_trans",
MAXARGS);
transval->nargs = nargs - 2;
for (i = 2; i < nargs; i++) {
if (PG_ARGISNULL(i))
elog(ERROR,
"NULL parameter %d passed to __cmsketch_trans",
i);
transval->args[i-2] = PG_GETARG_DATUM(i);
}
}
else transval->nargs = -1;
}
return(transblob);
}
bytea *cmsketch_init_transval()
{
bool typIsVarlena;
cmtransval *transval;
/* allocate and zero out a transval via palloc0 */
bytea * transblob = (bytea *)palloc0(CM_TRANSVAL_SZ);
SET_VARSIZE(transblob, CM_TRANSVAL_SZ);
/* set up for stringifying INT8's according to PG type rules */
transval = (cmtransval *)VARDATA(transblob);
transval->typOid = INT8OID; /* as of now we only support INT8 */
getTypeOutputInfo(transval->typOid,
&(transval->outFuncOid),
&typIsVarlena);
return(transblob);
}
/*!
* perform multiple sketch insertions, one for each dyadic range (from 0 up to RANGES-1).
* * \param transval the cmsketch transval
* * \param inputi the value to be inserted
*/
void countmin_dyadic_trans_c(cmtransval *transval, int64 inputi)
{
uint32 j;
for (j = 0; j < RANGES; j++) {
countmin_trans_c(transval->sketches[j], Int64GetDatum(
inputi), transval->outFuncOid);
/* now divide by 2 for the next dyadic range */
inputi >>= 1;
}
}
/*!
* Main loop of Cormode and Muthukrishnan's sketching algorithm, for setting counters in
* sketches at a single "dyadic range". For each call, we want to use DEPTH independent
* hash functions. We do this by using a single md5 hash function, and taking
* successive 16-bit runs of the result as independent hash outputs.
* \param sketch the current countmin sketch
* \param dat the datum to be inserted
* \param outFuncOid Oid of the PostgreSQL function to convert dat to a string
*/
char *countmin_trans_c(countmin sketch, Datum dat, Oid outFuncOid)
{
Datum nhash;
char *input;
/* stringify input for the md5 function */
input = OidOutputFunctionCall(outFuncOid, dat);
/* get the md5 hash of the input. */
nhash = md5_datum(input);
/*
* iterate through all sketches, incrementing the counters indicated by the hash
* we don't care about return value here, so 3rd (initialization) argument is arbitrary.
*/
(void)hash_counters_iterate(nhash, sketch, 0, &increment_counter);
return(input);
}
/*
* FINAL functions for various UDAs built on countmin sketches
*/
/*!
* simply returns its input
*/
PG_FUNCTION_INFO_V1(__cmsketch_final);
Datum __cmsketch_final(PG_FUNCTION_ARGS)
{
PG_RETURN_BYTEA_P(PG_GETARG_BYTEA_P(0));
}
/*!
* use sketch to get the count of the 1st arg
*/
PG_FUNCTION_INFO_V1(__cmsketch_count_final);
Datum __cmsketch_count_final(PG_FUNCTION_ARGS)
{
bytea * blob = PG_GETARG_BYTEA_P(0);
cmtransval *sketch = (cmtransval *)VARDATA(blob);
if (!CM_TRANSVAL_INITIALIZED(blob))
PG_RETURN_NULL();
PG_RETURN_INT64(cmsketch_count_c(sketch->sketches[0],
sketch->args[0], sketch->outFuncOid));
}
/*!
* use sketch to get the count in range of the 1st 2 args
*/
PG_FUNCTION_INFO_V1(__cmsketch_rangecount_final);
Datum __cmsketch_rangecount_final(PG_FUNCTION_ARGS)
{
bytea * blob = PG_GETARG_BYTEA_P(0);
cmtransval *sketch = (cmtransval *)VARDATA(blob);
if (!CM_TRANSVAL_INITIALIZED(blob))
PG_RETURN_NULL();
PG_RETURN_INT64(cmsketch_rangecount_c(sketch, DatumGetInt64(sketch->args[0]),
DatumGetInt64(sketch->args[1])));
}
/*!
* use sketch to get the centile stored in the 1st arg
*/
PG_FUNCTION_INFO_V1(__cmsketch_centile_final);
Datum __cmsketch_centile_final(PG_FUNCTION_ARGS)
{
bytea * blob = PG_GETARG_BYTEA_P(0);
int64 total;
cmtransval *sketch = (cmtransval *)VARDATA(blob);
if (!CM_TRANSVAL_INITIALIZED(blob))
PG_RETURN_NULL();
total = cmsketch_rangecount_c(sketch, MIN_INT64, MAX_INT64); /* count(*) */
if (total == 0) {
PG_RETURN_NULL();
}
PG_RETURN_INT64(cmsketch_centile_c(sketch, DatumGetInt64(sketch->args[0]),
total));
}
/*!
* use sketch to get the median
*/
PG_FUNCTION_INFO_V1(__cmsketch_median_final);
Datum __cmsketch_median_final(PG_FUNCTION_ARGS)
{
bytea * blob = PG_GETARG_BYTEA_P(0);
int64 total;
cmtransval *sketch = (cmtransval *)VARDATA(blob);
if (!CM_TRANSVAL_INITIALIZED(blob))
PG_RETURN_NULL();
total = cmsketch_rangecount_c(sketch, MIN_INT64, MAX_INT64); /* count(*) */
if (total == 0) {
PG_RETURN_NULL();
}
PG_RETURN_INT64(cmsketch_centile_c(sketch, 50, total));
}
/*!
* use sketch get the equi-depth histogram
*/
PG_FUNCTION_INFO_V1(__cmsketch_dhist_final);
Datum __cmsketch_dhist_final(PG_FUNCTION_ARGS)
{
bytea * blob = PG_GETARG_BYTEA_P(0);
cmtransval *sketch = (cmtransval *)VARDATA(blob);
if (!CM_TRANSVAL_INITIALIZED(blob))
PG_RETURN_NULL();
PG_RETURN_INT64(cmsketch_depth_histogram_c(sketch,
DatumGetInt64(sketch->args[0])));
}
/*!
* Greenplum "prefunc" to combine sketches from multiple machines
*/
PG_FUNCTION_INFO_V1(__cmsketch_merge);
Datum __cmsketch_merge(PG_FUNCTION_ARGS)
{
bytea * counterblob1 = PG_GETARG_BYTEA_P(0);
bytea * counterblob2 = PG_GETARG_BYTEA_P(1);
cmtransval *newtrans;
countmin * sketches2 = (countmin *)
((cmtransval *)(VARDATA(counterblob2)))->sketches;
bytea * newblob;
countmin * newsketches;
uint32 i, j, k;
int sz;
/* make sure they're initialized! */
if (!CM_TRANSVAL_INITIALIZED(counterblob1))
counterblob1 = cmsketch_init_transval();
if (!CM_TRANSVAL_INITIALIZED(counterblob2))
counterblob2 = cmsketch_init_transval();
sz = VARSIZE(counterblob1);
/* allocate a new transval as a copy of counterblob1 */
newblob = (bytea *)palloc(sz);
memcpy(newblob, counterblob1, sz);
newtrans = (cmtransval *)(VARDATA(newblob));
newsketches = (countmin *)(newtrans)->sketches;
/* add in values from counterblob2 */
for (i = 0; i < RANGES; i++)
for (j = 0; j < DEPTH; j++)
for (k = 0; k < NUMCOUNTERS; k++)
newsketches[i][j][k] += sketches2[i][j][k];
if (newtrans->nargs == -1) {
/* transfer in the args from the other input */
cmtransval *other = (cmtransval *)VARDATA(counterblob2);
newtrans->nargs = other->nargs;
for (i = 0; (int)i < other->nargs; i++)
newtrans->args[i] = other->args[i];
}
PG_RETURN_DATUM(PointerGetDatum(newblob));
}
/*
******* Below are scalar methods to manipulate completed sketches. ******
*/
/*!
* match sketch type to scalar arg type
*/
#define CM_CHECKARG(sketch, arg_offset) \
if (PG_ARGISNULL(arg_offset)) PG_RETURN_NULL(); \
if (sketch->typOid != get_fn_expr_argtype(fcinfo->flinfo, arg_offset)) { \
elog( \
ERROR, \
"sketch computed over type " INT64_FORMAT \
"; argument %d over type " INT64_FORMAT ".", \
(int64)sketch->typOid, \
arg_offset, \
(int64)get_fn_expr_argtype(fcinfo->flinfo, arg_offset)); \
PG_RETURN_NULL(); \
}
/*!
* get the approximate count of objects with value arg
* \param sketch a countmin sketch
* \param arg the Datum we want to find the count of
* \param funcOid the Postgres function that converts arg to a string
*/
int64 cmsketch_count_c(countmin sketch, Datum arg, Oid funcOid)
{
Datum nhash;
char *txt;
txt = OidOutputFunctionCall(funcOid, arg);
/* get the md5 hash of the stringified argument. */
nhash = md5_datum(txt);
/* iterate through the sketches, finding the min counter associated with this hash */
PG_RETURN_INT64(hash_counters_iterate(nhash, sketch, INT64_MAX,
&min_counter));
}
/*!
* Compute count of a range by summing counts of its dyadic ranges
* \param transval a countmin transition value
* \param bot the bottom of the range (inclusive)
* \param top the top of the range (inclusive)
*/
Datum cmsketch_rangecount_c(cmtransval *transval, int64 bot, int64 top)
{
int64 cursum = 0;
uint32 i;
rangelist r;
int4 dyad;
int64 val;
int64 countval;
r.emptyoffset = 0;
find_ranges(bot, top, &r);
/*
* find_ranges will not generate a span larger than 2^63-1, so
* we only need to consider 2^63 or less.
*/
for (i = 0; i < r.emptyoffset; i++) {
/* What power of 2 is this range? */
if (r.spans[i][0] == MIN_INT64 && r.spans[i][1] == -1) {
/* special case to avoid overflow: full range left of 0 */
dyad = 63;
countval = -1;
}
else if (r.spans[i][0] == 0 && r.spans[i][1] == MAX_INT64) {
/* special case to avoid overflow: full range right of 0 */
dyad = 63;
countval = 0;
}
else {
int64 width = r.spans[i][1] - r.spans[i][0] + (int64)1;
if (r.spans[i][0] == MIN_INT64)
width++;
if (r.spans[i][1] == MAX_INT64) {
width++;
}
/* Divide min of range by 2^dyad and get count */
dyad = safe_log2(width);
countval = r.spans[i][0] >> dyad;
}
val = cmsketch_count_c(transval->sketches[dyad],
(Datum) countval,
transval->outFuncOid);
cursum += val;
}
PG_RETURN_DATUM(cursum);
}
/*!
* convert an arbitrary range [bot-top] into a rangelist of dyadic ranges.
* E.g. convert 14-48 into [[14-15], [16-31], [32-47], [48-48]]
* To manage overflow issues, do not generate a range larger than 2^63-1!
* \param bot the bottom of the range (inclusive)
* \param top the top of the range (inclusive)
* \param r the list of ranges to be returned
*/
void find_ranges(int64 bot, int64 top, rangelist *r)
{
/* kick off the recursion at power RANGES-1 */
find_ranges_internal(bot, top, RANGES-1, r);
}
/*!
* find the ranges via recursive calls to this routine, pulling out smaller and
* smaller powers of 2
* To manage overflow issues, do not generate a range larger than 2^63-1!
* \param bot the bottom of the range (inclusive)
* \param top the top of the range (inclusive)
* \param power the highest power to start with
* \param r the rangelist to be returned
*/
void find_ranges_internal(int64 bot, int64 top, int power, rangelist *r)
{
short dyad; /* a number between 0 and 63 */
uint64 pow_dyad;
uint64 width;
/* Sanity check */
if (top < bot || power < 0)
return;
if (top == bot) {
/* base case of recursion, a range of the form [x-x]. */
r->spans[r->emptyoffset][0] = r->spans[r->emptyoffset][1] = bot;
ADVANCE_OFFSET(*r);
return;
}
/*
* a range that's 2^63 or larger will overflow int64's in the
* code below, so recurse to the left and right by hand
*/
if (top >= 0 && bot < 0) {
/* don't even try to figure out how wide this range is, just split */
find_ranges_internal(bot, -1, power-1, r);
find_ranges_internal(0, top, power-1, r);
return;
}
width = top - bot + (int64)1;
/* account for the fact that MIN and MAX are 1 off the true power of 2 */
if (top == MAX_INT64 || bot == MIN_INT64)
width++;
dyad = safe_log2(width);
if (dyad > 62) {
/* dangerously big, so split. we know that we don't span 0. */
int sign = (top < 0) ? -1 : 1;
find_ranges_internal(bot, (((int64)1) << 62)*sign - 1, 62, r);
find_ranges_internal((((int64)1) << 62)*sign, top, 62, r);
return;
}
/*
* if we get here, we have a range of size 2 or greater.
* Find the largest dyadic range width in this range.
*/
pow_dyad = ((uint64)1) << dyad;
if ((bot == MIN_INT64) || (bot % pow_dyad == 0)) {
/* our range is left-aligned on the dyad's min */
r->spans[r->emptyoffset][0] = bot;
if (top == MAX_INT64) {
/*
* special case: no need to recurse on right, and doing so
* could overflow, so handle by hand
*/
r->spans[r->emptyoffset][1] = MAX_INT64;
ADVANCE_OFFSET(*r);
}
else {
int64 newbot;
/*
* -1 on the next line because range to the right will
* start at the power-of-2 boundary.
*/
r->spans[r->emptyoffset][1] = (bot + pow_dyad - 1);
if (bot == MIN_INT64) {
/* account for fact that MIN_INT64 is 1 bigger than -2^63 */
r->spans[r->emptyoffset][1]--;
}
newbot = r->spans[r->emptyoffset][1] + 1;
ADVANCE_OFFSET(*r);
/* recurse on right at finer grain */
find_ranges_internal(newbot, top, power-1, r);
}
}
else if (top == MAX_INT64 || ((top+1) % pow_dyad) == 0) {
/* our range is right-aligned on the dyad's max. */
r->spans[r->emptyoffset][1] = top;
if (bot == MIN_INT64) {
/*
* special case: no need to recurse on left, and doing so
* could overflow, so handle by hand
*/
r->spans[r->emptyoffset][0] = MIN_INT64;
ADVANCE_OFFSET(*r);
}
else {
int64 newtop;
r->spans[r->emptyoffset][0] = (top - pow_dyad + 1);
if (top == MAX_INT64) {
/* account for fact that MAX_INT64 is 1 smaller than 2^63 */
r->spans[r->emptyoffset][0]++;
}
newtop = r->spans[r->emptyoffset][0] - 1;
ADVANCE_OFFSET(*r);
/* recurse on left at finer grain. */
find_ranges_internal(bot, newtop, power-1, r);
}
}
else {
/* we straddle a power of 2 */
int64 power_of_2 = pow_dyad*(top/pow_dyad);
/* recurse on right at finer grain */
find_ranges_internal(bot, power_of_2 - 1, power-1, r);
/* recurse on left at finer grain */
find_ranges_internal(power_of_2, top, power-1, r);
}
}
/*!
* find the approximate centile in a cm sketch by binary search in the domain of values
* \param transval the current transition value
* \param intcentile the centile to return
* \param total the total count of items
*/
Datum cmsketch_centile_c(cmtransval *transval, int intcentile, int64 total)
{
uint i;
int64 higuess,loguess,curguess,curcount;
int64 centile_cnt;
if (intcentile <= 0 || intcentile >= 100)
elog(ERROR,
"centiles must be between 1-99 inclusive, was %d",
intcentile);
centile_cnt = (int64)(total * (float8)intcentile/100.0);
for (i = 0, loguess = MIN_INT64, higuess = MAX_INT64, curguess = 0;
(i < INT64BITS - 1) && ((uint64)higuess-(uint64)loguess > 1);
i++) {
curcount = cmsketch_rangecount_c(transval, MIN_INT64, curguess);
if (curcount == centile_cnt)
break;
if (curcount > centile_cnt) {
/* overshot */
higuess = curguess;
curguess = loguess + (curguess - loguess) / 2;
}
else {
/* undershot */
loguess = curguess;
curguess = higuess - (higuess - curguess) / 2;
}
}
return(curguess);
}
PG_FUNCTION_INFO_V1(cmsketch_width_histogram);
/*!
* scalar function taking a sketch, min, max, and number of buckets.
* produces an equi-width histogram of that many buckets
*/
/* UDF wrapper. All the interesting stuff is in __fmsketch_count_distinct_c */
Datum cmsketch_width_histogram(PG_FUNCTION_ARGS)
{
bytea * transblob = cmsketch_check_transval(fcinfo, false);
cmtransval *transval = (cmtransval *)VARDATA(transblob);
int64 min = PG_GETARG_INT64(1);
int64 max = PG_GETARG_INT64(2);
int buckets = PG_GETARG_INT32(3);
Datum retval;
CM_CHECKARG(transval, 1);
CM_CHECKARG(transval, 2);
if (PG_ARGISNULL(3)) PG_RETURN_NULL();
retval = cmsketch_width_histogram_c(transval, min, max, buckets);
PG_RETURN_DATUM(retval);
}
/*!
* produces an equi-width histogram
* \param transval the transition value of the agg
* \param min minimum value in the sketch
* \param max maximum value in the sketch
* \param buckets the number of buckets in the histogram
*/
Datum cmsketch_width_histogram_c(cmtransval *transval,
int64 min,
int64 max,
int64 buckets)
{
int64 step;
int i;
ArrayType *retval;
int64 binlo, binhi, binval;
Datum histo[buckets][3];
int dims[2], lbs[2];
int16 typlen;
bool typbyval;
char typalign;
char typdelim;
Oid typioparam;
Oid typiofunc;
step = Max(trunc((float8)(max-min+1) / (float8)buckets), 1);
for (i = 0; i < buckets; i++) {
binlo = min + i*step;
if (binlo > max) break;
histo[i][0] = Int64GetDatum(binlo);
binhi = (i == buckets-1) ? max : (min + (i+1)*step - 1);
histo[i][1] = Int64GetDatum(binhi);
binval =
cmsketch_rangecount_c(transval,
DatumGetInt64(histo[i][0]),
DatumGetInt64(histo[i][1]));
histo[i][2] = Int64GetDatum(binval);
}
/*
* Get info about element type
*/
get_type_io_data(INT8OID, IOFunc_output,
&typlen, &typbyval,
&typalign, &typdelim,
&typioparam, &typiofunc);
dims[0] = i; /* may be less than requested buckets if too few values */
dims[1] = 3; /* lo, hi, and val */
lbs[0] = 0;
lbs[1] = 0;
retval = construct_md_array((Datum *)histo,
NULL,
2,
dims,
lbs,
INT8OID,
typlen,
typbyval,
typalign);
PG_RETURN_ARRAYTYPE_P(retval);
}
/*!
* produces an equi-depth histogram of a cm sketch
* \param transval the transition value of the agg
* \param buckets the number of buckets in the histogram
*/
Datum cmsketch_depth_histogram_c(cmtransval *transval, int64 buckets)
{
int64 step;
uint i, nextbucket;
ArrayType *retval;
int64 binlo;
Datum histo[buckets][3];
int dims[2], lbs[2];
int64 total = cmsketch_rangecount_c(transval, MIN_INT64, MAX_INT64);
int16 typlen;
bool typbyval;
char typalign;
char typdelim;
Oid typioparam;
Oid typiofunc;
step = Max(trunc(100 / (float8)buckets), 1);
for (i = nextbucket = 0, binlo = MIN_INT64; i < buckets; i++) {
uint64 centile;
if (i < buckets - 1) {
centile = cmsketch_centile_c(transval, (i+1)*step, total);
if (i > 0 && centile <= histo[nextbucket-1][1])
/* next centile is lower than previous; skip */
continue;
histo[nextbucket][1] = centile;
}
else {
/* this is the top bucket */
histo[nextbucket][1] = MAX_INT64;
}
histo[nextbucket][0] = Int64GetDatum(binlo);
histo[nextbucket][2] = cmsketch_rangecount_c(transval,
DatumGetInt64(histo[
nextbucket
][0]),
DatumGetInt64(histo[
nextbucket
][1]));
binlo = histo[nextbucket][1] + 1;
nextbucket++;
}
/*
* Get info about element type
*/
get_type_io_data(INT8OID, IOFunc_output,
&typlen, &typbyval,
&typalign, &typdelim,
&typioparam, &typiofunc);
dims[0] = nextbucket; /* may be less than requested buckets if too few values */
dims[1] = 3; /* lo, hi, and val */
lbs[0] = 0;
lbs[1] = 0;
retval = construct_md_array((Datum *)histo,
NULL,
2,
dims,
lbs,
INT8OID,
typlen,
typbyval,
typalign);
PG_RETURN_ARRAYTYPE_P(retval);
}
/****** SUPPORT ROUTINES *******/
PG_FUNCTION_INFO_V1(cmsketch_dump);
/*!
* dump sketch contents
*/
Datum cmsketch_dump(PG_FUNCTION_ARGS)
{
bytea * transblob = (bytea *)PG_GETARG_BYTEA_P(0);
countmin *sketches;
char * newblob = (char *)palloc(10240);
uint32 i, j, k, c;
sketches = ((cmtransval *)VARDATA(transblob))->sketches;
for (i=0, c=0; i < RANGES; i++)
for (j=0; j < DEPTH; j++)
for(k=0; k < NUMCOUNTERS; k++) {
if (sketches[i][j][k] != 0)
c += sprintf(&newblob[c], "[(%d,%d,%d):" INT64_FORMAT
"], ", i, j, k, sketches[i][j][k]);
if (c > 10000) break;
}
newblob[c] = '\0';
PG_RETURN_NULL();
}
/*!
* for each row of the sketch, use the 16 bits starting at 2^i mod NUMCOUNTERS,
* and invoke the lambda on those 16 bits (which may destructively modify counters).
* \param hashval the MD5 hashe value that we take 16 bits at a time
* \param sketch the cmsketch
* \param initial the initialized return value
* \param lambdaptr the function to invoke on each 16 bits
*/
int64 hash_counters_iterate(Datum hashval,
countmin sketch, /* width is DEPTH*NUMCOUNTERS */
int64 initial,
int64 (*lambdaptr)(uint32,
uint32,
countmin,
int64))
{
uint32 i, col;
unsigned short twobytes;
int64 retval = initial;
/*
* XXX WARNING: are there problems with unaligned access here?
* XXX I'm currently using copies rather than casting, which is inefficient,
* XXX but I was hoping memmove would deal with unaligned access in a portable way.
*/
for (i = 0; i < DEPTH; i++) {
memmove((void *)&twobytes,
(void *)((char *)VARDATA(DatumGetByteaP(hashval)) + 2*i), 2);
col = twobytes % NUMCOUNTERS;
retval = (*lambdaptr)(i, col, sketch, retval);
}
return retval;
}
/*!
* destructive increment lambda for hash_counters_iterate.
* transval and return val not of particular interest here.
* \param i which row to update
* \param col which column to update
* \param sketch the sketch
* \param transval we don't need transval here, but its part of the
* lambda interface for hash_counters_iterate
*/
int64 increment_counter(uint32 i,
uint32 col,
countmin sketch,
int64 transval)
{
int64 oldval = sketch[i][col];
if (sketch[i][col] == (INT64_MAX))
elog(ERROR, "maximum count exceeded in sketch");
sketch[i][col] = oldval + 1;
/* return the incremented value, though unlikely anyone cares. */
return oldval+1;
}
/*!
* running minimum lambda for hash_counters_iterate
* \param i which row to examine
* \param col which column to examine
* \param sketch the sketch
* \param transval smallest counter so far
* lambda interface for hash_counters_iterate
*/
int64 min_counter(uint32 i,
uint32 col,
countmin sketch,
int64 transval)
{
int64 thisval = sketch[i][col];
return (thisval < transval) ? thisval : transval;
}