// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#include "exprs/aggregate-functions.h"

#include <algorithm>
#include <map>
#include <sstream>
#include <utility>
#include <cmath>

#include <boost/random/ranlux.hpp>
#include <boost/random/uniform_int.hpp>

#include "codegen/impala-ir.h"
#include "common/logging.h"
#include "exprs/anyval-util.h"
#include "exprs/hll-bias.h"
#include "runtime/date-value.h"
#include "runtime/decimal-value.inline.h"
#include "runtime/runtime-state.h"
#include "runtime/string-value.inline.h"
#include "runtime/timestamp-value.h"
#include "runtime/timestamp-value.inline.h"
#include "util/arithmetic-util.h"
#include "util/mpfit-util.h"

#include "common/names.h"

using boost::uniform_int;
using boost::ranlux64_3;
using std::make_pair;
using std::map;
using std::min_element;
using std::nth_element;
using std::pop_heap;
using std::push_heap;

namespace {
// Threshold for each precision where it's better to use linear counting instead
// of the bias corrected estimate.
static float HllThreshold(int p) {
  switch (p) {
    case 4:
      return 10.0;
    case 5:
      return 20.0;
    case 6:
      return 40.0;
    case 7:
      return 80.0;
    case 8:
      return 220.0;
    case 9:
      return 400.0;
    case 10:
      return 900.0;
    case 11:
      return 1800.0;
    case 12:
      return 3100.0;
    case 13:
      return 6500.0;
    case 14:
      return 11500.0;
    case 15:
      return 20000.0;
    case 16:
      return 50000.0;
    case 17:
      return 120000.0;
    case 18:
      return 350000.0;
  }
  return 0.0;
}

// Implements k nearest neighbor interpolation for k=6,
// we choose 6 bassed on the HLL++ paper
int64_t HllEstimateBias(int64_t estimate) {
  const size_t K = 6;

  // Precision index into data arrays
  // We don't have data for precisions less than 4
  DCHECK(impala::AggregateFunctions::HLL_PRECISION >= 4);
  static constexpr size_t idx = impala::AggregateFunctions::HLL_PRECISION - 4;

  // Calculate the square of the difference of this estimate to all
  // precalculated estimates for a particular precision
  map<double, size_t> distances;
  for (size_t i = 0;
      i < impala::HLL_DATA_SIZES[idx] / sizeof(double); ++i) {
    double val = estimate - impala::HLL_RAW_ESTIMATE_DATA[idx][i];
    distances.insert(make_pair(val * val, i));
  }

  size_t nearest[K];
  size_t j = 0;
  // Use a sorted map to find the K closest estimates to our initial estimate
  for (map<double, size_t>::iterator it = distances.begin();
       j < K && it != distances.end(); ++it, ++j) {
    nearest[j] = it->second;
  }

  // Compute the average bias correction the K closest estimates
  double bias = 0.0;
  for (size_t i = 0; i < K; ++i) {
    bias += impala::HLL_BIAS_DATA[idx][nearest[i]];
  }

  return bias / K;
}

}

