/*!
 * \file fm.c
 *
 * \brief Flajolet-Martin sketch implementation
 */
/*!
 * \implementation
 * In a nutshell, the FM sketch
 * is based on the idea of a bitmap whose bits are "turned on" by hashes of
 * values in the domain.  It is arranged so that
 * as you move left-to-right in that bitmap, the expected number of domain values
 * that can turn on the bit decreases exponentially.
 * After hashing all the values this way, the location of the first 0 from the
 * left of the bitmap is correlated with the
 * number of distinct values.  This idea is smoothed across a number of
 * trials using multiple independent hash functions on multiple bitmaps.
 *
 * The FM sketch technique works poorly with small inputs, so we
 * explicitly count the first 12K distinct values in a main-memory
 * data structure before switching over to sketching.
 *
 * See the paper mentioned below
 * for detailed explanation, formulae, and pseudocode.
 */

/* THIS CODE MAY NEED TO BE REVISITED TO ENSURE ALIGNMENT! */

#include <math.h>

#include <postgres.h>
#include <utils/array.h>
#include <utils/elog.h>
#include <utils/builtins.h>
#include <utils/lsyscache.h>
#include <pg_config.h>
#if PG_VERSION_NUM >= 100000
#include <common/md5.h>
#else
#include <libpq/md5.h>
#endif
#include <nodes/execnodes.h>
#include <fmgr.h>
#include <ctype.h>
#include "sketch_support.h"
#include "sortasort.h"

#ifndef NO_PG_MODULE_MAGIC
PG_MODULE_MAGIC;
#endif

#define NMAP 256
#define FMSKETCH_SZ (VARHDRSZ + NMAP*(MD5_HASHLEN_BITS)/CHAR_BIT)

/*!
 * For FM, empirically, estimates seem to fall below 1% error around 12k
 * distinct vals
 */
#define MINVALS 1024*12

/*!
 * initial size for a sortasort: we'll guess at 8 bytes per datum.
 * sortasort will grow dynamically if we guessed too low
 */
#define SORTASORT_INITIAL_STORAGE  sizeof(sortasort) + MINVALS*sizeof(uint32) + \
    8*MINVALS

typedef enum {SMALL, BIG} fmstatus;

/*!
 * \internal
 * \brief transition value struct for FM sketches
 *
 * because FM sketches work poorly on small numbers of values,
 * our transval can be in one of two modes.
 * for "SMALL" numbers of values (<=MINVALS), the storage array
 * is a "sortasort" data structure containing an array of input values.
 * for "BIG" datasets (>MINVAL), it is an array of FM sketch bitmaps.
 * \endinternal
 */
typedef struct {
    fmstatus status;
    Oid      typOid;
    Oid      funcOid;
    int16    typLen;
    bool     typByVal;
    /*
     * We'd better make sure that this struct is 8bytes aligned,
     * If the there is no the reserved field, (char*)&fmtransval.storage - (char*)&fmtransval
     * is not equal to sizeof(fmtransval). This is not coincident with one's intuition
     * and is also error prone when coding.
     *
     * Another reason to make the address of storage 8bytes aligned is that when we
     * store a structure in storage, 8bytes aligned address leads high performance when
     * cpu access main memory.
     *
     * If there is any chance someone may change the compiler option -fpack-struct,
     * for struct like this, we'd better make every field well aligned and packed.
     *
     * The default option makes sure that the first byte address of 8bytes types like int64
     * are 8bytes aligned. And 4bytes types like int32 are 4bytes aligned. So we only
     * need to make sure the first byte address of storage 8bytes aligned.
     */
    char     reserved;
    char storage[];
} fmtransval;

/* check whter the contents in  fmtransval::storage is safe for sortasort */
void check_sortasort(sortasort *st, size_t st_size) {
    size_t left_len = st_size;

    if (left_len < sizeof(sortasort)) {
        elog(ERROR, "invalid transition state for fmsketch");
    }
    left_len -= sizeof(sortasort);

    if (st->num_vals > st->capacity || st->storage_cur > st->storage_sz) {
        elog(ERROR, "invalid transition state for fmsketch");
    }

    if (left_len < st->capacity*sizeof(st->storage_cur) + st->storage_sz) {
        elog(ERROR, "invalid transition state for fmsketch");
    }
}

