/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

#include <postgres.h>
#include <fmgr.h>
#include <utils/lsyscache.h>
#include <utils/builtins.h>
#include <utils/array.h>
#include <catalog/pg_type.h>
#include <funcapi.h>

#include "frequent_strings_sketch_c_adapter.h"
#include "base64.h"

/* PG_FUNCTION_INFO_V1 macro to pass functions to postgres */
PG_FUNCTION_INFO_V1(pg_frequent_strings_sketch_add_item);
PG_FUNCTION_INFO_V1(pg_frequent_strings_sketch_merge);
PG_FUNCTION_INFO_V1(pg_frequent_strings_sketch_from_internal);
PG_FUNCTION_INFO_V1(pg_frequent_strings_sketch_to_string);
PG_FUNCTION_INFO_V1(pg_frequent_strings_sketch_result_no_false_positives);
PG_FUNCTION_INFO_V1(pg_frequent_strings_sketch_result_no_false_negatives);

/* function declarations */
Datum pg_frequent_strings_sketch_recv(PG_FUNCTION_ARGS);
Datum pg_frequent_strings_sketch_send(PG_FUNCTION_ARGS);
Datum pg_frequent_strings_sketch_add_item(PG_FUNCTION_ARGS);
Datum pg_frequent_strings_sketch_merge(PG_FUNCTION_ARGS);
Datum pg_frequent_strings_sketch_from_internal(PG_FUNCTION_ARGS);
Datum pg_frequent_strings_sketch_to_string(PG_FUNCTION_ARGS);
Datum pg_frequent_strings_sketch_result_no_false_positives(PG_FUNCTION_ARGS);
Datum pg_frequent_strings_sketch_result_no_false_negatives(PG_FUNCTION_ARGS);

Datum frequent_strings_sketch_get_result(PG_FUNCTION_ARGS, bool);


Datum pg_frequent_strings_sketch_add_item(PG_FUNCTION_ARGS) {
  void* sketchptr;
  unsigned lg_k;
  const VarChar* str;
  unsigned long long weight;

  MemoryContext oldcontext;
  MemoryContext aggcontext;

  if (PG_ARGISNULL(0) && PG_ARGISNULL(2)) {
    PG_RETURN_NULL();
  } else if (PG_ARGISNULL(2)) {
    PG_RETURN_POINTER(PG_GETARG_POINTER(0)); // no update value. return unmodified state
  }

  if (!AggCheckCallContext(fcinfo, &aggcontext)) {
    elog(ERROR, "frequent_strings_sketch_add_item called in non-aggregate context");
  }
  oldcontext = MemoryContextSwitchTo(aggcontext);

  if (PG_ARGISNULL(0)) {
    lg_k = PG_GETARG_INT32(1);
    sketchptr = frequent_strings_sketch_new(lg_k);
  } else {
    sketchptr = PG_GETARG_POINTER(0);
  }

  str = PG_GETARG_VARCHAR_P(2);

  // optional weight
  if (PG_NARGS() == 3) {
    weight = 1;
  } else {
    weight = PG_GETARG_INT64(3);
  }

  frequent_strings_sketch_update(sketchptr, VARDATA(str), VARSIZE(str) - VARHDRSZ, weight);

  MemoryContextSwitchTo(oldcontext);

  PG_RETURN_POINTER(sketchptr);
}

Datum pg_frequent_strings_sketch_merge(PG_FUNCTION_ARGS) {
  void* unionptr;
  bytea* sketch_bytes;
  void* sketchptr;
  unsigned lg_k;

  MemoryContext oldcontext;
  MemoryContext aggcontext;

  if (PG_ARGISNULL(0) && PG_ARGISNULL(2)) {
    PG_RETURN_NULL();
  } else if (PG_ARGISNULL(2)) {
    PG_RETURN_POINTER(PG_GETARG_POINTER(0)); // no update value. return unmodified state
  }

  if (!AggCheckCallContext(fcinfo, &aggcontext)) {
    elog(ERROR, "frequent_strings_sketch_merge called in non-aggregate context");
  }
  oldcontext = MemoryContextSwitchTo(aggcontext);

  if (PG_ARGISNULL(0)) {
    lg_k = PG_GETARG_INT32(1);
    unionptr = frequent_strings_sketch_new(lg_k);
  } else {
    unionptr = PG_GETARG_POINTER(0);
  }

  sketch_bytes = PG_GETARG_BYTEA_P(2);
  sketchptr = frequent_strings_sketch_deserialize(VARDATA(sketch_bytes), VARSIZE(sketch_bytes) - VARHDRSZ);
  frequent_strings_sketch_merge(unionptr, sketchptr);
  frequent_strings_sketch_delete(sketchptr);

  MemoryContextSwitchTo(oldcontext);

  PG_RETURN_POINTER(unionptr);
}