namespace impala {

// This function initializes StringVal 'dst' with a newly allocated buffer of
// 'buf_len' bytes. The new buffer will be filled with zero. If allocation fails,
// 'dst' will be set to a null string. This allows execution to continue until the
// next time GetQueryStatus() is called (see IMPALA-2756).
static void AllocBuffer(FunctionContext* ctx, StringVal* dst, size_t buf_len) {
  uint8_t* ptr = ctx->Allocate(buf_len);
  if (UNLIKELY(ptr == NULL && buf_len != 0)) {
    DCHECK(!ctx->impl()->state()->GetQueryStatus().ok());
    *dst = StringVal::null();
  } else {
    *dst = StringVal(ptr, buf_len);
    // Avoid memset() with NULL ptr as it's undefined.
    if (LIKELY(ptr != NULL)) memset(ptr, 0, buf_len);
  }
}

// This function initializes StringVal 'dst' with a newly allocated buffer of
// 'buf_len' bytes and copies the content of StringVal 'src' into it.
// If allocation fails, 'dst' will be set to a null string.
static void CopyStringVal(FunctionContext* ctx, const StringVal& src, StringVal* dst) {
  if (src.is_null) {
    *dst = StringVal::null();
  } else {
    uint8_t* copy = ctx->Allocate(src.len);
    if (UNLIKELY(copy == NULL)) {
      // Zero-length allocation always returns a hard-coded pointer.
      DCHECK(src.len != 0 && !ctx->impl()->state()->GetQueryStatus().ok());
      *dst = StringVal::null();
    } else {
      *dst = StringVal(copy, src.len);
      memcpy(dst->ptr, src.ptr, src.len);
    }
  }
}

// Converts any UDF Val Type to a string representation
template <typename T>
StringVal ToStringVal(FunctionContext* context, T val) {
  stringstream ss;
  ss << val;
  const string &str = ss.str();
  return StringVal::CopyFrom(
      context, reinterpret_cast<const uint8_t*>(str.c_str()), str.size());
}

constexpr int AggregateFunctions::HLL_PRECISION;
constexpr int AggregateFunctions::HLL_LEN;

void AggregateFunctions::InitNull(FunctionContext*, AnyVal* dst) {
  dst->is_null = true;
}

template<typename T>
void AggregateFunctions::InitZero(FunctionContext*, T* dst) {
  dst->is_null = false;
  dst->val = 0;
}

template<>
void AggregateFunctions::InitZero(FunctionContext*, DecimalVal* dst) {
  dst->is_null = false;
  dst->val16 = 0; // Also initializes val4 and val8 to 0.
}

template <typename T>
void AggregateFunctions::UpdateVal(FunctionContext* ctx, const T& src, T* dst) {
  *dst = src;
}

template <>
void AggregateFunctions::UpdateVal(FunctionContext* ctx, const StringVal& src,
    StringVal* dst) {
  if (src.is_null) {
    if (!dst->is_null) ctx->Free(dst->ptr);
    *dst = StringVal::null();
    return;
  }

  uint8_t* new_ptr;
  if (dst->is_null) {
    new_ptr = ctx->Allocate(src.len);
  } else {
    new_ptr = ctx->Reallocate(dst->ptr, src.len);
  }
  // Note that a zero-length string is not the same as StringVal::null().
  RETURN_IF_NULL(ctx, new_ptr);
  dst->ptr = new_ptr;
  memcpy(dst->ptr, src.ptr, src.len);
  dst->is_null = false;
  dst->len = src.len;
}

StringVal AggregateFunctions::StringValGetValue(
    FunctionContext* ctx, const StringVal& src) {
  if (src.is_null) return src;
  return StringVal::CopyFrom(ctx, src.ptr, src.len);
}

StringVal AggregateFunctions::StringValSerializeOrFinalize(
    FunctionContext* ctx, const StringVal& src) {
  StringVal result = StringValGetValue(ctx, src);
  if (!src.is_null) ctx->Free(src.ptr);
  return result;
}

void AggregateFunctions::CountUpdate(
    FunctionContext*, const AnyVal& src, BigIntVal* dst) {
  DCHECK(!dst->is_null);
  if (!src.is_null) ++dst->val;
}

void AggregateFunctions::CountStarUpdate(FunctionContext*, BigIntVal* dst) {
  DCHECK(!dst->is_null);
  ++dst->val;
}

void AggregateFunctions::CountRemove(
    FunctionContext*, const AnyVal& src, BigIntVal* dst) {
  DCHECK(!dst->is_null);
  if (!src.is_null) {
    --dst->val;
    DCHECK_GE(dst->val, 0);
  }
}

void AggregateFunctions::CountStarRemove(FunctionContext*, BigIntVal* dst) {
  DCHECK(!dst->is_null);
  --dst->val;
  DCHECK_GE(dst->val, 0);
}

void AggregateFunctions::CountMerge(FunctionContext*, const BigIntVal& src,
    BigIntVal* dst) {
  DCHECK(!dst->is_null);
  DCHECK(!src.is_null);
  dst->val += src.val;
}

struct AvgState {
  double sum;
  int64_t count;
};

void AggregateFunctions::AvgInit(FunctionContext* ctx, StringVal* dst) {
  // avg() uses a preallocated FIXED_UDA_INTERMEDIATE intermediate value.
  DCHECK_EQ(dst->len, sizeof(AvgState));
  AvgState* avg = reinterpret_cast<AvgState*>(dst->ptr);
  avg->sum = 0.0;
  avg->count = 0;
}

template <typename T>
void AggregateFunctions::AvgUpdate(FunctionContext* ctx, const T& src, StringVal* dst) {
  if (src.is_null) return;
  DCHECK(dst->ptr != NULL);
  DCHECK_EQ(sizeof(AvgState), dst->len);
  AvgState* avg = reinterpret_cast<AvgState*>(dst->ptr);
  avg->sum += src.val;
  ++avg->count;
}

template <typename T>
void AggregateFunctions::AvgRemove(FunctionContext* ctx, const T& src, StringVal* dst) {
  // Remove doesn't need to explicitly check the number of calls to Update() or Remove()
  // because Finalize() returns NULL if count is 0.
  if (src.is_null) return;
  DCHECK(dst->ptr != NULL);
  DCHECK_EQ(sizeof(AvgState), dst->len);
  AvgState* avg = reinterpret_cast<AvgState*>(dst->ptr);
  avg->sum -= src.val;
  --avg->count;
  DCHECK_GE(avg->count, 0);
}

void AggregateFunctions::AvgMerge(FunctionContext* ctx, const StringVal& src,
    StringVal* dst) {
  const AvgState* src_struct = reinterpret_cast<const AvgState*>(src.ptr);
  DCHECK(dst->ptr != NULL);
  DCHECK_EQ(sizeof(AvgState), dst->len);
  AvgState* dst_struct = reinterpret_cast<AvgState*>(dst->ptr);
  dst_struct->sum += src_struct->sum;
  dst_struct->count += src_struct->count;
}

DoubleVal AggregateFunctions::AvgGetValue(FunctionContext* ctx, const StringVal& src) {
  AvgState* val_struct = reinterpret_cast<AvgState*>(src.ptr);
  if (val_struct->count == 0) return DoubleVal::null();
  return DoubleVal(val_struct->sum / val_struct->count);
}

DoubleVal AggregateFunctions::AvgFinalize(FunctionContext* ctx, const StringVal& src) {
  if (UNLIKELY(src.is_null)) return DoubleVal::null();
  DoubleVal result = AvgGetValue(ctx, src);
  return result;
}

void AggregateFunctions::TimestampAvgUpdate(FunctionContext* ctx,
    const TimestampVal& src, StringVal* dst) {
  if (src.is_null) return;
  DCHECK(dst->ptr != NULL);
  DCHECK_EQ(sizeof(AvgState), dst->len);
  AvgState* avg = reinterpret_cast<AvgState*>(dst->ptr);
  const TimestampValue& tm_src = TimestampValue::FromTimestampVal(src);
  double val;
  if (tm_src.ToSubsecondUnixTime(ctx->impl()->state()->local_time_zone(), &val)) {
    avg->sum += val;
    ++avg->count;
  }
}

void AggregateFunctions::TimestampAvgRemove(FunctionContext* ctx,
    const TimestampVal& src, StringVal* dst) {
  if (src.is_null) return;
  DCHECK(dst->ptr != NULL);
  DCHECK_EQ(sizeof(AvgState), dst->len);
  AvgState* avg = reinterpret_cast<AvgState*>(dst->ptr);
  const TimestampValue& tm_src = TimestampValue::FromTimestampVal(src);
  double val;
  if (tm_src.ToSubsecondUnixTime(ctx->impl()->state()->local_time_zone(), &val)) {
    avg->sum -= val;
    --avg->count;
    DCHECK_GE(avg->count, 0);
  }
}

TimestampVal AggregateFunctions::TimestampAvgGetValue(FunctionContext* ctx,
    const StringVal& src) {
  AvgState* val_struct = reinterpret_cast<AvgState*>(src.ptr);
  if (val_struct->count == 0) return TimestampVal::null();
  const TimestampValue& tv = TimestampValue::FromSubsecondUnixTime(
      val_struct->sum / val_struct->count, ctx->impl()->state()->local_time_zone());
  if (tv.HasDate()) {
    TimestampVal result;
    tv.ToTimestampVal(&result);
    return result;
  } else {
    return TimestampVal::null();
  }
}

TimestampVal AggregateFunctions::TimestampAvgFinalize(FunctionContext* ctx,
    const StringVal& src) {
  if (UNLIKELY(src.is_null)) return TimestampVal::null();
  TimestampVal result = TimestampAvgGetValue(ctx, src);
  return result;
}

// We saw some failures on the release build because GCC was emitting an instruction
// to operate on the int128_t that assumed the pointer was aligned (typically it isn't).
// We mark the struct with a "packed" attribute, so that the compiler does not expect it
// to be aligned. This should not have a negative performance impact on modern CPUs.
struct __attribute__ ((__packed__)) DecimalAvgState {
  __int128_t sum_val16; // Always uses max precision decimal.
  int64_t count;
};

void AggregateFunctions::DecimalAvgInit(FunctionContext* ctx, StringVal* dst) {
  // avg() uses a preallocated FIXED_UDA_INTERMEDIATE intermediate value.
  DCHECK_EQ(dst->len, sizeof(DecimalAvgState));
  DecimalAvgState* avg = reinterpret_cast<DecimalAvgState*>(dst->ptr);
  avg->sum_val16 = 0;
  avg->count = 0;
}

void AggregateFunctions::DecimalAvgUpdate(FunctionContext* ctx, const DecimalVal& src,
    StringVal* dst) {
  DecimalAvgAddOrRemove(ctx, src, dst, false);
}

void AggregateFunctions::DecimalAvgRemove(FunctionContext* ctx, const DecimalVal& src,
    StringVal* dst) {
  DecimalAvgAddOrRemove(ctx, src, dst, true);
}

// Always inline in IR so that constants can be replaced.
IR_ALWAYS_INLINE void AggregateFunctions::DecimalAvgAddOrRemove(FunctionContext* ctx,
    const DecimalVal& src, StringVal* dst, bool remove) {
  if (src.is_null) return;
  DCHECK(dst->ptr != NULL);
  DCHECK_EQ(sizeof(DecimalAvgState), dst->len);
  DecimalAvgState* avg = reinterpret_cast<DecimalAvgState*>(dst->ptr);
  bool decimal_v2 = ctx->impl()->GetConstFnAttr(FunctionContextImpl::DECIMAL_V2);

  // Since the src and dst are guaranteed to be the same scale, we can just
  // do a simple add.
  int m = remove ? -1 : 1;
  switch (ctx->impl()->GetConstFnAttr(FunctionContextImpl::ARG_TYPE_SIZE, 0)) {
    case 4:
      avg->sum_val16 += m * src.val4;
      if (UNLIKELY(decimal_v2 &&
          abs(avg->sum_val16) > DecimalUtil::MAX_UNSCALED_DECIMAL16)) {
        ctx->SetError("Avg computation overflowed");
      }
      break;
    case 8:
      avg->sum_val16 += m * src.val8;
      if (UNLIKELY(decimal_v2 &&
          abs(avg->sum_val16) > DecimalUtil::MAX_UNSCALED_DECIMAL16)) {
        ctx->SetError("Avg computation overflowed");
      }
      break;
    case 16:
      if (UNLIKELY(decimal_v2 && (avg->sum_val16 >= 0) == (src.val16 >= 0) &&
          abs(avg->sum_val16) > DecimalUtil::MAX_UNSCALED_DECIMAL16 - abs(src.val16))) {
        // We can't check for overflow after performing the addition like in the other
        // cases because the result may not fit into int128.
        ctx->SetError("Avg computation overflowed");
      }
      avg->sum_val16 += m * src.val16;
      break;
    default:
      DCHECK(false) << "Invalid byte size";
  }
  if (remove) {
    --avg->count;
    DCHECK_GE(avg->count, 0);
  } else {
    ++avg->count;
  }
}

void AggregateFunctions::DecimalAvgMerge(FunctionContext* ctx,
    const StringVal& src, StringVal* dst) {
  const DecimalAvgState* src_struct =
      reinterpret_cast<const DecimalAvgState*>(src.ptr);
  DCHECK(dst->ptr != NULL);
  DCHECK_EQ(sizeof(DecimalAvgState), dst->len);
  DecimalAvgState* dst_struct = reinterpret_cast<DecimalAvgState*>(dst->ptr);
  bool decimal_v2 = ctx->impl()->GetConstFnAttr(FunctionContextImpl::DECIMAL_V2);
  bool overflow = decimal_v2 &&
      abs(dst_struct->sum_val16) >
      DecimalUtil::MAX_UNSCALED_DECIMAL16 - abs(src_struct->sum_val16);
  if (UNLIKELY(overflow)) ctx->SetError("Avg computation overflowed");
  dst_struct->sum_val16 =
      ArithmeticUtil::AsUnsigned<std::plus>(dst_struct->sum_val16, src_struct->sum_val16);
  dst_struct->count += src_struct->count;
}

DecimalVal AggregateFunctions::DecimalAvgGetValue(FunctionContext* ctx,
    const StringVal& src) {
  DecimalAvgState* val_struct = reinterpret_cast<DecimalAvgState*>(src.ptr);
  if (val_struct->count == 0) return DecimalVal::null();
  Decimal16Value sum(val_struct->sum_val16);
  Decimal16Value count(val_struct->count);

  int output_precision =
      ctx->impl()->GetConstFnAttr(FunctionContextImpl::RETURN_TYPE_PRECISION);
  int output_scale = ctx->impl()->GetConstFnAttr(FunctionContextImpl::RETURN_TYPE_SCALE);
  // The scale of the accumulated sum is set to the scale of the input type.
  int sum_scale = ctx->impl()->GetConstFnAttr(FunctionContextImpl::ARG_TYPE_SCALE, 0);
  bool is_nan = false;
  bool overflow = false;
  bool decimal_v2 = ctx->impl()->GetConstFnAttr(FunctionContextImpl::DECIMAL_V2);
  Decimal16Value result = sum.Divide<int128_t>(sum_scale, count, 0 /* count's scale */,
      output_precision, output_scale, decimal_v2, &is_nan, &overflow);
  if (UNLIKELY(is_nan)) return DecimalVal::null();
  if (UNLIKELY(overflow)) {
    if (decimal_v2) {
      ctx->SetError("Avg computation overflowed");
    } else {
      ctx->AddWarning("Avg computation overflowed, returning NULL");
    }
    return DecimalVal::null();
  }
  return DecimalVal(result.value());
}

DecimalVal AggregateFunctions::DecimalAvgFinalize(FunctionContext* ctx,
    const StringVal& src) {
  if (UNLIKELY(src.is_null)) return DecimalVal::null();
  DecimalVal result = DecimalAvgGetValue(ctx, src);
  return result;
}

template<typename SRC_VAL, typename DST_VAL>
void AggregateFunctions::SumUpdate(FunctionContext* ctx, const SRC_VAL& src,
    DST_VAL* dst) {
  if (src.is_null) {
    // Do not count null values towards the number of updates
    ctx->impl()->IncrementNumUpdates(-1);
    return;
  }
  if (dst->is_null) InitZero<DST_VAL>(ctx, dst);
  dst->val = ArithmeticUtil::Compute<std::plus, decltype(dst->val)>(dst->val, src.val);
}

template<typename SRC_VAL, typename DST_VAL>
void AggregateFunctions::SumRemove(FunctionContext* ctx, const SRC_VAL& src,
    DST_VAL* dst) {
  // Do not count null values towards the number of removes
  if (src.is_null) ctx->impl()->IncrementNumRemoves(-1);
  if (ctx->impl()->num_removes() >= ctx->impl()->num_updates()) {
    *dst = DST_VAL::null();
    return;
  }
  if (src.is_null) return;
  if (dst->is_null) InitZero<DST_VAL>(ctx, dst);
  dst->val -= src.val;
}

void AggregateFunctions::SumDecimalUpdate(FunctionContext* ctx,
    const DecimalVal& src, DecimalVal* dst) {
  SumDecimalAddOrSubtract(ctx, src, dst);
}

void AggregateFunctions::SumDecimalRemove(FunctionContext* ctx,
    const DecimalVal& src, DecimalVal* dst) {
  if (ctx->impl()->num_removes() >= ctx->impl()->num_updates()) {
    *dst = DecimalVal::null();
    return;
  }
  SumDecimalAddOrSubtract(ctx, src, dst, true);
}

// Always inline in IR so that constants can be replaced.
IR_ALWAYS_INLINE void AggregateFunctions::SumDecimalAddOrSubtract(FunctionContext* ctx,
    const DecimalVal& src, DecimalVal* dst, bool subtract) {
  if (src.is_null) return;
  if (dst->is_null) InitZero<DecimalVal>(ctx, dst);
  int precision = ctx->impl()->GetConstFnAttr(FunctionContextImpl::ARG_TYPE_PRECISION, 0);
  bool decimal_v2 = ctx->impl()->GetConstFnAttr(FunctionContextImpl::DECIMAL_V2);
  // Since the src and dst are guaranteed to be the same scale, we can just
  // do a simple add.
  int m = subtract ? -1 : 1;
  if (precision <= 9) {
    dst->val16 += m * src.val4;
    if (UNLIKELY(decimal_v2 &&
        abs(dst->val16) > DecimalUtil::MAX_UNSCALED_DECIMAL16)) {
      ctx->SetError("Sum computation overflowed");
    }
  } else if (precision <= 19) {
    dst->val16 += m * src.val8;
    if (UNLIKELY(decimal_v2 &&
        abs(dst->val16) > DecimalUtil::MAX_UNSCALED_DECIMAL16)) {
      ctx->SetError("Sum computation overflowed");
    }
  } else {
    if (UNLIKELY(decimal_v2 && (dst->val16 >= 0) == (src.val16 >= 0) &&
        abs(dst->val16) > DecimalUtil::MAX_UNSCALED_DECIMAL16 - abs(src.val16))) {
      // We can't check for overflow after performing the addition like in the other
      // cases because the result may not fit into int128.
      ctx->SetError("Sum computation overflowed");
    }
    dst->val16 += m * src.val16;
  }
}

void AggregateFunctions::SumDecimalMerge(FunctionContext* ctx,
    const DecimalVal& src, DecimalVal* dst) {
  if (src.is_null) return;
  if (dst->is_null) InitZero<DecimalVal>(ctx, dst);
  bool decimal_v2 = ctx->impl()->GetConstFnAttr(FunctionContextImpl::DECIMAL_V2);
  bool overflow = decimal_v2 &&
      abs(dst->val16) > DecimalUtil::MAX_UNSCALED_DECIMAL16 - abs(src.val16);
  if (UNLIKELY(overflow)) ctx->SetError("Sum computation overflowed");
  dst->val16 = ArithmeticUtil::AsUnsigned<std::plus>(dst->val16, src.val16);
}

template<typename T>
void AggregateFunctions::Min(FunctionContext*, const T& src, T* dst) {
  if (src.is_null) return;
  if (dst->is_null || src.val < dst->val) *dst = src;
}

template<typename T>
void AggregateFunctions::Max(FunctionContext*, const T& src, T* dst) {
  if (src.is_null) return;
  if (dst->is_null || src.val > dst->val) *dst = src;
}

void AggregateFunctions::InitNullString(FunctionContext* c, StringVal* dst) {
  dst->is_null = true;
  dst->ptr = NULL;
  dst->len = 0;
}

// For DoubleVal and FloatVal, we have to handle NaN specially.  If 'val' != 'val', then
// 'val' must be NaN, and if any of the values that are inserted are NaN, then we return
// NaN. So, if 'src.val != src.val', set 'dst' to it.
template <>
void AggregateFunctions::Min(FunctionContext*, const FloatVal& src, FloatVal* dst) {
  if (src.is_null) return;
  if (dst->is_null || src.val < dst->val || src.val != src.val) *dst = src;
}

template <>
void AggregateFunctions::Max(FunctionContext*, const FloatVal& src, FloatVal* dst) {
  if (src.is_null) return;
  if (dst->is_null || src.val > dst->val || src.val != src.val) *dst = src;
}

template <>
void AggregateFunctions::Min(FunctionContext*, const DoubleVal& src, DoubleVal* dst) {
  if (src.is_null) return;
  if (dst->is_null || src.val < dst->val || src.val != src.val) *dst = src;
}

template <>
void AggregateFunctions::Max(FunctionContext*, const DoubleVal& src, DoubleVal* dst) {
  if (src.is_null) return;
  if (dst->is_null || src.val > dst->val || src.val != src.val) *dst = src;
}

template<>
void AggregateFunctions::Min(FunctionContext* ctx, const StringVal& src, StringVal* dst) {
  if (src.is_null) return;
  if (dst->is_null ||
      StringValue::FromStringVal(src) < StringValue::FromStringVal(*dst)) {
    if (!dst->is_null) ctx->Free(dst->ptr);
    CopyStringVal(ctx, src, dst);
  }
}

template<>
void AggregateFunctions::Max(FunctionContext* ctx, const StringVal& src, StringVal* dst) {
  if (src.is_null) return;
  if (dst->is_null ||
      StringValue::FromStringVal(src) > StringValue::FromStringVal(*dst)) {
    if (!dst->is_null) ctx->Free(dst->ptr);
    CopyStringVal(ctx, src, dst);
  }
}

template<>
void AggregateFunctions::Min(FunctionContext* ctx,
    const DecimalVal& src, DecimalVal* dst) {
  if (src.is_null) return;
  int precision = ctx->impl()->GetConstFnAttr(FunctionContextImpl::ARG_TYPE_PRECISION, 0);
  if (precision <= 9) {
    if (dst->is_null || src.val4 < dst->val4) *dst = src;
  } else if (precision <= 19) {
    if (dst->is_null || src.val8 < dst->val8) *dst = src;
  } else {
    if (dst->is_null || src.val16 < dst->val16) *dst = src;
  }
}

template<>
void AggregateFunctions::Max(FunctionContext* ctx,
    const DecimalVal& src, DecimalVal* dst) {
  if (src.is_null) return;
  int precision = ctx->impl()->GetConstFnAttr(FunctionContextImpl::ARG_TYPE_PRECISION, 0);
  if (precision <= 9) {
    if (dst->is_null || src.val4 > dst->val4) *dst = src;
  } else if (precision <= 19) {
    if (dst->is_null || src.val8 > dst->val8) *dst = src;
  } else {
    if (dst->is_null || src.val16 > dst->val16) *dst = src;
  }
}

template<>
void AggregateFunctions::Min(FunctionContext*,
    const TimestampVal& src, TimestampVal* dst) {
  if (src.is_null) return;
  if (dst->is_null) {
    *dst = src;
    return;
  }
  TimestampValue src_tv = TimestampValue::FromTimestampVal(src);
  TimestampValue dst_tv = TimestampValue::FromTimestampVal(*dst);
  if (src_tv < dst_tv) *dst = src;
}

template<>
void AggregateFunctions::Max(FunctionContext*,
    const TimestampVal& src, TimestampVal* dst) {
  if (src.is_null) return;
  if (dst->is_null) {
    *dst = src;
    return;
  }
  TimestampValue src_tv = TimestampValue::FromTimestampVal(src);
  TimestampValue dst_tv = TimestampValue::FromTimestampVal(*dst);
  if (src_tv > dst_tv) *dst = src;
}

// StringConcat intermediate state starts with the length of the first
// separator, followed by the accumulated string.  The accumulated
// string starts with the separator of the first value that arrived in
// StringConcatUpdate().
typedef int StringConcatHeader;

// Delimiter to use if the separator is not provided.
static inline StringVal ALWAYS_INLINE DefaultStringConcatDelim() {
  return StringVal(reinterpret_cast<uint8_t*>(const_cast<char*>(", ")), 2);
}

void AggregateFunctions::StringConcatUpdate(
    FunctionContext* ctx, const StringVal& src, StringVal* result) {
  StringConcatUpdate(ctx, src, DefaultStringConcatDelim(), result);
}

void AggregateFunctions::StringConcatUpdate(FunctionContext* ctx, const StringVal& src,
    const StringVal& separator, StringVal* result) {
  if (src.is_null) return;
  const StringVal default_delim = DefaultStringConcatDelim();
  const StringVal* sep = separator.is_null ? &default_delim : &separator;
  if (result->is_null) {
    // Header of the intermediate state holds the length of the first separator.
    const int header_len = sizeof(StringConcatHeader);
    DCHECK(header_len == sizeof(sep->len));
    AllocBuffer(ctx, result, header_len);
    if (UNLIKELY(result->is_null)) {
      DCHECK(!ctx->impl()->state()->GetQueryStatus().ok());
      return;
    }
    *reinterpret_cast<StringConcatHeader*>(result->ptr) = sep->len;
  }
  unsigned new_len = result->len + sep->len + src.len;
  if (LIKELY(new_len <= StringVal::MAX_LENGTH)) {
    uint8_t* ptr = ctx->Reallocate(result->ptr, new_len);
    if (LIKELY(ptr != NULL)) {
      memcpy(ptr + result->len, sep->ptr, sep->len);
      memcpy(ptr + result->len + sep->len, src.ptr, src.len);
      result->ptr = ptr;
      result->len = new_len;
    } else {
      DCHECK(!ctx->impl()->state()->GetQueryStatus().ok());
    }
  } else {
    ctx->SetError("Concatenated string length larger than allowed limit of "
        "1 GB character data.");
  }
}

void AggregateFunctions::StringConcatMerge(FunctionContext* ctx,
    const StringVal& src, StringVal* result) {
  if (src.is_null) return;
  const int header_len = sizeof(StringConcatHeader);
  if (result->is_null) {
    AllocBuffer(ctx, result, header_len);
    if (UNLIKELY(result->is_null)) {
      DCHECK(!ctx->impl()->state()->GetQueryStatus().ok());
      return;
    }
    // Copy the header from the first intermediate value.
    *reinterpret_cast<StringConcatHeader*>(result->ptr) =
        *reinterpret_cast<StringConcatHeader*>(src.ptr);
  }
  // Append the string portion of the intermediate src to result (omit src's header).
  unsigned buf_len = src.len - header_len;
  unsigned new_len = result->len + buf_len;
  if (LIKELY(new_len <= StringVal::MAX_LENGTH)) {
    uint8_t* ptr = ctx->Reallocate(result->ptr, new_len);
    if (LIKELY(ptr != NULL)) {
      memcpy(ptr + result->len, src.ptr + header_len, buf_len);
      result->ptr = ptr;
      result->len = new_len;
    } else {
      DCHECK(!ctx->impl()->state()->GetQueryStatus().ok());
    }
  } else {
    ctx->SetError("Concatenated string length larger than allowed limit of "
        "1 GB character data.");
  }
}

StringVal AggregateFunctions::StringConcatFinalize(FunctionContext* ctx,
    const StringVal& src) {
  if (UNLIKELY(src.is_null)) return src;
  const int header_len = sizeof(StringConcatHeader);
  DCHECK(src.len >= header_len);
  int sep_len = *reinterpret_cast<StringConcatHeader*>(src.ptr);
  DCHECK(src.len >= header_len + sep_len);
  // Remove the header and the first separator.
  StringVal result = StringVal::CopyFrom(ctx, src.ptr + header_len + sep_len,
      src.len - header_len - sep_len);
  ctx->Free(src.ptr);
  return result;
}

// Compute distinctpc and distinctpcsa using Flajolet and Martin's algorithm
// (Probabilistic Counting Algorithms for Data Base Applications)
// We have implemented two variants here: one with stochastic averaging (with PCSA
// postfix) and one without.
// There are 4 phases to compute the aggregate:
//   1. allocate a bitmap, stored in the aggregation tuple's output string slot
//   2. update the bitmap per row (UpdateDistinctEstimateSlot)
//   3. for distributed plan, merge the bitmaps from all the nodes
//      (UpdateMergeEstimateSlot)
//   4. compute the estimate using the bitmaps when all the rows are processed
//      (FinalizeEstimateSlot)
const static int NUM_PC_BITMAPS = 64; // number of bitmaps
const static int PC_BITMAP_LENGTH = 32; // the length of each bit map
const static float PC_THETA = 0.77351f; // the magic number to compute the final result
const static float PC_K = -1.75f; // the magic correction for low cardinalities

// Size of the distinct estimate bit map - Probabilistic Counting Algorithms for Data
// Base Applications (Flajolet and Martin)
//
// The bitmap is a 64bit(1st index) x 32bit(2nd index) matrix.
// So, the string length of 256 byte is enough.
// The layout is:
//   row  1: 8bit 8bit 8bit 8bit
//   row  2: 8bit 8bit 8bit 8bit
//   ...     ..
//   ...     ..
//   row 64: 8bit 8bit 8bit 8bit
//
// Using 32bit length, we can count up to 10^8. This will not be enough for Fact table
// primary key, but once we approach the limit, we could interpret the result as
// "every row is distinct".
const static int PC_INTERMEDIATE_BYTES = NUM_PC_BITMAPS * PC_BITMAP_LENGTH / 8;

void AggregateFunctions::PcInit(FunctionContext* c, StringVal* dst) {
  // The distinctpc*() functions use a preallocated FIXED_UDA_INTERMEDIATE intermediate
  // value.
  DCHECK_EQ(dst->len, PC_INTERMEDIATE_BYTES);
  memset(dst->ptr, 0, PC_INTERMEDIATE_BYTES);
}

static inline void SetDistinctEstimateBit(uint8_t* bitmap,
    uint32_t row_index, uint32_t bit_index) {
  // We need to convert Bitmap[alpha,index] into the index of the string.
  // alpha tells which of the 32bit we've to jump to.
  // index then lead us to the byte and bit.
  uint32_t *int_bitmap = reinterpret_cast<uint32_t*>(bitmap);
  int_bitmap[row_index] |= (1 << bit_index);
}

static inline bool GetDistinctEstimateBit(uint8_t* bitmap,
    uint32_t row_index, uint32_t bit_index) {
  uint32_t *int_bitmap = reinterpret_cast<uint32_t*>(bitmap);
  return ((int_bitmap[row_index] & (1 << bit_index)) > 0);
}

template<typename T>
void AggregateFunctions::PcUpdate(FunctionContext* c, const T& input, StringVal* dst) {
  DCHECK_EQ(dst->len, PC_INTERMEDIATE_BYTES);
  if (input.is_null) return;
  // Core of the algorithm. This is a direct translation of the code in the paper.
  // Please see the paper for details. For simple averaging, we need to compute hash
  // values NUM_PC_BITMAPS times using NUM_PC_BITMAPS different hash functions (by using a
  // different seed).
  for (int i = 0; i < NUM_PC_BITMAPS; ++i) {
    uint32_t hash_value = AnyValUtil::Hash(input, *c->GetArgType(0), i);
    const int bit_index = BitUtil::CountTrailingZeros(hash_value, PC_BITMAP_LENGTH - 1);
    // Set bitmap[i, bit_index] to 1
    SetDistinctEstimateBit(dst->ptr, i, bit_index);
  }
}

template<typename T>
void AggregateFunctions::PcsaUpdate(FunctionContext* c, const T& input, StringVal* dst) {
  DCHECK_EQ(dst->len, PC_INTERMEDIATE_BYTES);
  if (input.is_null) return;

  // Core of the algorithm. This is a direct translation of the code in the paper.
  // Please see the paper for details. Using stochastic averaging, we only need to
  // the hash value once for each row.
  uint32_t hash_value = AnyValUtil::Hash(input, *c->GetArgType(0), 0);
  uint32_t row_index = hash_value % NUM_PC_BITMAPS;

  // We want the zero-based position of the least significant 1-bit in binary
  // representation of hash_value. BitUtil::CountTrailingZeros(x,y) does exactly this
  // because it returns the number of trailing 0-bits in x (or y if x is zero).
  const int bit_index =
      BitUtil::CountTrailingZeros(hash_value / NUM_PC_BITMAPS, PC_BITMAP_LENGTH - 1);

  // Set bitmap[row_index, bit_index] to 1
  SetDistinctEstimateBit(dst->ptr, row_index, bit_index);
}

string DistinctEstimateBitMapToString(uint8_t* v) {
  stringstream debugstr;
  for (int i = 0; i < NUM_PC_BITMAPS; ++i) {
    for (int j = 0; j < PC_BITMAP_LENGTH; ++j) {
      // print bitmap[i][j]
      debugstr << GetDistinctEstimateBit(v, i, j);
    }
    debugstr << "\n";
  }
  debugstr << "\n";
  return debugstr.str();
}

void AggregateFunctions::PcMerge(FunctionContext* c,
    const StringVal& src, StringVal* dst) {
  DCHECK(!src.is_null);
  DCHECK(!dst->is_null);
  DCHECK_EQ(src.len, PC_INTERMEDIATE_BYTES);
  DCHECK_EQ(dst->len, PC_INTERMEDIATE_BYTES);

  // Merge the bits
  // I think _mm_or_ps can do it, but perf doesn't really matter here. We call this only
  // once group per node.
  for (int i = 0; i < PC_INTERMEDIATE_BYTES; ++i) {
    *(dst->ptr + i) |= *(src.ptr + i);
  }

  VLOG_ROW << "UpdateMergeEstimateSlot Src Bit map:\n"
           << DistinctEstimateBitMapToString(src.ptr);
  VLOG_ROW << "UpdateMergeEstimateSlot Dst Bit map:\n"
           << DistinctEstimateBitMapToString(dst->ptr);
}

static double DistinctEstimateFinalize(const StringVal& src) {
  DCHECK(!src.is_null);
  DCHECK_EQ(src.len, PC_INTERMEDIATE_BYTES);
  VLOG_ROW << "FinalizeEstimateSlot Bit map:\n"
           << DistinctEstimateBitMapToString(src.ptr);

  // We haven't processed any rows if none of the bits are set. Therefore, we have zero
  // distinct rows. We're overwriting the result in the same string buffer we've
  // allocated.
  bool is_empty = true;
  for (int i = 0; i < PC_INTERMEDIATE_BYTES; ++i) {
    if (src.ptr[i] != 0) {
      is_empty = false;
      break;
    }
  }
  if (is_empty) return 0;

  // Convert the bitmap to a number, please see the paper for details
  // In short, we count the average number of leading 1s (per row) in the bit map.
  // The number is proportional to the log2(1/NUM_PC_BITMAPS of  the actual number of
  // distinct).
  // To get the actual number of distinct, we'll do 2^avg / PC_THETA.
  // PC_THETA is a magic number.
  int sum = 0;
  for (int i = 0; i < NUM_PC_BITMAPS; ++i) {
    int row_bit_count = 0;
    // Count the number of leading ones for each row in the bitmap
    // We could have used the build in __builtin_clz to count of number of leading zeros
    // but we first need to invert the 1 and 0.
    while (GetDistinctEstimateBit(src.ptr, i, row_bit_count) &&
        row_bit_count < PC_BITMAP_LENGTH) {
      ++row_bit_count;
    }
    sum += row_bit_count;
  }
  double avg = static_cast<double>(sum) / static_cast<double>(NUM_PC_BITMAPS);
  // We apply a correction for small cardinalities based on equation (6) from
  // Scheuermann et al DialM-POMC '07 so the above equation becomes
  // (2^avg - 2^PC_K*avg) / PC_THETA
  double result = (pow(static_cast<double>(2), avg) -
                   pow(static_cast<double>(2), avg * PC_K)) / PC_THETA;
  return result;
}

BigIntVal AggregateFunctions::PcFinalize(FunctionContext* c, const StringVal& src) {
  if (UNLIKELY(src.is_null)) return BigIntVal::null();
  double estimate = DistinctEstimateFinalize(src);
  return static_cast<int64_t>(estimate);
}

BigIntVal AggregateFunctions::PcsaFinalize(FunctionContext* c, const StringVal& src) {
  if (UNLIKELY(src.is_null)) return BigIntVal::null();
  // When using stochastic averaging, the result has to be multiplied by NUM_PC_BITMAPS.
  double estimate = DistinctEstimateFinalize(src) * NUM_PC_BITMAPS;
  return static_cast<int64_t>(estimate);
}

// Histogram constants
// TODO: Expose as constant argument parameters to the UDA.
const static int NUM_BUCKETS = 100;
const static int NUM_SAMPLES_PER_BUCKET = 200;

template <typename T>
struct ReservoirSample {
  // Sample value
  T val;
  // Key on which the samples are sorted.
  double key;