/* check whether the contents in the bytea is safe for a fmtransval */
void check_fmtransval(bytea * storage) {
    fmtransval * fmt = NULL;
    sortasort *st = NULL;
    int16 typLen = 0;
    bool typByVal = false;
    if (VARSIZE(storage) < VARHDRSZ + sizeof(fmtransval)) {
        elog(ERROR, "invalid transition state for fmsketch");
    }

    fmt = (fmtransval*)VARDATA(storage);
    if (fmt->status != SMALL && fmt->status != BIG) {
        elog(ERROR, "invalid transition state for fmsketch");
    }

    if (fmt->reserved != 0) {
        elog(ERROR, "invalid transition state for fmsketch");
    }

    if (InvalidOid == fmt->typOid) {
        elog(ERROR, "invalid transition state for fmsketch");
    }

    get_typlenbyval(fmt->typOid, &typLen, &typByVal);
    if (fmt->typByVal != typByVal || fmt->typLen != typLen) {
        elog(ERROR, "invalid transition state for fmsketch");
    }

    if (fmt->typLen < -2 || fmt->typLen == 0) {
        elog(ERROR, "invalid transition state for fmsketch");
    }

    if (SMALL == fmt->status) {
        if (VARSIZE(storage) < VARHDRSZ + sizeof(fmtransval) + sizeof(sortasort)) {
            elog(ERROR, "invalid transition state for fmsketch");
        }
        st = (sortasort *)fmt->storage;
        if (fmt->typLen != st->typLen || fmt->typByVal != (bool)st->typByVal) {
            elog(ERROR, "invalid transition state for fmsketch");
        }
        check_sortasort(st, VARSIZE(storage) - VARHDRSZ - sizeof(fmtransval));
    }
    else {
        if (VARSIZE(storage) < 2*VARHDRSZ + sizeof(fmtransval)) {
            elog(ERROR, "invalid transition state for fmsketch");
        }
        if (VARSIZE(storage) < VARHDRSZ + sizeof(fmtransval) + VARSIZE(&(fmt->storage))) {
            elog(ERROR, "invalid transition state for fmsketch");
        }
    }
}

Datum __fmsketch_trans_c(bytea *, Datum);
Datum __fmsketch_count_distinct_c(bytea *);
Datum __fmsketch_trans(PG_FUNCTION_ARGS);
Datum __fmsketch_count_distinct(PG_FUNCTION_ARGS);
Datum __fmsketch_merge(PG_FUNCTION_ARGS);
void big_or_internal(bytea *bitmap1, bytea *bitmap2, bytea *out);
bytea *fmsketch_sortasort_insert(bytea *, Datum, size_t);
bytea *fm_new(fmtransval *);

PG_FUNCTION_INFO_V1(__fmsketch_trans);