Datum pg_frequent_strings_sketch_from_internal(PG_FUNCTION_ARGS) {
  void* sketchptr;
  bytea* bytes_out;
  MemoryContext aggcontext;
  if (PG_ARGISNULL(0)) PG_RETURN_NULL();
  if (!AggCheckCallContext(fcinfo, &aggcontext)) {
    elog(ERROR, "frequent_strings_sketch_from_internal called in non-aggregate context");
  }
  sketchptr = PG_GETARG_POINTER(0);
  bytes_out = frequent_strings_sketch_serialize(sketchptr);
  frequent_strings_sketch_delete(sketchptr);
  PG_RETURN_BYTEA_P(bytes_out);
}

Datum pg_frequent_strings_sketch_to_string(PG_FUNCTION_ARGS) {
  const bytea* bytes_in;
  void* sketchptr;
  bool print_items;
  char* str;
  bytes_in = PG_GETARG_BYTEA_P(0);
  if (PG_NARGS() > 1) {
    print_items = PG_GETARG_BOOL(1);
  } else {
    print_items = false;
  }
  sketchptr = frequent_strings_sketch_deserialize(VARDATA(bytes_in), VARSIZE(bytes_in) - VARHDRSZ);
  str = frequent_strings_sketch_to_string(sketchptr, print_items);
  frequent_strings_sketch_delete(sketchptr);
  PG_RETURN_TEXT_P(cstring_to_text(str));
}

Datum frequent_strings_sketch_get_result(PG_FUNCTION_ARGS, bool no_false_positives) {
  FuncCallContext* funcctx;
  TupleDesc tupdesc;
  const bytea* bytes_in;
  void* sketchptr;
  unsigned long long threshold;

  if (SRF_IS_FIRSTCALL()) {
    MemoryContext oldcontext;
    funcctx = SRF_FIRSTCALL_INIT();
    oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
    if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) {
      ereport(
        ERROR,
        (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("function returning record called in context that cannot accept type record"))
      );
    }

    if (PG_ARGISNULL(0)) SRF_RETURN_DONE(funcctx);
    bytes_in = PG_GETARG_BYTEA_P(0);
    sketchptr = frequent_strings_sketch_deserialize(VARDATA(bytes_in), VARSIZE(bytes_in) - VARHDRSZ);
    if (PG_ARGISNULL(1)) {
      threshold = 0;
    } else {
      threshold = PG_GETARG_INT64(1);
    }

    funcctx->user_fctx = frequent_strings_sketch_get_frequent_items(sketchptr, no_false_positives, threshold);
    funcctx->max_calls = ((struct frequent_strings_sketch_result*) funcctx->user_fctx)->num;
    funcctx->attinmeta = TupleDescGetAttInMetadata(tupdesc);

    frequent_strings_sketch_delete(sketchptr);

    MemoryContextSwitchTo(oldcontext);
  }

  funcctx = SRF_PERCALL_SETUP();

  if (funcctx->call_cntr < funcctx->max_calls) {
    char       **values;
    HeapTuple    tuple;
    Datum        result;

    struct frequent_strings_sketch_result* frequent_strings = funcctx->user_fctx;
    struct frequent_strings_sketch_result_row* row = &frequent_strings->rows[funcctx->call_cntr];

    values = (char**) palloc(4 * sizeof(char*));
    values[0] = row->str;
    values[1] = (char*) palloc(21);
    values[2] = (char*) palloc(21);
    values[3] = (char*) palloc(21);

    snprintf(values[1], 21, "%llu", row->estimate);
    snprintf(values[2], 21, "%llu", row->lower_bound);
    snprintf(values[3], 21, "%llu", row->upper_bound);

    tuple = BuildTupleFromCStrings(funcctx->attinmeta, values);

    result = HeapTupleGetDatum(tuple);

    pfree(values[1]);
    pfree(values[2]);
    pfree(values[3]);
    pfree(values);

    SRF_RETURN_NEXT(funcctx, result);
  } else {
    SRF_RETURN_DONE(funcctx);
  }
}

Datum pg_frequent_strings_sketch_result_no_false_positives(PG_FUNCTION_ARGS) {
  return frequent_strings_sketch_get_result(fcinfo, true);
}

Datum pg_frequent_strings_sketch_result_no_false_negatives(PG_FUNCTION_ARGS) {
  return frequent_strings_sketch_get_result(fcinfo, false);
}
