| /*! |
| * \file countmin.c |
| * |
| * \brief CountMin sketch implementation |
| * |
| * \implementation |
| * The basic CountMin sketch is a set of DEPTH arrays, each with NUMCOUNTERS counters. |
| * The idea is that each of those arrays is used as an independent random trial of the |
| * same process: for all the values x in a set, each holds counts of h_i(x) mod NUMCOUNTERS for a different random hash function h_i. |
| * Estimates of the count of some value x are based on the <i>minimum</i> counter h_i(x) across |
| * the DEPTH arrays (hence the name CountMin.) |
| * |
| * Let's call the process described above "sketching" the x's. To support range |
| * lookups, we repeat the basic CountMin sketching process INT64BITS times as follows. |
| * (This is the "dyadic range" trick mentioned in Cormode/Muthu.) |
| * |
| * Every value x/(2^i) is sketched |
| * at a different power-of-2 (dyadic) "range" i. So we sketch x in range 0, then sketch x/2 |
| * in range 1, then sketch x/4 in range 2, etc. |
| * This allows us to count up ranges (like 14-48) by doing CountMin equality lookups on constituent |
| * dyadic ranges ({[14-15] as 7 in range 2, [16-31] as 1 in range 16, [32-47] as 2 in range 16, [48-48] as 48 in range 1}). |
| * Dyadic ranges are similarly useful for histogramming, order stats, etc. |
| * |
| * The results of the estimators below generally have guarantees of the form |
| * "the answer is within \epsilon of the true answer with probability 1-\delta." |
| */ |
| |
| #include "postgres.h" |
| #include "utils/array.h" |
| #include "utils/elog.h" |
| #include "utils/builtins.h" |
| #include "utils/lsyscache.h" |
| #include "utils/numeric.h" |
| #include "nodes/execnodes.h" |
| #include "fmgr.h" |
| #include "sketch_support.h" |
| #include "catalog/pg_type.h" |
| #include "countmin.h" |
| |
| #include <ctype.h> |
| |
| PG_FUNCTION_INFO_V1(__cmsketch_int8_trans); |
| |
| /* |
| * This is the UDF interface. It does sanity checks and preps values |
| * for the interesting logic in countmin_dyadic_trans_c |
| */ |
| Datum __cmsketch_int8_trans(PG_FUNCTION_ARGS) |
| { |
| bytea * transblob = NULL; |
| cmtransval *transval; |
| |
| /* |
| * This function makes destructive updates to its arguments. |
| * Make sure it's being called in an agg context. |
| */ |
| |
| if (!(fcinfo->context && |
| (IsA(fcinfo->context, AggState) |
| #ifdef NOTGP |
| || IsA(fcinfo->context, WindowAggState) |
| #endif |
| ))) |
| elog(ERROR, |
| "destructive pass by reference outside agg"); |
| |
| /* get the provided element, being careful in case it's NULL */ |
| if (!PG_ARGISNULL(1)) { |
| transblob = cmsketch_check_transval(fcinfo, true); |
| transval = (cmtransval *)(VARDATA(transblob)); |
| |
| /* the following line modifies the contents of transval, and hence transblob */ |
| countmin_dyadic_trans_c(transval, PG_GETARG_DATUM(1)); |
| PG_RETURN_DATUM(PointerGetDatum(transblob)); |
| } |
| else PG_RETURN_DATUM(PointerGetDatum(PG_GETARG_BYTEA_P(0))); |
| } |
| |
| /*! |
| * check if the transblob is not initialized, and do so if not |
| * \param transblob a cmsketch transval packed in a bytea |
| */ |
| bytea *cmsketch_check_transval(PG_FUNCTION_ARGS, bool initargs) |
| { |
| bytea * transblob = PG_GETARG_BYTEA_P(0); |
| cmtransval *transval; |
| Oid element_type = get_fn_expr_argtype(fcinfo->flinfo, 1); |
| |
| /* |
| * an uninitialized transval should be a datum smaller than sizeof(cmtransval). |
| * if this one is small, initialize it now, else return it. |
| */ |
| if (!CM_TRANSVAL_INITIALIZED(transblob)) { |
| /* XXX would be nice to pfree the existing transblob, but pfree complains. */ |
| transblob = cmsketch_init_transval(element_type); |
| transval = (cmtransval *)VARDATA(transblob); |
| |
| if (initargs) { |
| int nargs = PG_NARGS(); |
| int i; |
| /* carry along any additional args */ |
| if (nargs - 2 > MAXARGS) |
| elog( |
| ERROR, |
| "no more than %d additional arguments should be passed to __cmsketch_int8_trans", |
| MAXARGS); |
| transval->nargs = nargs - 2; |
| for (i = 2; i < nargs; i++) { |
| if (PG_ARGISNULL(i)) |
| elog(ERROR, |
| "NULL parameter %d passed to __cmsketch_int8_trans", |
| i); |
| transval->args[i-2] = PG_GETARG_DATUM(i); |
| } |
| } |
| else transval->nargs = -1; |
| } |
| return(transblob); |
| } |
| |
| bytea *cmsketch_init_transval(Oid typOid) |
| { |
| bool typIsVarlena; |
| cmtransval *transval; |
| |
| /* allocate and zero out a transval via palloc0 */ |
| bytea * transblob = (bytea *)palloc0(CM_TRANSVAL_SZ); |
| SET_VARSIZE(transblob, CM_TRANSVAL_SZ); |
| |
| transval = (cmtransval *)VARDATA(transblob); |
| transval->typOid = typOid; |
| getTypeOutputInfo(transval->typOid, |
| &(transval->outFuncOid), |
| &typIsVarlena); |
| return(transblob); |
| } |
| |
| /*! |
| * perform multiple sketch insertions, one for each dyadic range (from 0 up to RANGES-1). |
| * * \param transval the cmsketch transval |
| * * \param inputi the value to be inserted |
| */ |
| void countmin_dyadic_trans_c(cmtransval *transval, Datum input) |
| { |
| uint32 j; |
| |
| if (transval->typOid != INT8OID) |
| elog(ERROR, "cmsketch can only compute ranges for int64"); |
| |
| for (j = 0; j < RANGES; j++) { |
| countmin_trans_c(transval->sketches[j], input, |
| transval->outFuncOid, transval->typOid); |
| /* now divide by 2 for the next dyadic range */ |
| input = Int64GetDatum(DatumGetInt64(input) >> 1); |
| } |
| } |
| |
| /*! |
| * Main loop of Cormode and Muthukrishnan's sketching algorithm, for setting counters in |
| * sketches at a single "dyadic range". For each call, we want to use DEPTH independent |
| * hash functions. We do this by using a single md5 hash function, and taking |
| * successive 16-bit runs of the result as independent hash outputs. |
| * \param sketch the current countmin sketch |
| * \param dat the datum to be inserted |
| * \param outFuncOid Oid of the PostgreSQL function to convert dat to a string |
| * \param typOid Oid of the Postgres type for dat |
| */ |
| Datum countmin_trans_c(countmin sketch, Datum dat, Oid outFuncOid, Oid typOid) |
| { |
| (void) outFuncOid; /* avoid warning about unused parameter */ |
| bytea *nhash; |
| |
| nhash = sketch_md5_bytea(dat, typOid); |
| |
| /* |
| * iterate through all sketches, incrementing the counters indicated by the hash |
| * we don't care about return value here, so 3rd (initialization) argument is arbitrary. |
| */ |
| (void)hash_counters_iterate(nhash, sketch, 0, &increment_counter); |
| return(PointerGetDatum(nhash)); |
| } |
| |
| /* |
| * FINAL functions for various UDAs built on countmin sketches |
| */ |
| |
| /*! |
| * return the array of sketch counters as a bytea |
| */ |
| PG_FUNCTION_INFO_V1(__cmsketch_final); |
| Datum __cmsketch_final(PG_FUNCTION_ARGS) |
| { |
| bytea * blob = PG_GETARG_BYTEA_P(0); |
| cmtransval *sketch = (cmtransval *)VARDATA(blob); |
| int len = RANGES*sizeof(countmin) + VARHDRSZ; |
| bytea *out = palloc0(len); |
| |
| memcpy((uint8 *)VARDATA(out), sketch->sketches, len - VARHDRSZ); |
| SET_VARSIZE(out, len); |
| |
| PG_RETURN_BYTEA_P(out); |
| } |
| |
| /*! |
| * Greenplum "prefunc" to combine sketches from multiple machines |
| */ |
| PG_FUNCTION_INFO_V1(__cmsketch_merge); |
| Datum __cmsketch_merge(PG_FUNCTION_ARGS) |
| { |
| bytea * counterblob1 = PG_GETARG_BYTEA_P(0); |
| bytea * counterblob2 = PG_GETARG_BYTEA_P(1); |
| cmtransval *transval1 = (cmtransval *)VARDATA(counterblob1); |
| cmtransval *transval2 = (cmtransval *)VARDATA(counterblob2); |
| cmtransval *newtrans; |
| countmin * sketches2 = (countmin *) |
| ((cmtransval *)(VARDATA(counterblob2)))->sketches; |
| bytea * newblob; |
| countmin * newsketches; |
| uint32 i, j, k; |
| int sz; |
| |
| /* make sure they're initialized! */ |
| if (!CM_TRANSVAL_INITIALIZED(counterblob1) |
| && !CM_TRANSVAL_INITIALIZED(counterblob2)) |
| /* if both are empty can return one of them */ |
| PG_RETURN_DATUM(PointerGetDatum(counterblob1)); |
| else if (!CM_TRANSVAL_INITIALIZED(counterblob1)) { |
| counterblob1 = cmsketch_init_transval(transval2->typOid); |
| transval1 = (cmtransval *)VARDATA(counterblob1); |
| } |
| else if (!CM_TRANSVAL_INITIALIZED(counterblob2)) { |
| counterblob2 = cmsketch_init_transval(transval1->typOid); |
| transval2 = (cmtransval *)VARDATA(counterblob2); |
| } |
| |
| sz = VARSIZE(counterblob1); |
| /* allocate a new transval as a copy of counterblob1 */ |
| newblob = (bytea *)palloc(sz); |
| memcpy(newblob, counterblob1, sz); |
| newtrans = (cmtransval *)(VARDATA(newblob)); |
| newsketches = (countmin *)(newtrans)->sketches; |
| |
| /* add in values from counterblob2 */ |
| for (i = 0; i < RANGES; i++) |
| for (j = 0; j < DEPTH; j++) |
| for (k = 0; k < NUMCOUNTERS; k++) |
| newsketches[i][j][k] += sketches2[i][j][k]; |
| |
| if (newtrans->nargs == -1) { |
| /* transfer in the args from the other input */ |
| newtrans->nargs = transval2->nargs; |
| for (i = 0; (int)i < transval2->nargs; i++) |
| newtrans->args[i] = transval2->args[i]; |
| } |
| |
| PG_RETURN_DATUM(PointerGetDatum(newblob)); |
| } |
| |
| |
| |
| /* |
| ******* Below are scalar methods to manipulate completed sketches. ****** |
| */ |
| |
| /*! |
| * get the approximate count of objects with value arg |
| * \param sketch a countmin sketch |
| * \param arg the Datum we want to find the count of |
| * \param funcOid the Postgres function that converts arg to a string |
| */ |
| int64 cmsketch_count_c(countmin sketch, Datum arg, Oid funcOid, Oid typOid) |
| { |
| bytea *nhash; |
| |
| /* get the md5 hash of the argument. */ |
| nhash = sketch_md5_bytea(arg, typOid); |
| return(cmsketch_count_md5_datum(sketch, nhash, funcOid)); |
| } |
| |
| int64 cmsketch_count_md5_datum(countmin sketch, bytea *md5_bytea, Oid funcOid) |
| { |
| (void) funcOid; /* avoid warning about unused parameter */ |
| /* iterate through the sketches, finding the min counter associated with this hash */ |
| return(hash_counters_iterate(md5_bytea, sketch, INT64_MAX, |
| &min_counter)); |
| } |
| |
| |
| /****** SUPPORT ROUTINES *******/ |
| PG_FUNCTION_INFO_V1(cmsketch_dump); |
| |
| /*! |
| * dump sketch contents |
| */ |
| Datum cmsketch_dump(PG_FUNCTION_ARGS) |
| { |
| bytea * transblob = (bytea *)PG_GETARG_BYTEA_P(0); |
| countmin *sketches; |
| char * newblob = (char *)palloc(10240); |
| uint32 i, j, k, c; |
| |
| sketches = ((cmtransval *)VARDATA(transblob))->sketches; |
| for (i=0, c=0; i < RANGES; i++) |
| for (j=0; j < DEPTH; j++) |
| for(k=0; k < NUMCOUNTERS; k++) { |
| if (sketches[i][j][k] != 0) |
| c += sprintf(&newblob[c], "[(%d,%d,%d):" INT64_FORMAT |
| "], ", i, j, k, sketches[i][j][k]); |
| if (c > 10000) break; |
| } |
| newblob[c] = '\0'; |
| PG_RETURN_NULL(); |
| } |
| |
| |
| /*! |
| * for each row of the sketch, use the 16 bits starting at 2^i mod NUMCOUNTERS, |
| * and invoke the lambda on those 16 bits (which may destructively modify counters). |
| * \param hashval the MD5 hashed value that we take 16 bits at a time |
| * \param sketch the cmsketch |
| * \param initial the initialized return value |
| * \param lambdaptr the function to invoke on each 16 bits |
| */ |
| int64 hash_counters_iterate(bytea *hashval, |
| countmin sketch, /* width is DEPTH*NUMCOUNTERS */ |
| int64 initial, |
| int64 (*lambdaptr)(uint32, |
| uint32, |
| countmin, |
| int64)) |
| { |
| uint32 i, col; |
| char *c; |
| unsigned short twobytes; |
| int64 retval = initial; |
| |
| /* |
| * XXX WARNING: are there problems with unaligned access here? |
| * XXX I was using memmove rather than casting, which was inefficient, |
| * XXX but I was hoping memmove would deal with unaligned access in a portable way. |
| * XXX However the deref of 2 bytes seems to work OK. |
| */ |
| for (i = 0, c = (char *)VARDATA(hashval); |
| i < DEPTH; |
| i++, c += 2) { |
| twobytes = *(unsigned short *)c; |
| col = twobytes % NUMCOUNTERS; |
| retval = (*lambdaptr)(i, col, sketch, retval); |
| } |
| return retval; |
| } |
| |
| /*! |
| * destructive increment lambda for hash_counters_iterate. |
| * transval and return val not of particular interest here. |
| * \param i which row to update |
| * \param col which column to update |
| * \param sketch the sketch |
| * \param transval we don't need transval here, but its part of the |
| * lambda interface for hash_counters_iterate |
| */ |
| |
| int64 increment_counter(uint32 i, |
| uint32 col, |
| countmin sketch, |
| int64 transval) |
| { |
| (void) transval; /* avoid warning about unused parameter */ |
| int64 oldval = sketch[i][col]; |
| if (sketch[i][col] == (INT64_MAX)) |
| elog(ERROR, "maximum count exceeded in sketch"); |
| sketch[i][col] = oldval + 1; |
| |
| /* return the incremented value, though unlikely anyone cares. */ |
| return oldval+1; |
| } |
| |
| /*! |
| * running minimum lambda for hash_counters_iterate |
| * \param i which row to examine |
| * \param col which column to examine |
| * \param sketch the sketch |
| * \param transval smallest counter so far |
| * lambda interface for hash_counters_iterate |
| */ |
| int64 min_counter(uint32 i, |
| uint32 col, |
| countmin sketch, |
| int64 transval) |
| { |
| int64 thisval = sketch[i][col]; |
| return (thisval < transval) ? thisval : transval; |
| } |