/*! UDA transition function for the fmsketch aggregate. */
Datum __fmsketch_trans(PG_FUNCTION_ARGS)
{
    bytea *     transblob = (bytea *)PG_GETARG_BYTEA_P(0);
    fmtransval *transval;
    Oid         element_type = get_fn_expr_argtype(fcinfo->flinfo, 1);
    Oid         funcOid;
    bool        typIsVarlena;
    Datum       retval;
    Datum       inval;

    /*
     * This is Postgres boilerplate for UDFs that modify the data in their own context.
     * Such UDFs can only be correctly called in an agg context since regular scalar
     * UDFs are essentially stateless across invocations.
     */
    if (!(fcinfo->context &&
          (IsA(fcinfo->context, AggState)
    #ifdef NOTGP
           || IsA(fcinfo->context, WindowAggState)
    #endif
          )))
        elog(
            ERROR,
            "UDF call to a function that only works for aggs (destructive pass by reference)");


    /* get the provided element, being careful in case it's NULL */
    if (!PG_ARGISNULL(1)) {
        if (!OidIsValid(element_type))
            elog(ERROR, "could not determine data type of input");

        inval = PG_GETARG_DATUM(1);
        /*
         * if this is the first call, initialize transval to hold a sortasort
         * on the first call, we should have the empty string (if the agg was declared properly!)
         */
        if (VARSIZE(transblob) <= VARHDRSZ) {
            size_t blobsz = VARHDRSZ + sizeof(fmtransval) +
                            SORTASORT_INITIAL_STORAGE;

            transblob = (bytea *)palloc0(blobsz);
            SET_VARSIZE(transblob, blobsz);
            transval = (fmtransval *)VARDATA(transblob);

            transval->typOid = element_type;
            /* figure out the outfunc for this type */
            getTypeOutputInfo(element_type, &funcOid, &typIsVarlena);
            get_typlenbyval(element_type, &(transval->typLen), &(transval->typByVal));
            transval->status = SMALL;
            sortasort_init((sortasort *)transval->storage,
                           MINVALS,
                           SORTASORT_INITIAL_STORAGE,
                           transval->typLen,
                           transval->typByVal);
        }
        else {
            // check_fmtransval(transblob);
            /* extract the existing transval from the transblob */
            transval = (fmtransval *)VARDATA(transblob);
            // if (transval->typOid != element_type) {
            //     elog(ERROR, "cannot aggregate on elements with different types");
            // }
        }

        /*
         * if we've seen < MINVALS distinct values, place datum into the sortasort
         * XXXX Would be cleaner to try the sortasort insert and if it fails, then continue.
         */
        if (transval->status == SMALL
            && ((sortasort *)(transval->storage))->num_vals <
            MINVALS) {
            int len = ExtractDatumLen(inval, transval->typLen, transval->typByVal, -1);

            retval =
                PointerGetDatum(fmsketch_sortasort_insert(
                                    transblob,
                                    inval, len));
            PG_RETURN_DATUM(retval);
        }

        /*
         * if we've seen exactly MINVALS distinct values, create FM bitmaps
         * and load the contents of the sortasort into the FM sketch
         */
        else if (transval->status == SMALL
                 && ((sortasort *)(transval->storage))->num_vals ==
                 MINVALS) {
            int        i;
            sortasort  *s = (sortasort *)(transval->storage);
            bytea      *newblob = fm_new(transval);

            transval = (fmtransval *)VARDATA(newblob);

            /*
             * "catch up" on the past as if we were doing FM from the beginning:
             * apply the FM sketching algorithm to each value previously stored in the sortasort
             */
            for (i = 0; i < MINVALS; i++)
                __fmsketch_trans_c(newblob,
                                   PointerExtractDatum(sortasort_getval(s,i), s->typByVal));

            /*
             * XXXX would like to pfree the old transblob, but the memory allocator doesn't like it
             * XXXX Meanwhile we know that this memory "leak" is of fixed size and will get
             * XXXX deallocated "soon" when the memory context is destroyed.
             */
            /* drop through to insert the current datum in "BIG" mode */
            transblob = newblob;
        }

        /*
         * if we're here we've seen >=MINVALS distinct values and are in BIG mode.
         * Just for sanity, let's check.
         */
        if (transval->status != BIG)
            elog(
                ERROR,
                "FM sketch failed internal sanity check");

        /* Apply FM algorithm to this datum */
        retval = __fmsketch_trans_c(transblob, inval);
        PG_RETURN_DATUM(retval);
    }
    else PG_RETURN_NULL();
}

/*!
 * generate a bytea holding a transval in BIG mode, with the right amount of
 * zero bits for an empty FM sketch.
 * \param template an optional pre-existing transval whose fields we can copy in
 */
bytea *fm_new(fmtransval *template)
{
    int         fmsize = VARHDRSZ + sizeof(fmtransval) + FMSKETCH_SZ;
    /* use palloc0 to make sure it's initialized to 0 */
    bytea *     newblob = (bytea *)palloc0(fmsize);
    fmtransval *transval;

    SET_VARSIZE(newblob, fmsize);
    transval = (fmtransval *)VARDATA(newblob);
    /* copy over the struct values */
    if (template != NULL)
        memcpy(transval, template, sizeof(fmtransval));

    /* set status to BIG, possibly overwriting what was in template */
    transval->status = BIG;

    SET_VARSIZE((bytea *)transval->storage, FMSKETCH_SZ);
    return(newblob);
}

