/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

#include <postgres.h>
#include <fmgr.h>
#include <utils/lsyscache.h>
#include <utils/builtins.h>
#include <utils/array.h>
#include <catalog/pg_type.h>

#include "req_float_sketch_c_adapter.h"
#include "base64.h"

/* PG_FUNCTION_INFO_V1 macro to pass functions to postgres */
PG_FUNCTION_INFO_V1(pg_req_float_sketch_add_item);
PG_FUNCTION_INFO_V1(pg_req_float_sketch_get_rank);
PG_FUNCTION_INFO_V1(pg_req_float_sketch_get_quantile);
PG_FUNCTION_INFO_V1(pg_req_float_sketch_get_n);
PG_FUNCTION_INFO_V1(pg_req_float_sketch_to_string);
PG_FUNCTION_INFO_V1(pg_req_float_sketch_merge);
PG_FUNCTION_INFO_V1(pg_req_float_sketch_from_internal);
PG_FUNCTION_INFO_V1(pg_req_float_sketch_get_pmf);
PG_FUNCTION_INFO_V1(pg_req_float_sketch_get_cdf);
PG_FUNCTION_INFO_V1(pg_req_float_sketch_get_quantiles);
PG_FUNCTION_INFO_V1(pg_req_float_sketch_get_histogram);

/* function declarations */
Datum pg_req_float_sketch_recv(PG_FUNCTION_ARGS);
Datum pg_req_float_sketch_send(PG_FUNCTION_ARGS);
Datum pg_req_float_sketch_add_item(PG_FUNCTION_ARGS);
Datum pg_req_float_sketch_get_rank(PG_FUNCTION_ARGS);
Datum pg_req_float_sketch_get_quantile(PG_FUNCTION_ARGS);
Datum pg_req_float_sketch_get_n(PG_FUNCTION_ARGS);
Datum pg_req_float_sketch_to_string(PG_FUNCTION_ARGS);
Datum pg_req_float_sketch_merge(PG_FUNCTION_ARGS);
Datum pg_req_float_sketch_from_internal(PG_FUNCTION_ARGS);
Datum pg_req_float_sketch_get_pmf(PG_FUNCTION_ARGS);
Datum pg_req_float_sketch_get_cdf(PG_FUNCTION_ARGS);
Datum pg_req_float_sketch_get_quantiles(PG_FUNCTION_ARGS);
Datum pg_req_float_sketch_get_histogram(PG_FUNCTION_ARGS);

static const unsigned DEFAULT_NUM_BINS = 10;

Datum pg_req_float_sketch_add_item(PG_FUNCTION_ARGS) {
  void* sketchptr;
  float value;
  int k;
  bool hra;

  MemoryContext oldcontext;
  MemoryContext aggcontext;

  if (PG_ARGISNULL(0) && PG_ARGISNULL(1)) {
    PG_RETURN_NULL();
  } else if (PG_ARGISNULL(1)) {
    PG_RETURN_POINTER(PG_GETARG_POINTER(0)); // no update value. return unmodified state
  }

  if (!AggCheckCallContext(fcinfo, &aggcontext)) {
    elog(ERROR, "req_float_sketch_add_item called in non-aggregate context");
  }
  oldcontext = MemoryContextSwitchTo(aggcontext);

  if (PG_ARGISNULL(0)) {
    k = PG_NARGS() > 2 ? PG_GETARG_INT32(2) : DEFAULT_K;
    hra = PG_NARGS() > 3 ? PG_GETARG_BOOL(3) : true;
    sketchptr = req_float_sketch_new(k, hra);
  } else {
    sketchptr = PG_GETARG_POINTER(0);
  }

  value = PG_GETARG_FLOAT4(1);
  req_float_sketch_update(sketchptr, value);

  MemoryContextSwitchTo(oldcontext);

  PG_RETURN_POINTER(sketchptr);
}

Datum pg_req_float_sketch_get_rank(PG_FUNCTION_ARGS) {
  const bytea* bytes_in;
  void* sketchptr;
  float value;
  double rank;
  bool inclusive;
  bytes_in = PG_GETARG_BYTEA_P(0);
  sketchptr = req_float_sketch_deserialize(VARDATA(bytes_in), VARSIZE(bytes_in) - VARHDRSZ);
  value = PG_GETARG_FLOAT4(1);
  inclusive = PG_NARGS() > 2 ? PG_GETARG_BOOL(2) : false;
  rank = req_float_sketch_get_rank(sketchptr, value, inclusive);
  req_float_sketch_delete(sketchptr);
  PG_RETURN_FLOAT8(rank);
}

