blob: bb1ec4e17f51a3d801e8f9aa20a41148b939e477 [file] [log] [blame]
/*!
* \file countmin.c
*
* \brief CountMin sketch implementation
*
* \implementation
* The basic CountMin sketch is a set of DEPTH arrays, each with NUMCOUNTERS counters.
* The idea is that each of those arrays is used as an independent random trial of the
* same process: for all the values x in a set, each holds counts of h_i(x) mod NUMCOUNTERS for a different random hash function h_i.
* Estimates of the count of some value x are based on the <i>minimum</i> counter h_i(x) across
* the DEPTH arrays (hence the name CountMin.)
*
* Let's call the process described above "sketching" the x's. To support range
* lookups, we repeat the basic CountMin sketching process INT64BITS times as follows.
* (This is the "dyadic range" trick mentioned in Cormode/Muthu.)
*
* Every value x/(2^i) is sketched
* at a different power-of-2 (dyadic) "range" i. So we sketch x in range 0, then sketch x/2
* in range 1, then sketch x/4 in range 2, etc.
* This allows us to count up ranges (like 14-48) by doing CountMin equality lookups on constituent
* dyadic ranges ({[14-15] as 7 in range 2, [16-31] as 1 in range 16, [32-47] as 2 in range 16, [48-48] as 48 in range 1}).
* Dyadic ranges are similarly useful for histogramming, order stats, etc.
*
* The results of the estimators below generally have guarantees of the form
* "the answer is within \epsilon of the true answer with probability 1-\delta."
*/
#include "postgres.h"
#include "utils/array.h"
#include "utils/elog.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/numeric.h"
#include "nodes/execnodes.h"
#include "fmgr.h"
#include "sketch_support.h"
#include "catalog/pg_type.h"
#include "countmin.h"
#include <ctype.h>
PG_FUNCTION_INFO_V1(__cmsketch_int8_trans);
/*
* This is the UDF interface. It does sanity checks and preps values
* for the interesting logic in countmin_dyadic_trans_c
*/
Datum __cmsketch_int8_trans(PG_FUNCTION_ARGS)
{
bytea * transblob = NULL;
cmtransval *transval;
/*
* This function makes destructive updates to its arguments.
* Make sure it's being called in an agg context.
*/
if (!(fcinfo->context &&
(IsA(fcinfo->context, AggState)
#ifdef NOTGP
|| IsA(fcinfo->context, WindowAggState)
#endif
)))
elog(ERROR,
"destructive pass by reference outside agg");
/* get the provided element, being careful in case it's NULL */
if (!PG_ARGISNULL(1)) {
transblob = cmsketch_check_transval(fcinfo, true);
transval = (cmtransval *)(VARDATA(transblob));
/* the following line modifies the contents of transval, and hence transblob */
countmin_dyadic_trans_c(transval, PG_GETARG_DATUM(1));
PG_RETURN_DATUM(PointerGetDatum(transblob));
}
else PG_RETURN_DATUM(PointerGetDatum(PG_GETARG_BYTEA_P(0)));
}
/*!
* check if the transblob is not initialized, and do so if not
* \param transblob a cmsketch transval packed in a bytea
*/
bytea *cmsketch_check_transval(PG_FUNCTION_ARGS, bool initargs)
{
bytea * transblob = PG_GETARG_BYTEA_P(0);
cmtransval *transval;
Oid element_type = get_fn_expr_argtype(fcinfo->flinfo, 1);
/*
* an uninitialized transval should be a datum smaller than sizeof(cmtransval).
* if this one is small, initialize it now, else return it.
*/
if (!CM_TRANSVAL_INITIALIZED(transblob)) {
/* XXX would be nice to pfree the existing transblob, but pfree complains. */
transblob = cmsketch_init_transval(element_type);
transval = (cmtransval *)VARDATA(transblob);
if (initargs) {
int nargs = PG_NARGS();
int i;
/* carry along any additional args */
if (nargs - 2 > MAXARGS)
elog(
ERROR,
"no more than %d additional arguments should be passed to __cmsketch_int8_trans",
MAXARGS);
transval->nargs = nargs - 2;
for (i = 2; i < nargs; i++) {
if (PG_ARGISNULL(i))
elog(ERROR,
"NULL parameter %d passed to __cmsketch_int8_trans",
i);
transval->args[i-2] = PG_GETARG_DATUM(i);
}
}
else transval->nargs = -1;
}
return(transblob);
}
bytea *cmsketch_init_transval(Oid typOid)
{
bool typIsVarlena;
cmtransval *transval;
/* allocate and zero out a transval via palloc0 */
bytea * transblob = (bytea *)palloc0(CM_TRANSVAL_SZ);
SET_VARSIZE(transblob, CM_TRANSVAL_SZ);
transval = (cmtransval *)VARDATA(transblob);
transval->typOid = typOid;
getTypeOutputInfo(transval->typOid,
&(transval->outFuncOid),
&typIsVarlena);
return(transblob);
}
/*!
* perform multiple sketch insertions, one for each dyadic range (from 0 up to RANGES-1).
* * \param transval the cmsketch transval
* * \param inputi the value to be inserted
*/
void countmin_dyadic_trans_c(cmtransval *transval, Datum input)
{
uint32 j;
if (transval->typOid != INT8OID)
elog(ERROR, "cmsketch can only compute ranges for int64");
for (j = 0; j < RANGES; j++) {
countmin_trans_c(transval->sketches[j], input,
transval->outFuncOid, transval->typOid);
/* now divide by 2 for the next dyadic range */
input = Int64GetDatum(DatumGetInt64(input) >> 1);
}
}
/*!
* Main loop of Cormode and Muthukrishnan's sketching algorithm, for setting counters in
* sketches at a single "dyadic range". For each call, we want to use DEPTH independent
* hash functions. We do this by using a single md5 hash function, and taking
* successive 16-bit runs of the result as independent hash outputs.
* \param sketch the current countmin sketch
* \param dat the datum to be inserted
* \param outFuncOid Oid of the PostgreSQL function to convert dat to a string
* \param typOid Oid of the Postgres type for dat
*/
Datum countmin_trans_c(countmin sketch, Datum dat, Oid outFuncOid, Oid typOid)
{
(void) outFuncOid; /* avoid warning about unused parameter */
bytea *nhash;
nhash = sketch_md5_bytea(dat, typOid);
/*
* iterate through all sketches, incrementing the counters indicated by the hash
* we don't care about return value here, so 3rd (initialization) argument is arbitrary.
*/
(void)hash_counters_iterate(nhash, sketch, 0, &increment_counter);
return(PointerGetDatum(nhash));
}
/*
* FINAL functions for various UDAs built on countmin sketches
*/
/*!
* return the array of sketch counters as a bytea
*/
PG_FUNCTION_INFO_V1(__cmsketch_final);
Datum __cmsketch_final(PG_FUNCTION_ARGS)
{
bytea * blob = PG_GETARG_BYTEA_P(0);
cmtransval *sketch = (cmtransval *)VARDATA(blob);
int len = RANGES*sizeof(countmin) + VARHDRSZ;
bytea *out = palloc0(len);
memcpy((uint8 *)VARDATA(out), sketch->sketches, len - VARHDRSZ);
SET_VARSIZE(out, len);
PG_RETURN_BYTEA_P(out);
}
/*!
* Greenplum "prefunc" to combine sketches from multiple machines
*/
PG_FUNCTION_INFO_V1(__cmsketch_merge);
Datum __cmsketch_merge(PG_FUNCTION_ARGS)
{
bytea * counterblob1 = PG_GETARG_BYTEA_P(0);
bytea * counterblob2 = PG_GETARG_BYTEA_P(1);
cmtransval *transval1 = (cmtransval *)VARDATA(counterblob1);
cmtransval *transval2 = (cmtransval *)VARDATA(counterblob2);
cmtransval *newtrans;
countmin * sketches2 = (countmin *)
((cmtransval *)(VARDATA(counterblob2)))->sketches;
bytea * newblob;
countmin * newsketches;
uint32 i, j, k;
int sz;
/* make sure they're initialized! */
if (!CM_TRANSVAL_INITIALIZED(counterblob1)
&& !CM_TRANSVAL_INITIALIZED(counterblob2))
/* if both are empty can return one of them */
PG_RETURN_DATUM(PointerGetDatum(counterblob1));
else if (!CM_TRANSVAL_INITIALIZED(counterblob1)) {
counterblob1 = cmsketch_init_transval(transval2->typOid);
transval1 = (cmtransval *)VARDATA(counterblob1);
}
else if (!CM_TRANSVAL_INITIALIZED(counterblob2)) {
counterblob2 = cmsketch_init_transval(transval1->typOid);
transval2 = (cmtransval *)VARDATA(counterblob2);
}
sz = VARSIZE(counterblob1);
/* allocate a new transval as a copy of counterblob1 */
newblob = (bytea *)palloc(sz);
memcpy(newblob, counterblob1, sz);
newtrans = (cmtransval *)(VARDATA(newblob));
newsketches = (countmin *)(newtrans)->sketches;
/* add in values from counterblob2 */
for (i = 0; i < RANGES; i++)
for (j = 0; j < DEPTH; j++)
for (k = 0; k < NUMCOUNTERS; k++)
newsketches[i][j][k] += sketches2[i][j][k];
if (newtrans->nargs == -1) {
/* transfer in the args from the other input */
newtrans->nargs = transval2->nargs;
for (i = 0; (int)i < transval2->nargs; i++)
newtrans->args[i] = transval2->args[i];
}
PG_RETURN_DATUM(PointerGetDatum(newblob));
}
/*
******* Below are scalar methods to manipulate completed sketches. ******
*/
/*!
* get the approximate count of objects with value arg
* \param sketch a countmin sketch
* \param arg the Datum we want to find the count of
* \param funcOid the Postgres function that converts arg to a string
*/
int64 cmsketch_count_c(countmin sketch, Datum arg, Oid funcOid, Oid typOid)
{
bytea *nhash;
/* get the md5 hash of the argument. */
nhash = sketch_md5_bytea(arg, typOid);
return(cmsketch_count_md5_datum(sketch, nhash, funcOid));
}
int64 cmsketch_count_md5_datum(countmin sketch, bytea *md5_bytea, Oid funcOid)
{
(void) funcOid; /* avoid warning about unused parameter */
/* iterate through the sketches, finding the min counter associated with this hash */
return(hash_counters_iterate(md5_bytea, sketch, INT64_MAX,
&min_counter));
}
/****** SUPPORT ROUTINES *******/
PG_FUNCTION_INFO_V1(cmsketch_dump);
/*!
* dump sketch contents
*/
Datum cmsketch_dump(PG_FUNCTION_ARGS)
{
bytea * transblob = (bytea *)PG_GETARG_BYTEA_P(0);
countmin *sketches;
char * newblob = (char *)palloc(10240);
uint32 i, j, k, c;
sketches = ((cmtransval *)VARDATA(transblob))->sketches;
for (i=0, c=0; i < RANGES; i++)
for (j=0; j < DEPTH; j++)
for(k=0; k < NUMCOUNTERS; k++) {
if (sketches[i][j][k] != 0)
c += sprintf(&newblob[c], "[(%d,%d,%d):" INT64_FORMAT
"], ", i, j, k, sketches[i][j][k]);
if (c > 10000) break;
}
newblob[c] = '\0';
PG_RETURN_NULL();
}
/*!
* for each row of the sketch, use the 16 bits starting at 2^i mod NUMCOUNTERS,
* and invoke the lambda on those 16 bits (which may destructively modify counters).
* \param hashval the MD5 hashed value that we take 16 bits at a time
* \param sketch the cmsketch
* \param initial the initialized return value
* \param lambdaptr the function to invoke on each 16 bits
*/
int64 hash_counters_iterate(bytea *hashval,
countmin sketch, /* width is DEPTH*NUMCOUNTERS */
int64 initial,
int64 (*lambdaptr)(uint32,
uint32,
countmin,
int64))
{
uint32 i, col;
char *c;
unsigned short twobytes;
int64 retval = initial;
/*
* XXX WARNING: are there problems with unaligned access here?
* XXX I was using memmove rather than casting, which was inefficient,
* XXX but I was hoping memmove would deal with unaligned access in a portable way.
* XXX However the deref of 2 bytes seems to work OK.
*/
for (i = 0, c = (char *)VARDATA(hashval);
i < DEPTH;
i++, c += 2) {
twobytes = *(unsigned short *)c;
col = twobytes % NUMCOUNTERS;
retval = (*lambdaptr)(i, col, sketch, retval);
}
return retval;
}
/*!
* destructive increment lambda for hash_counters_iterate.
* transval and return val not of particular interest here.
* \param i which row to update
* \param col which column to update
* \param sketch the sketch
* \param transval we don't need transval here, but its part of the
* lambda interface for hash_counters_iterate
*/
int64 increment_counter(uint32 i,
uint32 col,
countmin sketch,
int64 transval)
{
(void) transval; /* avoid warning about unused parameter */
int64 oldval = sketch[i][col];
if (sketch[i][col] == (INT64_MAX))
elog(ERROR, "maximum count exceeded in sketch");
sketch[i][col] = oldval + 1;
/* return the incremented value, though unlikely anyone cares. */
return oldval+1;
}
/*!
* running minimum lambda for hash_counters_iterate
* \param i which row to examine
* \param col which column to examine
* \param sketch the sketch
* \param transval smallest counter so far
* lambda interface for hash_counters_iterate
*/
int64 min_counter(uint32 i,
uint32 col,
countmin sketch,
int64 transval)
{
int64 thisval = sketch[i][col];
return (thisval < transval) ? thisval : transval;
}