/*!
 * Main logic of Flajolet and Martin's sketching algorithm.
 * For each call, we get an md5 hash of the value passed in.
 * First we use the hash as a random number to choose one of
 * the NMAP bitmaps at random to update.
 * Then we find the position "rmost" of the rightmost 1 bit in the hashed value.
 * We then turn on the "rmost"-th bit FROM THE LEFT in the chosen bitmap.
 * \param transblob the transition value packed into a bytea
 * \param input a textual representation of the value to hash
 */
Datum __fmsketch_trans_c(bytea *transblob, Datum indat)
{
    fmtransval * transval = (fmtransval *) VARDATA(transblob);
    bytea *      bitmaps = (bytea *)transval->storage;
    uint64       index;
    uint8 *      c;
    int          rmost;
    // char        *hex;
    bytea       *hashed;

    hashed = sketch_md5_bytea(indat, transval->typOid);
    c = (uint8 *)VARDATA(hashed);
    // hex = text_to_cstring((bytea *)DatumGetPointer(DirectFunctionCall2(binary_encode, PointerGetDatum(hashed),
    //                       CStringGetTextDatum("hex"))));
    // elog(NOTICE, "md5(%s), len %d: %s", text_to_cstring((bytea *)PointerGetDatum(indat)),
    //      VARSIZE((bytea *)PointerGetDatum(indat)), hex);

    /*
     * During the insertion we insert each element
     * in one bitmap only (a la Flajolet pseudocode, page 16).
     * Choose the bitmap by taking the 64 high-order bits worth of hash value mod NMAP
     */
    index = (*(uint64 *)c) % NMAP;

    /*
     * Find index of the rightmost non-0 bit.  Turn on that bit (from left!) in the sketch.
     */
    rmost = rightmost_one(c, 1, MD5_HASHLEN_BITS, 0);

    /*
     * last argument must be the index of the bit position from the right.
     * i.e. position 0 is the rightmost.
     * so to set the bit at rmost from the left, we subtract from the total number of bits.
     */
    array_set_bit_in_place(bitmaps, NMAP, MD5_HASHLEN_BITS, index,
                           (MD5_HASHLEN_BITS - 1) - rmost);
    return PointerGetDatum(transblob);
}

PG_FUNCTION_INFO_V1(__fmsketch_count_distinct);

/*! UDA final function to get count(distinct) out of an FM sketch */
Datum __fmsketch_count_distinct(PG_FUNCTION_ARGS)
{
    fmtransval *transval = NULL;

    if (VARSIZE((PG_GETARG_BYTEA_P(0))) == VARHDRSZ)
        /* nothing was ever aggregated! */
        return (0);

    check_fmtransval(PG_GETARG_BYTEA_P(0));
    transval = (fmtransval *)VARDATA((PG_GETARG_BYTEA_P(0)));

    /* if status is not BIG then get count from sortasort */
    if (transval->status == SMALL)
        return ((sortasort *)(transval->storage))->num_vals;
    /* else get count via fm */
    else if (transval->status != BIG) {
        elog(ERROR, "FM transval neither SMALL nor BIG");
        return(0);
    }
    else     /* transval->status == BIG */
        return __fmsketch_count_distinct_c((bytea *)transval->storage);
}

/*!
 * Finish up the Flajolet-Martin approximation.
 * We sum up the number of leading 1 bits across all bitmaps in the sketch.
 * Then we use the FM magic formula to estimate the distinct count.
 * \params bitmaps the FM Sketch
 */
Datum __fmsketch_count_distinct_c(bytea *bitmaps)
{
/*  int R = 0; // Flajolet/Martin's R is handled by leftmost_zero */
    uint32        S = 0;
    static double phi = 0.77351;     /*
                                      * the magic constant
                                      * char out[NMAP*MD5_HASHLEN_BITS];
                                      */
    int    i;
    uint32 lz;

    for (i = 0; i < NMAP; i++)
    {
        lz = leftmost_zero((uint8 *)VARDATA(
                               bitmaps), NMAP, MD5_HASHLEN_BITS, i);
        S = S + lz;
    }

    PG_RETURN_INT64(ceil( ((double)NMAP/
                           phi) * pow(2.0, (double)S/(double)NMAP) ));
}