Datum pg_req_float_sketch_get_quantile(PG_FUNCTION_ARGS) {
  const bytea* bytes_in;
  void* sketchptr;
  float value;
  double rank;
  bool inclusive;
  bytes_in = PG_GETARG_BYTEA_P(0);
  sketchptr = req_float_sketch_deserialize(VARDATA(bytes_in), VARSIZE(bytes_in) - VARHDRSZ);
  rank = PG_GETARG_FLOAT8(1);
  inclusive = PG_NARGS() > 2 ? PG_GETARG_BOOL(2) : false;
  value = req_float_sketch_get_quantile(sketchptr, rank, inclusive);
  req_float_sketch_delete(sketchptr);
  PG_RETURN_FLOAT4(value);
}

Datum pg_req_float_sketch_get_n(PG_FUNCTION_ARGS) {
  const bytea* bytes_in;
  void* sketchptr;
  uint64 n;
  bytes_in = PG_GETARG_BYTEA_P(0);
  sketchptr = req_float_sketch_deserialize(VARDATA(bytes_in), VARSIZE(bytes_in) - VARHDRSZ);
  n = req_float_sketch_get_n(sketchptr);
  req_float_sketch_delete(sketchptr);
  PG_RETURN_INT64(n);
}

Datum pg_req_float_sketch_to_string(PG_FUNCTION_ARGS) {
  const bytea* bytes_in;
  void* sketchptr;
  char* str;
  bytes_in = PG_GETARG_BYTEA_P(0);
  sketchptr = req_float_sketch_deserialize(VARDATA(bytes_in), VARSIZE(bytes_in) - VARHDRSZ);
  str = req_float_sketch_to_string(sketchptr);
  req_float_sketch_delete(sketchptr);
  PG_RETURN_TEXT_P(cstring_to_text(str));
}

Datum pg_req_float_sketch_merge(PG_FUNCTION_ARGS) {
  void* unionptr;
  bytea* sketch_bytes;
  void* sketchptr;
  int k;
  bool hra;

  MemoryContext oldcontext;
  MemoryContext aggcontext;

  if (PG_ARGISNULL(0) && PG_ARGISNULL(1)) {
    PG_RETURN_NULL();
  } else if (PG_ARGISNULL(1)) {
    PG_RETURN_POINTER(PG_GETARG_POINTER(0)); // no update value. return unmodified state
  }

  if (!AggCheckCallContext(fcinfo, &aggcontext)) {
    elog(ERROR, "req_float_sketch_merge called in non-aggregate context");
  }
  oldcontext = MemoryContextSwitchTo(aggcontext);

  if (PG_ARGISNULL(0)) {
    k = PG_NARGS() > 2 ? PG_GETARG_INT32(2) : DEFAULT_K;
    hra = PG_NARGS() > 3 ? PG_GETARG_BOOL(3) : true;
    unionptr = req_float_sketch_new(k ? k : DEFAULT_K, hra);
  } else {
    unionptr = PG_GETARG_POINTER(0);
  }

  sketch_bytes = PG_GETARG_BYTEA_P(1);
  sketchptr = req_float_sketch_deserialize(VARDATA(sketch_bytes), VARSIZE(sketch_bytes) - VARHDRSZ);
  req_float_sketch_merge(unionptr, sketchptr);
  req_float_sketch_delete(sketchptr);

  MemoryContextSwitchTo(oldcontext);

  PG_RETURN_POINTER(unionptr);
}

Datum pg_req_float_sketch_from_internal(PG_FUNCTION_ARGS) {
  void* sketchptr;
  struct ptr_with_size bytes_out;
  MemoryContext aggcontext;

  if (PG_ARGISNULL(0)) PG_RETURN_NULL();
  if (!AggCheckCallContext(fcinfo, &aggcontext)) {
    elog(ERROR, "req_float_sketch_from_internal called in non-aggregate context");
  }
  sketchptr = PG_GETARG_POINTER(0);
  bytes_out = req_float_sketch_serialize(sketchptr, VARHDRSZ);
  req_float_sketch_delete(sketchptr);
  SET_VARSIZE(bytes_out.ptr, bytes_out.size);
  PG_RETURN_BYTEA_P(bytes_out.ptr);
}

