| /*! |
| * \file countmin.c |
| * |
| * \brief CountMin sketch implementation |
| * |
| * \implementation |
| * The basic CountMin sketch is a set of DEPTH arrays, each with NUMCOUNTERS counters. |
| * The idea is that each of those arrays is used as an independent random trial of the |
| * same process: for all the values x in a set, each holds counts of h_i(x) mod NUMCOUNTERS for a different random hash function h_i. |
| * Estimates of the count of some value x are based on the <i>minimum</i> counter h_i(x) across |
| * the DEPTH arrays (hence the name CountMin.) |
| * |
| * Let's call the process described above "sketching" the x's. To support range |
| * lookups, we repeat the basic CountMin sketching process INT64BITS times as follows. |
| * (This is the "dyadic range" trick mentioned in Cormode/Muthu.) |
| * |
| * Every value x/(2^i) is sketched |
| * at a different power-of-2 (dyadic) "range" i. So we sketch x in range 0, then sketch x/2 |
| * in range 1, then sketch x/4 in range 2, etc. |
| * This allows us to count up ranges (like 14-48) by doing CountMin equality lookups on constituent |
| * dyadic ranges ({[14-15] as 7 in range 2, [16-31] as 1 in range 16, [32-47] as 2 in range 16, [48-48] as 48 in range 1}). |
| * Dyadic ranges are similarly useful for histogramming, order stats, etc. |
| * |
| * The results of the estimators below generally have guarantees of the form |
| * "the answer is within \epsilon of the true answer with probability 1-\delta." |
| */ |
| |
| #include "postgres.h" |
| #include "utils/array.h" |
| #include "utils/elog.h" |
| #include "utils/builtins.h" |
| #include "utils/lsyscache.h" |
| #include "utils/numeric.h" |
| #include "nodes/execnodes.h" |
| #include "fmgr.h" |
| #include "sketch_support.h" |
| #include "catalog/pg_type.h" |
| #include "countmin.h" |
| |
| #include <ctype.h> |
| |
| PG_FUNCTION_INFO_V1(__cmsketch_trans); |
| |
| /* |
| * This is the UDF interface. It does sanity checks and preps values |
| * for the interesting logic in countmin_dyadic_trans_c |
| */ |
| Datum __cmsketch_trans(PG_FUNCTION_ARGS) |
| { |
| bytea * transblob = NULL; |
| cmtransval *transval; |
| |
| /* |
| * This function makes destructive updates to its arguments. |
| * Make sure it's being called in an agg context. |
| */ |
| |
| if (!(fcinfo->context && |
| (IsA(fcinfo->context, AggState) |
| #ifdef NOTGP |
| || IsA(fcinfo->context, WindowAggState) |
| #endif |
| ))) |
| elog(ERROR, |
| "destructive pass by reference outside agg"); |
| |
| /* get the provided element, being careful in case it's NULL */ |
| if (!PG_ARGISNULL(1)) { |
| transblob = cmsketch_check_transval(fcinfo, true); |
| transval = (cmtransval *)(VARDATA(transblob)); |
| |
| /* the following line modifies the contents of transval, and hence transblob */ |
| countmin_dyadic_trans_c(transval, PG_GETARG_INT64(1)); |
| PG_RETURN_DATUM(PointerGetDatum(transblob)); |
| } |
| else PG_RETURN_DATUM(PointerGetDatum(PG_GETARG_BYTEA_P(0))); |
| } |
| |
| /*! |
| * check if the transblob is not initialized, and do so if not |
| * \param transblob a cmsketch transval packed in a bytea |
| */ |
| bytea *cmsketch_check_transval(PG_FUNCTION_ARGS, bool initargs) |
| { |
| bytea * transblob = PG_GETARG_BYTEA_P(0); |
| cmtransval *transval; |
| |
| /* |
| * an uninitialized transval should be a datum smaller than sizeof(cmtransval). |
| * if this one is small, initialize it now, else return it. |
| */ |
| if (!CM_TRANSVAL_INITIALIZED(transblob)) { |
| /* XXX would be nice to pfree the existing transblob, but pfree complains. */ |
| transblob = cmsketch_init_transval(); |
| transval = (cmtransval *)VARDATA(transblob); |
| |
| if (initargs) { |
| int nargs = PG_NARGS(); |
| int i; |
| /* carry along any additional args */ |
| if (nargs - 2 > MAXARGS) |
| elog( |
| ERROR, |
| "no more than %d additional arguments should be passed to __cmsketch_trans", |
| MAXARGS); |
| transval->nargs = nargs - 2; |
| for (i = 2; i < nargs; i++) { |
| if (PG_ARGISNULL(i)) |
| elog(ERROR, |
| "NULL parameter %d passed to __cmsketch_trans", |
| i); |
| transval->args[i-2] = PG_GETARG_DATUM(i); |
| } |
| } |
| else transval->nargs = -1; |
| } |
| return(transblob); |
| } |
| |
| bytea *cmsketch_init_transval() |
| { |
| bool typIsVarlena; |
| cmtransval *transval; |
| |
| /* allocate and zero out a transval via palloc0 */ |
| bytea * transblob = (bytea *)palloc0(CM_TRANSVAL_SZ); |
| SET_VARSIZE(transblob, CM_TRANSVAL_SZ); |
| |
| /* set up for stringifying INT8's according to PG type rules */ |
| transval = (cmtransval *)VARDATA(transblob); |
| transval->typOid = INT8OID; /* as of now we only support INT8 */ |
| getTypeOutputInfo(transval->typOid, |
| &(transval->outFuncOid), |
| &typIsVarlena); |
| return(transblob); |
| } |
| |
| /*! |
| * perform multiple sketch insertions, one for each dyadic range (from 0 up to RANGES-1). |
| * * \param transval the cmsketch transval |
| * * \param inputi the value to be inserted |
| */ |
| void countmin_dyadic_trans_c(cmtransval *transval, int64 inputi) |
| { |
| uint32 j; |
| |
| for (j = 0; j < RANGES; j++) { |
| countmin_trans_c(transval->sketches[j], Int64GetDatum( |
| inputi), transval->outFuncOid); |
| /* now divide by 2 for the next dyadic range */ |
| inputi >>= 1; |
| } |
| } |
| |
| /*! |
| * Main loop of Cormode and Muthukrishnan's sketching algorithm, for setting counters in |
| * sketches at a single "dyadic range". For each call, we want to use DEPTH independent |
| * hash functions. We do this by using a single md5 hash function, and taking |
| * successive 16-bit runs of the result as independent hash outputs. |
| * \param sketch the current countmin sketch |
| * \param dat the datum to be inserted |
| * \param outFuncOid Oid of the PostgreSQL function to convert dat to a string |
| */ |
| char *countmin_trans_c(countmin sketch, Datum dat, Oid outFuncOid) |
| { |
| Datum nhash; |
| char *input; |
| |
| /* stringify input for the md5 function */ |
| input = OidOutputFunctionCall(outFuncOid, dat); |
| |
| /* get the md5 hash of the input. */ |
| nhash = md5_datum(input); |
| |
| /* |
| * iterate through all sketches, incrementing the counters indicated by the hash |
| * we don't care about return value here, so 3rd (initialization) argument is arbitrary. |
| */ |
| (void)hash_counters_iterate(nhash, sketch, 0, &increment_counter); |
| return(input); |
| } |
| |
| /* |
| * FINAL functions for various UDAs built on countmin sketches |
| */ |
| |
| /*! |
| * simply returns its input |
| */ |
| PG_FUNCTION_INFO_V1(__cmsketch_final); |
| Datum __cmsketch_final(PG_FUNCTION_ARGS) |
| { |
| PG_RETURN_BYTEA_P(PG_GETARG_BYTEA_P(0)); |
| } |
| |
| /*! |
| * use sketch to get the count of the 1st arg |
| */ |
| PG_FUNCTION_INFO_V1(__cmsketch_count_final); |
| Datum __cmsketch_count_final(PG_FUNCTION_ARGS) |
| { |
| bytea * blob = PG_GETARG_BYTEA_P(0); |
| cmtransval *sketch = (cmtransval *)VARDATA(blob); |
| if (!CM_TRANSVAL_INITIALIZED(blob)) |
| PG_RETURN_NULL(); |
| PG_RETURN_INT64(cmsketch_count_c(sketch->sketches[0], |
| sketch->args[0], sketch->outFuncOid)); |
| } |
| |
| /*! |
| * use sketch to get the count in range of the 1st 2 args |
| */ |
| PG_FUNCTION_INFO_V1(__cmsketch_rangecount_final); |
| Datum __cmsketch_rangecount_final(PG_FUNCTION_ARGS) |
| { |
| bytea * blob = PG_GETARG_BYTEA_P(0); |
| cmtransval *sketch = (cmtransval *)VARDATA(blob); |
| if (!CM_TRANSVAL_INITIALIZED(blob)) |
| PG_RETURN_NULL(); |
| PG_RETURN_INT64(cmsketch_rangecount_c(sketch, DatumGetInt64(sketch->args[0]), |
| DatumGetInt64(sketch->args[1]))); |
| } |
| |
| /*! |
| * use sketch to get the centile stored in the 1st arg |
| */ |
| PG_FUNCTION_INFO_V1(__cmsketch_centile_final); |
| Datum __cmsketch_centile_final(PG_FUNCTION_ARGS) |
| { |
| bytea * blob = PG_GETARG_BYTEA_P(0); |
| int64 total; |
| |
| cmtransval *sketch = (cmtransval *)VARDATA(blob); |
| if (!CM_TRANSVAL_INITIALIZED(blob)) |
| PG_RETURN_NULL(); |
| total = cmsketch_rangecount_c(sketch, MIN_INT64, MAX_INT64); /* count(*) */ |
| if (total == 0) { |
| PG_RETURN_NULL(); |
| } |
| PG_RETURN_INT64(cmsketch_centile_c(sketch, DatumGetInt64(sketch->args[0]), |
| total)); |
| } |
| |
| /*! |
| * use sketch to get the median |
| */ |
| PG_FUNCTION_INFO_V1(__cmsketch_median_final); |
| Datum __cmsketch_median_final(PG_FUNCTION_ARGS) |
| { |
| bytea * blob = PG_GETARG_BYTEA_P(0); |
| int64 total; |
| |
| cmtransval *sketch = (cmtransval *)VARDATA(blob); |
| if (!CM_TRANSVAL_INITIALIZED(blob)) |
| PG_RETURN_NULL(); |
| total = cmsketch_rangecount_c(sketch, MIN_INT64, MAX_INT64); /* count(*) */ |
| if (total == 0) { |
| PG_RETURN_NULL(); |
| } |
| PG_RETURN_INT64(cmsketch_centile_c(sketch, 50, total)); |
| } |
| |
| /*! |
| * use sketch get the equi-depth histogram |
| */ |
| PG_FUNCTION_INFO_V1(__cmsketch_dhist_final); |
| Datum __cmsketch_dhist_final(PG_FUNCTION_ARGS) |
| { |
| bytea * blob = PG_GETARG_BYTEA_P(0); |
| cmtransval *sketch = (cmtransval *)VARDATA(blob); |
| if (!CM_TRANSVAL_INITIALIZED(blob)) |
| PG_RETURN_NULL(); |
| PG_RETURN_INT64(cmsketch_depth_histogram_c(sketch, |
| DatumGetInt64(sketch->args[0]))); |
| } |
| |
| /*! |
| * Greenplum "prefunc" to combine sketches from multiple machines |
| */ |
| PG_FUNCTION_INFO_V1(__cmsketch_merge); |
| Datum __cmsketch_merge(PG_FUNCTION_ARGS) |
| { |
| bytea * counterblob1 = PG_GETARG_BYTEA_P(0); |
| bytea * counterblob2 = PG_GETARG_BYTEA_P(1); |
| cmtransval *newtrans; |
| countmin * sketches2 = (countmin *) |
| ((cmtransval *)(VARDATA(counterblob2)))->sketches; |
| bytea * newblob; |
| countmin * newsketches; |
| uint32 i, j, k; |
| int sz; |
| |
| /* make sure they're initialized! */ |
| if (!CM_TRANSVAL_INITIALIZED(counterblob1)) |
| counterblob1 = cmsketch_init_transval(); |
| if (!CM_TRANSVAL_INITIALIZED(counterblob2)) |
| counterblob2 = cmsketch_init_transval(); |
| |
| sz = VARSIZE(counterblob1); |
| /* allocate a new transval as a copy of counterblob1 */ |
| newblob = (bytea *)palloc(sz); |
| memcpy(newblob, counterblob1, sz); |
| newtrans = (cmtransval *)(VARDATA(newblob)); |
| newsketches = (countmin *)(newtrans)->sketches; |
| |
| /* add in values from counterblob2 */ |
| for (i = 0; i < RANGES; i++) |
| for (j = 0; j < DEPTH; j++) |
| for (k = 0; k < NUMCOUNTERS; k++) |
| newsketches[i][j][k] += sketches2[i][j][k]; |
| |
| if (newtrans->nargs == -1) { |
| /* transfer in the args from the other input */ |
| cmtransval *other = (cmtransval *)VARDATA(counterblob2); |
| newtrans->nargs = other->nargs; |
| for (i = 0; (int)i < other->nargs; i++) |
| newtrans->args[i] = other->args[i]; |
| } |
| |
| PG_RETURN_DATUM(PointerGetDatum(newblob)); |
| } |
| |
| |
| |
| /* |
| ******* Below are scalar methods to manipulate completed sketches. ****** |
| */ |
| |
| /*! |
| * match sketch type to scalar arg type |
| */ |
| #define CM_CHECKARG(sketch, arg_offset) \ |
| if (PG_ARGISNULL(arg_offset)) PG_RETURN_NULL(); \ |
| if (sketch->typOid != get_fn_expr_argtype(fcinfo->flinfo, arg_offset)) { \ |
| elog( \ |
| ERROR, \ |
| "sketch computed over type " INT64_FORMAT \ |
| "; argument %d over type " INT64_FORMAT ".", \ |
| (int64)sketch->typOid, \ |
| arg_offset, \ |
| (int64)get_fn_expr_argtype(fcinfo->flinfo, arg_offset)); \ |
| PG_RETURN_NULL(); \ |
| } |
| |
| /*! |
| * get the approximate count of objects with value arg |
| * \param sketch a countmin sketch |
| * \param arg the Datum we want to find the count of |
| * \param funcOid the Postgres function that converts arg to a string |
| */ |
| int64 cmsketch_count_c(countmin sketch, Datum arg, Oid funcOid) |
| { |
| Datum nhash; |
| char *txt; |
| |
| txt = OidOutputFunctionCall(funcOid, arg); |
| |
| /* get the md5 hash of the stringified argument. */ |
| nhash = md5_datum(txt); |
| |
| /* iterate through the sketches, finding the min counter associated with this hash */ |
| PG_RETURN_INT64(hash_counters_iterate(nhash, sketch, INT64_MAX, |
| &min_counter)); |
| } |
| |
| /*! |
| * Compute count of a range by summing counts of its dyadic ranges |
| * \param transval a countmin transition value |
| * \param bot the bottom of the range (inclusive) |
| * \param top the top of the range (inclusive) |
| */ |
| Datum cmsketch_rangecount_c(cmtransval *transval, int64 bot, int64 top) |
| { |
| int64 cursum = 0; |
| uint32 i; |
| rangelist r; |
| int4 dyad; |
| int64 val; |
| int64 countval; |
| |
| r.emptyoffset = 0; |
| find_ranges(bot, top, &r); |
| |
| /* |
| * find_ranges will not generate a span larger than 2^63-1, so |
| * we only need to consider 2^63 or less. |
| */ |
| for (i = 0; i < r.emptyoffset; i++) { |
| /* What power of 2 is this range? */ |
| if (r.spans[i][0] == MIN_INT64 && r.spans[i][1] == -1) { |
| /* special case to avoid overflow: full range left of 0 */ |
| dyad = 63; |
| countval = -1; |
| } |
| else if (r.spans[i][0] == 0 && r.spans[i][1] == MAX_INT64) { |
| /* special case to avoid overflow: full range right of 0 */ |
| dyad = 63; |
| countval = 0; |
| } |
| else { |
| int64 width = r.spans[i][1] - r.spans[i][0] + (int64)1; |
| if (r.spans[i][0] == MIN_INT64) |
| width++; |
| if (r.spans[i][1] == MAX_INT64) { |
| width++; |
| } |
| /* Divide min of range by 2^dyad and get count */ |
| dyad = safe_log2(width); |
| countval = r.spans[i][0] >> dyad; |
| } |
| val = cmsketch_count_c(transval->sketches[dyad], |
| (Datum) countval, |
| transval->outFuncOid); |
| cursum += val; |
| } |
| PG_RETURN_DATUM(cursum); |
| } |
| |
| |
| /*! |
| * convert an arbitrary range [bot-top] into a rangelist of dyadic ranges. |
| * E.g. convert 14-48 into [[14-15], [16-31], [32-47], [48-48]] |
| * To manage overflow issues, do not generate a range larger than 2^63-1! |
| * \param bot the bottom of the range (inclusive) |
| * \param top the top of the range (inclusive) |
| * \param r the list of ranges to be returned |
| */ |
| void find_ranges(int64 bot, int64 top, rangelist *r) |
| { |
| /* kick off the recursion at power RANGES-1 */ |
| find_ranges_internal(bot, top, RANGES-1, r); |
| } |
| |
| /*! |
| * find the ranges via recursive calls to this routine, pulling out smaller and |
| * smaller powers of 2 |
| * To manage overflow issues, do not generate a range larger than 2^63-1! |
| * \param bot the bottom of the range (inclusive) |
| * \param top the top of the range (inclusive) |
| * \param power the highest power to start with |
| * \param r the rangelist to be returned |
| */ |
| void find_ranges_internal(int64 bot, int64 top, int power, rangelist *r) |
| { |
| short dyad; /* a number between 0 and 63 */ |
| uint64 pow_dyad; |
| uint64 width; |
| |
| /* Sanity check */ |
| if (top < bot || power < 0) |
| return; |
| |
| if (top == bot) { |
| /* base case of recursion, a range of the form [x-x]. */ |
| r->spans[r->emptyoffset][0] = r->spans[r->emptyoffset][1] = bot; |
| ADVANCE_OFFSET(*r); |
| return; |
| } |
| |
| /* |
| * a range that's 2^63 or larger will overflow int64's in the |
| * code below, so recurse to the left and right by hand |
| */ |
| if (top >= 0 && bot < 0) { |
| /* don't even try to figure out how wide this range is, just split */ |
| find_ranges_internal(bot, -1, power-1, r); |
| find_ranges_internal(0, top, power-1, r); |
| return; |
| } |
| |
| width = top - bot + (int64)1; |
| |
| /* account for the fact that MIN and MAX are 1 off the true power of 2 */ |
| if (top == MAX_INT64 || bot == MIN_INT64) |
| width++; |
| |
| dyad = safe_log2(width); |
| if (dyad > 62) { |
| /* dangerously big, so split. we know that we don't span 0. */ |
| int sign = (top < 0) ? -1 : 1; |
| find_ranges_internal(bot, (((int64)1) << 62)*sign - 1, 62, r); |
| find_ranges_internal((((int64)1) << 62)*sign, top, 62, r); |
| return; |
| } |
| |
| /* |
| * if we get here, we have a range of size 2 or greater. |
| * Find the largest dyadic range width in this range. |
| */ |
| pow_dyad = ((uint64)1) << dyad; |
| |
| if ((bot == MIN_INT64) || (bot % pow_dyad == 0)) { |
| /* our range is left-aligned on the dyad's min */ |
| r->spans[r->emptyoffset][0] = bot; |
| if (top == MAX_INT64) { |
| /* |
| * special case: no need to recurse on right, and doing so |
| * could overflow, so handle by hand |
| */ |
| r->spans[r->emptyoffset][1] = MAX_INT64; |
| ADVANCE_OFFSET(*r); |
| } |
| else { |
| int64 newbot; |
| /* |
| * -1 on the next line because range to the right will |
| * start at the power-of-2 boundary. |
| */ |
| r->spans[r->emptyoffset][1] = (bot + pow_dyad - 1); |
| if (bot == MIN_INT64) { |
| /* account for fact that MIN_INT64 is 1 bigger than -2^63 */ |
| r->spans[r->emptyoffset][1]--; |
| } |
| newbot = r->spans[r->emptyoffset][1] + 1; |
| ADVANCE_OFFSET(*r); |
| /* recurse on right at finer grain */ |
| find_ranges_internal(newbot, top, power-1, r); |
| } |
| } |
| |
| else if (top == MAX_INT64 || ((top+1) % pow_dyad) == 0) { |
| /* our range is right-aligned on the dyad's max. */ |
| r->spans[r->emptyoffset][1] = top; |
| if (bot == MIN_INT64) { |
| /* |
| * special case: no need to recurse on left, and doing so |
| * could overflow, so handle by hand |
| */ |
| r->spans[r->emptyoffset][0] = MIN_INT64; |
| ADVANCE_OFFSET(*r); |
| } |
| else { |
| int64 newtop; |
| r->spans[r->emptyoffset][0] = (top - pow_dyad + 1); |
| if (top == MAX_INT64) { |
| /* account for fact that MAX_INT64 is 1 smaller than 2^63 */ |
| r->spans[r->emptyoffset][0]++; |
| } |
| newtop = r->spans[r->emptyoffset][0] - 1; |
| ADVANCE_OFFSET(*r); |
| /* recurse on left at finer grain. */ |
| find_ranges_internal(bot, newtop, power-1, r); |
| } |
| } |
| else { |
| /* we straddle a power of 2 */ |
| int64 power_of_2 = pow_dyad*(top/pow_dyad); |
| |
| /* recurse on right at finer grain */ |
| find_ranges_internal(bot, power_of_2 - 1, power-1, r); |
| /* recurse on left at finer grain */ |
| find_ranges_internal(power_of_2, top, power-1, r); |
| } |
| } |
| |
| /*! |
| * find the approximate centile in a cm sketch by binary search in the domain of values |
| * \param transval the current transition value |
| * \param intcentile the centile to return |
| * \param total the total count of items |
| */ |
| Datum cmsketch_centile_c(cmtransval *transval, int intcentile, int64 total) |
| { |
| uint i; |
| int64 higuess,loguess,curguess,curcount; |
| int64 centile_cnt; |
| |
| |
| if (intcentile <= 0 || intcentile >= 100) |
| elog(ERROR, |
| "centiles must be between 1-99 inclusive, was %d", |
| intcentile); |
| |
| |
| centile_cnt = (int64)(total * (float8)intcentile/100.0); |
| |
| for (i = 0, loguess = MIN_INT64, higuess = MAX_INT64, curguess = 0; |
| (i < INT64BITS - 1) && ((uint64)higuess-(uint64)loguess > 1); |
| i++) { |
| curcount = cmsketch_rangecount_c(transval, MIN_INT64, curguess); |
| if (curcount == centile_cnt) |
| break; |
| if (curcount > centile_cnt) { |
| /* overshot */ |
| higuess = curguess; |
| curguess = loguess + (curguess - loguess) / 2; |
| } |
| else { |
| /* undershot */ |
| loguess = curguess; |
| curguess = higuess - (higuess - curguess) / 2; |
| } |
| } |
| return(curguess); |
| } |
| |
| |
| |
| PG_FUNCTION_INFO_V1(cmsketch_width_histogram); |
| /*! |
| * scalar function taking a sketch, min, max, and number of buckets. |
| * produces an equi-width histogram of that many buckets |
| */ |
| /* UDF wrapper. All the interesting stuff is in __fmsketch_count_distinct_c */ |
| Datum cmsketch_width_histogram(PG_FUNCTION_ARGS) |
| { |
| bytea * transblob = cmsketch_check_transval(fcinfo, false); |
| cmtransval *transval = (cmtransval *)VARDATA(transblob); |
| int64 min = PG_GETARG_INT64(1); |
| int64 max = PG_GETARG_INT64(2); |
| int buckets = PG_GETARG_INT32(3); |
| Datum retval; |
| |
| CM_CHECKARG(transval, 1); |
| CM_CHECKARG(transval, 2); |
| if (PG_ARGISNULL(3)) PG_RETURN_NULL(); |
| |
| retval = cmsketch_width_histogram_c(transval, min, max, buckets); |
| PG_RETURN_DATUM(retval); |
| } |
| |
| /*! |
| * produces an equi-width histogram |
| * \param transval the transition value of the agg |
| * \param min minimum value in the sketch |
| * \param max maximum value in the sketch |
| * \param buckets the number of buckets in the histogram |
| */ |
| Datum cmsketch_width_histogram_c(cmtransval *transval, |
| int64 min, |
| int64 max, |
| int64 buckets) |
| { |
| int64 step; |
| int i; |
| ArrayType *retval; |
| int64 binlo, binhi, binval; |
| Datum histo[buckets][3]; |
| int dims[2], lbs[2]; |
| int16 typlen; |
| bool typbyval; |
| char typalign; |
| char typdelim; |
| Oid typioparam; |
| Oid typiofunc; |
| |
| step = Max(trunc((float8)(max-min+1) / (float8)buckets), 1); |
| for (i = 0; i < buckets; i++) { |
| binlo = min + i*step; |
| if (binlo > max) break; |
| histo[i][0] = Int64GetDatum(binlo); |
| binhi = (i == buckets-1) ? max : (min + (i+1)*step - 1); |
| histo[i][1] = Int64GetDatum(binhi); |
| binval = |
| cmsketch_rangecount_c(transval, |
| DatumGetInt64(histo[i][0]), |
| DatumGetInt64(histo[i][1])); |
| histo[i][2] = Int64GetDatum(binval); |
| } |
| |
| |
| /* |
| * Get info about element type |
| */ |
| get_type_io_data(INT8OID, IOFunc_output, |
| &typlen, &typbyval, |
| &typalign, &typdelim, |
| &typioparam, &typiofunc); |
| |
| dims[0] = i; /* may be less than requested buckets if too few values */ |
| dims[1] = 3; /* lo, hi, and val */ |
| lbs[0] = 0; |
| lbs[1] = 0; |
| |
| retval = construct_md_array((Datum *)histo, |
| NULL, |
| 2, |
| dims, |
| lbs, |
| INT8OID, |
| typlen, |
| typbyval, |
| typalign); |
| PG_RETURN_ARRAYTYPE_P(retval); |
| } |
| |
| /*! |
| * produces an equi-depth histogram of a cm sketch |
| * \param transval the transition value of the agg |
| * \param buckets the number of buckets in the histogram |
| */ |
| Datum cmsketch_depth_histogram_c(cmtransval *transval, int64 buckets) |
| { |
| int64 step; |
| uint i, nextbucket; |
| ArrayType *retval; |
| int64 binlo; |
| Datum histo[buckets][3]; |
| int dims[2], lbs[2]; |
| int64 total = cmsketch_rangecount_c(transval, MIN_INT64, MAX_INT64); |
| int16 typlen; |
| bool typbyval; |
| char typalign; |
| char typdelim; |
| Oid typioparam; |
| Oid typiofunc; |
| |
| step = Max(trunc(100 / (float8)buckets), 1); |
| for (i = nextbucket = 0, binlo = MIN_INT64; i < buckets; i++) { |
| uint64 centile; |
| if (i < buckets - 1) { |
| centile = cmsketch_centile_c(transval, (i+1)*step, total); |
| if (i > 0 && centile <= histo[nextbucket-1][1]) |
| /* next centile is lower than previous; skip */ |
| continue; |
| histo[nextbucket][1] = centile; |
| } |
| else { |
| /* this is the top bucket */ |
| histo[nextbucket][1] = MAX_INT64; |
| } |
| histo[nextbucket][0] = Int64GetDatum(binlo); |
| histo[nextbucket][2] = cmsketch_rangecount_c(transval, |
| DatumGetInt64(histo[ |
| nextbucket |
| ][0]), |
| DatumGetInt64(histo[ |
| nextbucket |
| ][1])); |
| binlo = histo[nextbucket][1] + 1; |
| nextbucket++; |
| } |
| |
| /* |
| * Get info about element type |
| */ |
| get_type_io_data(INT8OID, IOFunc_output, |
| &typlen, &typbyval, |
| &typalign, &typdelim, |
| &typioparam, &typiofunc); |
| |
| dims[0] = nextbucket; /* may be less than requested buckets if too few values */ |
| dims[1] = 3; /* lo, hi, and val */ |
| lbs[0] = 0; |
| lbs[1] = 0; |
| |
| retval = construct_md_array((Datum *)histo, |
| NULL, |
| 2, |
| dims, |
| lbs, |
| INT8OID, |
| typlen, |
| typbyval, |
| typalign); |
| PG_RETURN_ARRAYTYPE_P(retval); |
| } |
| |
| /****** SUPPORT ROUTINES *******/ |
| PG_FUNCTION_INFO_V1(cmsketch_dump); |
| |
| /*! |
| * dump sketch contents |
| */ |
| Datum cmsketch_dump(PG_FUNCTION_ARGS) |
| { |
| bytea * transblob = (bytea *)PG_GETARG_BYTEA_P(0); |
| countmin *sketches; |
| char * newblob = (char *)palloc(10240); |
| uint32 i, j, k, c; |
| |
| sketches = ((cmtransval *)VARDATA(transblob))->sketches; |
| for (i=0, c=0; i < RANGES; i++) |
| for (j=0; j < DEPTH; j++) |
| for(k=0; k < NUMCOUNTERS; k++) { |
| if (sketches[i][j][k] != 0) |
| c += sprintf(&newblob[c], "[(%d,%d,%d):" INT64_FORMAT |
| "], ", i, j, k, sketches[i][j][k]); |
| if (c > 10000) break; |
| } |
| newblob[c] = '\0'; |
| PG_RETURN_NULL(); |
| } |
| |
| |
| /*! |
| * for each row of the sketch, use the 16 bits starting at 2^i mod NUMCOUNTERS, |
| * and invoke the lambda on those 16 bits (which may destructively modify counters). |
| * \param hashval the MD5 hashe value that we take 16 bits at a time |
| * \param sketch the cmsketch |
| * \param initial the initialized return value |
| * \param lambdaptr the function to invoke on each 16 bits |
| */ |
| int64 hash_counters_iterate(Datum hashval, |
| countmin sketch, /* width is DEPTH*NUMCOUNTERS */ |
| int64 initial, |
| int64 (*lambdaptr)(uint32, |
| uint32, |
| countmin, |
| int64)) |
| { |
| uint32 i, col; |
| unsigned short twobytes; |
| int64 retval = initial; |
| |
| /* |
| * XXX WARNING: are there problems with unaligned access here? |
| * XXX I'm currently using copies rather than casting, which is inefficient, |
| * XXX but I was hoping memmove would deal with unaligned access in a portable way. |
| */ |
| for (i = 0; i < DEPTH; i++) { |
| memmove((void *)&twobytes, |
| (void *)((char *)VARDATA(DatumGetByteaP(hashval)) + 2*i), 2); |
| col = twobytes % NUMCOUNTERS; |
| retval = (*lambdaptr)(i, col, sketch, retval); |
| } |
| return retval; |
| } |
| |
| /*! |
| * destructive increment lambda for hash_counters_iterate. |
| * transval and return val not of particular interest here. |
| * \param i which row to update |
| * \param col which column to update |
| * \param sketch the sketch |
| * \param transval we don't need transval here, but its part of the |
| * lambda interface for hash_counters_iterate |
| */ |
| |
| int64 increment_counter(uint32 i, |
| uint32 col, |
| countmin sketch, |
| int64 transval) |
| { |
| int64 oldval = sketch[i][col]; |
| if (sketch[i][col] == (INT64_MAX)) |
| elog(ERROR, "maximum count exceeded in sketch"); |
| sketch[i][col] = oldval + 1; |
| |
| /* return the incremented value, though unlikely anyone cares. */ |
| return oldval+1; |
| } |
| |
| /*! |
| * running minimum lambda for hash_counters_iterate |
| * \param i which row to examine |
| * \param col which column to examine |
| * \param sketch the sketch |
| * \param transval smallest counter so far |
| * lambda interface for hash_counters_iterate |
| */ |
| int64 min_counter(uint32 i, |
| uint32 col, |
| countmin sketch, |
| int64 transval) |
| { |
| int64 thisval = sketch[i][col]; |
| return (thisval < transval) ? thisval : transval; |
| } |