PG_FUNCTION_INFO_V1(__fmsketch_merge);

/*!
 * Greenplum "prefunc": a function to merge 2 transvals computed at different machines.
 * For simple FM, this is trivial: just OR together the two arrays of bitmaps.
 * But we have to deal with cases where one or both transval is SMALL: i.e. it
 * holds a sortasort, not an FM sketch.
 *
 * XXX  TESTING: Ensure we exercise all branches!
 */
Datum __fmsketch_merge(PG_FUNCTION_ARGS)
{
    bytea *     transblob1 = (bytea *)PG_GETARG_BYTEA_P(0);
    bytea *     transblob2 = (bytea *)PG_GETARG_BYTEA_P(1);
    fmtransval *transval1, *transval2;
    sortasort * s1, *s2;
    sortasort * sortashort, *sortabig;
    bytea *     tblob_big, *tblob_small;
    uint32      i;

    /* deal with the case where one or both items is the initial value of '' */
    if (VARSIZE(transblob1) == VARHDRSZ) {
        PG_RETURN_DATUM(PointerGetDatum(transblob2));
    }
    if (VARSIZE(transblob2) == VARHDRSZ) {
        PG_RETURN_DATUM(PointerGetDatum(transblob1));
    }

    check_fmtransval(transblob1);
    check_fmtransval(transblob2);
    transval1 = (fmtransval *)VARDATA(transblob1);
    transval2 = (fmtransval *)VARDATA(transblob2);
    if (transval1->typOid != transval2->typOid) {
        elog(ERROR, "cannot merge two transition state with different element types");
    }

    if (transval1->status == BIG && transval2->status == BIG) {
        /* easy case: merge two FM sketches via bitwise OR. */
        fmtransval *newval;
        tblob_big = fm_new(transval1);
        newval = (fmtransval *)VARDATA(tblob_big);

        big_or_internal((bytea *)transval1->storage, (bytea *)transval2->storage,
                        (bytea *)newval->storage);

        PG_RETURN_DATUM(PointerGetDatum(tblob_big));
    }
    else if (transval1->status == SMALL && transval2->status == SMALL) {
        s1 = (sortasort *)(transval1->storage);
        s2 = (sortasort *)(transval2->storage);
        tblob_big =
            (s1->num_vals > s2->num_vals) ? transblob1 : transblob2;
        tblob_small =
            (s1->num_vals > s2->num_vals) ? transblob2 : transblob1;
        sortashort =
            (sortasort *)(((fmtransval *)((fmtransval *)VARDATA(tblob_small)))->storage);
        sortabig = (sortasort *)(((fmtransval *)((fmtransval *)VARDATA(tblob_big)))->storage);
        if (sortabig->num_vals + sortashort->num_vals <=
            sortabig->capacity) {
            /*
             * we have room in sortabig
             * one could imagine a more efficient (merge-based) sortasort merge,
             * but for now we just copy the values from the smaller sortasort into
             * the bigger one.
             */
            for (i = 0; i < sortashort->num_vals; i++) {
                Datum the_val = PointerExtractDatum(sortasort_getval(sortashort,i),
                                                    transval1->typByVal);
                int the_len = ExtractDatumLen(the_val, transval1->typLen,
                                              transval1->typByVal, -1);
                tblob_big = fmsketch_sortasort_insert(tblob_big,
                                                      the_val,
                                                      the_len);
            }
            PG_RETURN_DATUM(PointerGetDatum(tblob_big));
        }
        /* else drop through. */
    }

    /*
     * if we got here, then at most one transval is BIG, i.e. one or both transvals is SMALL.
     * need to form an FM sketch and populate with the SMALL transval(s)
     */
    if (transval1->status == SMALL && transval2->status == SMALL)
        tblob_big = fm_new(transval1);
    else
        tblob_big =
            (transval1->status == BIG) ? transblob1 : transblob2;

    if (transval1->status == SMALL) {
        s1 = (sortasort *)(transval1->storage);
        for(i = 0; i < s1->num_vals; i++) {
            __fmsketch_trans_c(tblob_big, PointerExtractDatum(sortasort_getval(s1,i),
                                                              transval1->typByVal));
        }
    }
    if (transval2->status == SMALL) {
        s2 = (sortasort *)(transval2->storage);
        for(i = 0; i < s2->num_vals; i++) {
            __fmsketch_trans_c(tblob_big, PointerExtractDatum(sortasort_getval(s2,i),
                                                              transval1->typByVal));
        }
    }
    PG_RETURN_DATUM(PointerGetDatum(tblob_big));
}

