| /*! |
| * \file countmin.c |
| * |
| * \brief CountMin sketch implementation |
| * |
| * \implementation |
| * The basic CountMin sketch is a set of DEPTH arrays, each with NUMCOUNTERS counters. |
| * The idea is that each of those arrays is used as an independent random trial of the |
| * same process: for all the values x in a set, each holds counts of h_i(x) mod NUMCOUNTERS for a different random hash function h_i. |
| * Estimates of the count of some value x are based on the <i>minimum</i> counter h_i(x) across |
| * the DEPTH arrays (hence the name CountMin.) |
| * |
| * Let's call the process described above "sketching" the x's. To support range |
| * lookups, we repeat the basic CountMin sketching process INT64BITS times as follows. |
| * (This is the "dyadic range" trick mentioned in Cormode/Muthu.) |
| * |
| * Every value x/(2^i) is sketched |
| * at a different power-of-2 (dyadic) "range" i. So we sketch x in range 0, then sketch x/2 |
| * in range 1, then sketch x/4 in range 2, etc. |
| * This allows us to count up ranges (like 14-48) by doing CountMin equality lookups on constituent |
| * dyadic ranges ({[14-15] as 7 in range 2, [16-31] as 1 in range 16, [32-47] as 2 in range 16, [48-48] as 48 in range 1}). |
| * Dyadic ranges are similarly useful for histogramming, order stats, etc. |
| * |
| * The results of the estimators below generally have guarantees of the form |
| * "the answer is within \epsilon of the true answer with probability 1-\delta." |
| */ |
| |
| #include <postgres.h> |
| #include <utils/fmgroids.h> |
| #include <utils/array.h> |
| #include <utils/elog.h> |
| #include <utils/builtins.h> |
| #include <utils/lsyscache.h> |
| #include <utils/numeric.h> |
| #include <nodes/execnodes.h> |
| #include <fmgr.h> |
| #include <catalog/pg_type.h> |
| #include <ctype.h> |
| #include "sketch_support.h" |
| #include "countmin.h" |
| |
| PG_FUNCTION_INFO_V1(__cmsketch_int8_trans); |
| |
| /* |
| * This is the UDF interface. It does sanity checks and preps values |
| * for the interesting logic in countmin_dyadic_trans_c |
| */ |
| Datum __cmsketch_int8_trans(PG_FUNCTION_ARGS) |
| { |
| bytea * transblob = NULL; |
| cmtransval *transval; |
| |
| /* |
| * This function makes destructive updates to its arguments. |
| * Make sure it's being called in an agg context. |
| */ |
| |
| if (!(fcinfo->context && |
| (IsA(fcinfo->context, AggState) |
| #ifdef NOTGP |
| || IsA(fcinfo->context, WindowAggState) |
| #endif |
| ))) |
| elog(ERROR, |
| "destructive pass by reference outside agg"); |
| |
| /* get the provided element, being careful in case it's NULL */ |
| if (!PG_ARGISNULL(1)) { |
| transblob = cmsketch_check_transval(fcinfo, true); |
| transval = (cmtransval *)(VARDATA(transblob)); |
| |
| /* the following line modifies the contents of transval, and hence transblob */ |
| countmin_dyadic_trans_c(transval, PG_GETARG_DATUM(1)); |
| PG_RETURN_DATUM(PointerGetDatum(transblob)); |
| } |
| else PG_RETURN_DATUM(PointerGetDatum(PG_GETARG_BYTEA_P(0))); |
| } |
| |
| /*! |
| * check if the transblob is not initialized, and do so if not |
| * \param transblob a cmsketch transval packed in a bytea |
| */ |
| bytea *cmsketch_check_transval(PG_FUNCTION_ARGS, bool initargs) |
| { |
| bytea * transblob = PG_GETARG_BYTEA_P(0); |
| cmtransval *transval; |
| |
| /* |
| * an uninitialized transval should be a datum smaller than sizeof(cmtransval). |
| * if this one is small, initialize it now, else return it. |
| */ |
| if (!CM_TRANSVAL_INITIALIZED(transblob)) { |
| /* XXX would be nice to pfree the existing transblob, but pfree complains. */ |
| transblob = cmsketch_init_transval(); |
| transval = (cmtransval *)VARDATA(transblob); |
| |
| if (initargs) { |
| int nargs = PG_NARGS(); |
| int i; |
| /* carry along any additional args */ |
| if (nargs - 2 > MAXARGS) |
| elog( |
| ERROR, |
| "no more than %d additional arguments should be passed to __cmsketch_int8_trans", |
| MAXARGS); |
| transval->nargs = nargs - 2; |
| for (i = 2; i < nargs; i++) { |
| if (PG_ARGISNULL(i)) |
| elog(ERROR, |
| "NULL parameter %d passed to __cmsketch_int8_trans", |
| i); |
| transval->args[i-2] = PG_GETARG_INT64(i); |
| } |
| } |
| else transval->nargs = -1; |
| } |
| return(transblob); |
| } |
| |
| bytea *cmsketch_init_transval() |
| { |
| /* allocate and zero out a transval via palloc0 */ |
| bytea * transblob = (bytea *)palloc0(CM_TRANSVAL_SZ); |
| SET_VARSIZE(transblob, CM_TRANSVAL_SZ); |
| |
| return(transblob); |
| } |
| |
| /*! |
| * perform multiple sketch insertions, one for each dyadic range (from 0 up to RANGES-1). |
| * * \param transval the cmsketch transval |
| * * \param inputi the value to be inserted |
| */ |
| void countmin_dyadic_trans_c(cmtransval *transval, Datum input) |
| { |
| uint32 j; |
| |
| for (j = 0; j < RANGES; j++) { |
| countmin_trans_c(transval->sketches[j], input, |
| F_INT8OUT, INT8OID); |
| /* now divide by 2 for the next dyadic range */ |
| input = Int64GetDatum(DatumGetInt64(input) >> 1); |
| } |
| } |
| |
| /*! |
| * Main loop of Cormode and Muthukrishnan's sketching algorithm, for setting counters in |
| * sketches at a single "dyadic range". For each call, we want to use DEPTH independent |
| * hash functions. We do this by using a single md5 hash function, and taking |
| * successive 16-bit runs of the result as independent hash outputs. |
| * \param sketch the current countmin sketch |
| * \param dat the datum to be inserted |
| * \param outFuncOid Oid of the PostgreSQL function to convert dat to a string |
| * \param typOid Oid of the Postgres type for dat |
| */ |
| Datum countmin_trans_c(countmin sketch, Datum dat, Oid outFuncOid, Oid typOid) |
| { |
| (void) outFuncOid; /* avoid warning about unused parameter */ |
| bytea *nhash; |
| |
| nhash = sketch_md5_bytea(dat, typOid); |
| |
| /* |
| * iterate through all sketches, incrementing the counters indicated by the hash |
| * we don't care about return value here, so 3rd (initialization) argument is arbitrary. |
| */ |
| (void)hash_counters_iterate(nhash, sketch, 0, &increment_counter); |
| return(PointerGetDatum(nhash)); |
| } |
| |
| /* |
| * FINAL functions for various UDAs built on countmin sketches |
| */ |
| |
| /* |
| * pasted code from utils/encode.c, facilitating __cmsketch_base64_final |
| */ |
| static const char _base64[] = |
| "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"; |
| |
| static unsigned |
| b64_encode(const char *src, unsigned len, char *dst) |
| { |
| char *p, |
| *lend = dst + 76; |
| const char *s, |
| *end = src + len; |
| int pos = 2; |
| uint32 buf = 0; |
| |
| s = src; |
| p = dst; |
| |
| while (s < end) |
| { |
| buf |= (unsigned char) *s << (pos << 3); |
| pos--; |
| s++; |
| |
| /* write it out */ |
| if (pos < 0) |
| { |
| *p++ = _base64[(buf >> 18) & 0x3f]; |
| *p++ = _base64[(buf >> 12) & 0x3f]; |
| *p++ = _base64[(buf >> 6) & 0x3f]; |
| *p++ = _base64[buf & 0x3f]; |
| |
| pos = 2; |
| buf = 0; |
| } |
| if (p >= lend) |
| { |
| *p++ = '\n'; |
| lend = p + 76; |
| } |
| } |
| if (pos != 2) |
| { |
| *p++ = _base64[(buf >> 18) & 0x3f]; |
| *p++ = _base64[(buf >> 12) & 0x3f]; |
| *p++ = (pos == 0) ? _base64[(buf >> 6) & 0x3f] : '='; |
| *p++ = '='; |
| } |
| |
| return p - dst; |
| } |
| |
| static unsigned |
| b64_enc_len(unsigned srclen) |
| { |
| /* 3 bytes will be converted to 4, linefeed after 76 chars */ |
| return (srclen + 2) * 4 / 3 + srclen / (76 * 3 / 4); |
| } |
| // done pasted code |
| |
| /*! |
| * return the array of sketch counters as a int8 |
| */ |
| PG_FUNCTION_INFO_V1(__cmsketch_base64_final); |
| Datum __cmsketch_base64_final(PG_FUNCTION_ARGS) |
| { |
| bytea * blob = PG_GETARG_BYTEA_P(0); |
| cmtransval *sketch = NULL; |
| int len = RANGES*sizeof(countmin) + VARHDRSZ; |
| bytea *out = NULL; |
| |
| if (VARSIZE(blob) > VARHDRSZ && !CM_TRANSVAL_INITIALIZED(blob)) { |
| elog(ERROR, "invalid transition state for cmsketch"); |
| } |
| |
| out = palloc0(len); |
| if (VARSIZE(blob) > VARHDRSZ) { |
| sketch = (cmtransval *)VARDATA(blob); |
| memcpy((uint8 *)VARDATA(out), sketch->sketches, len - VARHDRSZ); |
| } |
| SET_VARSIZE(out, len); |
| |
| // pasted code from utils/builtins.h:binary_encode() |
| bytea *data = out; |
| text *result; |
| int datalen, |
| resultlen, |
| res; |
| datalen = VARSIZE(data) - VARHDRSZ; |
| resultlen = b64_enc_len(datalen); |
| result = palloc(VARHDRSZ + resultlen); |
| res = b64_encode(VARDATA(data), datalen, VARDATA(result)); |
| /* Make this FATAL 'cause we've trodden on memory ... */ |
| if (res > resultlen) |
| elog(FATAL, "overflow - encode estimate too small"); |
| SET_VARSIZE(result, VARHDRSZ + res); |
| |
| PG_RETURN_TEXT_P(result); |
| } |
| |
| /*! |
| * Greenplum "prefunc" to combine sketches from multiple machines |
| */ |
| PG_FUNCTION_INFO_V1(__cmsketch_merge); |
| Datum __cmsketch_merge(PG_FUNCTION_ARGS) |
| { |
| bytea * counterblob1 = PG_GETARG_BYTEA_P(0); |
| bytea * counterblob2 = PG_GETARG_BYTEA_P(1); |
| cmtransval *transval2 = (cmtransval *)VARDATA(counterblob2); |
| cmtransval *newtrans; |
| countmin * sketches2 = NULL; |
| bytea * newblob; |
| countmin * newsketches; |
| uint32 i, j, k; |
| int sz; |
| |
| /* make sure they're initialized! */ |
| if (!CM_TRANSVAL_INITIALIZED(counterblob1) |
| && !CM_TRANSVAL_INITIALIZED(counterblob2)) |
| /* if both are empty can return one of them */ |
| PG_RETURN_DATUM(PointerGetDatum(counterblob1)); |
| else if (!CM_TRANSVAL_INITIALIZED(counterblob1)) { |
| counterblob1 = cmsketch_init_transval(); |
| } |
| else if (!CM_TRANSVAL_INITIALIZED(counterblob2)) { |
| counterblob2 = cmsketch_init_transval(); |
| transval2 = (cmtransval *)VARDATA(counterblob2); |
| } |
| |
| sketches2 = transval2 ->sketches; |
| |
| sz = VARSIZE(counterblob1); |
| /* allocate a new transval as a copy of counterblob1 */ |
| newblob = (bytea *)palloc(sz); |
| memcpy(newblob, counterblob1, sz); |
| newtrans = (cmtransval *)(VARDATA(newblob)); |
| newsketches = (countmin *)(newtrans)->sketches; |
| |
| /* add in values from counterblob2 */ |
| for (i = 0; i < RANGES; i++) |
| for (j = 0; j < DEPTH; j++) |
| for (k = 0; k < NUMCOUNTERS; k++) |
| newsketches[i][j][k] += sketches2[i][j][k]; |
| |
| if (newtrans->nargs == -1) { |
| /* transfer in the args from the other input */ |
| newtrans->nargs = transval2->nargs; |
| for (i = 0; (int)i < transval2->nargs; i++) |
| newtrans->args[i] = transval2->args[i]; |
| } |
| |
| PG_RETURN_DATUM(PointerGetDatum(newblob)); |
| } |
| |
| |
| |
| /* |
| ******* Below are scalar methods to manipulate completed sketches. ****** |
| */ |
| |
| /*! |
| * get the approximate count of objects with value arg |
| * \param sketch a countmin sketch |
| * \param arg the Datum we want to find the count of |
| * \param funcOid the Postgres function that converts arg to a string |
| */ |
| int64 cmsketch_count_c(countmin sketch, Datum arg, Oid funcOid, Oid typOid) |
| { |
| bytea *nhash; |
| |
| /* get the md5 hash of the argument. */ |
| nhash = sketch_md5_bytea(arg, typOid); |
| return(cmsketch_count_md5_datum(sketch, nhash, funcOid)); |
| } |
| |
| int64 cmsketch_count_md5_datum(countmin sketch, bytea *md5_bytea, Oid funcOid) |
| { |
| (void) funcOid; /* avoid warning about unused parameter */ |
| /* iterate through the sketches, finding the min counter associated with this hash */ |
| return(hash_counters_iterate(md5_bytea, sketch, INT64_MAX, |
| &min_counter)); |
| } |
| |
| |
| #if 0 |
| /****** SUPPORT ROUTINES *******/ |
| PG_FUNCTION_INFO_V1(cmsketch_dump); |
| |
| /*! |
| * dump sketch contents |
| */ |
| Datum cmsketch_dump(PG_FUNCTION_ARGS) |
| { |
| bytea * transblob = (bytea *)PG_GETARG_BYTEA_P(0); |
| countmin *sketches; |
| char * newblob = (char *)palloc(10240); |
| uint32 i, j, k, c; |
| |
| if (VARSIZE(transblob) > VARHDRSZ && !CM_TRANSVAL_INITIALIZED(transblob)) { |
| elog(ERROR, "invalid transition state for cmsketch"); |
| } |
| |
| if (VARSIZE(transblob) > VARHDRSZ) { |
| sketches = ((cmtransval *)VARDATA(transblob))->sketches; |
| } |
| else { |
| sketches = palloc0(RANGES*sizeof(countmin) + VARHDRSZ); |
| SET_VARSIZE(sketches, RANGES*sizeof(countmin) + VARHDRSZ); |
| } |
| for (i=0, c=0; i < RANGES; i++) |
| for (j=0; j < DEPTH; j++) |
| for(k=0; k < NUMCOUNTERS; k++) { |
| if (sketches[i][j][k] != 0) |
| c += sprintf(&newblob[c], "[(%d,%d,%d):" INT64_FORMAT |
| "], ", i, j, k, sketches[i][j][k]); |
| if (c > 10000) break; |
| } |
| newblob[c] = '\0'; |
| if (VARSIZE(transblob) <= VARHDRSZ) { |
| pfree(sketches); |
| } |
| PG_RETURN_NULL(); |
| } |
| #endif |
| |
| |
| /*! |
| * for each row of the sketch, use the 16 bits starting at 2^i mod NUMCOUNTERS, |
| * and invoke the lambda on those 16 bits (which may destructively modify counters). |
| * \param hashval the MD5 hashed value that we take 16 bits at a time |
| * \param sketch the cmsketch |
| * \param initial the initialized return value |
| * \param lambdaptr the function to invoke on each 16 bits |
| */ |
| int64 hash_counters_iterate(bytea *hashval, |
| countmin sketch, /* width is DEPTH*NUMCOUNTERS */ |
| int64 initial, |
| int64 (*lambdaptr)(uint32, |
| uint32, |
| countmin, |
| int64)) |
| { |
| uint32 i, col; |
| char *c; |
| unsigned short twobytes; |
| int64 retval = initial; |
| |
| /* |
| * XXX WARNING: are there problems with unaligned access here? |
| * XXX I was using memmove rather than casting, which was inefficient, |
| * XXX but I was hoping memmove would deal with unaligned access in a portable way. |
| * XXX However the deref of 2 bytes seems to work OK. |
| */ |
| for (i = 0, c = (char *)VARDATA(hashval); |
| i < DEPTH; |
| i++, c += 2) { |
| twobytes = *(unsigned short *)c; |
| col = twobytes % NUMCOUNTERS; |
| retval = (*lambdaptr)(i, col, sketch, retval); |
| } |
| return retval; |
| } |
| |
| /*! |
| * destructive increment lambda for hash_counters_iterate. |
| * transval and return val not of particular interest here. |
| * \param i which row to update |
| * \param col which column to update |
| * \param sketch the sketch |
| * \param transval we don't need transval here, but its part of the |
| * lambda interface for hash_counters_iterate |
| */ |
| |
| int64 increment_counter(uint32 i, |
| uint32 col, |
| countmin sketch, |
| int64 transval) |
| { |
| (void) transval; /* avoid warning about unused parameter */ |
| int64 oldval = sketch[i][col]; |
| if (sketch[i][col] == (INT64_MAX)) |
| elog(ERROR, "maximum count exceeded in sketch"); |
| sketch[i][col] = oldval + 1; |
| |
| /* return the incremented value, though unlikely anyone cares. */ |
| return oldval+1; |
| } |
| |
| /*! |
| * running minimum lambda for hash_counters_iterate |
| * \param i which row to examine |
| * \param col which column to examine |
| * \param sketch the sketch |
| * \param transval smallest counter so far |
| * lambda interface for hash_counters_iterate |
| */ |
| int64 min_counter(uint32 i, |
| uint32 col, |
| countmin sketch, |
| int64 transval) |
| { |
| int64 thisval = sketch[i][col]; |
| return (thisval < transval) ? thisval : transval; |
| } |