Datum pg_req_float_sketch_get_pmf(PG_FUNCTION_ARGS) {
  const bytea* bytes_in;
  void* sketchptr;

  // input array of split points
  ArrayType* arr_in;
  Oid elmtype_in;
  int16 elmlen_in;
  bool elmbyval_in;
  char elmalign_in;
  Datum* data_in;
  bool* nulls_in;
  int arr_len_in;
  float* split_points;
  bool inclusive;

  // output array of fractions
  Datum* result;
  ArrayType* arr_out;
  int16 elmlen_out;
  bool elmbyval_out;
  char elmalign_out;
  int arr_len_out;

  int i;

  bytes_in = PG_GETARG_BYTEA_P(0);
  sketchptr = req_float_sketch_deserialize(VARDATA(bytes_in), VARSIZE(bytes_in) - VARHDRSZ);

  arr_in = PG_GETARG_ARRAYTYPE_P(1);
  elmtype_in = ARR_ELEMTYPE(arr_in);
  get_typlenbyvalalign(elmtype_in, &elmlen_in, &elmbyval_in, &elmalign_in);
  deconstruct_array(arr_in, elmtype_in, elmlen_in, elmbyval_in, elmalign_in, &data_in, &nulls_in, &arr_len_in);

  split_points = palloc(sizeof(float) * arr_len_in);
  for (i = 0; i < arr_len_in; i++) {
    split_points[i] = DatumGetFloat4(data_in[i]);
  }

  inclusive = PG_NARGS() > 2 ? PG_GETARG_BOOL(2) : false;

  result = (Datum*) req_float_sketch_get_pmf_or_cdf(sketchptr, split_points, arr_len_in, false, false, inclusive);
  pfree(split_points);

  // construct output array of fractions
  arr_len_out = arr_len_in + 1; // N split points devide the number line into N+1 intervals
  get_typlenbyvalalign(FLOAT8OID, &elmlen_out, &elmbyval_out, &elmalign_out);
  arr_out = construct_array(result, arr_len_out, FLOAT8OID, elmlen_out, elmbyval_out, elmalign_out);

  req_float_sketch_delete(sketchptr);

  PG_RETURN_ARRAYTYPE_P(arr_out);
}

Datum pg_req_float_sketch_get_cdf(PG_FUNCTION_ARGS) {
  const bytea* bytes_in;
  void* sketchptr;

  // input array of split points
  ArrayType* arr_in;
  Oid elmtype_in;
  int16 elmlen_in;
  bool elmbyval_in;
  char elmalign_in;
  Datum* data_in;
  bool* nulls_in;
  int arr_len_in;
  float* split_points;
  bool inclusive;

  // output array of fractions
  Datum* result;
  ArrayType* arr_out;
  int16 elmlen_out;
  bool elmbyval_out;
  char elmalign_out;
  int arr_len_out;

  int i;

  bytes_in = PG_GETARG_BYTEA_P(0);
  sketchptr = req_float_sketch_deserialize(VARDATA(bytes_in), VARSIZE(bytes_in) - VARHDRSZ);

  arr_in = PG_GETARG_ARRAYTYPE_P(1);
  elmtype_in = ARR_ELEMTYPE(arr_in);
  get_typlenbyvalalign(elmtype_in, &elmlen_in, &elmbyval_in, &elmalign_in);
  deconstruct_array(arr_in, elmtype_in, elmlen_in, elmbyval_in, elmalign_in, &data_in, &nulls_in, &arr_len_in);

  split_points = palloc(sizeof(float) * arr_len_in);
  for (i = 0; i < arr_len_in; i++) {
    split_points[i] = DatumGetFloat4(data_in[i]);
  }

  inclusive = PG_NARGS() > 2 ? PG_GETARG_BOOL(2) : false;

  result = (Datum*) req_float_sketch_get_pmf_or_cdf(sketchptr, split_points, arr_len_in, true, false, inclusive);
  pfree(split_points);

  // construct output array of fractions
  arr_len_out = arr_len_in + 1; // N split points devide the number line into N+1 intervals
  get_typlenbyvalalign(FLOAT8OID, &elmlen_out, &elmbyval_out, &elmalign_out);
  arr_out = construct_array(result, arr_len_out, FLOAT8OID, elmlen_out, elmbyval_out, elmalign_out);

  req_float_sketch_delete(sketchptr);

  PG_RETURN_ARRAYTYPE_P(arr_out);
}