  ReservoirSample() : key(-1) { }
  ReservoirSample(const T& val) : val(val), key(-1) { }

  // Gets a copy of the sample value that allocates memory from ctx, if necessary.
  T GetValue(FunctionContext* ctx) { return val; }
};

// Maximum length of a string sample.
const static int MAX_STRING_SAMPLE_LEN = 10;

// Template specialization for StringVal because we do not store the StringVal itself.
// Instead, we keep fixed size arrays and truncate longer strings if necessary.
template <>
struct ReservoirSample<StringVal> {
  uint8_t val[MAX_STRING_SAMPLE_LEN];
  int len; // Size of string (up to MAX_STRING_SAMPLE_LEN)
  double key;

  ReservoirSample() : len(0), key(-1) { }

  ReservoirSample(const StringVal& string_val) : key(-1) {
    len = min(string_val.len, MAX_STRING_SAMPLE_LEN);
    memcpy(&val[0], string_val.ptr, len);
  }

  // Gets a copy of the sample value that allocates memory from ctx, if necessary.
  StringVal GetValue(FunctionContext* ctx) {
    return StringVal::CopyFrom(ctx, &val[0], len);
  }
};

template <typename T>
bool SampleValLess(const ReservoirSample<T>& i, const ReservoirSample<T>& j) {
  return i.val.val < j.val.val;
}

template <>
bool SampleValLess(const ReservoirSample<StringVal>& i,
    const ReservoirSample<StringVal>& j) {
  int n = min(i.len, j.len);
  int result = memcmp(&i.val[0], &j.val[0], n);
  if (result == 0) return i.len < j.len;
  return result < 0;
}

template <>
bool SampleValLess(const ReservoirSample<DecimalVal>& i,
    const ReservoirSample<DecimalVal>& j) {
  // Also handles val4 and val8 - the DecimalVal memory layout ensures the least
  // significant bits overlap in memory.
  return i.val.val16 < j.val.val16;
}

template <>
bool SampleValLess(const ReservoirSample<TimestampVal>& i,
    const ReservoirSample<TimestampVal>& j) {
  if (i.val.date == j.val.date) return i.val.time_of_day < j.val.time_of_day;
  else return i.val.date < j.val.date;
}

template <typename T>
bool SampleKeyGreater(const ReservoirSample<T>& i, const ReservoirSample<T>& j) {
  return i.key > j.key;
}

// Keeps track of the current state of the reservoir sampling algorithm. The samples are
// stored in a dynamically sized array. Initially, the the samples array is stored in a
// separate memory allocation. This class is responsible for managing the memory of the
// array and reallocating when the array is full. When this object is serialized into an
// output buffer, the samples array is inlined into the output buffer as well.
template <typename T>
class ReservoirSampleState {
 public:
  ReservoirSampleState(FunctionContext* ctx)
    : num_samples_(0),
      capacity_(INIT_CAPACITY),
      source_size_(0),
      sample_array_inline_(false),
      samples_(NULL) {
    // Allocate some initial memory for the samples array.
    size_t buffer_len = sizeof(ReservoirSample<T>) * capacity_;
    uint8_t* ptr = ctx->Allocate(buffer_len);
    if (ptr == NULL) {
      DCHECK(!ctx->impl()->state()->GetQueryStatus().ok());
      return;
    }
    samples_ = reinterpret_cast<ReservoirSample<T>*>(ptr);
  }

  // Returns a pointer to a ReservoirSample at idx.
  ReservoirSample<T>* GetSample(int64_t idx) {
    DCHECK(samples_ != NULL);
    DCHECK_LT(idx, num_samples_);
    DCHECK_LE(num_samples_, capacity_);
    DCHECK_GE(idx, 0);
    return &samples_[idx];
  }

  // Adds a sample and increments the source size. Doubles the capacity of the sample
  // array if necessary. If max capacity is reached, randomly evicts a sample (as
  // required by the algorithm). Returns false if the attempt to double the capacity
  // fails, true otherwise.
  bool AddSample(FunctionContext* ctx, const ReservoirSample<T>& s) {
    DCHECK(samples_ != NULL);
    DCHECK_LE(num_samples_, MAX_CAPACITY);
    if (num_samples_ < MAX_CAPACITY) {
      if (num_samples_ == capacity_) {
        bool result = IncreaseCapacity(ctx, capacity_ * 2);
        if (!result) return false;
      }
      DCHECK_LT(num_samples_, capacity_);
      samples_[num_samples_++] = s;
    } else {
      DCHECK_EQ(num_samples_, MAX_CAPACITY);
      DCHECK(!sample_array_inline_);
      int64_t idx = GetNext64(source_size_);
      if (idx < MAX_CAPACITY) samples_[idx] = s;
    }
    ++source_size_;
    return true;
  }

  // Same as above.
  bool AddSample(FunctionContext* ctx, const T& s) {
    return AddSample(ctx, ReservoirSample<T>(s));
  }

  // Returns a buffer with a serialized ReservoirSampleState and the array of samples it
  // contains. The samples array must not be inlined; i.e. it must be in a separate memory
  // allocation. Returns a buffer containing this object and inlined samples array. The
  // memory containing this object and the samples array is freed. The serialized object
  // in the output buffer requires a call to Deserialize() before use.
  StringVal Serialize(FunctionContext* ctx) {
    DCHECK(samples_ != NULL);
    DCHECK(!sample_array_inline_);
    // Assign keys to the samples that haven't been set (i.e. if serializing after
    // Update()). In weighted reservoir sampling the keys are typically assigned as the
    // sources are being sampled, but this requires maintaining the samples in sorted
    // order (by key) and it accomplishes the same thing at this point because all data
    // points coming into Update() get the same weight. When the samples are later merged,
    // they do have different weights (set here) that are proportional to the source_size,
    // i.e. samples selected from a larger stream are more likely to end up in the final
    // sample set. In order to avoid the extra overhead in Update(), we approximate the
    // keys by picking random numbers in the range
    // [(SOURCE_SIZE - SAMPLE_SIZE)/(SOURCE_SIZE), 1]. This weights the keys by
    // SOURCE_SIZE and implies that the samples picked had the highest keys, because
    // values not sampled would have keys between 0 and
    // (SOURCE_SIZE - SAMPLE_SIZE)/(SOURCE_SIZE).
    for (int i = 0; i < num_samples_; ++i) {
      if (samples_[i].key >= 0) continue;
      int r = rand() % num_samples_;
      samples_[i].key = ((double) source_size_ - r) / source_size_;
    }
    capacity_ = num_samples_;
    sample_array_inline_ = true;

    size_t buffer_len = sizeof(ReservoirSampleState<T>) +
        sizeof(ReservoirSample<T>) * num_samples_;
    StringVal dst(ctx, buffer_len);
    if (LIKELY(!dst.is_null)) {
      memcpy(dst.ptr, reinterpret_cast<uint8_t*>(this), sizeof(ReservoirSampleState<T>));
      memcpy(dst.ptr + sizeof(ReservoirSampleState<T>),
          reinterpret_cast<uint8_t*>(samples_),
          sizeof(ReservoirSample<T>) * num_samples_);
    }
    ctx->Free(reinterpret_cast<uint8_t*>(samples_));
    ctx->Free(reinterpret_cast<uint8_t*>(this));
    return dst;
  }

  // Updates the pointer to the samples array. Must be called before using this object in
  // Merge().
  void Deserialize() {
    DCHECK(sample_array_inline_);
    samples_ = reinterpret_cast<ReservoirSample<T>*>(this + 1);
  }

