blob: 4e12029e794b492f0f81d0e92e1a812eab89a908 [file] [log] [blame]
/*!
* \file fm.c
*
* \brief Flajolet-Martin sketch implementation
*/
/*!
* \implementation
* In a nutshell, the FM sketch
* is based on the idea of a bitmap whose bits are "turned on" by hashes of
* values in the domain. It is arranged so that
* as you move left-to-right in that bitmap, the expected number of domain values
* that can turn on the bit decreases exponentially.
* After hashing all the values this way, the location of the first 0 from the
* left of the bitmap is correlated with the
* number of distinct values. This idea is smoothed across a number of
* trials using multiple independent hash functions on multiple bitmaps.
*
* The FM sketch technique works poorly with small inputs, so we
* explicitly count the first 12K distinct values in a main-memory
* data structure before switching over to sketching.
*
* See the paper mentioned below
* for detailed explanation, formulae, and pseudocode.
*/
/* THIS CODE MAY NEED TO BE REVISITED TO ENSURE ALIGNMENT! */
#include <math.h>
#include <postgres.h>
#include <utils/array.h>
#include <utils/elog.h>
#include <utils/builtins.h>
#include <utils/lsyscache.h>
#include <pg_config.h>
#if PG_VERSION_NUM >= 100000
#include <common/md5.h>
#else
#include <libpq/md5.h>
#endif
#include <nodes/execnodes.h>
#include <fmgr.h>
#include <ctype.h>
#include "sketch_support.h"
#include "sortasort.h"
#ifndef NO_PG_MODULE_MAGIC
PG_MODULE_MAGIC;
#endif
#define NMAP 256
#define FMSKETCH_SZ (VARHDRSZ + NMAP*(MD5_HASHLEN_BITS)/CHAR_BIT)
/*!
* For FM, empirically, estimates seem to fall below 1% error around 12k
* distinct vals
*/
#define MINVALS 1024*12
/*!
* initial size for a sortasort: we'll guess at 8 bytes per datum.
* sortasort will grow dynamically if we guessed too low
*/
#define SORTASORT_INITIAL_STORAGE sizeof(sortasort) + MINVALS*sizeof(uint32) + \
8*MINVALS
typedef enum {SMALL, BIG} fmstatus;
/*!
* \internal
* \brief transition value struct for FM sketches
*
* because FM sketches work poorly on small numbers of values,
* our transval can be in one of two modes.
* for "SMALL" numbers of values (<=MINVALS), the storage array
* is a "sortasort" data structure containing an array of input values.
* for "BIG" datasets (>MINVAL), it is an array of FM sketch bitmaps.
* \endinternal
*/
typedef struct {
fmstatus status;
Oid typOid;
Oid funcOid;
int16 typLen;
bool typByVal;
/*
* We'd better make sure that this struct is 8bytes aligned,
* If the there is no the reserved field, (char*)&fmtransval.storage - (char*)&fmtransval
* is not equal to sizeof(fmtransval). This is not coincident with one's intuition
* and is also error prone when coding.
*
* Another reason to make the address of storage 8bytes aligned is that when we
* store a structure in storage, 8bytes aligned address leads high performance when
* cpu access main memory.
*
* If there is any chance someone may change the compiler option -fpack-struct,
* for struct like this, we'd better make every field well aligned and packed.
*
* The default option makes sure that the first byte address of 8bytes types like int64
* are 8bytes aligned. And 4bytes types like int32 are 4bytes aligned. So we only
* need to make sure the first byte address of storage 8bytes aligned.
*/
char reserved;
char storage[];
} fmtransval;
/* check whter the contents in fmtransval::storage is safe for sortasort */
void check_sortasort(sortasort *st, size_t st_size) {
size_t left_len = st_size;
if (left_len < sizeof(sortasort)) {
elog(ERROR, "invalid transition state for fmsketch");
}
left_len -= sizeof(sortasort);
if (st->num_vals > st->capacity || st->storage_cur > st->storage_sz) {
elog(ERROR, "invalid transition state for fmsketch");
}
if (left_len < st->capacity*sizeof(st->storage_cur) + st->storage_sz) {
elog(ERROR, "invalid transition state for fmsketch");
}
}
/* check whether the contents in the bytea is safe for a fmtransval */
void check_fmtransval(bytea * storage) {
fmtransval * fmt = NULL;
sortasort *st = NULL;
int16 typLen = 0;
bool typByVal = false;
if (VARSIZE(storage) < VARHDRSZ + sizeof(fmtransval)) {
elog(ERROR, "invalid transition state for fmsketch");
}
fmt = (fmtransval*)VARDATA(storage);
if (fmt->status != SMALL && fmt->status != BIG) {
elog(ERROR, "invalid transition state for fmsketch");
}
if (fmt->reserved != 0) {
elog(ERROR, "invalid transition state for fmsketch");
}
if (InvalidOid == fmt->typOid) {
elog(ERROR, "invalid transition state for fmsketch");
}
get_typlenbyval(fmt->typOid, &typLen, &typByVal);
if (fmt->typByVal != typByVal || fmt->typLen != typLen) {
elog(ERROR, "invalid transition state for fmsketch");
}
if (fmt->typLen < -2 || fmt->typLen == 0) {
elog(ERROR, "invalid transition state for fmsketch");
}
if (SMALL == fmt->status) {
if (VARSIZE(storage) < VARHDRSZ + sizeof(fmtransval) + sizeof(sortasort)) {
elog(ERROR, "invalid transition state for fmsketch");
}
st = (sortasort *)fmt->storage;
if (fmt->typLen != st->typLen || fmt->typByVal != (bool)st->typByVal) {
elog(ERROR, "invalid transition state for fmsketch");
}
check_sortasort(st, VARSIZE(storage) - VARHDRSZ - sizeof(fmtransval));
}
else {
if (VARSIZE(storage) < 2*VARHDRSZ + sizeof(fmtransval)) {
elog(ERROR, "invalid transition state for fmsketch");
}
if (VARSIZE(storage) < VARHDRSZ + sizeof(fmtransval) + VARSIZE(&(fmt->storage))) {
elog(ERROR, "invalid transition state for fmsketch");
}
}
}
Datum __fmsketch_trans_c(bytea *, Datum);
Datum __fmsketch_count_distinct_c(bytea *);
Datum __fmsketch_trans(PG_FUNCTION_ARGS);
Datum __fmsketch_count_distinct(PG_FUNCTION_ARGS);
Datum __fmsketch_merge(PG_FUNCTION_ARGS);
void big_or_internal(bytea *bitmap1, bytea *bitmap2, bytea *out);
bytea *fmsketch_sortasort_insert(bytea *, Datum, size_t);
bytea *fm_new(fmtransval *);
PG_FUNCTION_INFO_V1(__fmsketch_trans);
/*! UDA transition function for the fmsketch aggregate. */
Datum __fmsketch_trans(PG_FUNCTION_ARGS)
{
bytea * transblob = (bytea *)PG_GETARG_BYTEA_P(0);
fmtransval *transval;
Oid element_type = get_fn_expr_argtype(fcinfo->flinfo, 1);
Oid funcOid;
bool typIsVarlena;
Datum retval;
Datum inval;
/*
* This is Postgres boilerplate for UDFs that modify the data in their own context.
* Such UDFs can only be correctly called in an agg context since regular scalar
* UDFs are essentially stateless across invocations.
*/
if (!(fcinfo->context &&
(IsA(fcinfo->context, AggState)
#ifdef NOTGP
|| IsA(fcinfo->context, WindowAggState)
#endif
)))
elog(
ERROR,
"UDF call to a function that only works for aggs (destructive pass by reference)");
/* get the provided element, being careful in case it's NULL */
if (!PG_ARGISNULL(1)) {
if (!OidIsValid(element_type))
elog(ERROR, "could not determine data type of input");
inval = PG_GETARG_DATUM(1);
/*
* if this is the first call, initialize transval to hold a sortasort
* on the first call, we should have the empty string (if the agg was declared properly!)
*/
if (VARSIZE(transblob) <= VARHDRSZ) {
size_t blobsz = VARHDRSZ + sizeof(fmtransval) +
SORTASORT_INITIAL_STORAGE;
transblob = (bytea *)palloc0(blobsz);
SET_VARSIZE(transblob, blobsz);
transval = (fmtransval *)VARDATA(transblob);
transval->typOid = element_type;
/* figure out the outfunc for this type */
getTypeOutputInfo(element_type, &funcOid, &typIsVarlena);
get_typlenbyval(element_type, &(transval->typLen), &(transval->typByVal));
transval->status = SMALL;
sortasort_init((sortasort *)transval->storage,
MINVALS,
SORTASORT_INITIAL_STORAGE,
transval->typLen,
transval->typByVal);
}
else {
// check_fmtransval(transblob);
/* extract the existing transval from the transblob */
transval = (fmtransval *)VARDATA(transblob);
// if (transval->typOid != element_type) {
// elog(ERROR, "cannot aggregate on elements with different types");
// }
}
/*
* if we've seen < MINVALS distinct values, place datum into the sortasort
* XXXX Would be cleaner to try the sortasort insert and if it fails, then continue.
*/
if (transval->status == SMALL
&& ((sortasort *)(transval->storage))->num_vals <
MINVALS) {
int len = ExtractDatumLen(inval, transval->typLen, transval->typByVal, -1);
retval =
PointerGetDatum(fmsketch_sortasort_insert(
transblob,
inval, len));
PG_RETURN_DATUM(retval);
}
/*
* if we've seen exactly MINVALS distinct values, create FM bitmaps
* and load the contents of the sortasort into the FM sketch
*/
else if (transval->status == SMALL
&& ((sortasort *)(transval->storage))->num_vals ==
MINVALS) {
int i;
sortasort *s = (sortasort *)(transval->storage);
bytea *newblob = fm_new(transval);
transval = (fmtransval *)VARDATA(newblob);
/*
* "catch up" on the past as if we were doing FM from the beginning:
* apply the FM sketching algorithm to each value previously stored in the sortasort
*/
for (i = 0; i < MINVALS; i++)
__fmsketch_trans_c(newblob,
PointerExtractDatum(sortasort_getval(s,i), s->typByVal));
/*
* XXXX would like to pfree the old transblob, but the memory allocator doesn't like it
* XXXX Meanwhile we know that this memory "leak" is of fixed size and will get
* XXXX deallocated "soon" when the memory context is destroyed.
*/
/* drop through to insert the current datum in "BIG" mode */
transblob = newblob;
}
/*
* if we're here we've seen >=MINVALS distinct values and are in BIG mode.
* Just for sanity, let's check.
*/
if (transval->status != BIG)
elog(
ERROR,
"FM sketch failed internal sanity check");
/* Apply FM algorithm to this datum */
retval = __fmsketch_trans_c(transblob, inval);
PG_RETURN_DATUM(retval);
}
else PG_RETURN_NULL();
}
/*!
* generate a bytea holding a transval in BIG mode, with the right amount of
* zero bits for an empty FM sketch.
* \param template an optional pre-existing transval whose fields we can copy in
*/
bytea *fm_new(fmtransval *template)
{
int fmsize = VARHDRSZ + sizeof(fmtransval) + FMSKETCH_SZ;
/* use palloc0 to make sure it's initialized to 0 */
bytea * newblob = (bytea *)palloc0(fmsize);
fmtransval *transval;
SET_VARSIZE(newblob, fmsize);
transval = (fmtransval *)VARDATA(newblob);
/* copy over the struct values */
if (template != NULL)
memcpy(transval, template, sizeof(fmtransval));
/* set status to BIG, possibly overwriting what was in template */
transval->status = BIG;
SET_VARSIZE((bytea *)transval->storage, FMSKETCH_SZ);
return(newblob);
}
/*!
* Main logic of Flajolet and Martin's sketching algorithm.
* For each call, we get an md5 hash of the value passed in.
* First we use the hash as a random number to choose one of
* the NMAP bitmaps at random to update.
* Then we find the position "rmost" of the rightmost 1 bit in the hashed value.
* We then turn on the "rmost"-th bit FROM THE LEFT in the chosen bitmap.
* \param transblob the transition value packed into a bytea
* \param input a textual representation of the value to hash
*/
Datum __fmsketch_trans_c(bytea *transblob, Datum indat)
{
fmtransval * transval = (fmtransval *) VARDATA(transblob);
bytea * bitmaps = (bytea *)transval->storage;
uint64 index;
uint8 * c;
int rmost;
// char *hex;
bytea *hashed;
hashed = sketch_md5_bytea(indat, transval->typOid);
c = (uint8 *)VARDATA(hashed);
// hex = text_to_cstring((bytea *)DatumGetPointer(DirectFunctionCall2(binary_encode, PointerGetDatum(hashed),
// CStringGetTextDatum("hex"))));
// elog(NOTICE, "md5(%s), len %d: %s", text_to_cstring((bytea *)PointerGetDatum(indat)),
// VARSIZE((bytea *)PointerGetDatum(indat)), hex);
/*
* During the insertion we insert each element
* in one bitmap only (a la Flajolet pseudocode, page 16).
* Choose the bitmap by taking the 64 high-order bits worth of hash value mod NMAP
*/
index = (*(uint64 *)c) % NMAP;
/*
* Find index of the rightmost non-0 bit. Turn on that bit (from left!) in the sketch.
*/
rmost = rightmost_one(c, 1, MD5_HASHLEN_BITS, 0);
/*
* last argument must be the index of the bit position from the right.
* i.e. position 0 is the rightmost.
* so to set the bit at rmost from the left, we subtract from the total number of bits.
*/
array_set_bit_in_place(bitmaps, NMAP, MD5_HASHLEN_BITS, index,
(MD5_HASHLEN_BITS - 1) - rmost);
return PointerGetDatum(transblob);
}
PG_FUNCTION_INFO_V1(__fmsketch_count_distinct);
/*! UDA final function to get count(distinct) out of an FM sketch */
Datum __fmsketch_count_distinct(PG_FUNCTION_ARGS)
{
fmtransval *transval = NULL;
if (VARSIZE((PG_GETARG_BYTEA_P(0))) == VARHDRSZ)
/* nothing was ever aggregated! */
return (0);
check_fmtransval(PG_GETARG_BYTEA_P(0));
transval = (fmtransval *)VARDATA((PG_GETARG_BYTEA_P(0)));
/* if status is not BIG then get count from sortasort */
if (transval->status == SMALL)
return ((sortasort *)(transval->storage))->num_vals;
/* else get count via fm */
else if (transval->status != BIG) {
elog(ERROR, "FM transval neither SMALL nor BIG");
return(0);
}
else /* transval->status == BIG */
return __fmsketch_count_distinct_c((bytea *)transval->storage);
}
/*!
* Finish up the Flajolet-Martin approximation.
* We sum up the number of leading 1 bits across all bitmaps in the sketch.
* Then we use the FM magic formula to estimate the distinct count.
* \params bitmaps the FM Sketch
*/
Datum __fmsketch_count_distinct_c(bytea *bitmaps)
{
/* int R = 0; // Flajolet/Martin's R is handled by leftmost_zero */
uint32 S = 0;
static double phi = 0.77351; /*
* the magic constant
* char out[NMAP*MD5_HASHLEN_BITS];
*/
int i;
uint32 lz;
for (i = 0; i < NMAP; i++)
{
lz = leftmost_zero((uint8 *)VARDATA(
bitmaps), NMAP, MD5_HASHLEN_BITS, i);
S = S + lz;
}
PG_RETURN_INT64(ceil( ((double)NMAP/
phi) * pow(2.0, (double)S/(double)NMAP) ));
}
PG_FUNCTION_INFO_V1(__fmsketch_merge);
/*!
* Greenplum "prefunc": a function to merge 2 transvals computed at different machines.
* For simple FM, this is trivial: just OR together the two arrays of bitmaps.
* But we have to deal with cases where one or both transval is SMALL: i.e. it
* holds a sortasort, not an FM sketch.
*
* XXX TESTING: Ensure we exercise all branches!
*/
Datum __fmsketch_merge(PG_FUNCTION_ARGS)
{
bytea * transblob1 = (bytea *)PG_GETARG_BYTEA_P(0);
bytea * transblob2 = (bytea *)PG_GETARG_BYTEA_P(1);
fmtransval *transval1, *transval2;
sortasort * s1, *s2;
sortasort * sortashort, *sortabig;
bytea * tblob_big, *tblob_small;
uint32 i;
/* deal with the case where one or both items is the initial value of '' */
if (VARSIZE(transblob1) == VARHDRSZ) {
PG_RETURN_DATUM(PointerGetDatum(transblob2));
}
if (VARSIZE(transblob2) == VARHDRSZ) {
PG_RETURN_DATUM(PointerGetDatum(transblob1));
}
check_fmtransval(transblob1);
check_fmtransval(transblob2);
transval1 = (fmtransval *)VARDATA(transblob1);
transval2 = (fmtransval *)VARDATA(transblob2);
if (transval1->typOid != transval2->typOid) {
elog(ERROR, "cannot merge two transition state with different element types");
}
if (transval1->status == BIG && transval2->status == BIG) {
/* easy case: merge two FM sketches via bitwise OR. */
fmtransval *newval;
tblob_big = fm_new(transval1);
newval = (fmtransval *)VARDATA(tblob_big);
big_or_internal((bytea *)transval1->storage, (bytea *)transval2->storage,
(bytea *)newval->storage);
PG_RETURN_DATUM(PointerGetDatum(tblob_big));
}
else if (transval1->status == SMALL && transval2->status == SMALL) {
s1 = (sortasort *)(transval1->storage);
s2 = (sortasort *)(transval2->storage);
tblob_big =
(s1->num_vals > s2->num_vals) ? transblob1 : transblob2;
tblob_small =
(s1->num_vals > s2->num_vals) ? transblob2 : transblob1;
sortashort =
(sortasort *)(((fmtransval *)((fmtransval *)VARDATA(tblob_small)))->storage);
sortabig = (sortasort *)(((fmtransval *)((fmtransval *)VARDATA(tblob_big)))->storage);
if (sortabig->num_vals + sortashort->num_vals <=
sortabig->capacity) {
/*
* we have room in sortabig
* one could imagine a more efficient (merge-based) sortasort merge,
* but for now we just copy the values from the smaller sortasort into
* the bigger one.
*/
for (i = 0; i < sortashort->num_vals; i++) {
Datum the_val = PointerExtractDatum(sortasort_getval(sortashort,i),
transval1->typByVal);
int the_len = ExtractDatumLen(the_val, transval1->typLen,
transval1->typByVal, -1);
tblob_big = fmsketch_sortasort_insert(tblob_big,
the_val,
the_len);
}
PG_RETURN_DATUM(PointerGetDatum(tblob_big));
}
/* else drop through. */
}
/*
* if we got here, then at most one transval is BIG, i.e. one or both transvals is SMALL.
* need to form an FM sketch and populate with the SMALL transval(s)
*/
if (transval1->status == SMALL && transval2->status == SMALL)
tblob_big = fm_new(transval1);
else
tblob_big =
(transval1->status == BIG) ? transblob1 : transblob2;
if (transval1->status == SMALL) {
s1 = (sortasort *)(transval1->storage);
for(i = 0; i < s1->num_vals; i++) {
__fmsketch_trans_c(tblob_big, PointerExtractDatum(sortasort_getval(s1,i),
transval1->typByVal));
}
}
if (transval2->status == SMALL) {
s2 = (sortasort *)(transval2->storage);
for(i = 0; i < s2->num_vals; i++) {
__fmsketch_trans_c(tblob_big, PointerExtractDatum(sortasort_getval(s2,i),
transval1->typByVal));
}
}
PG_RETURN_DATUM(PointerGetDatum(tblob_big));
}
PG_FUNCTION_INFO_V1(big_or);
Datum
big_or(PG_FUNCTION_ARGS)
{
big_or_internal(PG_GETARG_BYTEA_PP(0), PG_GETARG_BYTEA_PP(1), PG_GETARG_BYTEA_PP(2));
PG_RETURN_VOID();
}
/*! OR of two big bitmaps, for gathering sketches computed in parallel. */
void big_or_internal(bytea *bitmap1, bytea *bitmap2, bytea *out)
{
uint32 i;
if (VARSIZE(bitmap1) != VARSIZE(bitmap2))
elog(ERROR,
"attempting to OR two different-sized bitmaps: %d, %d",
VARSIZE(bitmap1),
VARSIZE(bitmap2));
if (VARSIZE(bitmap1) != VARSIZE(out))
elog(ERROR,
"target bitmap is of a different size from the source. "
"target bitmap size: %d, source bitmap size: %d",
VARSIZE(out),
VARSIZE(bitmap1));
/* could probably be more efficient doing this 32 or 64 bits at a time */
for (i=0; i < VARSIZE(bitmap1) - VARHDRSZ; i++)
((char *)(VARDATA(out)))[i] = ((char *)(VARDATA(bitmap1)))[i] |
((char *)(VARDATA(bitmap2)))[i];
}
/*!
* wrapper for insertion into a sortasort. calls sortasort_try_insert and if that fails it
* makes more space for insertion (double or more the size) and tries again.
* \param transblob the current transition value packed into a bytea
* \param dat the Datum to be inserted
*/
bytea *fmsketch_sortasort_insert(bytea *transblob, Datum dat, size_t len)
{
fmtransval *transval = (fmtransval *)VARDATA(transblob);
sortasort *s_in =
(sortasort *)(transval->storage);
bytea * newblob;
bool success = FALSE;
size_t new_storage_sz;
size_t newsize;
if (s_in->num_vals >= s_in->capacity)
elog(ERROR, "attempt to insert into full sortasort");
success = sortasort_try_insert(s_in, dat, transval->typLen);
if (success < 0)
elog(ERROR, "insufficient directory capacity in sortasort");
if (success == TRUE) return (transblob);
/* XXX THIS WHILE LOOP WILL SUCCEED THE FIRST TRY ... REMOVE IT. */
while (!success) {
/*
* else insufficient space
* allocate a fmtransval with double-big storage area plus room for v
* should work 2nd time around the loop.
* we can't use repalloc because it fails trying to free the old transblob
*/
new_storage_sz = s_in->storage_sz*2 + len;
/* XXX THIS POINTER ARITHMETIC SHOULD BE HIDDEN BY A MACRO IN THE SORTASORT LIBRARY! */
newsize = VARHDRSZ + sizeof(fmtransval) + sizeof(sortasort)
+ s_in->capacity*sizeof(s_in->dir[0]) +
new_storage_sz;
newblob = (bytea *)palloc(newsize);
memcpy(newblob, transblob, VARSIZE(transblob));
SET_VARSIZE(newblob, newsize);
s_in = (sortasort *)((fmtransval *)VARDATA(newblob))->storage;
s_in->storage_sz = new_storage_sz;
/*
* Can't figure out how to make pfree happy with transblob
* pfree(transblob);
*/
transblob = newblob;
success = sortasort_try_insert(s_in, dat, transval->typLen);
}
return(transblob);
}