PG_FUNCTION_INFO_V1(big_or);
Datum
big_or(PG_FUNCTION_ARGS)
{

  big_or_internal(PG_GETARG_BYTEA_PP(0), PG_GETARG_BYTEA_PP(1), PG_GETARG_BYTEA_PP(2));

  PG_RETURN_VOID();
}
  
  
/*! OR of two big bitmaps, for gathering sketches computed in parallel. */
void big_or_internal(bytea *bitmap1, bytea *bitmap2, bytea *out)
{
    uint32  i;

    if (VARSIZE(bitmap1) != VARSIZE(bitmap2))
        elog(ERROR,
             "attempting to OR two different-sized bitmaps: %d, %d",
             VARSIZE(bitmap1),
             VARSIZE(bitmap2));

    if (VARSIZE(bitmap1) != VARSIZE(out))
        elog(ERROR,
             "target bitmap is of a different size from the source. "
             "target bitmap size: %d, source bitmap size: %d",
             VARSIZE(out),
             VARSIZE(bitmap1));

    /* could probably be more efficient doing this 32 or 64 bits at a time */
    for (i=0; i < VARSIZE(bitmap1) - VARHDRSZ; i++)
        ((char *)(VARDATA(out)))[i] = ((char *)(VARDATA(bitmap1)))[i] |
                                      ((char *)(VARDATA(bitmap2)))[i];
}

/*!
 * wrapper for insertion into a sortasort. calls sortasort_try_insert and if that fails it
 * makes more space for insertion (double or more the size) and tries again.
 * \param transblob the current transition value packed into a bytea
 * \param dat the Datum to be inserted
 */
bytea *fmsketch_sortasort_insert(bytea *transblob, Datum dat, size_t len)
{
    fmtransval *transval = (fmtransval *)VARDATA(transblob);
    sortasort *s_in =
        (sortasort *)(transval->storage);
    bytea *    newblob;
    bool       success = FALSE;
    size_t     new_storage_sz;
    size_t     newsize;

    if (s_in->num_vals >= s_in->capacity)
        elog(ERROR, "attempt to insert into full sortasort");

    success = sortasort_try_insert(s_in, dat, transval->typLen);
    if (success < 0)
        elog(ERROR, "insufficient directory capacity in sortasort");

    if (success == TRUE) return (transblob);

    /* XXX  THIS WHILE LOOP WILL SUCCEED THE FIRST TRY ... REMOVE IT. */
    while (!success) {
        /*
         * else insufficient space
         * allocate a fmtransval with double-big storage area plus room for v
         * should work 2nd time around the loop.
         * we can't use repalloc because it fails trying to free the old transblob
         */

        new_storage_sz = s_in->storage_sz*2 + len;
        /* XXX THIS POINTER ARITHMETIC SHOULD BE HIDDEN BY A MACRO IN THE SORTASORT LIBRARY! */
        newsize = VARHDRSZ + sizeof(fmtransval) + sizeof(sortasort)
                  + s_in->capacity*sizeof(s_in->dir[0]) +
                  new_storage_sz;
        newblob = (bytea *)palloc(newsize);
        memcpy(newblob, transblob, VARSIZE(transblob));
        SET_VARSIZE(newblob, newsize);
        s_in = (sortasort *)((fmtransval *)VARDATA(newblob))->storage;
        s_in->storage_sz = new_storage_sz;
        /*
         * Can't figure out how to make pfree happy with transblob
         * pfree(transblob);
         */
        transblob = newblob;
        success = sortasort_try_insert(s_in, dat, transval->typLen);
    }
    return(transblob);
}