Datum pg_req_float_sketch_get_quantiles(PG_FUNCTION_ARGS) {
  const bytea* bytes_in;
  void* sketchptr;

  // input array of fractions
  ArrayType* arr_in;
  Oid elmtype_in;
  int16 elmlen_in;
  bool elmbyval_in;
  char elmalign_in;
  Datum* data_in;
  bool* nulls_in;
  int arr_len;
  double* fractions;
  bool inclusive;

  // output array of quantiles
  Datum* quantiles;
  ArrayType* arr_out;
  int16 elmlen_out;
  bool elmbyval_out;
  char elmalign_out;

  int i;

  bytes_in = PG_GETARG_BYTEA_P(0);
  sketchptr = req_float_sketch_deserialize(VARDATA(bytes_in), VARSIZE(bytes_in) - VARHDRSZ);

  arr_in = PG_GETARG_ARRAYTYPE_P(1);
  elmtype_in = ARR_ELEMTYPE(arr_in);
  get_typlenbyvalalign(elmtype_in, &elmlen_in, &elmbyval_in, &elmalign_in);
  deconstruct_array(arr_in, elmtype_in, elmlen_in, elmbyval_in, elmalign_in, &data_in, &nulls_in, &arr_len);

  fractions = palloc(sizeof(double) * arr_len);
  for (i = 0; i < arr_len; i++) {
    fractions[i] = DatumGetFloat8(data_in[i]);
  }

  inclusive = PG_NARGS() > 2 ? PG_GETARG_BOOL(2) : false;

  quantiles = (Datum*) req_float_sketch_get_quantiles(sketchptr, fractions, arr_len, inclusive);
  pfree(fractions);

  // construct output array of quantiles
  get_typlenbyvalalign(FLOAT4OID, &elmlen_out, &elmbyval_out, &elmalign_out);
  arr_out = construct_array(quantiles, arr_len, FLOAT4OID, elmlen_out, elmbyval_out, elmalign_out);

  req_float_sketch_delete(sketchptr);

  PG_RETURN_ARRAYTYPE_P(arr_out);
}

Datum pg_req_float_sketch_get_histogram(PG_FUNCTION_ARGS) {
  const bytea* bytes_in;
  void* sketchptr;
  int num_bins;
  bool inclusive;

  // output array of bins
  Datum* result;
  ArrayType* arr_out;
  int16 elmlen_out;
  bool elmbyval_out;
  char elmalign_out;
  int arr_len_out;

  int i;

  bytes_in = PG_GETARG_BYTEA_P(0);
  sketchptr = req_float_sketch_deserialize(VARDATA(bytes_in), VARSIZE(bytes_in) - VARHDRSZ);

  num_bins = PG_NARGS() > 1 ? PG_GETARG_INT32(1) : DEFAULT_NUM_BINS;
  if (num_bins < 2) {
    elog(ERROR, "at least two bins expected");
  }

  inclusive = PG_NARGS() > 2 ? PG_GETARG_BOOL(2) : false;

  float* split_points = palloc(sizeof(float) * (num_bins - 1));
  const float min_value = req_float_sketch_get_quantile(sketchptr, 0, inclusive);
  const float max_value = req_float_sketch_get_quantile(sketchptr, 1, inclusive);
  const float delta = (max_value - min_value) / num_bins;
  for (i = 0; i < num_bins - 1; i++) {
    split_points[i] = min_value + delta * (i + 1);
  }
  result = (Datum*) req_float_sketch_get_pmf_or_cdf(sketchptr, split_points, num_bins - 1, false, true, inclusive);
  pfree(split_points);

  // construct output array
  arr_len_out = num_bins;
  get_typlenbyvalalign(FLOAT8OID, &elmlen_out, &elmbyval_out, &elmalign_out);
  arr_out = construct_array(result, arr_len_out, FLOAT8OID, elmlen_out, elmbyval_out, elmalign_out);

  req_float_sketch_delete(sketchptr);

  PG_RETURN_ARRAYTYPE_P(arr_out);
}