  // Merges the samples in "other_state" into the current state by following the
  // reservoir sampling algorithm. If necessary, increases the capacity to fit the
  // samples from "other_state". In the case of failure to increase the size of the
  // array, returns.
  void Merge(FunctionContext* ctx, ReservoirSampleState<T>* other_state) {
    DCHECK(samples_ != NULL);
    DCHECK_GT(capacity_, 0);
    other_state->Deserialize();
    int src_idx = 0;
    // We can increase the capacity significantly here and skip several doublings because
    // we know the number of elements in the other state up front.
    if (capacity_ < MAX_CAPACITY) {
      int necessary_capacity = num_samples_ + other_state->num_samples();
      if (capacity_ < necessary_capacity) {
        bool result = IncreaseCapacity(ctx, necessary_capacity);
        if (!result) return;
      }
    }

    // First, fill up the dst samples if they don't already exist. The samples are now
    // ordered as a min-heap on the key.
    while (num_samples_ < MAX_CAPACITY && src_idx < other_state->num_samples()) {
      DCHECK_GE(other_state->GetSample(src_idx)->key, 0);
      bool result = AddSample(ctx, *other_state->GetSample(src_idx++));
      if (!result) return;
      push_heap(&samples_[0], &samples_[num_samples_], SampleKeyGreater<T>);
    }

    // Then for every sample from source, take the sample if the key is greater than
    // the minimum key in the min-heap.
    while (src_idx < other_state->num_samples()) {
      DCHECK_GE(other_state->GetSample(src_idx)->key, 0);
      if (other_state->GetSample(src_idx)->key > samples_[0].key) {
        pop_heap(&samples_[0], &samples_[num_samples_], SampleKeyGreater<T>);
        samples_[MAX_CAPACITY - 1] = *other_state->GetSample(src_idx);
        push_heap(&samples_[0], &samples_[num_samples_], SampleKeyGreater<T>);
      }
      ++src_idx;
    }

    source_size_ += other_state->source_size();
  }

  // Returns the median element.
  T GetMedian(FunctionContext* ctx) {
    if (num_samples_ == 0) return T::null();
    ReservoirSample<T>* mid_point = GetSample(num_samples_ / 2);
    nth_element(&samples_[0], mid_point, &samples_[num_samples_], SampleValLess<T>);
    return mid_point->GetValue(ctx);
  }

  // Sorts the samples.
  void SortSamples() {
    sort(&samples_[0], &samples_[num_samples_], SampleValLess<T>);
  }

  // Deletes this object by freeing the memory that contains the array of samples (if not
  // inlined) and itself.
  void Delete(FunctionContext* ctx) {
    if (!sample_array_inline_) ctx->Free(reinterpret_cast<uint8_t*>(samples_));
    ctx->Free(reinterpret_cast<uint8_t*>(this));
  }

  int num_samples() { return num_samples_; }
  int64_t source_size() { return source_size_; }

 private:
  // The initial capacity of the samples array.
  const static int INIT_CAPACITY = 16;

  // Maximum capacity of the samples array.
  const static int MAX_CAPACITY = NUM_BUCKETS * NUM_SAMPLES_PER_BUCKET;

  // Number of collected samples.
  int num_samples_;

  // Size of the "samples_" array.
  int capacity_;

  // Number of values over which the samples were collected.
  int64_t source_size_;

  // Random number generator for generating 64-bit integers
  // TODO: Replace with mt19937_64 when upgrading boost
  ranlux64_3 rng_;

  // True if the array of samples is in the same memory allocation as this object. If
  // false, this object is responsible for freeing the memory.
  bool sample_array_inline_;

  // Points to the array of ReservoirSamples. The array may be located inline (right after
  // this object), or in a separate memory allocation.
  ReservoirSample<T>* samples_;

  // Increases the capacity of the "samples_" array to "new_capacity" rounded up to a
  // power of two by reallocating. Should only be called if the samples array is not
  // inline. Returns false if the operation fails.
  bool IncreaseCapacity(FunctionContext* ctx, int new_capacity) {
    DCHECK(samples_ != NULL);
    DCHECK(!sample_array_inline_);
    DCHECK_LT(capacity_, MAX_CAPACITY);
    DCHECK_GT(new_capacity, capacity_);
    new_capacity = BitUtil::RoundUpToPowerOfTwo(new_capacity);
    if (new_capacity > MAX_CAPACITY) new_capacity = MAX_CAPACITY;
    size_t buffer_len = sizeof(ReservoirSample<T>) * new_capacity;
    uint8_t* ptr = ctx->Reallocate(reinterpret_cast<uint8_t*>(samples_), buffer_len);
    if (ptr == NULL) {
      DCHECK(!ctx->impl()->state()->GetQueryStatus().ok());
      return false;
    }
    samples_ = reinterpret_cast<ReservoirSample<T>*>(ptr);
    capacity_ = new_capacity;
    return true;
  }

  // Returns a random integer in the range [0, max].
  int64_t GetNext64(int64_t max) {
    uniform_int<int64_t> dist(0, max);
    return dist(rng_);
  }
};

template <typename T>
void AggregateFunctions::ReservoirSampleInit(FunctionContext* ctx, StringVal* dst) {
  AllocBuffer(ctx, dst, sizeof(ReservoirSampleState<T>));
  if (UNLIKELY(dst->is_null)) {
    DCHECK(!ctx->impl()->state()->GetQueryStatus().ok());
    return;
  }
  ReservoirSampleState<T>* dst_state =
      reinterpret_cast<ReservoirSampleState<T>*>(dst->ptr);
  *dst_state = ReservoirSampleState<T>(ctx);
}

template <typename T>
void AggregateFunctions::ReservoirSampleUpdate(FunctionContext* ctx, const T& src,
    StringVal* dst) {
  if (src.is_null) return;
  DCHECK(!dst->is_null);
  ReservoirSampleState<T>* dst_state =
      reinterpret_cast<ReservoirSampleState<T>*>(dst->ptr);
  dst_state->AddSample(ctx, src);
}

template <typename T>
StringVal AggregateFunctions::ReservoirSampleSerialize(FunctionContext* ctx,
    const StringVal& src) {
  if (UNLIKELY(src.is_null)) return src;
  ReservoirSampleState<T>* src_state =
      reinterpret_cast<ReservoirSampleState<T>*>(src.ptr);
  StringVal result = src_state->Serialize(ctx);
  return result;
}

template <typename T>
void AggregateFunctions::ReservoirSampleMerge(FunctionContext* ctx,
    const StringVal& src, StringVal* dst) {
  if (src.is_null) return;
  DCHECK(!dst->is_null);
  DCHECK(!src.is_null);
  ReservoirSampleState<T>* src_state =
      reinterpret_cast<ReservoirSampleState<T>*>(src.ptr);
  ReservoirSampleState<T>* dst_state =
      reinterpret_cast<ReservoirSampleState<T>*>(dst->ptr);
  dst_state->Merge(ctx, src_state);
}

template <typename T>
void PrintSample(const ReservoirSample<T>& v, ostream* os) { *os << v.val.val; }

template <>
void PrintSample(const ReservoirSample<TinyIntVal>& v, ostream* os) {
  *os << static_cast<int32_t>(v.val.val);
}

template <>
void PrintSample(const ReservoirSample<StringVal>& v, ostream* os) {
  string s(reinterpret_cast<const char*>(&v.val[0]), v.len);
  *os << s;
}

template <>
void PrintSample(const ReservoirSample<DecimalVal>& v, ostream* os) {
  // Also handles val4 and val8 - the DecimalVal memory layout ensures the least
  // significant bits overlap in memory.
  *os << v.val.val16;
}

template <>
void PrintSample(const ReservoirSample<TimestampVal>& v, ostream* os) {
  *os << TimestampValue::FromTimestampVal(v.val).ToString();
}

template <>
void PrintSample(const ReservoirSample<DateVal>& v, ostream* os) {
  *os << DateValue::FromDateVal(v.val);
}

template <typename T>
StringVal AggregateFunctions::ReservoirSampleFinalize(FunctionContext* ctx,
    const StringVal& src) {
  if (UNLIKELY(src.is_null)) return src;
  ReservoirSampleState<T>* src_state =
      reinterpret_cast<ReservoirSampleState<T>*>(src.ptr);

  stringstream out;
  for (int i = 0; i < src_state->num_samples(); ++i) {
    PrintSample<T>(*src_state->GetSample(i), &out);
    if (i < (src_state->num_samples() - 1)) out << ", ";
  }
  const string& out_str = out.str();
  StringVal result_str = StringVal::CopyFrom(ctx,
      reinterpret_cast<const uint8_t*>(out_str.c_str()), out_str.size());
  src_state->Delete(ctx);
  return result_str;
}

template <typename T>
StringVal AggregateFunctions::HistogramFinalize(FunctionContext* ctx,
    const StringVal& src) {
  if (UNLIKELY(src.is_null)) return src;

  ReservoirSampleState<T>* src_state =
      reinterpret_cast<ReservoirSampleState<T>*>(src.ptr);
  src_state->SortSamples();

  stringstream out;
  int num_buckets = min(src_state->num_samples(), NUM_BUCKETS);
  int samples_per_bucket = max(src_state->num_samples() / NUM_BUCKETS, 1);
  for (int bucket_idx = 0; bucket_idx < num_buckets; ++bucket_idx) {
    int sample_idx = (bucket_idx + 1) * samples_per_bucket - 1;
    PrintSample<T>(*(src_state->GetSample(sample_idx)), &out);
    if (bucket_idx < (num_buckets - 1)) out << ", ";
  }
  const string& out_str = out.str();
  StringVal result_str = StringVal::CopyFrom(ctx,
      reinterpret_cast<const uint8_t*>(out_str.c_str()), out_str.size());
  src_state->Delete(ctx);
  return result_str;
}

template <typename T>
T AggregateFunctions::AppxMedianFinalize(FunctionContext* ctx, const StringVal& src) {
  if (UNLIKELY(src.is_null)) return T::null();
  ReservoirSampleState<T>* src_state =
      reinterpret_cast<ReservoirSampleState<T>*>(src.ptr);
  T result = src_state->GetMedian(ctx);
  src_state->Delete(ctx);
  return result;
}

void AggregateFunctions::HllInit(FunctionContext* ctx, StringVal* dst) {
  // The HLL functions use a preallocated FIXED_UDA_INTERMEDIATE intermediate value.
  DCHECK_EQ(dst->len, HLL_LEN);
  memset(dst->ptr, 0, HLL_LEN);
}

template <typename T>
void AggregateFunctions::HllUpdate(FunctionContext* ctx, const T& src, StringVal* dst) {
  if (src.is_null) return;
  DCHECK(!dst->is_null);
  DCHECK_EQ(dst->len, HLL_LEN);
  uint64_t hash_value =
      AnyValUtil::Hash64(src, *ctx->GetArgType(0), HashUtil::FNV64_SEED);
  // Use the lower bits to index into the number of streams and then find the first 1 bit
  // after the index bits.
  int idx = hash_value & (HLL_LEN - 1);
  const uint8_t first_one_bit =
      1 + BitUtil::CountTrailingZeros(
              hash_value >> HLL_PRECISION, sizeof(hash_value) * CHAR_BIT - HLL_PRECISION);
  dst->ptr[idx] = ::max(dst->ptr[idx], first_one_bit);
}

// Specialize for DecimalVal to allow substituting decimal size.
template <>
void AggregateFunctions::HllUpdate(
    FunctionContext* ctx, const DecimalVal& src, StringVal* dst) {
  if (src.is_null) return;
  DCHECK(!dst->is_null);
  DCHECK_EQ(dst->len, HLL_LEN);
  int byte_size = ctx->impl()->GetConstFnAttr(FunctionContextImpl::ARG_TYPE_SIZE, 0);
  uint64_t hash_value = AnyValUtil::HashDecimal64(src, byte_size, HashUtil::FNV64_SEED);
  if (hash_value != 0) {
    // Use the lower bits to index into the number of streams and then
    // find the first 1 bit after the index bits.
    int idx = hash_value & (HLL_LEN - 1);
    uint8_t first_one_bit = __builtin_ctzl(hash_value >> HLL_PRECISION) + 1;
    dst->ptr[idx] = ::max(dst->ptr[idx], first_one_bit);
  }
}

void AggregateFunctions::HllMerge(
    FunctionContext* ctx, const StringVal& src, StringVal* dst) {
  DCHECK(!dst->is_null);
  DCHECK(!src.is_null);
  DCHECK_EQ(dst->len, HLL_LEN);
  DCHECK_EQ(src.len, HLL_LEN);
  for (int i = 0; i < src.len; ++i) {
    dst->ptr[i] = ::max(dst->ptr[i], src.ptr[i]);
  }
}

uint64_t AggregateFunctions::HllFinalEstimate(const uint8_t* buckets) {
  DCHECK(buckets != NULL);

  // Empirical constants for the algorithm.
  float alpha = 0;
  if (HLL_LEN == 16) {
    alpha = 0.673f;
  } else if (HLL_LEN == 32) {
    alpha = 0.697f;
  } else if (HLL_LEN == 64) {
    alpha = 0.709f;
  } else {
    alpha = 0.7213f / (1 + 1.079f / HLL_LEN);
  }

  float harmonic_mean = 0;
  int num_zero_registers = 0;
  for (int i = 0; i < HLL_LEN; ++i) {
    harmonic_mean += ldexp(1.0f, -buckets[i]);
    if (buckets[i] == 0) ++num_zero_registers;
  }
  harmonic_mean = 1.0f / harmonic_mean;
  int64_t estimate = alpha * HLL_LEN * HLL_LEN * harmonic_mean;
  // Adjust for Hll bias based on Hll++ algorithm
  if (estimate <= 5 * HLL_LEN) {
    estimate -= HllEstimateBias(estimate);
  }

  if (num_zero_registers == 0) return estimate;

  // Estimated cardinality is too low. Hll is too inaccurate here, instead use
  // linear counting.
  int64_t h = HLL_LEN * log(static_cast<float>(HLL_LEN) / num_zero_registers);

  return (h <= HllThreshold(HLL_PRECISION)) ? h : estimate;
}

BigIntVal AggregateFunctions::HllFinalize(FunctionContext* ctx, const StringVal& src) {
  if (UNLIKELY(src.is_null)) return BigIntVal::null();
  uint64_t estimate = HllFinalEstimate(src.ptr);
  return estimate;
}

/// Intermediate aggregation state for the SampledNdv() function.
/// Stores NUM_HLL_BUCKETS of the form <row_count, hll_state>.
/// The 'row_count' keeps track of how many input rows were aggregated into that
/// bucket, and the 'hll_state' is an intermediate aggregation state of HyperLogLog.
/// See the header comments on the SampledNdv() function for more details.
class SampledNdvState {
 public:
  /// Empirically determined number of HLL buckets. Power of two for fast modulo.
  static const uint32_t NUM_HLL_BUCKETS = 32;

  /// A bucket contains an update count and an HLL intermediate state.
  static constexpr int64_t BUCKET_SIZE = sizeof(int64_t) + AggregateFunctions::HLL_LEN;

  /// Sampling percent which was given as the second argument to SampledNdv().
  /// Stored here to avoid existing issues with passing constant arguments to all
  /// aggregation phases and because we convert the sampling percent argument from
  /// decimal to double. See IMPALA-6179.
  double sample_perc;

  /// Counts the number of Update() calls. Used for determining which bucket to update.
  int64_t total_row_count;

  /// Array of buckets.
  struct {
    int64_t row_count;
    uint8_t hll[AggregateFunctions::HLL_LEN];
  } buckets[NUM_HLL_BUCKETS];
};

void AggregateFunctions::SampledNdvInit(FunctionContext* ctx, StringVal* dst) {
  // Uses a preallocated FIXED_UDA_INTERMEDIATE intermediate value.
  DCHECK_EQ(dst->len, sizeof(SampledNdvState));
  memset(dst->ptr, 0, sizeof(SampledNdvState));

  DoubleVal* sample_perc = reinterpret_cast<DoubleVal*>(ctx->GetConstantArg(1));
  if (sample_perc == nullptr) return;
  // Guaranteed by the FE.
  DCHECK(!sample_perc->is_null);
  DCHECK_GE(sample_perc->val, 0.0);
  DCHECK_LE(sample_perc->val, 1.0);
  SampledNdvState* state = reinterpret_cast<SampledNdvState*>(dst->ptr);
  state->sample_perc = sample_perc->val;
}

/// Incorporate the 'src' into one of the intermediate HLLs, which will be used by
/// Finalize() to generate a set of the (x,y) data points.
template <typename T>
void AggregateFunctions::SampledNdvUpdate(FunctionContext* ctx, const T& src,
    const DoubleVal& sample_perc, StringVal* dst) {
  SampledNdvState* state = reinterpret_cast<SampledNdvState*>(dst->ptr);
  int64_t bucket_idx = state->total_row_count % SampledNdvState::NUM_HLL_BUCKETS;
  StringVal hll_dst = StringVal(state->buckets[bucket_idx].hll, HLL_LEN);
  HllUpdate(ctx, src, &hll_dst);
  ++state->buckets[bucket_idx].row_count;
  ++state->total_row_count;
}

void AggregateFunctions::SampledNdvMerge(FunctionContext* ctx, const StringVal& src,
    StringVal* dst) {
  SampledNdvState* src_state = reinterpret_cast<SampledNdvState*>(src.ptr);
  SampledNdvState* dst_state = reinterpret_cast<SampledNdvState*>(dst->ptr);
  for (int i = 0; i < SampledNdvState::NUM_HLL_BUCKETS; ++i) {
    StringVal src_hll = StringVal(src_state->buckets[i].hll, HLL_LEN);
    StringVal dst_hll = StringVal(dst_state->buckets[i].hll, HLL_LEN);
    HllMerge(ctx, src_hll, &dst_hll);
    dst_state->buckets[i].row_count += src_state->buckets[i].row_count;
  }
  // Total count. Not really needed after Update() but kept for sanity checking.
  dst_state->total_row_count += src_state->total_row_count;
  // Propagate sampling percent to Finalize().
  dst_state->sample_perc = src_state->sample_perc;
}

BigIntVal AggregateFunctions::SampledNdvFinalize(FunctionContext* ctx,
    const StringVal& src) {
  SampledNdvState* state = reinterpret_cast<SampledNdvState*>(src.ptr);

  // Generate 'num_points' data points with x=row_count and y=ndv_estimate. These points
  // are used to fit a function for the NDV growth and estimate the real NDV.
  constexpr int num_points =
      SampledNdvState::NUM_HLL_BUCKETS * SampledNdvState::NUM_HLL_BUCKETS;
  int64_t counts[num_points] = { 0 };
  int64_t ndvs[num_points] = { 0 };

  int64_t min_ndv = numeric_limits<int64_t>::max();
  int64_t min_count = numeric_limits<int64_t>::max();
  // We have a fixed number of HLL intermediates to generate data points. Any unique
  // subset of intermediates can be combined to create a new data point. It was
  // empirically determined that 'num_data' points is typically sufficient and there are
  // diminishing returns from generating additional data points.
  // The generation method below was chosen for its simplicity. It successively merges
  // buckets in a rolling window of size NUM_HLL_BUCKETS. Repeating the last data point
  // where all buckets are merged biases the curve fitting to hit that data point which
  // makes sense because that's likely the most accurate one. The number of data points
  // are sufficient for reasonable accuracy.
  int pidx = 0;
  for (int i = 0; i < SampledNdvState::NUM_HLL_BUCKETS; ++i) {
    uint8_t merged_hll_data[HLL_LEN];
    memset(merged_hll_data, 0, HLL_LEN);
    StringVal merged_hll(merged_hll_data, HLL_LEN);
    int64_t merged_count = 0;
    for (int j = 0; j < SampledNdvState::NUM_HLL_BUCKETS; ++j) {
      int bucket_idx = (i + j) % SampledNdvState::NUM_HLL_BUCKETS;
      merged_count += state->buckets[bucket_idx].row_count;
      counts[pidx] = merged_count;
      StringVal hll = StringVal(state->buckets[bucket_idx].hll, HLL_LEN);
      HllMerge(ctx, hll, &merged_hll);
      ndvs[pidx] = HllFinalEstimate(merged_hll.ptr);
      ++pidx;
    }
    min_count = std::min(min_count, state->buckets[i].row_count);
    min_ndv = std::min(min_ndv, ndvs[i * SampledNdvState::NUM_HLL_BUCKETS]);
  }
  // Based on the point-generation method above the last elements represent the data
  // point where all buckets are merged.
  int64_t max_count = counts[num_points - 1];
  int64_t max_ndv = ndvs[num_points - 1];

  // Scale all values to [0,1] since some objective functions require it (e.g., Sigmoid).
  double count_scale = max_count - min_count;
  double ndv_scale = max_ndv - min_ndv;
  if (count_scale == 0) count_scale = 1.0;
  if (ndv_scale == 0) ndv_scale = 1.0;
  double scaled_counts[num_points];
  double scaled_ndvs[num_points];
  for (int i = 0; i < num_points; ++i) {
    scaled_counts[i] = counts[i] / count_scale;
    scaled_ndvs[i] = ndvs[i] / ndv_scale;
  }

  // List of objective functions. Curve fitting will select the best values for the
  // parameters a, b, c, d.
  vector<ObjectiveFunction> ndv_fns;
  // Linear function: f(x) = a + b * x
  ndv_fns.push_back(ObjectiveFunction("LIN", 2,
      [](double x, const double* params) -> double {
        return params[0] + params[1] * x;
      }
  ));
  // Logarithmic function: f(x) = a + b * log(x)
  ndv_fns.push_back(ObjectiveFunction("LOG", 2,
      [](double x, const double* params) -> double {
        return params[0] + params[1] * log(x);
      }
  ));
  // Power function: f(x) = a + b * pow(x, c)
  ndv_fns.push_back(ObjectiveFunction("POW", 3,
      [](double x, const double* params) -> double {
        return params[0] + params[1] * pow(x, params[2]);
      }
  ));
  // Sigmoid function: f(x) = a + b * (c / (c + pow(d, -x)))
  ndv_fns.push_back(ObjectiveFunction("SIG", 4,
      [](double x, const double* params) -> double {
        return params[0] + params[1] * (params[2] / (params[2] + pow(params[3], -x)));
      }
  ));

  // Perform least mean squares fitting on all objective functions.
  vector<ObjectiveFunction> valid_ndv_fns;
  for (ObjectiveFunction& f: ndv_fns) {
    if(f.LmsFit(scaled_counts, scaled_ndvs, num_points)) {
      valid_ndv_fns.push_back(std::move(f));
    }
  }

  // Select the best-fit function for estimating the NDV.
  auto best_fit_fn = min_element(valid_ndv_fns.begin(), valid_ndv_fns.end(),
      [](const ObjectiveFunction& a, const ObjectiveFunction& b) -> bool {
        return a.GetError() < b.GetError();
      }
  );

  // Compute the extrapolated NDV based on the extrapolated row count.
  double extrap_count = max_count / state->sample_perc;
  double scaled_extrap_count = extrap_count / count_scale;
  double scaled_extrap_ndv = best_fit_fn->GetY(scaled_extrap_count);
  return round(scaled_extrap_ndv * ndv_scale);
}

template <typename T>
void AggregateFunctions::AggIfUpdate(
    FunctionContext* ctx, const BooleanVal& cond, const T& src, T* dst) {
  DCHECK(!cond.is_null);
  if (cond.val) *dst = src;
}

template <>
void AggregateFunctions::AggIfUpdate(
    FunctionContext* ctx, const BooleanVal& cond, const StringVal& src, StringVal* dst) {
  DCHECK(!cond.is_null);
  if (cond.val) CopyStringVal(ctx, src, dst);
}

template <typename T>
void AggregateFunctions::AggIfMerge(FunctionContext*, const T& src, T* dst) {
  *dst = src;
}

template <>
void AggregateFunctions::AggIfMerge(
    FunctionContext* ctx, const StringVal& src, StringVal* dst) {
  CopyStringVal(ctx, src, dst);
}

template <typename T>
T AggregateFunctions::AggIfFinalize(FunctionContext*, const T& src) {
  return src;
}

template <>
StringVal AggregateFunctions::AggIfFinalize(FunctionContext* ctx, const StringVal& src) {
  StringVal result = StringValGetValue(ctx, src);
  if (!src.is_null) ctx->Free(src.ptr);
  return result;
}

// An implementation of a simple single pass variance algorithm. A standard UDA must
// be single pass (i.e. does not scan the table more than once), so the most canonical
// two pass approach is not practical.
struct KnuthVarianceState {
  double mean;
  double m2;
  int64_t count;
};

// Set pop=true for population variance, false for sample variance
static double ComputeKnuthVariance(const KnuthVarianceState& state, bool pop) {
  // Return zero for 1 tuple specified by
  // http://docs.oracle.com/cd/B19306_01/server.102/b14200/functions212.htm
  if (state.count == 1) return 0.0;
  if (pop) return state.m2 / state.count;
  return state.m2 / (state.count - 1);
}

void AggregateFunctions::KnuthVarInit(FunctionContext* ctx, StringVal* dst) {
  // The Knuth variance functions use a preallocated FIXED_UDA_INTERMEDIATE intermediate
  // value.
  DCHECK_EQ(dst->len, sizeof(KnuthVarianceState));
  memset(dst->ptr, 0, dst->len);
}

template <typename T>
void AggregateFunctions::KnuthVarUpdate(FunctionContext* ctx, const T& src,
    StringVal* dst) {
  DCHECK(!dst->is_null);
  DCHECK_EQ(dst->len, sizeof(KnuthVarianceState));
  if (src.is_null) return;
  KnuthVarianceState* state = reinterpret_cast<KnuthVarianceState*>(dst->ptr);
  double temp = 1 + state->count;
  double delta = src.val - state->mean;
  double r = delta / temp;
  state->mean += r;
  state->m2 += state->count * delta * r;
  state->count = temp;
}

void AggregateFunctions::KnuthVarMerge(FunctionContext* ctx, const StringVal& src,
    StringVal* dst) {
  DCHECK(!dst->is_null);
  DCHECK_EQ(dst->len, sizeof(KnuthVarianceState));
  DCHECK(!src.is_null);
  DCHECK_EQ(src.len, sizeof(KnuthVarianceState));
  // Reference implementation:
  // http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Parallel_algorithm
  KnuthVarianceState* src_state = reinterpret_cast<KnuthVarianceState*>(src.ptr);
  KnuthVarianceState* dst_state = reinterpret_cast<KnuthVarianceState*>(dst->ptr);
  if (src_state->count == 0) return;
  double delta = dst_state->mean - src_state->mean;
  double sum_count = dst_state->count + src_state->count;
  dst_state->mean = src_state->mean + delta * (dst_state->count / sum_count);
  dst_state->m2 = (src_state->m2) + dst_state->m2 +
      (delta * delta) * (src_state->count * dst_state->count / sum_count);
  dst_state->count = sum_count;
}

DoubleVal AggregateFunctions::KnuthVarFinalize(
    FunctionContext* ctx, const StringVal& state_sv) {
  DCHECK(!state_sv.is_null);
  KnuthVarianceState* state = reinterpret_cast<KnuthVarianceState*>(state_sv.ptr);
  if (state->count == 0 || state->count == 1) return DoubleVal::null();
  double variance = ComputeKnuthVariance(*state, false);
  return DoubleVal(variance);
}

DoubleVal AggregateFunctions::KnuthVarPopFinalize(FunctionContext* ctx,
                                                  const StringVal& state_sv) {
  DCHECK(!state_sv.is_null);
  DCHECK_EQ(state_sv.len, sizeof(KnuthVarianceState));
  KnuthVarianceState* state = reinterpret_cast<KnuthVarianceState*>(state_sv.ptr);
  if (state->count == 0) return DoubleVal::null();
  return ComputeKnuthVariance(*state, true);
}

DoubleVal AggregateFunctions::KnuthStddevFinalize(FunctionContext* ctx,
    const StringVal& state_sv) {
  DCHECK(!state_sv.is_null);
  DCHECK_EQ(state_sv.len, sizeof(KnuthVarianceState));
  KnuthVarianceState* state = reinterpret_cast<KnuthVarianceState*>(state_sv.ptr);
  if (state->count == 0 || state->count == 1) return DoubleVal::null();
  return sqrt(ComputeKnuthVariance(*state, false));
}

DoubleVal AggregateFunctions::KnuthStddevPopFinalize(FunctionContext* ctx,
    const StringVal& state_sv) {
  DCHECK(!state_sv.is_null);
  DCHECK_EQ(state_sv.len, sizeof(KnuthVarianceState));
  KnuthVarianceState* state = reinterpret_cast<KnuthVarianceState*>(state_sv.ptr);
  if (state->count == 0) return DoubleVal::null();
  return sqrt(ComputeKnuthVariance(*state, true));
}

struct RankState {
  int64_t rank;
  int64_t count;
  RankState() : rank(1), count(0) { }
};

void AggregateFunctions::RankInit(FunctionContext* ctx, StringVal* dst) {
  // The rank functions use a preallocated FIXED_UDA_INTERMEDIATE intermediate value.
  DCHECK_EQ(dst->len, sizeof(RankState));
  *reinterpret_cast<RankState*>(dst->ptr) = RankState();
}

void AggregateFunctions::RankUpdate(FunctionContext* ctx, StringVal* dst) {
  DCHECK(!dst->is_null);
  DCHECK_EQ(dst->len, sizeof(RankState));
  RankState* state = reinterpret_cast<RankState*>(dst->ptr);
  ++state->count;
}

void AggregateFunctions::DenseRankUpdate(FunctionContext* ctx, StringVal* dst) { }

BigIntVal AggregateFunctions::RankGetValue(FunctionContext* ctx,
    StringVal& src_val) {
  DCHECK(!src_val.is_null);
  DCHECK_EQ(src_val.len, sizeof(RankState));
  RankState* state = reinterpret_cast<RankState*>(src_val.ptr);
  DCHECK_GT(state->count, 0);
  DCHECK_GT(state->rank, 0);
  int64_t result = state->rank;

  // Prepares future calls for the next rank
  state->rank += state->count;
  state->count = 0;
  return BigIntVal(result);
}

BigIntVal AggregateFunctions::DenseRankGetValue(FunctionContext* ctx,
    StringVal& src_val) {
  DCHECK(!src_val.is_null);
  DCHECK_EQ(src_val.len, sizeof(RankState));
  RankState* state = reinterpret_cast<RankState*>(src_val.ptr);
  DCHECK_EQ(state->count, 0);
  DCHECK_GT(state->rank, 0);
  int64_t result = state->rank;

  // Prepares future calls for the next rank
  ++state->rank;
  return BigIntVal(result);
}

BigIntVal AggregateFunctions::RankFinalize(FunctionContext* ctx,
    StringVal& src_val) {
  if (UNLIKELY(src_val.is_null)) return BigIntVal::null();
  DCHECK_EQ(src_val.len, sizeof(RankState));
  RankState* state = reinterpret_cast<RankState*>(src_val.ptr);
  int64_t result = state->rank;
  return BigIntVal(result);
}

template <typename T>
void AggregateFunctions::LastValRemove(FunctionContext* ctx, const T& src, T* dst) {
  if (ctx->impl()->num_removes() >= ctx->impl()->num_updates()) *dst = T::null();
}

template <>
void AggregateFunctions::LastValRemove(FunctionContext* ctx, const StringVal& src,
    StringVal* dst) {
  if (ctx->impl()->num_removes() >= ctx->impl()->num_updates()) {
    if (!dst->is_null) ctx->Free(dst->ptr);
    *dst = StringVal::null();
  }
}

// Returns the current size of the window.
inline int GetWindowSize(FunctionContext* ctx) {
  return ctx->impl()->num_updates() - ctx->impl()->num_removes();
}

// LastValIgnoreNulls is a wrapper around LastVal. It works by not calling UpdateVal
// if the value being added to the window is null, so that we will return the most
// recently seen non-null value.
// The one special case to consider is when all of the values in the window are null
// and we therefore need to return null. To handle this, we track the number of nulls
// currently in the window, and set the value to be returned to null if the number of
// nulls is the same as the window size.
template <typename T>
struct LastValIgnoreNullsState {
  T last_val;
  // Number of nulls currently in the window, to detect when the window only has nulls.
  int64_t num_nulls;
};

template <typename T>
void AggregateFunctions::LastValIgnoreNullsInit(FunctionContext* ctx, StringVal* dst) {
  AllocBuffer(ctx, dst, sizeof(LastValIgnoreNullsState<T>));
  LastValIgnoreNullsState<T>* state =
      reinterpret_cast<LastValIgnoreNullsState<T>*>(dst->ptr);
  state->last_val = T::null();
  state->num_nulls = 0;
}

template <typename T>
void AggregateFunctions::LastValIgnoreNullsUpdate(FunctionContext* ctx, const T& src,
      StringVal* dst) {
  DCHECK(dst->ptr != NULL);
  DCHECK_EQ(sizeof(LastValIgnoreNullsState<T>), dst->len);
  LastValIgnoreNullsState<T>* state =
      reinterpret_cast<LastValIgnoreNullsState<T>*>(dst->ptr);

  if (!src.is_null) {
    UpdateVal(ctx, src, &state->last_val);
  } else {
    ++state->num_nulls;
    DCHECK_LE(state->num_nulls, GetWindowSize(ctx));
    if (GetWindowSize(ctx) == state->num_nulls) {
      // Call UpdateVal here to set the value to null because it handles deallocation
      // of StringVals correctly.
      UpdateVal(ctx, T::null(), &state->last_val);
    }
  }
}

template <typename T>
void AggregateFunctions::LastValIgnoreNullsRemove(FunctionContext* ctx, const T& src,
      StringVal* dst) {
  DCHECK(dst->ptr != NULL);
  DCHECK_EQ(sizeof(LastValIgnoreNullsState<T>), dst->len);
  LastValIgnoreNullsState<T>* state =
      reinterpret_cast<LastValIgnoreNullsState<T>*>(dst->ptr);
  LastValRemove(ctx, src, &state->last_val);

  if (src.is_null) --state->num_nulls;
  DCHECK_GE(state->num_nulls, 0);
  if (GetWindowSize(ctx) == state->num_nulls) {
    // Call UpdateVal here to set the value to null because it handles deallocation
    // of StringVals correctly.
    UpdateVal(ctx, T::null(), &state->last_val);
  }
}

template <typename T>
T AggregateFunctions::LastValIgnoreNullsGetValue(FunctionContext* ctx,
    const StringVal& src) {
  DCHECK(!src.is_null);
  DCHECK_EQ(sizeof(LastValIgnoreNullsState<T>), src.len);
  LastValIgnoreNullsState<T>* state =
      reinterpret_cast<LastValIgnoreNullsState<T>*>(src.ptr);
  return state->last_val;
}

template <>
StringVal AggregateFunctions::LastValIgnoreNullsGetValue(FunctionContext* ctx,
    const StringVal& src) {
  DCHECK(!src.is_null);
  DCHECK_EQ(sizeof(LastValIgnoreNullsState<StringVal>), src.len);
  LastValIgnoreNullsState<StringVal>* state =
      reinterpret_cast<LastValIgnoreNullsState<StringVal>*>(src.ptr);

  if (state->last_val.is_null) {
    return StringVal::null();
  } else {
    return StringVal::CopyFrom(ctx, state->last_val.ptr, state->last_val.len);
  }
}

template <typename T>
T AggregateFunctions::LastValIgnoreNullsFinalize(FunctionContext* ctx,
      const StringVal& src) {
  DCHECK(!src.is_null);
  T result = LastValIgnoreNullsGetValue<T>(ctx, src);
  ctx->Free(src.ptr);
  return result;
}

template <>
StringVal AggregateFunctions::LastValIgnoreNullsFinalize(FunctionContext* ctx,
      const StringVal& src) {
  DCHECK(!src.is_null);
  LastValIgnoreNullsState<StringVal>* state =
      reinterpret_cast<LastValIgnoreNullsState<StringVal>*>(src.ptr);
  StringVal result = LastValIgnoreNullsGetValue<StringVal>(ctx, src);
  if (!state->last_val.is_null) ctx->Free(state->last_val.ptr);
  ctx->Free(src.ptr);
  return result;
}

template <typename T>
void AggregateFunctions::FirstValUpdate(FunctionContext* ctx, const T& src, T* dst) {
  // The first call to FirstValUpdate sets the value of dst.
  if (ctx->impl()->num_updates() > 1) return;
  // num_updates is incremented before calling Update(), so it should never be 0.
  // Remove() should never be called for FIRST_VALUE.
  DCHECK_GT(ctx->impl()->num_updates(), 0);
  DCHECK_EQ(ctx->impl()->num_removes(), 0);
  *dst = src;
}

template <>
void AggregateFunctions::FirstValUpdate(FunctionContext* ctx, const StringVal& src,
    StringVal* dst) {
  if (ctx->impl()->num_updates() > 1) return;
  DCHECK_GT(ctx->impl()->num_updates(), 0);
  DCHECK_EQ(ctx->impl()->num_removes(), 0);
  if (src.is_null) {
    *dst = StringVal::null();
    return;
  }
  CopyStringVal(ctx, src, dst);
}

template <typename T>
void AggregateFunctions::FirstValRewriteUpdate(FunctionContext* ctx, const T& src,
    const BigIntVal&, T* dst) {
  UpdateVal<T>(ctx, src, dst);
}

template <typename T>
void AggregateFunctions::FirstValIgnoreNullsUpdate(FunctionContext*, const T& src,
    T* dst) {
  // Store the first non-null value encountered, unlike FirstValUpdate which always stores
  // the first value even if it is null.
  if (!dst->is_null || src.is_null) return;
  *dst = src;
}

template <>
void AggregateFunctions::FirstValIgnoreNullsUpdate(FunctionContext* ctx,
    const StringVal& src, StringVal* dst) {
  // Store the first non-null value encountered, unlike FirstValUpdate which always stores
  // the first value even if it is null.
  if (!dst->is_null || src.is_null) return;
  CopyStringVal(ctx, src, dst);
}

template <typename T>
void AggregateFunctions::OffsetFnInit(FunctionContext* ctx, T* dst) {
  DCHECK_EQ(ctx->GetNumArgs(), 3);
  DCHECK(ctx->IsArgConstant(1));
  DCHECK(ctx->IsArgConstant(2));
  DCHECK_EQ(ctx->GetArgType(0)->type, ctx->GetArgType(2)->type);
  *dst = *static_cast<T*>(ctx->GetConstantArg(2));
}

template <>
void AggregateFunctions::OffsetFnInit(FunctionContext* ctx, StringVal* dst) {
  DCHECK_EQ(ctx->GetNumArgs(), 3);
  DCHECK(ctx->IsArgConstant(1));
  DCHECK(ctx->IsArgConstant(2));
  DCHECK_EQ(ctx->GetArgType(0)->type, ctx->GetArgType(2)->type);
  CopyStringVal(ctx, *static_cast<StringVal*>(ctx->GetConstantArg(2)), dst);
}

template <typename T>
void AggregateFunctions::OffsetFnUpdate(FunctionContext* ctx, const T& src,
    const BigIntVal&, const T& default_value, T* dst) {
  UpdateVal(ctx, src, dst);
}

// Stamp out the templates for the types we need.
template void AggregateFunctions::InitZero<BigIntVal>(FunctionContext*, BigIntVal* dst);

template void AggregateFunctions::UpdateVal<BooleanVal>(
    FunctionContext*, const BooleanVal& src, BooleanVal* dst);
template void AggregateFunctions::UpdateVal<TinyIntVal>(
    FunctionContext*, const TinyIntVal& src, TinyIntVal* dst);
template void AggregateFunctions::UpdateVal<SmallIntVal>(
    FunctionContext*, const SmallIntVal& src, SmallIntVal* dst);
template void AggregateFunctions::UpdateVal<IntVal>(
    FunctionContext*, const IntVal& src, IntVal* dst);
template void AggregateFunctions::UpdateVal<BigIntVal>(
    FunctionContext*, const BigIntVal& src, BigIntVal* dst);
template void AggregateFunctions::UpdateVal<FloatVal>(
    FunctionContext*, const FloatVal& src, FloatVal* dst);
template void AggregateFunctions::UpdateVal<DoubleVal>(
    FunctionContext*, const DoubleVal& src, DoubleVal* dst);
template void AggregateFunctions::UpdateVal<TimestampVal>(
    FunctionContext*, const TimestampVal& src, TimestampVal* dst);
template void AggregateFunctions::UpdateVal<DecimalVal>(
    FunctionContext*, const DecimalVal& src, DecimalVal* dst);
template void AggregateFunctions::UpdateVal<DateVal>(
    FunctionContext*, const DateVal& src, DateVal* dst);

template void AggregateFunctions::AvgUpdate<BigIntVal>(
    FunctionContext* ctx, const BigIntVal& input, StringVal* dst);
template void AggregateFunctions::AvgUpdate<DoubleVal>(
    FunctionContext* ctx, const DoubleVal& input, StringVal* dst);
template void AggregateFunctions::AvgUpdate<DateVal>(
    FunctionContext* ctx, const DateVal& input, StringVal* dst);
template void AggregateFunctions::AvgRemove<BigIntVal>(
    FunctionContext* ctx, const BigIntVal& input, StringVal* dst);
template void AggregateFunctions::AvgRemove<DoubleVal>(
    FunctionContext* ctx, const DoubleVal& input, StringVal* dst);
template void AggregateFunctions::AvgRemove<DateVal>(
    FunctionContext* ctx, const DateVal& input, StringVal* dst);

template void AggregateFunctions::SumUpdate<TinyIntVal, BigIntVal>(
    FunctionContext*, const TinyIntVal& src, BigIntVal* dst);
template void AggregateFunctions::SumUpdate<SmallIntVal, BigIntVal>(
    FunctionContext*, const SmallIntVal& src, BigIntVal* dst);
template void AggregateFunctions::SumUpdate<IntVal, BigIntVal>(
    FunctionContext*, const IntVal& src, BigIntVal* dst);
template void AggregateFunctions::SumUpdate<BigIntVal, BigIntVal>(
    FunctionContext*, const BigIntVal& src, BigIntVal* dst);
template void AggregateFunctions::SumUpdate<FloatVal, DoubleVal>(
    FunctionContext*, const FloatVal& src, DoubleVal* dst);
template void AggregateFunctions::SumUpdate<DoubleVal, DoubleVal>(
    FunctionContext*, const DoubleVal& src, DoubleVal* dst);

template void AggregateFunctions::SumRemove<TinyIntVal, BigIntVal>(
    FunctionContext*, const TinyIntVal& src, BigIntVal* dst);
template void AggregateFunctions::SumRemove<SmallIntVal, BigIntVal>(
    FunctionContext*, const SmallIntVal& src, BigIntVal* dst);
template void AggregateFunctions::SumRemove<IntVal, BigIntVal>(
    FunctionContext*, const IntVal& src, BigIntVal* dst);
template void AggregateFunctions::SumRemove<BigIntVal, BigIntVal>(
    FunctionContext*, const BigIntVal& src, BigIntVal* dst);
template void AggregateFunctions::SumRemove<FloatVal, DoubleVal>(
    FunctionContext*, const FloatVal& src, DoubleVal* dst);
template void AggregateFunctions::SumRemove<DoubleVal, DoubleVal>(
    FunctionContext*, const DoubleVal& src, DoubleVal* dst);

template void AggregateFunctions::Min<BooleanVal>(
    FunctionContext*, const BooleanVal& src, BooleanVal* dst);
template void AggregateFunctions::Min<TinyIntVal>(
    FunctionContext*, const TinyIntVal& src, TinyIntVal* dst);
template void AggregateFunctions::Min<SmallIntVal>(
    FunctionContext*, const SmallIntVal& src, SmallIntVal* dst);
template void AggregateFunctions::Min<IntVal>(
    FunctionContext*, const IntVal& src, IntVal* dst);
template void AggregateFunctions::Min<BigIntVal>(
    FunctionContext*, const BigIntVal& src, BigIntVal* dst);
template void AggregateFunctions::Min<DateVal>(
    FunctionContext*, const DateVal& src, DateVal* dst);

template void AggregateFunctions::Max<BooleanVal>(
    FunctionContext*, const BooleanVal& src, BooleanVal* dst);
template void AggregateFunctions::Max<TinyIntVal>(
    FunctionContext*, const TinyIntVal& src, TinyIntVal* dst);
template void AggregateFunctions::Max<SmallIntVal>(
    FunctionContext*, const SmallIntVal& src, SmallIntVal* dst);
template void AggregateFunctions::Max<IntVal>(
    FunctionContext*, const IntVal& src, IntVal* dst);
template void AggregateFunctions::Max<BigIntVal>(
    FunctionContext*, const BigIntVal& src, BigIntVal* dst);
template void AggregateFunctions::Max<DateVal>(
    FunctionContext*, const DateVal& src, DateVal* dst);

template void AggregateFunctions::PcUpdate(
    FunctionContext*, const BooleanVal&, StringVal*);
template void AggregateFunctions::PcUpdate(
    FunctionContext*, const TinyIntVal&, StringVal*);
template void AggregateFunctions::PcUpdate(
    FunctionContext*, const SmallIntVal&, StringVal*);
template void AggregateFunctions::PcUpdate(
    FunctionContext*, const IntVal&, StringVal*);
template void AggregateFunctions::PcUpdate(
    FunctionContext*, const BigIntVal&, StringVal*);
template void AggregateFunctions::PcUpdate(
    FunctionContext*, const FloatVal&, StringVal*);
template void AggregateFunctions::PcUpdate(
    FunctionContext*, const DoubleVal&, StringVal*);
template void AggregateFunctions::PcUpdate(
    FunctionContext*, const StringVal&, StringVal*);
template void AggregateFunctions::PcUpdate(
    FunctionContext*, const TimestampVal&, StringVal*);
template void AggregateFunctions::PcUpdate(
    FunctionContext*, const DecimalVal&, StringVal*);
template void AggregateFunctions::PcUpdate(
    FunctionContext*, const DateVal&, StringVal*);

template void AggregateFunctions::PcsaUpdate(
    FunctionContext*, const BooleanVal&, StringVal*);
template void AggregateFunctions::PcsaUpdate(
    FunctionContext*, const TinyIntVal&, StringVal*);
template void AggregateFunctions::PcsaUpdate(
    FunctionContext*, const SmallIntVal&, StringVal*);
template void AggregateFunctions::PcsaUpdate(
    FunctionContext*, const IntVal&, StringVal*);
template void AggregateFunctions::PcsaUpdate(
    FunctionContext*, const BigIntVal&, StringVal*);
template void AggregateFunctions::PcsaUpdate(
    FunctionContext*, const FloatVal&, StringVal*);
template void AggregateFunctions::PcsaUpdate(
    FunctionContext*, const DoubleVal&, StringVal*);
template void AggregateFunctions::PcsaUpdate(
    FunctionContext*, const StringVal&, StringVal*);
template void AggregateFunctions::PcsaUpdate(
    FunctionContext*, const TimestampVal&, StringVal*);
template void AggregateFunctions::PcsaUpdate(
    FunctionContext*, const DecimalVal&, StringVal*);
template void AggregateFunctions::PcsaUpdate(
    FunctionContext*, const DateVal&, StringVal*);

template void AggregateFunctions::ReservoirSampleInit<BooleanVal>(
    FunctionContext*, StringVal*);
template void AggregateFunctions::ReservoirSampleInit<TinyIntVal>(
    FunctionContext*, StringVal*);
template void AggregateFunctions::ReservoirSampleInit<SmallIntVal>(
    FunctionContext*, StringVal*);
template void AggregateFunctions::ReservoirSampleInit<IntVal>(
    FunctionContext*, StringVal*);
template void AggregateFunctions::ReservoirSampleInit<BigIntVal>(
    FunctionContext*, StringVal*);
template void AggregateFunctions::ReservoirSampleInit<FloatVal>(
    FunctionContext*, StringVal*);
template void AggregateFunctions::ReservoirSampleInit<DoubleVal>(
    FunctionContext*, StringVal*);
template void AggregateFunctions::ReservoirSampleInit<StringVal>(
    FunctionContext*, StringVal*);
template void AggregateFunctions::ReservoirSampleInit<TimestampVal>(
    FunctionContext*, StringVal*);
template void AggregateFunctions::ReservoirSampleInit<DecimalVal>(
    FunctionContext*, StringVal*);
template void AggregateFunctions::ReservoirSampleInit<DateVal>(
    FunctionContext*, StringVal*);

template void AggregateFunctions::ReservoirSampleUpdate(
    FunctionContext*, const BooleanVal&, StringVal*);
template void AggregateFunctions::ReservoirSampleUpdate(
    FunctionContext*, const TinyIntVal&, StringVal*);
template void AggregateFunctions::ReservoirSampleUpdate(
    FunctionContext*, const SmallIntVal&, StringVal*);
template void AggregateFunctions::ReservoirSampleUpdate(
    FunctionContext*, const IntVal&, StringVal*);
template void AggregateFunctions::ReservoirSampleUpdate(
    FunctionContext*, const BigIntVal&, StringVal*);
template void AggregateFunctions::ReservoirSampleUpdate(
    FunctionContext*, const FloatVal&, StringVal*);
template void AggregateFunctions::ReservoirSampleUpdate(
    FunctionContext*, const DoubleVal&, StringVal*);
template void AggregateFunctions::ReservoirSampleUpdate(
    FunctionContext*, const StringVal&, StringVal*);
template void AggregateFunctions::ReservoirSampleUpdate(
    FunctionContext*, const TimestampVal&, StringVal*);
template void AggregateFunctions::ReservoirSampleUpdate(
    FunctionContext*, const DecimalVal&, StringVal*);
template void AggregateFunctions::ReservoirSampleUpdate(
    FunctionContext*, const DateVal&, StringVal*);

template StringVal AggregateFunctions::ReservoirSampleSerialize<BooleanVal>(
    FunctionContext*, const StringVal&);
template StringVal AggregateFunctions::ReservoirSampleSerialize<TinyIntVal>(
    FunctionContext*, const StringVal&);
template StringVal AggregateFunctions::ReservoirSampleSerialize<SmallIntVal>(
    FunctionContext*, const StringVal&);
template StringVal AggregateFunctions::ReservoirSampleSerialize<IntVal>(
    FunctionContext*, const StringVal&);
template StringVal AggregateFunctions::ReservoirSampleSerialize<BigIntVal>(
    FunctionContext*, const StringVal&);
template StringVal AggregateFunctions::ReservoirSampleSerialize<FloatVal>(
    FunctionContext*, const StringVal&);
template StringVal AggregateFunctions::ReservoirSampleSerialize<DoubleVal>(
    FunctionContext*, const StringVal&);
template StringVal AggregateFunctions::ReservoirSampleSerialize<StringVal>(
    FunctionContext*, const StringVal&);
template StringVal AggregateFunctions::ReservoirSampleSerialize<TimestampVal>(
    FunctionContext*, const StringVal&);
template StringVal AggregateFunctions::ReservoirSampleSerialize<DecimalVal>(
    FunctionContext*, const StringVal&);
template StringVal AggregateFunctions::ReservoirSampleSerialize<DateVal>(
    FunctionContext*, const StringVal&);

template void AggregateFunctions::ReservoirSampleMerge<BooleanVal>(
    FunctionContext*, const StringVal&, StringVal*);
template void AggregateFunctions::ReservoirSampleMerge<TinyIntVal>(
    FunctionContext*, const StringVal&, StringVal*);
template void AggregateFunctions::ReservoirSampleMerge<SmallIntVal>(
    FunctionContext*, const StringVal&, StringVal*);
template void AggregateFunctions::ReservoirSampleMerge<IntVal>(
    FunctionContext*, const StringVal&, StringVal*);
template void AggregateFunctions::ReservoirSampleMerge<BigIntVal>(
    FunctionContext*, const StringVal&, StringVal*);
template void AggregateFunctions::ReservoirSampleMerge<FloatVal>(
    FunctionContext*, const StringVal&, StringVal*);
template void AggregateFunctions::ReservoirSampleMerge<DoubleVal>(
    FunctionContext*, const StringVal&, StringVal*);
template void AggregateFunctions::ReservoirSampleMerge<StringVal>(
    FunctionContext*, const StringVal&, StringVal*);
template void AggregateFunctions::ReservoirSampleMerge<TimestampVal>(
    FunctionContext*, const StringVal&, StringVal*);
template void AggregateFunctions::ReservoirSampleMerge<DecimalVal>(
    FunctionContext*, const StringVal&, StringVal*);
template void AggregateFunctions::ReservoirSampleMerge<DateVal>(
    FunctionContext*, const StringVal&, StringVal*);

template StringVal AggregateFunctions::ReservoirSampleFinalize<BooleanVal>(
    FunctionContext*, const StringVal&);
template StringVal AggregateFunctions::ReservoirSampleFinalize<TinyIntVal>(
    FunctionContext*, const StringVal&);
template StringVal AggregateFunctions::ReservoirSampleFinalize<SmallIntVal>(
    FunctionContext*, const StringVal&);
template StringVal AggregateFunctions::ReservoirSampleFinalize<IntVal>(
    FunctionContext*, const StringVal&);
template StringVal AggregateFunctions::ReservoirSampleFinalize<BigIntVal>(
    FunctionContext*, const StringVal&);
template StringVal AggregateFunctions::ReservoirSampleFinalize<FloatVal>(
    FunctionContext*, const StringVal&);
template StringVal AggregateFunctions::ReservoirSampleFinalize<DoubleVal>(
    FunctionContext*, const StringVal&);
template StringVal AggregateFunctions::ReservoirSampleFinalize<StringVal>(
    FunctionContext*, const StringVal&);
template StringVal AggregateFunctions::ReservoirSampleFinalize<TimestampVal>(
    FunctionContext*, const StringVal&);
template StringVal AggregateFunctions::ReservoirSampleFinalize<DecimalVal>(
    FunctionContext*, const StringVal&);
template StringVal AggregateFunctions::ReservoirSampleFinalize<DateVal>(
    FunctionContext*, const StringVal&);

template StringVal AggregateFunctions::HistogramFinalize<BooleanVal>(
    FunctionContext*, const StringVal&);
template StringVal AggregateFunctions::HistogramFinalize<TinyIntVal>(
    FunctionContext*, const StringVal&);
template StringVal AggregateFunctions::HistogramFinalize<SmallIntVal>(
    FunctionContext*, const StringVal&);
template StringVal AggregateFunctions::HistogramFinalize<IntVal>(
    FunctionContext*, const StringVal&);
template StringVal AggregateFunctions::HistogramFinalize<BigIntVal>(
    FunctionContext*, const StringVal&);
template StringVal AggregateFunctions::HistogramFinalize<FloatVal>(
    FunctionContext*, const StringVal&);
template StringVal AggregateFunctions::HistogramFinalize<DoubleVal>(
    FunctionContext*, const StringVal&);
template StringVal AggregateFunctions::HistogramFinalize<StringVal>(
    FunctionContext*, const StringVal&);
template StringVal AggregateFunctions::HistogramFinalize<TimestampVal>(
    FunctionContext*, const StringVal&);
template StringVal AggregateFunctions::HistogramFinalize<DecimalVal>(
    FunctionContext*, const StringVal&);
template StringVal AggregateFunctions::HistogramFinalize<DateVal>(
    FunctionContext*, const StringVal&);

template BooleanVal AggregateFunctions::AppxMedianFinalize<BooleanVal>(
    FunctionContext*, const StringVal&);
template TinyIntVal AggregateFunctions::AppxMedianFinalize<TinyIntVal>(
    FunctionContext*, const StringVal&);
template SmallIntVal AggregateFunctions::AppxMedianFinalize<SmallIntVal>(
    FunctionContext*, const StringVal&);
template IntVal AggregateFunctions::AppxMedianFinalize<IntVal>(
    FunctionContext*, const StringVal&);
template BigIntVal AggregateFunctions::AppxMedianFinalize<BigIntVal>(
    FunctionContext*, const StringVal&);
template FloatVal AggregateFunctions::AppxMedianFinalize<FloatVal>(
    FunctionContext*, const StringVal&);
template DoubleVal AggregateFunctions::AppxMedianFinalize<DoubleVal>(
    FunctionContext*, const StringVal&);
template StringVal AggregateFunctions::AppxMedianFinalize<StringVal>(
    FunctionContext*, const StringVal&);
template TimestampVal AggregateFunctions::AppxMedianFinalize<TimestampVal>(
    FunctionContext*, const StringVal&);
template DecimalVal AggregateFunctions::AppxMedianFinalize<DecimalVal>(
    FunctionContext*, const StringVal&);
template DateVal AggregateFunctions::AppxMedianFinalize<DateVal>(
    FunctionContext*, const StringVal&);

template void AggregateFunctions::HllUpdate(
    FunctionContext*, const BooleanVal&, StringVal*);
template void AggregateFunctions::HllUpdate(
    FunctionContext*, const TinyIntVal&, StringVal*);
template void AggregateFunctions::HllUpdate(
    FunctionContext*, const SmallIntVal&, StringVal*);
template void AggregateFunctions::HllUpdate(
    FunctionContext*, const IntVal&, StringVal*);
template void AggregateFunctions::HllUpdate(
    FunctionContext*, const BigIntVal&, StringVal*);
template void AggregateFunctions::HllUpdate(
    FunctionContext*, const FloatVal&, StringVal*);
template void AggregateFunctions::HllUpdate(
    FunctionContext*, const DoubleVal&, StringVal*);
template void AggregateFunctions::HllUpdate(
    FunctionContext*, const StringVal&, StringVal*);
template void AggregateFunctions::HllUpdate(
    FunctionContext*, const TimestampVal&, StringVal*);
template void AggregateFunctions::HllUpdate(
    FunctionContext*, const DateVal&, StringVal*);

template void AggregateFunctions::SampledNdvUpdate(
    FunctionContext*, const BooleanVal&, const DoubleVal&, StringVal*);
template void AggregateFunctions::SampledNdvUpdate(
    FunctionContext*, const TinyIntVal&, const DoubleVal&, StringVal*);
template void AggregateFunctions::SampledNdvUpdate(
    FunctionContext*, const SmallIntVal&, const DoubleVal&, StringVal*);
template void AggregateFunctions::SampledNdvUpdate(
    FunctionContext*, const IntVal&, const DoubleVal&, StringVal*);
template void AggregateFunctions::SampledNdvUpdate(
    FunctionContext*, const BigIntVal&, const DoubleVal&, StringVal*);
template void AggregateFunctions::SampledNdvUpdate(
    FunctionContext*, const FloatVal&, const DoubleVal&, StringVal*);
template void AggregateFunctions::SampledNdvUpdate(
    FunctionContext*, const DoubleVal&, const DoubleVal&, StringVal*);
template void AggregateFunctions::SampledNdvUpdate(
    FunctionContext*, const StringVal&, const DoubleVal&, StringVal*);
template void AggregateFunctions::SampledNdvUpdate(
    FunctionContext*, const TimestampVal&, const DoubleVal&, StringVal*);
template void AggregateFunctions::SampledNdvUpdate(
    FunctionContext*, const DecimalVal&, const DoubleVal&, StringVal*);
template void AggregateFunctions::SampledNdvUpdate(
    FunctionContext*, const DateVal&, const DoubleVal&, StringVal*);

template void AggregateFunctions::AggIfUpdate(
    FunctionContext*, const BooleanVal& cond, const BooleanVal& src, BooleanVal* dst);
template void AggregateFunctions::AggIfUpdate(
    FunctionContext*, const BooleanVal& cond, const TinyIntVal& src, TinyIntVal* dst);
template void AggregateFunctions::AggIfUpdate(
    FunctionContext*, const BooleanVal& cond, const SmallIntVal& src, SmallIntVal* dst);
template void AggregateFunctions::AggIfUpdate(
    FunctionContext*, const BooleanVal& cond, const IntVal& src, IntVal* dst);
template void AggregateFunctions::AggIfUpdate(
    FunctionContext*, const BooleanVal& cond, const BigIntVal& src, BigIntVal* dst);
template void AggregateFunctions::AggIfUpdate(
    FunctionContext*, const BooleanVal& cond, const FloatVal& src, FloatVal* dst);
template void AggregateFunctions::AggIfUpdate(
    FunctionContext*, const BooleanVal& cond, const DoubleVal& src, DoubleVal* dst);
template void AggregateFunctions::AggIfUpdate(
    FunctionContext*, const BooleanVal& cond, const TimestampVal& src, TimestampVal* dst);
template void AggregateFunctions::AggIfUpdate(
    FunctionContext*, const BooleanVal& cond, const DecimalVal& src, DecimalVal* dst);
template void AggregateFunctions::AggIfUpdate(
    FunctionContext*, const BooleanVal& cond, const DateVal& src, DateVal* dst);

template void AggregateFunctions::AggIfMerge(
    FunctionContext*, const BooleanVal& src, BooleanVal* dst);
template void AggregateFunctions::AggIfMerge(
    FunctionContext*, const TinyIntVal& src, TinyIntVal* dst);
template void AggregateFunctions::AggIfMerge(
    FunctionContext*, const SmallIntVal& src, SmallIntVal* dst);
template void AggregateFunctions::AggIfMerge(
    FunctionContext*, const IntVal& src, IntVal* dst);
template void AggregateFunctions::AggIfMerge(
    FunctionContext*, const BigIntVal& src, BigIntVal* dst);
template void AggregateFunctions::AggIfMerge(
    FunctionContext*, const FloatVal& src, FloatVal* dst);
template void AggregateFunctions::AggIfMerge(
    FunctionContext*, const DoubleVal& src, DoubleVal* dst);
template void AggregateFunctions::AggIfMerge(
    FunctionContext*, const TimestampVal& src, TimestampVal* dst);
template void AggregateFunctions::AggIfMerge(
    FunctionContext*, const DecimalVal& src, DecimalVal* dst);
template void AggregateFunctions::AggIfMerge(
    FunctionContext*, const DateVal& src, DateVal* dst);

template BooleanVal AggregateFunctions::AggIfFinalize(
    FunctionContext*, const BooleanVal& src);
template TinyIntVal AggregateFunctions::AggIfFinalize(
    FunctionContext*, const TinyIntVal& src);
template SmallIntVal AggregateFunctions::AggIfFinalize(
    FunctionContext*, const SmallIntVal& src);
template IntVal AggregateFunctions::AggIfFinalize(FunctionContext*, const IntVal& src);
template BigIntVal AggregateFunctions::AggIfFinalize(
    FunctionContext*, const BigIntVal& src);
template FloatVal AggregateFunctions::AggIfFinalize(
    FunctionContext*, const FloatVal& src);
template DoubleVal AggregateFunctions::AggIfFinalize(
    FunctionContext*, const DoubleVal& src);
template TimestampVal AggregateFunctions::AggIfFinalize(
    FunctionContext*, const TimestampVal& src);
template DecimalVal AggregateFunctions::AggIfFinalize(
    FunctionContext*, const DecimalVal& src);
template DateVal AggregateFunctions::AggIfFinalize(
    FunctionContext*, const DateVal& src);

template void AggregateFunctions::KnuthVarUpdate(
    FunctionContext*, const TinyIntVal&, StringVal*);
template void AggregateFunctions::KnuthVarUpdate(
    FunctionContext*, const SmallIntVal&, StringVal*);
template void AggregateFunctions::KnuthVarUpdate(
    FunctionContext*, const IntVal&, StringVal*);
template void AggregateFunctions::KnuthVarUpdate(
    FunctionContext*, const BigIntVal&, StringVal*);
template void AggregateFunctions::KnuthVarUpdate(
    FunctionContext*, const FloatVal&, StringVal*);
template void AggregateFunctions::KnuthVarUpdate(
    FunctionContext*, const DoubleVal&, StringVal*);

template void AggregateFunctions::LastValRemove<BooleanVal>(
    FunctionContext*, const BooleanVal& src, BooleanVal* dst);
template void AggregateFunctions::LastValRemove<TinyIntVal>(
    FunctionContext*, const TinyIntVal& src, TinyIntVal* dst);
template void AggregateFunctions::LastValRemove<SmallIntVal>(
    FunctionContext*, const SmallIntVal& src, SmallIntVal* dst);
template void AggregateFunctions::LastValRemove<IntVal>(
    FunctionContext*, const IntVal& src, IntVal* dst);
template void AggregateFunctions::LastValRemove<BigIntVal>(
    FunctionContext*, const BigIntVal& src, BigIntVal* dst);
template void AggregateFunctions::LastValRemove<FloatVal>(
    FunctionContext*, const FloatVal& src, FloatVal* dst);
template void AggregateFunctions::LastValRemove<DoubleVal>(
    FunctionContext*, const DoubleVal& src, DoubleVal* dst);
template void AggregateFunctions::LastValRemove<TimestampVal>(
    FunctionContext*, const TimestampVal& src, TimestampVal* dst);
template void AggregateFunctions::LastValRemove<DecimalVal>(
    FunctionContext*, const DecimalVal& src, DecimalVal* dst);
template void AggregateFunctions::LastValRemove<DateVal>(
    FunctionContext*, const DateVal& src, DateVal* dst);

template void AggregateFunctions::LastValIgnoreNullsInit<BooleanVal>(
    FunctionContext*, StringVal*);
template void AggregateFunctions::LastValIgnoreNullsInit<TinyIntVal>(
    FunctionContext*, StringVal*);
template void AggregateFunctions::LastValIgnoreNullsInit<SmallIntVal>(
    FunctionContext*, StringVal*);
template void AggregateFunctions::LastValIgnoreNullsInit<IntVal>(
    FunctionContext*, StringVal*);
template void AggregateFunctions::LastValIgnoreNullsInit<BigIntVal>(
    FunctionContext*, StringVal*);
template void AggregateFunctions::LastValIgnoreNullsInit<FloatVal>(
    FunctionContext*, StringVal*);
template void AggregateFunctions::LastValIgnoreNullsInit<DoubleVal>(
    FunctionContext*, StringVal*);
template void AggregateFunctions::LastValIgnoreNullsInit<StringVal>(
    FunctionContext*, StringVal*);
template void AggregateFunctions::LastValIgnoreNullsInit<TimestampVal>(
    FunctionContext*, StringVal*);
template void AggregateFunctions::LastValIgnoreNullsInit<DecimalVal>(
    FunctionContext*, StringVal*);
template void AggregateFunctions::LastValIgnoreNullsInit<DateVal>(
    FunctionContext*, StringVal*);

template void AggregateFunctions::LastValIgnoreNullsUpdate<BooleanVal>(
    FunctionContext*, const BooleanVal& src, StringVal* dst);
template void AggregateFunctions::LastValIgnoreNullsUpdate<TinyIntVal>(
    FunctionContext*, const TinyIntVal& src, StringVal* dst);
template void AggregateFunctions::LastValIgnoreNullsUpdate<SmallIntVal>(
    FunctionContext*, const SmallIntVal& src, StringVal* dst);
template void AggregateFunctions::LastValIgnoreNullsUpdate<IntVal>(
    FunctionContext*, const IntVal& src, StringVal* dst);
template void AggregateFunctions::LastValIgnoreNullsUpdate<BigIntVal>(
    FunctionContext*, const BigIntVal& src, StringVal* dst);
template void AggregateFunctions::LastValIgnoreNullsUpdate<FloatVal>(
    FunctionContext*, const FloatVal& src, StringVal* dst);
template void AggregateFunctions::LastValIgnoreNullsUpdate<DoubleVal>(
    FunctionContext*, const DoubleVal& src, StringVal* dst);
template void AggregateFunctions::LastValIgnoreNullsUpdate<StringVal>(
    FunctionContext*, const StringVal& src, StringVal* dst);
template void AggregateFunctions::LastValIgnoreNullsUpdate<TimestampVal>(
    FunctionContext*, const TimestampVal& src, StringVal* dst);
template void AggregateFunctions::LastValIgnoreNullsUpdate<DecimalVal>(
    FunctionContext*, const DecimalVal& src, StringVal* dst);
template void AggregateFunctions::LastValIgnoreNullsUpdate<DateVal>(
    FunctionContext*, const DateVal& src, StringVal* dst);

template void AggregateFunctions::LastValIgnoreNullsRemove<BooleanVal>(
    FunctionContext*, const BooleanVal& src, StringVal* dst);
template void AggregateFunctions::LastValIgnoreNullsRemove<TinyIntVal>(
    FunctionContext*, const TinyIntVal& src, StringVal* dst);
template void AggregateFunctions::LastValIgnoreNullsRemove<SmallIntVal>(
    FunctionContext*, const SmallIntVal& src, StringVal* dst);
template void AggregateFunctions::LastValIgnoreNullsRemove<IntVal>(
    FunctionContext*, const IntVal& src, StringVal* dst);
template void AggregateFunctions::LastValIgnoreNullsRemove<BigIntVal>(
    FunctionContext*, const BigIntVal& src, StringVal* dst);
template void AggregateFunctions::LastValIgnoreNullsRemove<FloatVal>(
    FunctionContext*, const FloatVal& src, StringVal* dst);
template void AggregateFunctions::LastValIgnoreNullsRemove<DoubleVal>(
    FunctionContext*, const DoubleVal& src, StringVal* dst);
template void AggregateFunctions::LastValIgnoreNullsRemove<StringVal>(
    FunctionContext*, const StringVal& src, StringVal* dst);
template void AggregateFunctions::LastValIgnoreNullsRemove<TimestampVal>(
    FunctionContext*, const TimestampVal& src, StringVal* dst);
template void AggregateFunctions::LastValIgnoreNullsRemove<DecimalVal>(
    FunctionContext*, const DecimalVal& src, StringVal* dst);
template void AggregateFunctions::LastValIgnoreNullsRemove<DateVal>(
    FunctionContext*, const DateVal& src, StringVal* dst);

template BooleanVal AggregateFunctions::LastValIgnoreNullsGetValue<BooleanVal>(
    FunctionContext*, const StringVal&);
template TinyIntVal AggregateFunctions::LastValIgnoreNullsGetValue<TinyIntVal>(
    FunctionContext*, const StringVal&);
template SmallIntVal AggregateFunctions::LastValIgnoreNullsGetValue<SmallIntVal>(
    FunctionContext*, const StringVal&);
template IntVal AggregateFunctions::LastValIgnoreNullsGetValue<IntVal>(
    FunctionContext*, const StringVal&);
template BigIntVal AggregateFunctions::LastValIgnoreNullsGetValue<BigIntVal>(
    FunctionContext*, const StringVal&);
template FloatVal AggregateFunctions::LastValIgnoreNullsGetValue<FloatVal>(
    FunctionContext*, const StringVal&);
template DoubleVal AggregateFunctions::LastValIgnoreNullsGetValue<DoubleVal>(
    FunctionContext*, const StringVal&);
template TimestampVal AggregateFunctions::LastValIgnoreNullsGetValue<TimestampVal>(
    FunctionContext*, const StringVal&);
template DecimalVal AggregateFunctions::LastValIgnoreNullsGetValue<DecimalVal>(
    FunctionContext*, const StringVal&);
template DateVal AggregateFunctions::LastValIgnoreNullsGetValue<DateVal>(
    FunctionContext*, const StringVal&);

template BooleanVal AggregateFunctions::LastValIgnoreNullsFinalize<BooleanVal>(
    FunctionContext*, const StringVal&);
template TinyIntVal AggregateFunctions::LastValIgnoreNullsFinalize<TinyIntVal>(
    FunctionContext*, const StringVal&);
template SmallIntVal AggregateFunctions::LastValIgnoreNullsFinalize<SmallIntVal>(
    FunctionContext*, const StringVal&);
template IntVal AggregateFunctions::LastValIgnoreNullsFinalize<IntVal>(
    FunctionContext*, const StringVal&);
template BigIntVal AggregateFunctions::LastValIgnoreNullsFinalize<BigIntVal>(
    FunctionContext*, const StringVal&);
template FloatVal AggregateFunctions::LastValIgnoreNullsFinalize<FloatVal>(
    FunctionContext*, const StringVal&);
template DoubleVal AggregateFunctions::LastValIgnoreNullsFinalize<DoubleVal>(
    FunctionContext*, const StringVal&);
template TimestampVal AggregateFunctions::LastValIgnoreNullsFinalize<TimestampVal>(
    FunctionContext*, const StringVal&);
template DecimalVal AggregateFunctions::LastValIgnoreNullsFinalize<DecimalVal>(
    FunctionContext*, const StringVal&);
template DateVal AggregateFunctions::LastValIgnoreNullsFinalize<DateVal>(
    FunctionContext*, const StringVal&);

template void AggregateFunctions::FirstValUpdate<BooleanVal>(
    FunctionContext*, const BooleanVal& src, BooleanVal* dst);
template void AggregateFunctions::FirstValUpdate<TinyIntVal>(
    FunctionContext*, const TinyIntVal& src, TinyIntVal* dst);
template void AggregateFunctions::FirstValUpdate<SmallIntVal>(
    FunctionContext*, const SmallIntVal& src, SmallIntVal* dst);
template void AggregateFunctions::FirstValUpdate<IntVal>(
    FunctionContext*, const IntVal& src, IntVal* dst);
template void AggregateFunctions::FirstValUpdate<BigIntVal>(
    FunctionContext*, const BigIntVal& src, BigIntVal* dst);
template void AggregateFunctions::FirstValUpdate<FloatVal>(
    FunctionContext*, const FloatVal& src, FloatVal* dst);
template void AggregateFunctions::FirstValUpdate<DoubleVal>(
    FunctionContext*, const DoubleVal& src, DoubleVal* dst);
template void AggregateFunctions::FirstValUpdate<TimestampVal>(
    FunctionContext*, const TimestampVal& src, TimestampVal* dst);
template void AggregateFunctions::FirstValUpdate<DecimalVal>(
    FunctionContext*, const DecimalVal& src, DecimalVal* dst);
template void AggregateFunctions::FirstValUpdate<DateVal>(
    FunctionContext*, const DateVal& src, DateVal* dst);

template void AggregateFunctions::FirstValRewriteUpdate<BooleanVal>(
    FunctionContext*, const BooleanVal& src, const BigIntVal&, BooleanVal* dst);
template void AggregateFunctions::FirstValRewriteUpdate<TinyIntVal>(
    FunctionContext*, const TinyIntVal& src, const BigIntVal&, TinyIntVal* dst);
template void AggregateFunctions::FirstValRewriteUpdate<SmallIntVal>(
    FunctionContext*, const SmallIntVal& src, const BigIntVal&, SmallIntVal* dst);
template void AggregateFunctions::FirstValRewriteUpdate<IntVal>(
    FunctionContext*, const IntVal& src, const BigIntVal&, IntVal* dst);
template void AggregateFunctions::FirstValRewriteUpdate<BigIntVal>(
    FunctionContext*, const BigIntVal& src, const BigIntVal&, BigIntVal* dst);
template void AggregateFunctions::FirstValRewriteUpdate<FloatVal>(
    FunctionContext*, const FloatVal& src, const BigIntVal&, FloatVal* dst);
template void AggregateFunctions::FirstValRewriteUpdate<DoubleVal>(
    FunctionContext*, const DoubleVal& src, const BigIntVal&, DoubleVal* dst);
template void AggregateFunctions::FirstValRewriteUpdate<StringVal>(
    FunctionContext*, const StringVal& src, const BigIntVal&, StringVal* dst);
template void AggregateFunctions::FirstValRewriteUpdate<TimestampVal>(
    FunctionContext*, const TimestampVal& src, const BigIntVal&, TimestampVal* dst);
template void AggregateFunctions::FirstValRewriteUpdate<DecimalVal>(
    FunctionContext*, const DecimalVal& src, const BigIntVal&, DecimalVal* dst);
template void AggregateFunctions::FirstValRewriteUpdate<DateVal>(
    FunctionContext*, const DateVal& src, const BigIntVal&, DateVal* dst);

template void AggregateFunctions::FirstValIgnoreNullsUpdate<BooleanVal>(
    FunctionContext*, const BooleanVal& src, BooleanVal* dst);
template void AggregateFunctions::FirstValIgnoreNullsUpdate<TinyIntVal>(
    FunctionContext*, const TinyIntVal& src, TinyIntVal* dst);
template void AggregateFunctions::FirstValIgnoreNullsUpdate<SmallIntVal>(
    FunctionContext*, const SmallIntVal& src, SmallIntVal* dst);
template void AggregateFunctions::FirstValIgnoreNullsUpdate<IntVal>(
    FunctionContext*, const IntVal& src, IntVal* dst);
template void AggregateFunctions::FirstValIgnoreNullsUpdate<BigIntVal>(
    FunctionContext*, const BigIntVal& src, BigIntVal* dst);
template void AggregateFunctions::FirstValIgnoreNullsUpdate<FloatVal>(
    FunctionContext*, const FloatVal& src, FloatVal* dst);
template void AggregateFunctions::FirstValIgnoreNullsUpdate<DoubleVal>(
    FunctionContext*, const DoubleVal& src, DoubleVal* dst);
template void AggregateFunctions::FirstValIgnoreNullsUpdate<TimestampVal>(
    FunctionContext*, const TimestampVal& src, TimestampVal* dst);
template void AggregateFunctions::FirstValIgnoreNullsUpdate<DecimalVal>(
    FunctionContext*, const DecimalVal& src, DecimalVal* dst);
template void AggregateFunctions::FirstValIgnoreNullsUpdate<DateVal>(
    FunctionContext*, const DateVal& src, DateVal* dst);

template void AggregateFunctions::OffsetFnInit<BooleanVal>(
    FunctionContext*, BooleanVal*);
template void AggregateFunctions::OffsetFnInit<TinyIntVal>(
    FunctionContext*, TinyIntVal*);
template void AggregateFunctions::OffsetFnInit<SmallIntVal>(
    FunctionContext*, SmallIntVal*);
template void AggregateFunctions::OffsetFnInit<IntVal>(
    FunctionContext*, IntVal*);
template void AggregateFunctions::OffsetFnInit<BigIntVal>(
    FunctionContext*, BigIntVal*);
template void AggregateFunctions::OffsetFnInit<FloatVal>(
    FunctionContext*, FloatVal*);
template void AggregateFunctions::OffsetFnInit<DoubleVal>(
    FunctionContext*, DoubleVal*);
template void AggregateFunctions::OffsetFnInit<TimestampVal>(
    FunctionContext*, TimestampVal*);
template void AggregateFunctions::OffsetFnInit<DecimalVal>(
    FunctionContext*, DecimalVal*);
template void AggregateFunctions::OffsetFnInit<DateVal>(
    FunctionContext*, DateVal*);

template void AggregateFunctions::OffsetFnUpdate<BooleanVal>(
    FunctionContext*, const BooleanVal& src, const BigIntVal&, const BooleanVal&,
    BooleanVal* dst);
template void AggregateFunctions::OffsetFnUpdate<TinyIntVal>(
    FunctionContext*, const TinyIntVal& src, const BigIntVal&, const TinyIntVal&,
    TinyIntVal* dst);
template void AggregateFunctions::OffsetFnUpdate<SmallIntVal>(
    FunctionContext*, const SmallIntVal& src, const BigIntVal&, const SmallIntVal&,
    SmallIntVal* dst);
template void AggregateFunctions::OffsetFnUpdate<IntVal>(
    FunctionContext*, const IntVal& src, const BigIntVal&, const IntVal&, IntVal* dst);
template void AggregateFunctions::OffsetFnUpdate<BigIntVal>(
    FunctionContext*, const BigIntVal& src, const BigIntVal&, const BigIntVal&,
    BigIntVal* dst);
template void AggregateFunctions::OffsetFnUpdate<FloatVal>(
    FunctionContext*, const FloatVal& src, const BigIntVal&, const FloatVal&,
    FloatVal* dst);
template void AggregateFunctions::OffsetFnUpdate<DoubleVal>(
    FunctionContext*, const DoubleVal& src, const BigIntVal&, const DoubleVal&,
    DoubleVal* dst);
template void AggregateFunctions::OffsetFnUpdate<StringVal>(
    FunctionContext*, const StringVal& src, const BigIntVal&, const StringVal&,
    StringVal* dst);
template void AggregateFunctions::OffsetFnUpdate<TimestampVal>(
    FunctionContext*, const TimestampVal& src, const BigIntVal&, const TimestampVal&,
    TimestampVal* dst);
template void AggregateFunctions::OffsetFnUpdate<DecimalVal>(
    FunctionContext*, const DecimalVal& src, const BigIntVal&, const DecimalVal&,
    DecimalVal* dst);
template void AggregateFunctions::OffsetFnUpdate<DateVal>(
    FunctionContext*, const DateVal& src, const BigIntVal&, const DateVal&,
    DateVal* dst);
}
