| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "exprs/aggregate-functions.h" |
| |
| #include <algorithm> |
| #include <map> |
| #include <sstream> |
| #include <utility> |
| #include <cmath> |
| |
| #include <boost/random/ranlux.hpp> |
| #include <boost/random/uniform_int.hpp> |
| |
| #include "codegen/impala-ir.h" |
| #include "common/logging.h" |
| #include "exprs/anyval-util.h" |
| #include "exprs/hll-bias.h" |
| #include "runtime/date-value.h" |
| #include "runtime/decimal-value.inline.h" |
| #include "runtime/runtime-state.h" |
| #include "runtime/string-value.inline.h" |
| #include "runtime/timestamp-value.h" |
| #include "runtime/timestamp-value.inline.h" |
| #include "util/arithmetic-util.h" |
| #include "util/mpfit-util.h" |
| |
| #include "common/names.h" |
| |
| using boost::uniform_int; |
| using boost::ranlux64_3; |
| using std::make_pair; |
| using std::map; |
| using std::min_element; |
| using std::nth_element; |
| using std::pop_heap; |
| using std::push_heap; |
| |
| namespace { |
| // Threshold for each precision where it's better to use linear counting instead |
| // of the bias corrected estimate. |
| static float HllThreshold(int p) { |
| switch (p) { |
| case 4: |
| return 10.0; |
| case 5: |
| return 20.0; |
| case 6: |
| return 40.0; |
| case 7: |
| return 80.0; |
| case 8: |
| return 220.0; |
| case 9: |
| return 400.0; |
| case 10: |
| return 900.0; |
| case 11: |
| return 1800.0; |
| case 12: |
| return 3100.0; |
| case 13: |
| return 6500.0; |
| case 14: |
| return 11500.0; |
| case 15: |
| return 20000.0; |
| case 16: |
| return 50000.0; |
| case 17: |
| return 120000.0; |
| case 18: |
| return 350000.0; |
| } |
| return 0.0; |
| } |
| |
| // Implements k nearest neighbor interpolation for k=6, |
| // we choose 6 bassed on the HLL++ paper |
| int64_t HllEstimateBias(int64_t estimate) { |
| const size_t K = 6; |
| |
| // Precision index into data arrays |
| // We don't have data for precisions less than 4 |
| DCHECK(impala::AggregateFunctions::HLL_PRECISION >= 4); |
| static constexpr size_t idx = impala::AggregateFunctions::HLL_PRECISION - 4; |
| |
| // Calculate the square of the difference of this estimate to all |
| // precalculated estimates for a particular precision |
| map<double, size_t> distances; |
| for (size_t i = 0; |
| i < impala::HLL_DATA_SIZES[idx] / sizeof(double); ++i) { |
| double val = estimate - impala::HLL_RAW_ESTIMATE_DATA[idx][i]; |
| distances.insert(make_pair(val * val, i)); |
| } |
| |
| size_t nearest[K]; |
| size_t j = 0; |
| // Use a sorted map to find the K closest estimates to our initial estimate |
| for (map<double, size_t>::iterator it = distances.begin(); |
| j < K && it != distances.end(); ++it, ++j) { |
| nearest[j] = it->second; |
| } |
| |
| // Compute the average bias correction the K closest estimates |
| double bias = 0.0; |
| for (size_t i = 0; i < K; ++i) { |
| bias += impala::HLL_BIAS_DATA[idx][nearest[i]]; |
| } |
| |
| return bias / K; |
| } |
| |
| } |
| |
| namespace impala { |
| |
| // This function initializes StringVal 'dst' with a newly allocated buffer of |
| // 'buf_len' bytes. The new buffer will be filled with zero. If allocation fails, |
| // 'dst' will be set to a null string. This allows execution to continue until the |
| // next time GetQueryStatus() is called (see IMPALA-2756). |
| static void AllocBuffer(FunctionContext* ctx, StringVal* dst, size_t buf_len) { |
| uint8_t* ptr = ctx->Allocate(buf_len); |
| if (UNLIKELY(ptr == NULL && buf_len != 0)) { |
| DCHECK(!ctx->impl()->state()->GetQueryStatus().ok()); |
| *dst = StringVal::null(); |
| } else { |
| *dst = StringVal(ptr, buf_len); |
| // Avoid memset() with NULL ptr as it's undefined. |
| if (LIKELY(ptr != NULL)) memset(ptr, 0, buf_len); |
| } |
| } |
| |
| // This function initializes StringVal 'dst' with a newly allocated buffer of |
| // 'buf_len' bytes and copies the content of StringVal 'src' into it. |
| // If allocation fails, 'dst' will be set to a null string. |
| static void CopyStringVal(FunctionContext* ctx, const StringVal& src, StringVal* dst) { |
| if (src.is_null) { |
| *dst = StringVal::null(); |
| } else { |
| uint8_t* copy = ctx->Allocate(src.len); |
| if (UNLIKELY(copy == NULL)) { |
| // Zero-length allocation always returns a hard-coded pointer. |
| DCHECK(src.len != 0 && !ctx->impl()->state()->GetQueryStatus().ok()); |
| *dst = StringVal::null(); |
| } else { |
| *dst = StringVal(copy, src.len); |
| memcpy(dst->ptr, src.ptr, src.len); |
| } |
| } |
| } |
| |
| // Converts any UDF Val Type to a string representation |
| template <typename T> |
| StringVal ToStringVal(FunctionContext* context, T val) { |
| stringstream ss; |
| ss << val; |
| const string &str = ss.str(); |
| return StringVal::CopyFrom( |
| context, reinterpret_cast<const uint8_t*>(str.c_str()), str.size()); |
| } |
| |
| constexpr int AggregateFunctions::HLL_PRECISION; |
| constexpr int AggregateFunctions::HLL_LEN; |
| |
| void AggregateFunctions::InitNull(FunctionContext*, AnyVal* dst) { |
| dst->is_null = true; |
| } |
| |
| template<typename T> |
| void AggregateFunctions::InitZero(FunctionContext*, T* dst) { |
| dst->is_null = false; |
| dst->val = 0; |
| } |
| |
| template<> |
| void AggregateFunctions::InitZero(FunctionContext*, DecimalVal* dst) { |
| dst->is_null = false; |
| dst->val16 = 0; // Also initializes val4 and val8 to 0. |
| } |
| |
| template <typename T> |
| void AggregateFunctions::UpdateVal(FunctionContext* ctx, const T& src, T* dst) { |
| *dst = src; |
| } |
| |
| template <> |
| void AggregateFunctions::UpdateVal(FunctionContext* ctx, const StringVal& src, |
| StringVal* dst) { |
| if (src.is_null) { |
| if (!dst->is_null) ctx->Free(dst->ptr); |
| *dst = StringVal::null(); |
| return; |
| } |
| |
| uint8_t* new_ptr; |
| if (dst->is_null) { |
| new_ptr = ctx->Allocate(src.len); |
| } else { |
| new_ptr = ctx->Reallocate(dst->ptr, src.len); |
| } |
| // Note that a zero-length string is not the same as StringVal::null(). |
| RETURN_IF_NULL(ctx, new_ptr); |
| dst->ptr = new_ptr; |
| memcpy(dst->ptr, src.ptr, src.len); |
| dst->is_null = false; |
| dst->len = src.len; |
| } |
| |
| StringVal AggregateFunctions::StringValGetValue( |
| FunctionContext* ctx, const StringVal& src) { |
| if (src.is_null) return src; |
| return StringVal::CopyFrom(ctx, src.ptr, src.len); |
| } |
| |
| StringVal AggregateFunctions::StringValSerializeOrFinalize( |
| FunctionContext* ctx, const StringVal& src) { |
| StringVal result = StringValGetValue(ctx, src); |
| if (!src.is_null) ctx->Free(src.ptr); |
| return result; |
| } |
| |
| void AggregateFunctions::CountUpdate( |
| FunctionContext*, const AnyVal& src, BigIntVal* dst) { |
| DCHECK(!dst->is_null); |
| if (!src.is_null) ++dst->val; |
| } |
| |
| void AggregateFunctions::CountStarUpdate(FunctionContext*, BigIntVal* dst) { |
| DCHECK(!dst->is_null); |
| ++dst->val; |
| } |
| |
| void AggregateFunctions::CountRemove( |
| FunctionContext*, const AnyVal& src, BigIntVal* dst) { |
| DCHECK(!dst->is_null); |
| if (!src.is_null) { |
| --dst->val; |
| DCHECK_GE(dst->val, 0); |
| } |
| } |
| |
| void AggregateFunctions::CountStarRemove(FunctionContext*, BigIntVal* dst) { |
| DCHECK(!dst->is_null); |
| --dst->val; |
| DCHECK_GE(dst->val, 0); |
| } |
| |
| void AggregateFunctions::CountMerge(FunctionContext*, const BigIntVal& src, |
| BigIntVal* dst) { |
| DCHECK(!dst->is_null); |
| DCHECK(!src.is_null); |
| dst->val += src.val; |
| } |
| |
| struct AvgState { |
| double sum; |
| int64_t count; |
| }; |
| |
| void AggregateFunctions::AvgInit(FunctionContext* ctx, StringVal* dst) { |
| // avg() uses a preallocated FIXED_UDA_INTERMEDIATE intermediate value. |
| DCHECK_EQ(dst->len, sizeof(AvgState)); |
| AvgState* avg = reinterpret_cast<AvgState*>(dst->ptr); |
| avg->sum = 0.0; |
| avg->count = 0; |
| } |
| |
| template <typename T> |
| void AggregateFunctions::AvgUpdate(FunctionContext* ctx, const T& src, StringVal* dst) { |
| if (src.is_null) return; |
| DCHECK(dst->ptr != NULL); |
| DCHECK_EQ(sizeof(AvgState), dst->len); |
| AvgState* avg = reinterpret_cast<AvgState*>(dst->ptr); |
| avg->sum += src.val; |
| ++avg->count; |
| } |
| |
| template <typename T> |
| void AggregateFunctions::AvgRemove(FunctionContext* ctx, const T& src, StringVal* dst) { |
| // Remove doesn't need to explicitly check the number of calls to Update() or Remove() |
| // because Finalize() returns NULL if count is 0. |
| if (src.is_null) return; |
| DCHECK(dst->ptr != NULL); |
| DCHECK_EQ(sizeof(AvgState), dst->len); |
| AvgState* avg = reinterpret_cast<AvgState*>(dst->ptr); |
| avg->sum -= src.val; |
| --avg->count; |
| DCHECK_GE(avg->count, 0); |
| } |
| |
| void AggregateFunctions::AvgMerge(FunctionContext* ctx, const StringVal& src, |
| StringVal* dst) { |
| const AvgState* src_struct = reinterpret_cast<const AvgState*>(src.ptr); |
| DCHECK(dst->ptr != NULL); |
| DCHECK_EQ(sizeof(AvgState), dst->len); |
| AvgState* dst_struct = reinterpret_cast<AvgState*>(dst->ptr); |
| dst_struct->sum += src_struct->sum; |
| dst_struct->count += src_struct->count; |
| } |
| |
| DoubleVal AggregateFunctions::AvgGetValue(FunctionContext* ctx, const StringVal& src) { |
| AvgState* val_struct = reinterpret_cast<AvgState*>(src.ptr); |
| if (val_struct->count == 0) return DoubleVal::null(); |
| return DoubleVal(val_struct->sum / val_struct->count); |
| } |
| |
| DoubleVal AggregateFunctions::AvgFinalize(FunctionContext* ctx, const StringVal& src) { |
| if (UNLIKELY(src.is_null)) return DoubleVal::null(); |
| DoubleVal result = AvgGetValue(ctx, src); |
| return result; |
| } |
| |
| void AggregateFunctions::TimestampAvgUpdate(FunctionContext* ctx, |
| const TimestampVal& src, StringVal* dst) { |
| if (src.is_null) return; |
| DCHECK(dst->ptr != NULL); |
| DCHECK_EQ(sizeof(AvgState), dst->len); |
| AvgState* avg = reinterpret_cast<AvgState*>(dst->ptr); |
| const TimestampValue& tm_src = TimestampValue::FromTimestampVal(src); |
| double val; |
| if (tm_src.ToSubsecondUnixTime(ctx->impl()->state()->local_time_zone(), &val)) { |
| avg->sum += val; |
| ++avg->count; |
| } |
| } |
| |
| void AggregateFunctions::TimestampAvgRemove(FunctionContext* ctx, |
| const TimestampVal& src, StringVal* dst) { |
| if (src.is_null) return; |
| DCHECK(dst->ptr != NULL); |
| DCHECK_EQ(sizeof(AvgState), dst->len); |
| AvgState* avg = reinterpret_cast<AvgState*>(dst->ptr); |
| const TimestampValue& tm_src = TimestampValue::FromTimestampVal(src); |
| double val; |
| if (tm_src.ToSubsecondUnixTime(ctx->impl()->state()->local_time_zone(), &val)) { |
| avg->sum -= val; |
| --avg->count; |
| DCHECK_GE(avg->count, 0); |
| } |
| } |
| |
| TimestampVal AggregateFunctions::TimestampAvgGetValue(FunctionContext* ctx, |
| const StringVal& src) { |
| AvgState* val_struct = reinterpret_cast<AvgState*>(src.ptr); |
| if (val_struct->count == 0) return TimestampVal::null(); |
| const TimestampValue& tv = TimestampValue::FromSubsecondUnixTime( |
| val_struct->sum / val_struct->count, ctx->impl()->state()->local_time_zone()); |
| if (tv.HasDate()) { |
| TimestampVal result; |
| tv.ToTimestampVal(&result); |
| return result; |
| } else { |
| return TimestampVal::null(); |
| } |
| } |
| |
| TimestampVal AggregateFunctions::TimestampAvgFinalize(FunctionContext* ctx, |
| const StringVal& src) { |
| if (UNLIKELY(src.is_null)) return TimestampVal::null(); |
| TimestampVal result = TimestampAvgGetValue(ctx, src); |
| return result; |
| } |
| |
| // We saw some failures on the release build because GCC was emitting an instruction |
| // to operate on the int128_t that assumed the pointer was aligned (typically it isn't). |
| // We mark the struct with a "packed" attribute, so that the compiler does not expect it |
| // to be aligned. This should not have a negative performance impact on modern CPUs. |
| struct __attribute__ ((__packed__)) DecimalAvgState { |
| __int128_t sum_val16; // Always uses max precision decimal. |
| int64_t count; |
| }; |
| |
| void AggregateFunctions::DecimalAvgInit(FunctionContext* ctx, StringVal* dst) { |
| // avg() uses a preallocated FIXED_UDA_INTERMEDIATE intermediate value. |
| DCHECK_EQ(dst->len, sizeof(DecimalAvgState)); |
| DecimalAvgState* avg = reinterpret_cast<DecimalAvgState*>(dst->ptr); |
| avg->sum_val16 = 0; |
| avg->count = 0; |
| } |
| |
| void AggregateFunctions::DecimalAvgUpdate(FunctionContext* ctx, const DecimalVal& src, |
| StringVal* dst) { |
| DecimalAvgAddOrRemove(ctx, src, dst, false); |
| } |
| |
| void AggregateFunctions::DecimalAvgRemove(FunctionContext* ctx, const DecimalVal& src, |
| StringVal* dst) { |
| DecimalAvgAddOrRemove(ctx, src, dst, true); |
| } |
| |
| // Always inline in IR so that constants can be replaced. |
| IR_ALWAYS_INLINE void AggregateFunctions::DecimalAvgAddOrRemove(FunctionContext* ctx, |
| const DecimalVal& src, StringVal* dst, bool remove) { |
| if (src.is_null) return; |
| DCHECK(dst->ptr != NULL); |
| DCHECK_EQ(sizeof(DecimalAvgState), dst->len); |
| DecimalAvgState* avg = reinterpret_cast<DecimalAvgState*>(dst->ptr); |
| bool decimal_v2 = ctx->impl()->GetConstFnAttr(FunctionContextImpl::DECIMAL_V2); |
| |
| // Since the src and dst are guaranteed to be the same scale, we can just |
| // do a simple add. |
| int m = remove ? -1 : 1; |
| switch (ctx->impl()->GetConstFnAttr(FunctionContextImpl::ARG_TYPE_SIZE, 0)) { |
| case 4: |
| avg->sum_val16 += m * src.val4; |
| if (UNLIKELY(decimal_v2 && |
| abs(avg->sum_val16) > DecimalUtil::MAX_UNSCALED_DECIMAL16)) { |
| ctx->SetError("Avg computation overflowed"); |
| } |
| break; |
| case 8: |
| avg->sum_val16 += m * src.val8; |
| if (UNLIKELY(decimal_v2 && |
| abs(avg->sum_val16) > DecimalUtil::MAX_UNSCALED_DECIMAL16)) { |
| ctx->SetError("Avg computation overflowed"); |
| } |
| break; |
| case 16: |
| if (UNLIKELY(decimal_v2 && (avg->sum_val16 >= 0) == (src.val16 >= 0) && |
| abs(avg->sum_val16) > DecimalUtil::MAX_UNSCALED_DECIMAL16 - abs(src.val16))) { |
| // We can't check for overflow after performing the addition like in the other |
| // cases because the result may not fit into int128. |
| ctx->SetError("Avg computation overflowed"); |
| } |
| avg->sum_val16 += m * src.val16; |
| break; |
| default: |
| DCHECK(false) << "Invalid byte size"; |
| } |
| if (remove) { |
| --avg->count; |
| DCHECK_GE(avg->count, 0); |
| } else { |
| ++avg->count; |
| } |
| } |
| |
| void AggregateFunctions::DecimalAvgMerge(FunctionContext* ctx, |
| const StringVal& src, StringVal* dst) { |
| const DecimalAvgState* src_struct = |
| reinterpret_cast<const DecimalAvgState*>(src.ptr); |
| DCHECK(dst->ptr != NULL); |
| DCHECK_EQ(sizeof(DecimalAvgState), dst->len); |
| DecimalAvgState* dst_struct = reinterpret_cast<DecimalAvgState*>(dst->ptr); |
| bool decimal_v2 = ctx->impl()->GetConstFnAttr(FunctionContextImpl::DECIMAL_V2); |
| bool overflow = decimal_v2 && |
| abs(dst_struct->sum_val16) > |
| DecimalUtil::MAX_UNSCALED_DECIMAL16 - abs(src_struct->sum_val16); |
| if (UNLIKELY(overflow)) ctx->SetError("Avg computation overflowed"); |
| dst_struct->sum_val16 = |
| ArithmeticUtil::AsUnsigned<std::plus>(dst_struct->sum_val16, src_struct->sum_val16); |
| dst_struct->count += src_struct->count; |
| } |
| |
| DecimalVal AggregateFunctions::DecimalAvgGetValue(FunctionContext* ctx, |
| const StringVal& src) { |
| DecimalAvgState* val_struct = reinterpret_cast<DecimalAvgState*>(src.ptr); |
| if (val_struct->count == 0) return DecimalVal::null(); |
| Decimal16Value sum(val_struct->sum_val16); |
| Decimal16Value count(val_struct->count); |
| |
| int output_precision = |
| ctx->impl()->GetConstFnAttr(FunctionContextImpl::RETURN_TYPE_PRECISION); |
| int output_scale = ctx->impl()->GetConstFnAttr(FunctionContextImpl::RETURN_TYPE_SCALE); |
| // The scale of the accumulated sum is set to the scale of the input type. |
| int sum_scale = ctx->impl()->GetConstFnAttr(FunctionContextImpl::ARG_TYPE_SCALE, 0); |
| bool is_nan = false; |
| bool overflow = false; |
| bool decimal_v2 = ctx->impl()->GetConstFnAttr(FunctionContextImpl::DECIMAL_V2); |
| Decimal16Value result = sum.Divide<int128_t>(sum_scale, count, 0 /* count's scale */, |
| output_precision, output_scale, decimal_v2, &is_nan, &overflow); |
| if (UNLIKELY(is_nan)) return DecimalVal::null(); |
| if (UNLIKELY(overflow)) { |
| if (decimal_v2) { |
| ctx->SetError("Avg computation overflowed"); |
| } else { |
| ctx->AddWarning("Avg computation overflowed, returning NULL"); |
| } |
| return DecimalVal::null(); |
| } |
| return DecimalVal(result.value()); |
| } |
| |
| DecimalVal AggregateFunctions::DecimalAvgFinalize(FunctionContext* ctx, |
| const StringVal& src) { |
| if (UNLIKELY(src.is_null)) return DecimalVal::null(); |
| DecimalVal result = DecimalAvgGetValue(ctx, src); |
| return result; |
| } |
| |
| template<typename SRC_VAL, typename DST_VAL> |
| void AggregateFunctions::SumUpdate(FunctionContext* ctx, const SRC_VAL& src, |
| DST_VAL* dst) { |
| if (src.is_null) { |
| // Do not count null values towards the number of updates |
| ctx->impl()->IncrementNumUpdates(-1); |
| return; |
| } |
| if (dst->is_null) InitZero<DST_VAL>(ctx, dst); |
| dst->val = ArithmeticUtil::Compute<std::plus, decltype(dst->val)>(dst->val, src.val); |
| } |
| |
| template<typename SRC_VAL, typename DST_VAL> |
| void AggregateFunctions::SumRemove(FunctionContext* ctx, const SRC_VAL& src, |
| DST_VAL* dst) { |
| // Do not count null values towards the number of removes |
| if (src.is_null) ctx->impl()->IncrementNumRemoves(-1); |
| if (ctx->impl()->num_removes() >= ctx->impl()->num_updates()) { |
| *dst = DST_VAL::null(); |
| return; |
| } |
| if (src.is_null) return; |
| if (dst->is_null) InitZero<DST_VAL>(ctx, dst); |
| dst->val -= src.val; |
| } |
| |
| void AggregateFunctions::SumDecimalUpdate(FunctionContext* ctx, |
| const DecimalVal& src, DecimalVal* dst) { |
| SumDecimalAddOrSubtract(ctx, src, dst); |
| } |
| |
| void AggregateFunctions::SumDecimalRemove(FunctionContext* ctx, |
| const DecimalVal& src, DecimalVal* dst) { |
| if (ctx->impl()->num_removes() >= ctx->impl()->num_updates()) { |
| *dst = DecimalVal::null(); |
| return; |
| } |
| SumDecimalAddOrSubtract(ctx, src, dst, true); |
| } |
| |
| // Always inline in IR so that constants can be replaced. |
| IR_ALWAYS_INLINE void AggregateFunctions::SumDecimalAddOrSubtract(FunctionContext* ctx, |
| const DecimalVal& src, DecimalVal* dst, bool subtract) { |
| if (src.is_null) return; |
| if (dst->is_null) InitZero<DecimalVal>(ctx, dst); |
| int precision = ctx->impl()->GetConstFnAttr(FunctionContextImpl::ARG_TYPE_PRECISION, 0); |
| bool decimal_v2 = ctx->impl()->GetConstFnAttr(FunctionContextImpl::DECIMAL_V2); |
| // Since the src and dst are guaranteed to be the same scale, we can just |
| // do a simple add. |
| int m = subtract ? -1 : 1; |
| if (precision <= 9) { |
| dst->val16 += m * src.val4; |
| if (UNLIKELY(decimal_v2 && |
| abs(dst->val16) > DecimalUtil::MAX_UNSCALED_DECIMAL16)) { |
| ctx->SetError("Sum computation overflowed"); |
| } |
| } else if (precision <= 19) { |
| dst->val16 += m * src.val8; |
| if (UNLIKELY(decimal_v2 && |
| abs(dst->val16) > DecimalUtil::MAX_UNSCALED_DECIMAL16)) { |
| ctx->SetError("Sum computation overflowed"); |
| } |
| } else { |
| if (UNLIKELY(decimal_v2 && (dst->val16 >= 0) == (src.val16 >= 0) && |
| abs(dst->val16) > DecimalUtil::MAX_UNSCALED_DECIMAL16 - abs(src.val16))) { |
| // We can't check for overflow after performing the addition like in the other |
| // cases because the result may not fit into int128. |
| ctx->SetError("Sum computation overflowed"); |
| } |
| dst->val16 += m * src.val16; |
| } |
| } |
| |
| void AggregateFunctions::SumDecimalMerge(FunctionContext* ctx, |
| const DecimalVal& src, DecimalVal* dst) { |
| if (src.is_null) return; |
| if (dst->is_null) InitZero<DecimalVal>(ctx, dst); |
| bool decimal_v2 = ctx->impl()->GetConstFnAttr(FunctionContextImpl::DECIMAL_V2); |
| bool overflow = decimal_v2 && |
| abs(dst->val16) > DecimalUtil::MAX_UNSCALED_DECIMAL16 - abs(src.val16); |
| if (UNLIKELY(overflow)) ctx->SetError("Sum computation overflowed"); |
| dst->val16 = ArithmeticUtil::AsUnsigned<std::plus>(dst->val16, src.val16); |
| } |
| |
| template<typename T> |
| void AggregateFunctions::Min(FunctionContext*, const T& src, T* dst) { |
| if (src.is_null) return; |
| if (dst->is_null || src.val < dst->val) *dst = src; |
| } |
| |
| template<typename T> |
| void AggregateFunctions::Max(FunctionContext*, const T& src, T* dst) { |
| if (src.is_null) return; |
| if (dst->is_null || src.val > dst->val) *dst = src; |
| } |
| |
| void AggregateFunctions::InitNullString(FunctionContext* c, StringVal* dst) { |
| dst->is_null = true; |
| dst->ptr = NULL; |
| dst->len = 0; |
| } |
| |
| // For DoubleVal and FloatVal, we have to handle NaN specially. If 'val' != 'val', then |
| // 'val' must be NaN, and if any of the values that are inserted are NaN, then we return |
| // NaN. So, if 'src.val != src.val', set 'dst' to it. |
| template <> |
| void AggregateFunctions::Min(FunctionContext*, const FloatVal& src, FloatVal* dst) { |
| if (src.is_null) return; |
| if (dst->is_null || src.val < dst->val || src.val != src.val) *dst = src; |
| } |
| |
| template <> |
| void AggregateFunctions::Max(FunctionContext*, const FloatVal& src, FloatVal* dst) { |
| if (src.is_null) return; |
| if (dst->is_null || src.val > dst->val || src.val != src.val) *dst = src; |
| } |
| |
| template <> |
| void AggregateFunctions::Min(FunctionContext*, const DoubleVal& src, DoubleVal* dst) { |
| if (src.is_null) return; |
| if (dst->is_null || src.val < dst->val || src.val != src.val) *dst = src; |
| } |
| |
| template <> |
| void AggregateFunctions::Max(FunctionContext*, const DoubleVal& src, DoubleVal* dst) { |
| if (src.is_null) return; |
| if (dst->is_null || src.val > dst->val || src.val != src.val) *dst = src; |
| } |
| |
| template<> |
| void AggregateFunctions::Min(FunctionContext* ctx, const StringVal& src, StringVal* dst) { |
| if (src.is_null) return; |
| if (dst->is_null || |
| StringValue::FromStringVal(src) < StringValue::FromStringVal(*dst)) { |
| if (!dst->is_null) ctx->Free(dst->ptr); |
| CopyStringVal(ctx, src, dst); |
| } |
| } |
| |
| template<> |
| void AggregateFunctions::Max(FunctionContext* ctx, const StringVal& src, StringVal* dst) { |
| if (src.is_null) return; |
| if (dst->is_null || |
| StringValue::FromStringVal(src) > StringValue::FromStringVal(*dst)) { |
| if (!dst->is_null) ctx->Free(dst->ptr); |
| CopyStringVal(ctx, src, dst); |
| } |
| } |
| |
| template<> |
| void AggregateFunctions::Min(FunctionContext* ctx, |
| const DecimalVal& src, DecimalVal* dst) { |
| if (src.is_null) return; |
| int precision = ctx->impl()->GetConstFnAttr(FunctionContextImpl::ARG_TYPE_PRECISION, 0); |
| if (precision <= 9) { |
| if (dst->is_null || src.val4 < dst->val4) *dst = src; |
| } else if (precision <= 19) { |
| if (dst->is_null || src.val8 < dst->val8) *dst = src; |
| } else { |
| if (dst->is_null || src.val16 < dst->val16) *dst = src; |
| } |
| } |
| |
| template<> |
| void AggregateFunctions::Max(FunctionContext* ctx, |
| const DecimalVal& src, DecimalVal* dst) { |
| if (src.is_null) return; |
| int precision = ctx->impl()->GetConstFnAttr(FunctionContextImpl::ARG_TYPE_PRECISION, 0); |
| if (precision <= 9) { |
| if (dst->is_null || src.val4 > dst->val4) *dst = src; |
| } else if (precision <= 19) { |
| if (dst->is_null || src.val8 > dst->val8) *dst = src; |
| } else { |
| if (dst->is_null || src.val16 > dst->val16) *dst = src; |
| } |
| } |
| |
| template<> |
| void AggregateFunctions::Min(FunctionContext*, |
| const TimestampVal& src, TimestampVal* dst) { |
| if (src.is_null) return; |
| if (dst->is_null) { |
| *dst = src; |
| return; |
| } |
| TimestampValue src_tv = TimestampValue::FromTimestampVal(src); |
| TimestampValue dst_tv = TimestampValue::FromTimestampVal(*dst); |
| if (src_tv < dst_tv) *dst = src; |
| } |
| |
| template<> |
| void AggregateFunctions::Max(FunctionContext*, |
| const TimestampVal& src, TimestampVal* dst) { |
| if (src.is_null) return; |
| if (dst->is_null) { |
| *dst = src; |
| return; |
| } |
| TimestampValue src_tv = TimestampValue::FromTimestampVal(src); |
| TimestampValue dst_tv = TimestampValue::FromTimestampVal(*dst); |
| if (src_tv > dst_tv) *dst = src; |
| } |
| |
| // StringConcat intermediate state starts with the length of the first |
| // separator, followed by the accumulated string. The accumulated |
| // string starts with the separator of the first value that arrived in |
| // StringConcatUpdate(). |
| typedef int StringConcatHeader; |
| |
| // Delimiter to use if the separator is not provided. |
| static inline StringVal ALWAYS_INLINE DefaultStringConcatDelim() { |
| return StringVal(reinterpret_cast<uint8_t*>(const_cast<char*>(", ")), 2); |
| } |
| |
| void AggregateFunctions::StringConcatUpdate( |
| FunctionContext* ctx, const StringVal& src, StringVal* result) { |
| StringConcatUpdate(ctx, src, DefaultStringConcatDelim(), result); |
| } |
| |
| void AggregateFunctions::StringConcatUpdate(FunctionContext* ctx, const StringVal& src, |
| const StringVal& separator, StringVal* result) { |
| if (src.is_null) return; |
| const StringVal default_delim = DefaultStringConcatDelim(); |
| const StringVal* sep = separator.is_null ? &default_delim : &separator; |
| if (result->is_null) { |
| // Header of the intermediate state holds the length of the first separator. |
| const int header_len = sizeof(StringConcatHeader); |
| DCHECK(header_len == sizeof(sep->len)); |
| AllocBuffer(ctx, result, header_len); |
| if (UNLIKELY(result->is_null)) { |
| DCHECK(!ctx->impl()->state()->GetQueryStatus().ok()); |
| return; |
| } |
| *reinterpret_cast<StringConcatHeader*>(result->ptr) = sep->len; |
| } |
| unsigned new_len = result->len + sep->len + src.len; |
| if (LIKELY(new_len <= StringVal::MAX_LENGTH)) { |
| uint8_t* ptr = ctx->Reallocate(result->ptr, new_len); |
| if (LIKELY(ptr != NULL)) { |
| memcpy(ptr + result->len, sep->ptr, sep->len); |
| memcpy(ptr + result->len + sep->len, src.ptr, src.len); |
| result->ptr = ptr; |
| result->len = new_len; |
| } else { |
| DCHECK(!ctx->impl()->state()->GetQueryStatus().ok()); |
| } |
| } else { |
| ctx->SetError("Concatenated string length larger than allowed limit of " |
| "1 GB character data."); |
| } |
| } |
| |
| void AggregateFunctions::StringConcatMerge(FunctionContext* ctx, |
| const StringVal& src, StringVal* result) { |
| if (src.is_null) return; |
| const int header_len = sizeof(StringConcatHeader); |
| if (result->is_null) { |
| AllocBuffer(ctx, result, header_len); |
| if (UNLIKELY(result->is_null)) { |
| DCHECK(!ctx->impl()->state()->GetQueryStatus().ok()); |
| return; |
| } |
| // Copy the header from the first intermediate value. |
| *reinterpret_cast<StringConcatHeader*>(result->ptr) = |
| *reinterpret_cast<StringConcatHeader*>(src.ptr); |
| } |
| // Append the string portion of the intermediate src to result (omit src's header). |
| unsigned buf_len = src.len - header_len; |
| unsigned new_len = result->len + buf_len; |
| if (LIKELY(new_len <= StringVal::MAX_LENGTH)) { |
| uint8_t* ptr = ctx->Reallocate(result->ptr, new_len); |
| if (LIKELY(ptr != NULL)) { |
| memcpy(ptr + result->len, src.ptr + header_len, buf_len); |
| result->ptr = ptr; |
| result->len = new_len; |
| } else { |
| DCHECK(!ctx->impl()->state()->GetQueryStatus().ok()); |
| } |
| } else { |
| ctx->SetError("Concatenated string length larger than allowed limit of " |
| "1 GB character data."); |
| } |
| } |
| |
| StringVal AggregateFunctions::StringConcatFinalize(FunctionContext* ctx, |
| const StringVal& src) { |
| if (UNLIKELY(src.is_null)) return src; |
| const int header_len = sizeof(StringConcatHeader); |
| DCHECK(src.len >= header_len); |
| int sep_len = *reinterpret_cast<StringConcatHeader*>(src.ptr); |
| DCHECK(src.len >= header_len + sep_len); |
| // Remove the header and the first separator. |
| StringVal result = StringVal::CopyFrom(ctx, src.ptr + header_len + sep_len, |
| src.len - header_len - sep_len); |
| ctx->Free(src.ptr); |
| return result; |
| } |
| |
| // Compute distinctpc and distinctpcsa using Flajolet and Martin's algorithm |
| // (Probabilistic Counting Algorithms for Data Base Applications) |
| // We have implemented two variants here: one with stochastic averaging (with PCSA |
| // postfix) and one without. |
| // There are 4 phases to compute the aggregate: |
| // 1. allocate a bitmap, stored in the aggregation tuple's output string slot |
| // 2. update the bitmap per row (UpdateDistinctEstimateSlot) |
| // 3. for distributed plan, merge the bitmaps from all the nodes |
| // (UpdateMergeEstimateSlot) |
| // 4. compute the estimate using the bitmaps when all the rows are processed |
| // (FinalizeEstimateSlot) |
| const static int NUM_PC_BITMAPS = 64; // number of bitmaps |
| const static int PC_BITMAP_LENGTH = 32; // the length of each bit map |
| const static float PC_THETA = 0.77351f; // the magic number to compute the final result |
| const static float PC_K = -1.75f; // the magic correction for low cardinalities |
| |
| // Size of the distinct estimate bit map - Probabilistic Counting Algorithms for Data |
| // Base Applications (Flajolet and Martin) |
| // |
| // The bitmap is a 64bit(1st index) x 32bit(2nd index) matrix. |
| // So, the string length of 256 byte is enough. |
| // The layout is: |
| // row 1: 8bit 8bit 8bit 8bit |
| // row 2: 8bit 8bit 8bit 8bit |
| // ... .. |
| // ... .. |
| // row 64: 8bit 8bit 8bit 8bit |
| // |
| // Using 32bit length, we can count up to 10^8. This will not be enough for Fact table |
| // primary key, but once we approach the limit, we could interpret the result as |
| // "every row is distinct". |
| const static int PC_INTERMEDIATE_BYTES = NUM_PC_BITMAPS * PC_BITMAP_LENGTH / 8; |
| |
| void AggregateFunctions::PcInit(FunctionContext* c, StringVal* dst) { |
| // The distinctpc*() functions use a preallocated FIXED_UDA_INTERMEDIATE intermediate |
| // value. |
| DCHECK_EQ(dst->len, PC_INTERMEDIATE_BYTES); |
| memset(dst->ptr, 0, PC_INTERMEDIATE_BYTES); |
| } |
| |
| static inline void SetDistinctEstimateBit(uint8_t* bitmap, |
| uint32_t row_index, uint32_t bit_index) { |
| // We need to convert Bitmap[alpha,index] into the index of the string. |
| // alpha tells which of the 32bit we've to jump to. |
| // index then lead us to the byte and bit. |
| uint32_t *int_bitmap = reinterpret_cast<uint32_t*>(bitmap); |
| int_bitmap[row_index] |= (1 << bit_index); |
| } |
| |
| static inline bool GetDistinctEstimateBit(uint8_t* bitmap, |
| uint32_t row_index, uint32_t bit_index) { |
| uint32_t *int_bitmap = reinterpret_cast<uint32_t*>(bitmap); |
| return ((int_bitmap[row_index] & (1 << bit_index)) > 0); |
| } |
| |
| template<typename T> |
| void AggregateFunctions::PcUpdate(FunctionContext* c, const T& input, StringVal* dst) { |
| DCHECK_EQ(dst->len, PC_INTERMEDIATE_BYTES); |
| if (input.is_null) return; |
| // Core of the algorithm. This is a direct translation of the code in the paper. |
| // Please see the paper for details. For simple averaging, we need to compute hash |
| // values NUM_PC_BITMAPS times using NUM_PC_BITMAPS different hash functions (by using a |
| // different seed). |
| for (int i = 0; i < NUM_PC_BITMAPS; ++i) { |
| uint32_t hash_value = AnyValUtil::Hash(input, *c->GetArgType(0), i); |
| const int bit_index = BitUtil::CountTrailingZeros(hash_value, PC_BITMAP_LENGTH - 1); |
| // Set bitmap[i, bit_index] to 1 |
| SetDistinctEstimateBit(dst->ptr, i, bit_index); |
| } |
| } |
| |
| template<typename T> |
| void AggregateFunctions::PcsaUpdate(FunctionContext* c, const T& input, StringVal* dst) { |
| DCHECK_EQ(dst->len, PC_INTERMEDIATE_BYTES); |
| if (input.is_null) return; |
| |
| // Core of the algorithm. This is a direct translation of the code in the paper. |
| // Please see the paper for details. Using stochastic averaging, we only need to |
| // the hash value once for each row. |
| uint32_t hash_value = AnyValUtil::Hash(input, *c->GetArgType(0), 0); |
| uint32_t row_index = hash_value % NUM_PC_BITMAPS; |
| |
| // We want the zero-based position of the least significant 1-bit in binary |
| // representation of hash_value. BitUtil::CountTrailingZeros(x,y) does exactly this |
| // because it returns the number of trailing 0-bits in x (or y if x is zero). |
| const int bit_index = |
| BitUtil::CountTrailingZeros(hash_value / NUM_PC_BITMAPS, PC_BITMAP_LENGTH - 1); |
| |
| // Set bitmap[row_index, bit_index] to 1 |
| SetDistinctEstimateBit(dst->ptr, row_index, bit_index); |
| } |
| |
| string DistinctEstimateBitMapToString(uint8_t* v) { |
| stringstream debugstr; |
| for (int i = 0; i < NUM_PC_BITMAPS; ++i) { |
| for (int j = 0; j < PC_BITMAP_LENGTH; ++j) { |
| // print bitmap[i][j] |
| debugstr << GetDistinctEstimateBit(v, i, j); |
| } |
| debugstr << "\n"; |
| } |
| debugstr << "\n"; |
| return debugstr.str(); |
| } |
| |
| void AggregateFunctions::PcMerge(FunctionContext* c, |
| const StringVal& src, StringVal* dst) { |
| DCHECK(!src.is_null); |
| DCHECK(!dst->is_null); |
| DCHECK_EQ(src.len, PC_INTERMEDIATE_BYTES); |
| DCHECK_EQ(dst->len, PC_INTERMEDIATE_BYTES); |
| |
| // Merge the bits |
| // I think _mm_or_ps can do it, but perf doesn't really matter here. We call this only |
| // once group per node. |
| for (int i = 0; i < PC_INTERMEDIATE_BYTES; ++i) { |
| *(dst->ptr + i) |= *(src.ptr + i); |
| } |
| |
| VLOG_ROW << "UpdateMergeEstimateSlot Src Bit map:\n" |
| << DistinctEstimateBitMapToString(src.ptr); |
| VLOG_ROW << "UpdateMergeEstimateSlot Dst Bit map:\n" |
| << DistinctEstimateBitMapToString(dst->ptr); |
| } |
| |
| static double DistinctEstimateFinalize(const StringVal& src) { |
| DCHECK(!src.is_null); |
| DCHECK_EQ(src.len, PC_INTERMEDIATE_BYTES); |
| VLOG_ROW << "FinalizeEstimateSlot Bit map:\n" |
| << DistinctEstimateBitMapToString(src.ptr); |
| |
| // We haven't processed any rows if none of the bits are set. Therefore, we have zero |
| // distinct rows. We're overwriting the result in the same string buffer we've |
| // allocated. |
| bool is_empty = true; |
| for (int i = 0; i < PC_INTERMEDIATE_BYTES; ++i) { |
| if (src.ptr[i] != 0) { |
| is_empty = false; |
| break; |
| } |
| } |
| if (is_empty) return 0; |
| |
| // Convert the bitmap to a number, please see the paper for details |
| // In short, we count the average number of leading 1s (per row) in the bit map. |
| // The number is proportional to the log2(1/NUM_PC_BITMAPS of the actual number of |
| // distinct). |
| // To get the actual number of distinct, we'll do 2^avg / PC_THETA. |
| // PC_THETA is a magic number. |
| int sum = 0; |
| for (int i = 0; i < NUM_PC_BITMAPS; ++i) { |
| int row_bit_count = 0; |
| // Count the number of leading ones for each row in the bitmap |
| // We could have used the build in __builtin_clz to count of number of leading zeros |
| // but we first need to invert the 1 and 0. |
| while (GetDistinctEstimateBit(src.ptr, i, row_bit_count) && |
| row_bit_count < PC_BITMAP_LENGTH) { |
| ++row_bit_count; |
| } |
| sum += row_bit_count; |
| } |
| double avg = static_cast<double>(sum) / static_cast<double>(NUM_PC_BITMAPS); |
| // We apply a correction for small cardinalities based on equation (6) from |
| // Scheuermann et al DialM-POMC '07 so the above equation becomes |
| // (2^avg - 2^PC_K*avg) / PC_THETA |
| double result = (pow(static_cast<double>(2), avg) - |
| pow(static_cast<double>(2), avg * PC_K)) / PC_THETA; |
| return result; |
| } |
| |
| BigIntVal AggregateFunctions::PcFinalize(FunctionContext* c, const StringVal& src) { |
| if (UNLIKELY(src.is_null)) return BigIntVal::null(); |
| double estimate = DistinctEstimateFinalize(src); |
| return static_cast<int64_t>(estimate); |
| } |
| |
| BigIntVal AggregateFunctions::PcsaFinalize(FunctionContext* c, const StringVal& src) { |
| if (UNLIKELY(src.is_null)) return BigIntVal::null(); |
| // When using stochastic averaging, the result has to be multiplied by NUM_PC_BITMAPS. |
| double estimate = DistinctEstimateFinalize(src) * NUM_PC_BITMAPS; |
| return static_cast<int64_t>(estimate); |
| } |
| |
| // Histogram constants |
| // TODO: Expose as constant argument parameters to the UDA. |
| const static int NUM_BUCKETS = 100; |
| const static int NUM_SAMPLES_PER_BUCKET = 200; |
| |
| template <typename T> |
| struct ReservoirSample { |
| // Sample value |
| T val; |
| // Key on which the samples are sorted. |
| double key; |
| |
| ReservoirSample() : key(-1) { } |
| ReservoirSample(const T& val) : val(val), key(-1) { } |
| |
| // Gets a copy of the sample value that allocates memory from ctx, if necessary. |
| T GetValue(FunctionContext* ctx) { return val; } |
| }; |
| |
| // Maximum length of a string sample. |
| const static int MAX_STRING_SAMPLE_LEN = 10; |
| |
| // Template specialization for StringVal because we do not store the StringVal itself. |
| // Instead, we keep fixed size arrays and truncate longer strings if necessary. |
| template <> |
| struct ReservoirSample<StringVal> { |
| uint8_t val[MAX_STRING_SAMPLE_LEN]; |
| int len; // Size of string (up to MAX_STRING_SAMPLE_LEN) |
| double key; |
| |
| ReservoirSample() : len(0), key(-1) { } |
| |
| ReservoirSample(const StringVal& string_val) : key(-1) { |
| len = min(string_val.len, MAX_STRING_SAMPLE_LEN); |
| memcpy(&val[0], string_val.ptr, len); |
| } |
| |
| // Gets a copy of the sample value that allocates memory from ctx, if necessary. |
| StringVal GetValue(FunctionContext* ctx) { |
| return StringVal::CopyFrom(ctx, &val[0], len); |
| } |
| }; |
| |
| template <typename T> |
| bool SampleValLess(const ReservoirSample<T>& i, const ReservoirSample<T>& j) { |
| return i.val.val < j.val.val; |
| } |
| |
| template <> |
| bool SampleValLess(const ReservoirSample<StringVal>& i, |
| const ReservoirSample<StringVal>& j) { |
| int n = min(i.len, j.len); |
| int result = memcmp(&i.val[0], &j.val[0], n); |
| if (result == 0) return i.len < j.len; |
| return result < 0; |
| } |
| |
| template <> |
| bool SampleValLess(const ReservoirSample<DecimalVal>& i, |
| const ReservoirSample<DecimalVal>& j) { |
| // Also handles val4 and val8 - the DecimalVal memory layout ensures the least |
| // significant bits overlap in memory. |
| return i.val.val16 < j.val.val16; |
| } |
| |
| template <> |
| bool SampleValLess(const ReservoirSample<TimestampVal>& i, |
| const ReservoirSample<TimestampVal>& j) { |
| if (i.val.date == j.val.date) return i.val.time_of_day < j.val.time_of_day; |
| else return i.val.date < j.val.date; |
| } |
| |
| template <typename T> |
| bool SampleKeyGreater(const ReservoirSample<T>& i, const ReservoirSample<T>& j) { |
| return i.key > j.key; |
| } |
| |
| // Keeps track of the current state of the reservoir sampling algorithm. The samples are |
| // stored in a dynamically sized array. Initially, the the samples array is stored in a |
| // separate memory allocation. This class is responsible for managing the memory of the |
| // array and reallocating when the array is full. When this object is serialized into an |
| // output buffer, the samples array is inlined into the output buffer as well. |
| template <typename T> |
| class ReservoirSampleState { |
| public: |
| ReservoirSampleState(FunctionContext* ctx) |
| : num_samples_(0), |
| capacity_(INIT_CAPACITY), |
| source_size_(0), |
| sample_array_inline_(false), |
| samples_(NULL) { |
| // Allocate some initial memory for the samples array. |
| size_t buffer_len = sizeof(ReservoirSample<T>) * capacity_; |
| uint8_t* ptr = ctx->Allocate(buffer_len); |
| if (ptr == NULL) { |
| DCHECK(!ctx->impl()->state()->GetQueryStatus().ok()); |
| return; |
| } |
| samples_ = reinterpret_cast<ReservoirSample<T>*>(ptr); |
| } |
| |
| // Returns a pointer to a ReservoirSample at idx. |
| ReservoirSample<T>* GetSample(int64_t idx) { |
| DCHECK(samples_ != NULL); |
| DCHECK_LT(idx, num_samples_); |
| DCHECK_LE(num_samples_, capacity_); |
| DCHECK_GE(idx, 0); |
| return &samples_[idx]; |
| } |
| |
| // Adds a sample and increments the source size. Doubles the capacity of the sample |
| // array if necessary. If max capacity is reached, randomly evicts a sample (as |
| // required by the algorithm). Returns false if the attempt to double the capacity |
| // fails, true otherwise. |
| bool AddSample(FunctionContext* ctx, const ReservoirSample<T>& s) { |
| DCHECK(samples_ != NULL); |
| DCHECK_LE(num_samples_, MAX_CAPACITY); |
| if (num_samples_ < MAX_CAPACITY) { |
| if (num_samples_ == capacity_) { |
| bool result = IncreaseCapacity(ctx, capacity_ * 2); |
| if (!result) return false; |
| } |
| DCHECK_LT(num_samples_, capacity_); |
| samples_[num_samples_++] = s; |
| } else { |
| DCHECK_EQ(num_samples_, MAX_CAPACITY); |
| DCHECK(!sample_array_inline_); |
| int64_t idx = GetNext64(source_size_); |
| if (idx < MAX_CAPACITY) samples_[idx] = s; |
| } |
| ++source_size_; |
| return true; |
| } |
| |
| // Same as above. |
| bool AddSample(FunctionContext* ctx, const T& s) { |
| return AddSample(ctx, ReservoirSample<T>(s)); |
| } |
| |
| // Returns a buffer with a serialized ReservoirSampleState and the array of samples it |
| // contains. The samples array must not be inlined; i.e. it must be in a separate memory |
| // allocation. Returns a buffer containing this object and inlined samples array. The |
| // memory containing this object and the samples array is freed. The serialized object |
| // in the output buffer requires a call to Deserialize() before use. |
| StringVal Serialize(FunctionContext* ctx) { |
| DCHECK(samples_ != NULL); |
| DCHECK(!sample_array_inline_); |
| // Assign keys to the samples that haven't been set (i.e. if serializing after |
| // Update()). In weighted reservoir sampling the keys are typically assigned as the |
| // sources are being sampled, but this requires maintaining the samples in sorted |
| // order (by key) and it accomplishes the same thing at this point because all data |
| // points coming into Update() get the same weight. When the samples are later merged, |
| // they do have different weights (set here) that are proportional to the source_size, |
| // i.e. samples selected from a larger stream are more likely to end up in the final |
| // sample set. In order to avoid the extra overhead in Update(), we approximate the |
| // keys by picking random numbers in the range |
| // [(SOURCE_SIZE - SAMPLE_SIZE)/(SOURCE_SIZE), 1]. This weights the keys by |
| // SOURCE_SIZE and implies that the samples picked had the highest keys, because |
| // values not sampled would have keys between 0 and |
| // (SOURCE_SIZE - SAMPLE_SIZE)/(SOURCE_SIZE). |
| for (int i = 0; i < num_samples_; ++i) { |
| if (samples_[i].key >= 0) continue; |
| int r = rand() % num_samples_; |
| samples_[i].key = ((double) source_size_ - r) / source_size_; |
| } |
| capacity_ = num_samples_; |
| sample_array_inline_ = true; |
| |
| size_t buffer_len = sizeof(ReservoirSampleState<T>) + |
| sizeof(ReservoirSample<T>) * num_samples_; |
| StringVal dst(ctx, buffer_len); |
| if (LIKELY(!dst.is_null)) { |
| memcpy(dst.ptr, reinterpret_cast<uint8_t*>(this), sizeof(ReservoirSampleState<T>)); |
| memcpy(dst.ptr + sizeof(ReservoirSampleState<T>), |
| reinterpret_cast<uint8_t*>(samples_), |
| sizeof(ReservoirSample<T>) * num_samples_); |
| } |
| ctx->Free(reinterpret_cast<uint8_t*>(samples_)); |
| ctx->Free(reinterpret_cast<uint8_t*>(this)); |
| return dst; |
| } |
| |
| // Updates the pointer to the samples array. Must be called before using this object in |
| // Merge(). |
| void Deserialize() { |
| DCHECK(sample_array_inline_); |
| samples_ = reinterpret_cast<ReservoirSample<T>*>(this + 1); |
| } |
| |
| // Merges the samples in "other_state" into the current state by following the |
| // reservoir sampling algorithm. If necessary, increases the capacity to fit the |
| // samples from "other_state". In the case of failure to increase the size of the |
| // array, returns. |
| void Merge(FunctionContext* ctx, ReservoirSampleState<T>* other_state) { |
| DCHECK(samples_ != NULL); |
| DCHECK_GT(capacity_, 0); |
| other_state->Deserialize(); |
| int src_idx = 0; |
| // We can increase the capacity significantly here and skip several doublings because |
| // we know the number of elements in the other state up front. |
| if (capacity_ < MAX_CAPACITY) { |
| int necessary_capacity = num_samples_ + other_state->num_samples(); |
| if (capacity_ < necessary_capacity) { |
| bool result = IncreaseCapacity(ctx, necessary_capacity); |
| if (!result) return; |
| } |
| } |
| |
| // First, fill up the dst samples if they don't already exist. The samples are now |
| // ordered as a min-heap on the key. |
| while (num_samples_ < MAX_CAPACITY && src_idx < other_state->num_samples()) { |
| DCHECK_GE(other_state->GetSample(src_idx)->key, 0); |
| bool result = AddSample(ctx, *other_state->GetSample(src_idx++)); |
| if (!result) return; |
| push_heap(&samples_[0], &samples_[num_samples_], SampleKeyGreater<T>); |
| } |
| |
| // Then for every sample from source, take the sample if the key is greater than |
| // the minimum key in the min-heap. |
| while (src_idx < other_state->num_samples()) { |
| DCHECK_GE(other_state->GetSample(src_idx)->key, 0); |
| if (other_state->GetSample(src_idx)->key > samples_[0].key) { |
| pop_heap(&samples_[0], &samples_[num_samples_], SampleKeyGreater<T>); |
| samples_[MAX_CAPACITY - 1] = *other_state->GetSample(src_idx); |
| push_heap(&samples_[0], &samples_[num_samples_], SampleKeyGreater<T>); |
| } |
| ++src_idx; |
| } |
| |
| source_size_ += other_state->source_size(); |
| } |
| |
| // Returns the median element. |
| T GetMedian(FunctionContext* ctx) { |
| if (num_samples_ == 0) return T::null(); |
| ReservoirSample<T>* mid_point = GetSample(num_samples_ / 2); |
| nth_element(&samples_[0], mid_point, &samples_[num_samples_], SampleValLess<T>); |
| return mid_point->GetValue(ctx); |
| } |
| |
| // Sorts the samples. |
| void SortSamples() { |
| sort(&samples_[0], &samples_[num_samples_], SampleValLess<T>); |
| } |
| |
| // Deletes this object by freeing the memory that contains the array of samples (if not |
| // inlined) and itself. |
| void Delete(FunctionContext* ctx) { |
| if (!sample_array_inline_) ctx->Free(reinterpret_cast<uint8_t*>(samples_)); |
| ctx->Free(reinterpret_cast<uint8_t*>(this)); |
| } |
| |
| int num_samples() { return num_samples_; } |
| int64_t source_size() { return source_size_; } |
| |
| private: |
| // The initial capacity of the samples array. |
| const static int INIT_CAPACITY = 16; |
| |
| // Maximum capacity of the samples array. |
| const static int MAX_CAPACITY = NUM_BUCKETS * NUM_SAMPLES_PER_BUCKET; |
| |
| // Number of collected samples. |
| int num_samples_; |
| |
| // Size of the "samples_" array. |
| int capacity_; |
| |
| // Number of values over which the samples were collected. |
| int64_t source_size_; |
| |
| // Random number generator for generating 64-bit integers |
| // TODO: Replace with mt19937_64 when upgrading boost |
| ranlux64_3 rng_; |
| |
| // True if the array of samples is in the same memory allocation as this object. If |
| // false, this object is responsible for freeing the memory. |
| bool sample_array_inline_; |
| |
| // Points to the array of ReservoirSamples. The array may be located inline (right after |
| // this object), or in a separate memory allocation. |
| ReservoirSample<T>* samples_; |
| |
| // Increases the capacity of the "samples_" array to "new_capacity" rounded up to a |
| // power of two by reallocating. Should only be called if the samples array is not |
| // inline. Returns false if the operation fails. |
| bool IncreaseCapacity(FunctionContext* ctx, int new_capacity) { |
| DCHECK(samples_ != NULL); |
| DCHECK(!sample_array_inline_); |
| DCHECK_LT(capacity_, MAX_CAPACITY); |
| DCHECK_GT(new_capacity, capacity_); |
| new_capacity = BitUtil::RoundUpToPowerOfTwo(new_capacity); |
| if (new_capacity > MAX_CAPACITY) new_capacity = MAX_CAPACITY; |
| size_t buffer_len = sizeof(ReservoirSample<T>) * new_capacity; |
| uint8_t* ptr = ctx->Reallocate(reinterpret_cast<uint8_t*>(samples_), buffer_len); |
| if (ptr == NULL) { |
| DCHECK(!ctx->impl()->state()->GetQueryStatus().ok()); |
| return false; |
| } |
| samples_ = reinterpret_cast<ReservoirSample<T>*>(ptr); |
| capacity_ = new_capacity; |
| return true; |
| } |
| |
| // Returns a random integer in the range [0, max]. |
| int64_t GetNext64(int64_t max) { |
| uniform_int<int64_t> dist(0, max); |
| return dist(rng_); |
| } |
| }; |
| |
| template <typename T> |
| void AggregateFunctions::ReservoirSampleInit(FunctionContext* ctx, StringVal* dst) { |
| AllocBuffer(ctx, dst, sizeof(ReservoirSampleState<T>)); |
| if (UNLIKELY(dst->is_null)) { |
| DCHECK(!ctx->impl()->state()->GetQueryStatus().ok()); |
| return; |
| } |
| ReservoirSampleState<T>* dst_state = |
| reinterpret_cast<ReservoirSampleState<T>*>(dst->ptr); |
| *dst_state = ReservoirSampleState<T>(ctx); |
| } |
| |
| template <typename T> |
| void AggregateFunctions::ReservoirSampleUpdate(FunctionContext* ctx, const T& src, |
| StringVal* dst) { |
| if (src.is_null) return; |
| DCHECK(!dst->is_null); |
| ReservoirSampleState<T>* dst_state = |
| reinterpret_cast<ReservoirSampleState<T>*>(dst->ptr); |
| dst_state->AddSample(ctx, src); |
| } |
| |
| template <typename T> |
| StringVal AggregateFunctions::ReservoirSampleSerialize(FunctionContext* ctx, |
| const StringVal& src) { |
| if (UNLIKELY(src.is_null)) return src; |
| ReservoirSampleState<T>* src_state = |
| reinterpret_cast<ReservoirSampleState<T>*>(src.ptr); |
| StringVal result = src_state->Serialize(ctx); |
| return result; |
| } |
| |
| template <typename T> |
| void AggregateFunctions::ReservoirSampleMerge(FunctionContext* ctx, |
| const StringVal& src, StringVal* dst) { |
| if (src.is_null) return; |
| DCHECK(!dst->is_null); |
| DCHECK(!src.is_null); |
| ReservoirSampleState<T>* src_state = |
| reinterpret_cast<ReservoirSampleState<T>*>(src.ptr); |
| ReservoirSampleState<T>* dst_state = |
| reinterpret_cast<ReservoirSampleState<T>*>(dst->ptr); |
| dst_state->Merge(ctx, src_state); |
| } |
| |
| template <typename T> |
| void PrintSample(const ReservoirSample<T>& v, ostream* os) { *os << v.val.val; } |
| |
| template <> |
| void PrintSample(const ReservoirSample<TinyIntVal>& v, ostream* os) { |
| *os << static_cast<int32_t>(v.val.val); |
| } |
| |
| template <> |
| void PrintSample(const ReservoirSample<StringVal>& v, ostream* os) { |
| string s(reinterpret_cast<const char*>(&v.val[0]), v.len); |
| *os << s; |
| } |
| |
| template <> |
| void PrintSample(const ReservoirSample<DecimalVal>& v, ostream* os) { |
| // Also handles val4 and val8 - the DecimalVal memory layout ensures the least |
| // significant bits overlap in memory. |
| *os << v.val.val16; |
| } |
| |
| template <> |
| void PrintSample(const ReservoirSample<TimestampVal>& v, ostream* os) { |
| *os << TimestampValue::FromTimestampVal(v.val).ToString(); |
| } |
| |
| template <> |
| void PrintSample(const ReservoirSample<DateVal>& v, ostream* os) { |
| *os << DateValue::FromDateVal(v.val); |
| } |
| |
| template <typename T> |
| StringVal AggregateFunctions::ReservoirSampleFinalize(FunctionContext* ctx, |
| const StringVal& src) { |
| if (UNLIKELY(src.is_null)) return src; |
| ReservoirSampleState<T>* src_state = |
| reinterpret_cast<ReservoirSampleState<T>*>(src.ptr); |
| |
| stringstream out; |
| for (int i = 0; i < src_state->num_samples(); ++i) { |
| PrintSample<T>(*src_state->GetSample(i), &out); |
| if (i < (src_state->num_samples() - 1)) out << ", "; |
| } |
| const string& out_str = out.str(); |
| StringVal result_str = StringVal::CopyFrom(ctx, |
| reinterpret_cast<const uint8_t*>(out_str.c_str()), out_str.size()); |
| src_state->Delete(ctx); |
| return result_str; |
| } |
| |
| template <typename T> |
| StringVal AggregateFunctions::HistogramFinalize(FunctionContext* ctx, |
| const StringVal& src) { |
| if (UNLIKELY(src.is_null)) return src; |
| |
| ReservoirSampleState<T>* src_state = |
| reinterpret_cast<ReservoirSampleState<T>*>(src.ptr); |
| src_state->SortSamples(); |
| |
| stringstream out; |
| int num_buckets = min(src_state->num_samples(), NUM_BUCKETS); |
| int samples_per_bucket = max(src_state->num_samples() / NUM_BUCKETS, 1); |
| for (int bucket_idx = 0; bucket_idx < num_buckets; ++bucket_idx) { |
| int sample_idx = (bucket_idx + 1) * samples_per_bucket - 1; |
| PrintSample<T>(*(src_state->GetSample(sample_idx)), &out); |
| if (bucket_idx < (num_buckets - 1)) out << ", "; |
| } |
| const string& out_str = out.str(); |
| StringVal result_str = StringVal::CopyFrom(ctx, |
| reinterpret_cast<const uint8_t*>(out_str.c_str()), out_str.size()); |
| src_state->Delete(ctx); |
| return result_str; |
| } |
| |
| template <typename T> |
| T AggregateFunctions::AppxMedianFinalize(FunctionContext* ctx, const StringVal& src) { |
| if (UNLIKELY(src.is_null)) return T::null(); |
| ReservoirSampleState<T>* src_state = |
| reinterpret_cast<ReservoirSampleState<T>*>(src.ptr); |
| T result = src_state->GetMedian(ctx); |
| src_state->Delete(ctx); |
| return result; |
| } |
| |
| void AggregateFunctions::HllInit(FunctionContext* ctx, StringVal* dst) { |
| // The HLL functions use a preallocated FIXED_UDA_INTERMEDIATE intermediate value. |
| DCHECK_EQ(dst->len, HLL_LEN); |
| memset(dst->ptr, 0, HLL_LEN); |
| } |
| |
| template <typename T> |
| void AggregateFunctions::HllUpdate(FunctionContext* ctx, const T& src, StringVal* dst) { |
| if (src.is_null) return; |
| DCHECK(!dst->is_null); |
| DCHECK_EQ(dst->len, HLL_LEN); |
| uint64_t hash_value = |
| AnyValUtil::Hash64(src, *ctx->GetArgType(0), HashUtil::FNV64_SEED); |
| // Use the lower bits to index into the number of streams and then find the first 1 bit |
| // after the index bits. |
| int idx = hash_value & (HLL_LEN - 1); |
| const uint8_t first_one_bit = |
| 1 + BitUtil::CountTrailingZeros( |
| hash_value >> HLL_PRECISION, sizeof(hash_value) * CHAR_BIT - HLL_PRECISION); |
| dst->ptr[idx] = ::max(dst->ptr[idx], first_one_bit); |
| } |
| |
| // Specialize for DecimalVal to allow substituting decimal size. |
| template <> |
| void AggregateFunctions::HllUpdate( |
| FunctionContext* ctx, const DecimalVal& src, StringVal* dst) { |
| if (src.is_null) return; |
| DCHECK(!dst->is_null); |
| DCHECK_EQ(dst->len, HLL_LEN); |
| int byte_size = ctx->impl()->GetConstFnAttr(FunctionContextImpl::ARG_TYPE_SIZE, 0); |
| uint64_t hash_value = AnyValUtil::HashDecimal64(src, byte_size, HashUtil::FNV64_SEED); |
| if (hash_value != 0) { |
| // Use the lower bits to index into the number of streams and then |
| // find the first 1 bit after the index bits. |
| int idx = hash_value & (HLL_LEN - 1); |
| uint8_t first_one_bit = __builtin_ctzl(hash_value >> HLL_PRECISION) + 1; |
| dst->ptr[idx] = ::max(dst->ptr[idx], first_one_bit); |
| } |
| } |
| |
| void AggregateFunctions::HllMerge( |
| FunctionContext* ctx, const StringVal& src, StringVal* dst) { |
| DCHECK(!dst->is_null); |
| DCHECK(!src.is_null); |
| DCHECK_EQ(dst->len, HLL_LEN); |
| DCHECK_EQ(src.len, HLL_LEN); |
| for (int i = 0; i < src.len; ++i) { |
| dst->ptr[i] = ::max(dst->ptr[i], src.ptr[i]); |
| } |
| } |
| |
| uint64_t AggregateFunctions::HllFinalEstimate(const uint8_t* buckets) { |
| DCHECK(buckets != NULL); |
| |
| // Empirical constants for the algorithm. |
| float alpha = 0; |
| if (HLL_LEN == 16) { |
| alpha = 0.673f; |
| } else if (HLL_LEN == 32) { |
| alpha = 0.697f; |
| } else if (HLL_LEN == 64) { |
| alpha = 0.709f; |
| } else { |
| alpha = 0.7213f / (1 + 1.079f / HLL_LEN); |
| } |
| |
| float harmonic_mean = 0; |
| int num_zero_registers = 0; |
| for (int i = 0; i < HLL_LEN; ++i) { |
| harmonic_mean += ldexp(1.0f, -buckets[i]); |
| if (buckets[i] == 0) ++num_zero_registers; |
| } |
| harmonic_mean = 1.0f / harmonic_mean; |
| int64_t estimate = alpha * HLL_LEN * HLL_LEN * harmonic_mean; |
| // Adjust for Hll bias based on Hll++ algorithm |
| if (estimate <= 5 * HLL_LEN) { |
| estimate -= HllEstimateBias(estimate); |
| } |
| |
| if (num_zero_registers == 0) return estimate; |
| |
| // Estimated cardinality is too low. Hll is too inaccurate here, instead use |
| // linear counting. |
| int64_t h = HLL_LEN * log(static_cast<float>(HLL_LEN) / num_zero_registers); |
| |
| return (h <= HllThreshold(HLL_PRECISION)) ? h : estimate; |
| } |
| |
| BigIntVal AggregateFunctions::HllFinalize(FunctionContext* ctx, const StringVal& src) { |
| if (UNLIKELY(src.is_null)) return BigIntVal::null(); |
| uint64_t estimate = HllFinalEstimate(src.ptr); |
| return estimate; |
| } |
| |
| /// Intermediate aggregation state for the SampledNdv() function. |
| /// Stores NUM_HLL_BUCKETS of the form <row_count, hll_state>. |
| /// The 'row_count' keeps track of how many input rows were aggregated into that |
| /// bucket, and the 'hll_state' is an intermediate aggregation state of HyperLogLog. |
| /// See the header comments on the SampledNdv() function for more details. |
| class SampledNdvState { |
| public: |
| /// Empirically determined number of HLL buckets. Power of two for fast modulo. |
| static const uint32_t NUM_HLL_BUCKETS = 32; |
| |
| /// A bucket contains an update count and an HLL intermediate state. |
| static constexpr int64_t BUCKET_SIZE = sizeof(int64_t) + AggregateFunctions::HLL_LEN; |
| |
| /// Sampling percent which was given as the second argument to SampledNdv(). |
| /// Stored here to avoid existing issues with passing constant arguments to all |
| /// aggregation phases and because we convert the sampling percent argument from |
| /// decimal to double. See IMPALA-6179. |
| double sample_perc; |
| |
| /// Counts the number of Update() calls. Used for determining which bucket to update. |
| int64_t total_row_count; |
| |
| /// Array of buckets. |
| struct { |
| int64_t row_count; |
| uint8_t hll[AggregateFunctions::HLL_LEN]; |
| } buckets[NUM_HLL_BUCKETS]; |
| }; |
| |
| void AggregateFunctions::SampledNdvInit(FunctionContext* ctx, StringVal* dst) { |
| // Uses a preallocated FIXED_UDA_INTERMEDIATE intermediate value. |
| DCHECK_EQ(dst->len, sizeof(SampledNdvState)); |
| memset(dst->ptr, 0, sizeof(SampledNdvState)); |
| |
| DoubleVal* sample_perc = reinterpret_cast<DoubleVal*>(ctx->GetConstantArg(1)); |
| if (sample_perc == nullptr) return; |
| // Guaranteed by the FE. |
| DCHECK(!sample_perc->is_null); |
| DCHECK_GE(sample_perc->val, 0.0); |
| DCHECK_LE(sample_perc->val, 1.0); |
| SampledNdvState* state = reinterpret_cast<SampledNdvState*>(dst->ptr); |
| state->sample_perc = sample_perc->val; |
| } |
| |
| /// Incorporate the 'src' into one of the intermediate HLLs, which will be used by |
| /// Finalize() to generate a set of the (x,y) data points. |
| template <typename T> |
| void AggregateFunctions::SampledNdvUpdate(FunctionContext* ctx, const T& src, |
| const DoubleVal& sample_perc, StringVal* dst) { |
| SampledNdvState* state = reinterpret_cast<SampledNdvState*>(dst->ptr); |
| int64_t bucket_idx = state->total_row_count % SampledNdvState::NUM_HLL_BUCKETS; |
| StringVal hll_dst = StringVal(state->buckets[bucket_idx].hll, HLL_LEN); |
| HllUpdate(ctx, src, &hll_dst); |
| ++state->buckets[bucket_idx].row_count; |
| ++state->total_row_count; |
| } |
| |
| void AggregateFunctions::SampledNdvMerge(FunctionContext* ctx, const StringVal& src, |
| StringVal* dst) { |
| SampledNdvState* src_state = reinterpret_cast<SampledNdvState*>(src.ptr); |
| SampledNdvState* dst_state = reinterpret_cast<SampledNdvState*>(dst->ptr); |
| for (int i = 0; i < SampledNdvState::NUM_HLL_BUCKETS; ++i) { |
| StringVal src_hll = StringVal(src_state->buckets[i].hll, HLL_LEN); |
| StringVal dst_hll = StringVal(dst_state->buckets[i].hll, HLL_LEN); |
| HllMerge(ctx, src_hll, &dst_hll); |
| dst_state->buckets[i].row_count += src_state->buckets[i].row_count; |
| } |
| // Total count. Not really needed after Update() but kept for sanity checking. |
| dst_state->total_row_count += src_state->total_row_count; |
| // Propagate sampling percent to Finalize(). |
| dst_state->sample_perc = src_state->sample_perc; |
| } |
| |
| BigIntVal AggregateFunctions::SampledNdvFinalize(FunctionContext* ctx, |
| const StringVal& src) { |
| SampledNdvState* state = reinterpret_cast<SampledNdvState*>(src.ptr); |
| |
| // Generate 'num_points' data points with x=row_count and y=ndv_estimate. These points |
| // are used to fit a function for the NDV growth and estimate the real NDV. |
| constexpr int num_points = |
| SampledNdvState::NUM_HLL_BUCKETS * SampledNdvState::NUM_HLL_BUCKETS; |
| int64_t counts[num_points] = { 0 }; |
| int64_t ndvs[num_points] = { 0 }; |
| |
| int64_t min_ndv = numeric_limits<int64_t>::max(); |
| int64_t min_count = numeric_limits<int64_t>::max(); |
| // We have a fixed number of HLL intermediates to generate data points. Any unique |
| // subset of intermediates can be combined to create a new data point. It was |
| // empirically determined that 'num_data' points is typically sufficient and there are |
| // diminishing returns from generating additional data points. |
| // The generation method below was chosen for its simplicity. It successively merges |
| // buckets in a rolling window of size NUM_HLL_BUCKETS. Repeating the last data point |
| // where all buckets are merged biases the curve fitting to hit that data point which |
| // makes sense because that's likely the most accurate one. The number of data points |
| // are sufficient for reasonable accuracy. |
| int pidx = 0; |
| for (int i = 0; i < SampledNdvState::NUM_HLL_BUCKETS; ++i) { |
| uint8_t merged_hll_data[HLL_LEN]; |
| memset(merged_hll_data, 0, HLL_LEN); |
| StringVal merged_hll(merged_hll_data, HLL_LEN); |
| int64_t merged_count = 0; |
| for (int j = 0; j < SampledNdvState::NUM_HLL_BUCKETS; ++j) { |
| int bucket_idx = (i + j) % SampledNdvState::NUM_HLL_BUCKETS; |
| merged_count += state->buckets[bucket_idx].row_count; |
| counts[pidx] = merged_count; |
| StringVal hll = StringVal(state->buckets[bucket_idx].hll, HLL_LEN); |
| HllMerge(ctx, hll, &merged_hll); |
| ndvs[pidx] = HllFinalEstimate(merged_hll.ptr); |
| ++pidx; |
| } |
| min_count = std::min(min_count, state->buckets[i].row_count); |
| min_ndv = std::min(min_ndv, ndvs[i * SampledNdvState::NUM_HLL_BUCKETS]); |
| } |
| // Based on the point-generation method above the last elements represent the data |
| // point where all buckets are merged. |
| int64_t max_count = counts[num_points - 1]; |
| int64_t max_ndv = ndvs[num_points - 1]; |
| |
| // Scale all values to [0,1] since some objective functions require it (e.g., Sigmoid). |
| double count_scale = max_count - min_count; |
| double ndv_scale = max_ndv - min_ndv; |
| if (count_scale == 0) count_scale = 1.0; |
| if (ndv_scale == 0) ndv_scale = 1.0; |
| double scaled_counts[num_points]; |
| double scaled_ndvs[num_points]; |
| for (int i = 0; i < num_points; ++i) { |
| scaled_counts[i] = counts[i] / count_scale; |
| scaled_ndvs[i] = ndvs[i] / ndv_scale; |
| } |
| |
| // List of objective functions. Curve fitting will select the best values for the |
| // parameters a, b, c, d. |
| vector<ObjectiveFunction> ndv_fns; |
| // Linear function: f(x) = a + b * x |
| ndv_fns.push_back(ObjectiveFunction("LIN", 2, |
| [](double x, const double* params) -> double { |
| return params[0] + params[1] * x; |
| } |
| )); |
| // Logarithmic function: f(x) = a + b * log(x) |
| ndv_fns.push_back(ObjectiveFunction("LOG", 2, |
| [](double x, const double* params) -> double { |
| return params[0] + params[1] * log(x); |
| } |
| )); |
| // Power function: f(x) = a + b * pow(x, c) |
| ndv_fns.push_back(ObjectiveFunction("POW", 3, |
| [](double x, const double* params) -> double { |
| return params[0] + params[1] * pow(x, params[2]); |
| } |
| )); |
| // Sigmoid function: f(x) = a + b * (c / (c + pow(d, -x))) |
| ndv_fns.push_back(ObjectiveFunction("SIG", 4, |
| [](double x, const double* params) -> double { |
| return params[0] + params[1] * (params[2] / (params[2] + pow(params[3], -x))); |
| } |
| )); |
| |
| // Perform least mean squares fitting on all objective functions. |
| vector<ObjectiveFunction> valid_ndv_fns; |
| for (ObjectiveFunction& f: ndv_fns) { |
| if(f.LmsFit(scaled_counts, scaled_ndvs, num_points)) { |
| valid_ndv_fns.push_back(std::move(f)); |
| } |
| } |
| |
| // Select the best-fit function for estimating the NDV. |
| auto best_fit_fn = min_element(valid_ndv_fns.begin(), valid_ndv_fns.end(), |
| [](const ObjectiveFunction& a, const ObjectiveFunction& b) -> bool { |
| return a.GetError() < b.GetError(); |
| } |
| ); |
| |
| // Compute the extrapolated NDV based on the extrapolated row count. |
| double extrap_count = max_count / state->sample_perc; |
| double scaled_extrap_count = extrap_count / count_scale; |
| double scaled_extrap_ndv = best_fit_fn->GetY(scaled_extrap_count); |
| return round(scaled_extrap_ndv * ndv_scale); |
| } |
| |
| template <typename T> |
| void AggregateFunctions::AggIfUpdate( |
| FunctionContext* ctx, const BooleanVal& cond, const T& src, T* dst) { |
| DCHECK(!cond.is_null); |
| if (cond.val) *dst = src; |
| } |
| |
| template <> |
| void AggregateFunctions::AggIfUpdate( |
| FunctionContext* ctx, const BooleanVal& cond, const StringVal& src, StringVal* dst) { |
| DCHECK(!cond.is_null); |
| if (cond.val) CopyStringVal(ctx, src, dst); |
| } |
| |
| template <typename T> |
| void AggregateFunctions::AggIfMerge(FunctionContext*, const T& src, T* dst) { |
| *dst = src; |
| } |
| |
| template <> |
| void AggregateFunctions::AggIfMerge( |
| FunctionContext* ctx, const StringVal& src, StringVal* dst) { |
| CopyStringVal(ctx, src, dst); |
| } |
| |
| template <typename T> |
| T AggregateFunctions::AggIfFinalize(FunctionContext*, const T& src) { |
| return src; |
| } |
| |
| template <> |
| StringVal AggregateFunctions::AggIfFinalize(FunctionContext* ctx, const StringVal& src) { |
| StringVal result = StringValGetValue(ctx, src); |
| if (!src.is_null) ctx->Free(src.ptr); |
| return result; |
| } |
| |
| // An implementation of a simple single pass variance algorithm. A standard UDA must |
| // be single pass (i.e. does not scan the table more than once), so the most canonical |
| // two pass approach is not practical. |
| struct KnuthVarianceState { |
| double mean; |
| double m2; |
| int64_t count; |
| }; |
| |
| // Set pop=true for population variance, false for sample variance |
| static double ComputeKnuthVariance(const KnuthVarianceState& state, bool pop) { |
| // Return zero for 1 tuple specified by |
| // http://docs.oracle.com/cd/B19306_01/server.102/b14200/functions212.htm |
| if (state.count == 1) return 0.0; |
| if (pop) return state.m2 / state.count; |
| return state.m2 / (state.count - 1); |
| } |
| |
| void AggregateFunctions::KnuthVarInit(FunctionContext* ctx, StringVal* dst) { |
| // The Knuth variance functions use a preallocated FIXED_UDA_INTERMEDIATE intermediate |
| // value. |
| DCHECK_EQ(dst->len, sizeof(KnuthVarianceState)); |
| memset(dst->ptr, 0, dst->len); |
| } |
| |
| template <typename T> |
| void AggregateFunctions::KnuthVarUpdate(FunctionContext* ctx, const T& src, |
| StringVal* dst) { |
| DCHECK(!dst->is_null); |
| DCHECK_EQ(dst->len, sizeof(KnuthVarianceState)); |
| if (src.is_null) return; |
| KnuthVarianceState* state = reinterpret_cast<KnuthVarianceState*>(dst->ptr); |
| double temp = 1 + state->count; |
| double delta = src.val - state->mean; |
| double r = delta / temp; |
| state->mean += r; |
| state->m2 += state->count * delta * r; |
| state->count = temp; |
| } |
| |
| void AggregateFunctions::KnuthVarMerge(FunctionContext* ctx, const StringVal& src, |
| StringVal* dst) { |
| DCHECK(!dst->is_null); |
| DCHECK_EQ(dst->len, sizeof(KnuthVarianceState)); |
| DCHECK(!src.is_null); |
| DCHECK_EQ(src.len, sizeof(KnuthVarianceState)); |
| // Reference implementation: |
| // http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Parallel_algorithm |
| KnuthVarianceState* src_state = reinterpret_cast<KnuthVarianceState*>(src.ptr); |
| KnuthVarianceState* dst_state = reinterpret_cast<KnuthVarianceState*>(dst->ptr); |
| if (src_state->count == 0) return; |
| double delta = dst_state->mean - src_state->mean; |
| double sum_count = dst_state->count + src_state->count; |
| dst_state->mean = src_state->mean + delta * (dst_state->count / sum_count); |
| dst_state->m2 = (src_state->m2) + dst_state->m2 + |
| (delta * delta) * (src_state->count * dst_state->count / sum_count); |
| dst_state->count = sum_count; |
| } |
| |
| DoubleVal AggregateFunctions::KnuthVarFinalize( |
| FunctionContext* ctx, const StringVal& state_sv) { |
| DCHECK(!state_sv.is_null); |
| KnuthVarianceState* state = reinterpret_cast<KnuthVarianceState*>(state_sv.ptr); |
| if (state->count == 0 || state->count == 1) return DoubleVal::null(); |
| double variance = ComputeKnuthVariance(*state, false); |
| return DoubleVal(variance); |
| } |
| |
| DoubleVal AggregateFunctions::KnuthVarPopFinalize(FunctionContext* ctx, |
| const StringVal& state_sv) { |
| DCHECK(!state_sv.is_null); |
| DCHECK_EQ(state_sv.len, sizeof(KnuthVarianceState)); |
| KnuthVarianceState* state = reinterpret_cast<KnuthVarianceState*>(state_sv.ptr); |
| if (state->count == 0) return DoubleVal::null(); |
| return ComputeKnuthVariance(*state, true); |
| } |
| |
| DoubleVal AggregateFunctions::KnuthStddevFinalize(FunctionContext* ctx, |
| const StringVal& state_sv) { |
| DCHECK(!state_sv.is_null); |
| DCHECK_EQ(state_sv.len, sizeof(KnuthVarianceState)); |
| KnuthVarianceState* state = reinterpret_cast<KnuthVarianceState*>(state_sv.ptr); |
| if (state->count == 0 || state->count == 1) return DoubleVal::null(); |
| return sqrt(ComputeKnuthVariance(*state, false)); |
| } |
| |
| DoubleVal AggregateFunctions::KnuthStddevPopFinalize(FunctionContext* ctx, |
| const StringVal& state_sv) { |
| DCHECK(!state_sv.is_null); |
| DCHECK_EQ(state_sv.len, sizeof(KnuthVarianceState)); |
| KnuthVarianceState* state = reinterpret_cast<KnuthVarianceState*>(state_sv.ptr); |
| if (state->count == 0) return DoubleVal::null(); |
| return sqrt(ComputeKnuthVariance(*state, true)); |
| } |
| |
| struct RankState { |
| int64_t rank; |
| int64_t count; |
| RankState() : rank(1), count(0) { } |
| }; |
| |
| void AggregateFunctions::RankInit(FunctionContext* ctx, StringVal* dst) { |
| // The rank functions use a preallocated FIXED_UDA_INTERMEDIATE intermediate value. |
| DCHECK_EQ(dst->len, sizeof(RankState)); |
| *reinterpret_cast<RankState*>(dst->ptr) = RankState(); |
| } |
| |
| void AggregateFunctions::RankUpdate(FunctionContext* ctx, StringVal* dst) { |
| DCHECK(!dst->is_null); |
| DCHECK_EQ(dst->len, sizeof(RankState)); |
| RankState* state = reinterpret_cast<RankState*>(dst->ptr); |
| ++state->count; |
| } |
| |
| void AggregateFunctions::DenseRankUpdate(FunctionContext* ctx, StringVal* dst) { } |
| |
| BigIntVal AggregateFunctions::RankGetValue(FunctionContext* ctx, |
| StringVal& src_val) { |
| DCHECK(!src_val.is_null); |
| DCHECK_EQ(src_val.len, sizeof(RankState)); |
| RankState* state = reinterpret_cast<RankState*>(src_val.ptr); |
| DCHECK_GT(state->count, 0); |
| DCHECK_GT(state->rank, 0); |
| int64_t result = state->rank; |
| |
| // Prepares future calls for the next rank |
| state->rank += state->count; |
| state->count = 0; |
| return BigIntVal(result); |
| } |
| |
| BigIntVal AggregateFunctions::DenseRankGetValue(FunctionContext* ctx, |
| StringVal& src_val) { |
| DCHECK(!src_val.is_null); |
| DCHECK_EQ(src_val.len, sizeof(RankState)); |
| RankState* state = reinterpret_cast<RankState*>(src_val.ptr); |
| DCHECK_EQ(state->count, 0); |
| DCHECK_GT(state->rank, 0); |
| int64_t result = state->rank; |
| |
| // Prepares future calls for the next rank |
| ++state->rank; |
| return BigIntVal(result); |
| } |
| |
| BigIntVal AggregateFunctions::RankFinalize(FunctionContext* ctx, |
| StringVal& src_val) { |
| if (UNLIKELY(src_val.is_null)) return BigIntVal::null(); |
| DCHECK_EQ(src_val.len, sizeof(RankState)); |
| RankState* state = reinterpret_cast<RankState*>(src_val.ptr); |
| int64_t result = state->rank; |
| return BigIntVal(result); |
| } |
| |
| template <typename T> |
| void AggregateFunctions::LastValRemove(FunctionContext* ctx, const T& src, T* dst) { |
| if (ctx->impl()->num_removes() >= ctx->impl()->num_updates()) *dst = T::null(); |
| } |
| |
| template <> |
| void AggregateFunctions::LastValRemove(FunctionContext* ctx, const StringVal& src, |
| StringVal* dst) { |
| if (ctx->impl()->num_removes() >= ctx->impl()->num_updates()) { |
| if (!dst->is_null) ctx->Free(dst->ptr); |
| *dst = StringVal::null(); |
| } |
| } |
| |
| // Returns the current size of the window. |
| inline int GetWindowSize(FunctionContext* ctx) { |
| return ctx->impl()->num_updates() - ctx->impl()->num_removes(); |
| } |
| |
| // LastValIgnoreNulls is a wrapper around LastVal. It works by not calling UpdateVal |
| // if the value being added to the window is null, so that we will return the most |
| // recently seen non-null value. |
| // The one special case to consider is when all of the values in the window are null |
| // and we therefore need to return null. To handle this, we track the number of nulls |
| // currently in the window, and set the value to be returned to null if the number of |
| // nulls is the same as the window size. |
| template <typename T> |
| struct LastValIgnoreNullsState { |
| T last_val; |
| // Number of nulls currently in the window, to detect when the window only has nulls. |
| int64_t num_nulls; |
| }; |
| |
| template <typename T> |
| void AggregateFunctions::LastValIgnoreNullsInit(FunctionContext* ctx, StringVal* dst) { |
| AllocBuffer(ctx, dst, sizeof(LastValIgnoreNullsState<T>)); |
| LastValIgnoreNullsState<T>* state = |
| reinterpret_cast<LastValIgnoreNullsState<T>*>(dst->ptr); |
| state->last_val = T::null(); |
| state->num_nulls = 0; |
| } |
| |
| template <typename T> |
| void AggregateFunctions::LastValIgnoreNullsUpdate(FunctionContext* ctx, const T& src, |
| StringVal* dst) { |
| DCHECK(dst->ptr != NULL); |
| DCHECK_EQ(sizeof(LastValIgnoreNullsState<T>), dst->len); |
| LastValIgnoreNullsState<T>* state = |
| reinterpret_cast<LastValIgnoreNullsState<T>*>(dst->ptr); |
| |
| if (!src.is_null) { |
| UpdateVal(ctx, src, &state->last_val); |
| } else { |
| ++state->num_nulls; |
| DCHECK_LE(state->num_nulls, GetWindowSize(ctx)); |
| if (GetWindowSize(ctx) == state->num_nulls) { |
| // Call UpdateVal here to set the value to null because it handles deallocation |
| // of StringVals correctly. |
| UpdateVal(ctx, T::null(), &state->last_val); |
| } |
| } |
| } |
| |
| template <typename T> |
| void AggregateFunctions::LastValIgnoreNullsRemove(FunctionContext* ctx, const T& src, |
| StringVal* dst) { |
| DCHECK(dst->ptr != NULL); |
| DCHECK_EQ(sizeof(LastValIgnoreNullsState<T>), dst->len); |
| LastValIgnoreNullsState<T>* state = |
| reinterpret_cast<LastValIgnoreNullsState<T>*>(dst->ptr); |
| LastValRemove(ctx, src, &state->last_val); |
| |
| if (src.is_null) --state->num_nulls; |
| DCHECK_GE(state->num_nulls, 0); |
| if (GetWindowSize(ctx) == state->num_nulls) { |
| // Call UpdateVal here to set the value to null because it handles deallocation |
| // of StringVals correctly. |
| UpdateVal(ctx, T::null(), &state->last_val); |
| } |
| } |
| |
| template <typename T> |
| T AggregateFunctions::LastValIgnoreNullsGetValue(FunctionContext* ctx, |
| const StringVal& src) { |
| DCHECK(!src.is_null); |
| DCHECK_EQ(sizeof(LastValIgnoreNullsState<T>), src.len); |
| LastValIgnoreNullsState<T>* state = |
| reinterpret_cast<LastValIgnoreNullsState<T>*>(src.ptr); |
| return state->last_val; |
| } |
| |
| template <> |
| StringVal AggregateFunctions::LastValIgnoreNullsGetValue(FunctionContext* ctx, |
| const StringVal& src) { |
| DCHECK(!src.is_null); |
| DCHECK_EQ(sizeof(LastValIgnoreNullsState<StringVal>), src.len); |
| LastValIgnoreNullsState<StringVal>* state = |
| reinterpret_cast<LastValIgnoreNullsState<StringVal>*>(src.ptr); |
| |
| if (state->last_val.is_null) { |
| return StringVal::null(); |
| } else { |
| return StringVal::CopyFrom(ctx, state->last_val.ptr, state->last_val.len); |
| } |
| } |
| |
| template <typename T> |
| T AggregateFunctions::LastValIgnoreNullsFinalize(FunctionContext* ctx, |
| const StringVal& src) { |
| DCHECK(!src.is_null); |
| T result = LastValIgnoreNullsGetValue<T>(ctx, src); |
| ctx->Free(src.ptr); |
| return result; |
| } |
| |
| template <> |
| StringVal AggregateFunctions::LastValIgnoreNullsFinalize(FunctionContext* ctx, |
| const StringVal& src) { |
| DCHECK(!src.is_null); |
| LastValIgnoreNullsState<StringVal>* state = |
| reinterpret_cast<LastValIgnoreNullsState<StringVal>*>(src.ptr); |
| StringVal result = LastValIgnoreNullsGetValue<StringVal>(ctx, src); |
| if (!state->last_val.is_null) ctx->Free(state->last_val.ptr); |
| ctx->Free(src.ptr); |
| return result; |
| } |
| |
| template <typename T> |
| void AggregateFunctions::FirstValUpdate(FunctionContext* ctx, const T& src, T* dst) { |
| // The first call to FirstValUpdate sets the value of dst. |
| if (ctx->impl()->num_updates() > 1) return; |
| // num_updates is incremented before calling Update(), so it should never be 0. |
| // Remove() should never be called for FIRST_VALUE. |
| DCHECK_GT(ctx->impl()->num_updates(), 0); |
| DCHECK_EQ(ctx->impl()->num_removes(), 0); |
| *dst = src; |
| } |
| |
| template <> |
| void AggregateFunctions::FirstValUpdate(FunctionContext* ctx, const StringVal& src, |
| StringVal* dst) { |
| if (ctx->impl()->num_updates() > 1) return; |
| DCHECK_GT(ctx->impl()->num_updates(), 0); |
| DCHECK_EQ(ctx->impl()->num_removes(), 0); |
| if (src.is_null) { |
| *dst = StringVal::null(); |
| return; |
| } |
| CopyStringVal(ctx, src, dst); |
| } |
| |
| template <typename T> |
| void AggregateFunctions::FirstValRewriteUpdate(FunctionContext* ctx, const T& src, |
| const BigIntVal&, T* dst) { |
| UpdateVal<T>(ctx, src, dst); |
| } |
| |
| template <typename T> |
| void AggregateFunctions::FirstValIgnoreNullsUpdate(FunctionContext*, const T& src, |
| T* dst) { |
| // Store the first non-null value encountered, unlike FirstValUpdate which always stores |
| // the first value even if it is null. |
| if (!dst->is_null || src.is_null) return; |
| *dst = src; |
| } |
| |
| template <> |
| void AggregateFunctions::FirstValIgnoreNullsUpdate(FunctionContext* ctx, |
| const StringVal& src, StringVal* dst) { |
| // Store the first non-null value encountered, unlike FirstValUpdate which always stores |
| // the first value even if it is null. |
| if (!dst->is_null || src.is_null) return; |
| CopyStringVal(ctx, src, dst); |
| } |
| |
| template <typename T> |
| void AggregateFunctions::OffsetFnInit(FunctionContext* ctx, T* dst) { |
| DCHECK_EQ(ctx->GetNumArgs(), 3); |
| DCHECK(ctx->IsArgConstant(1)); |
| DCHECK(ctx->IsArgConstant(2)); |
| DCHECK_EQ(ctx->GetArgType(0)->type, ctx->GetArgType(2)->type); |
| *dst = *static_cast<T*>(ctx->GetConstantArg(2)); |
| } |
| |
| template <> |
| void AggregateFunctions::OffsetFnInit(FunctionContext* ctx, StringVal* dst) { |
| DCHECK_EQ(ctx->GetNumArgs(), 3); |
| DCHECK(ctx->IsArgConstant(1)); |
| DCHECK(ctx->IsArgConstant(2)); |
| DCHECK_EQ(ctx->GetArgType(0)->type, ctx->GetArgType(2)->type); |
| CopyStringVal(ctx, *static_cast<StringVal*>(ctx->GetConstantArg(2)), dst); |
| } |
| |
| template <typename T> |
| void AggregateFunctions::OffsetFnUpdate(FunctionContext* ctx, const T& src, |
| const BigIntVal&, const T& default_value, T* dst) { |
| UpdateVal(ctx, src, dst); |
| } |
| |
| // Stamp out the templates for the types we need. |
| template void AggregateFunctions::InitZero<BigIntVal>(FunctionContext*, BigIntVal* dst); |
| |
| template void AggregateFunctions::UpdateVal<BooleanVal>( |
| FunctionContext*, const BooleanVal& src, BooleanVal* dst); |
| template void AggregateFunctions::UpdateVal<TinyIntVal>( |
| FunctionContext*, const TinyIntVal& src, TinyIntVal* dst); |
| template void AggregateFunctions::UpdateVal<SmallIntVal>( |
| FunctionContext*, const SmallIntVal& src, SmallIntVal* dst); |
| template void AggregateFunctions::UpdateVal<IntVal>( |
| FunctionContext*, const IntVal& src, IntVal* dst); |
| template void AggregateFunctions::UpdateVal<BigIntVal>( |
| FunctionContext*, const BigIntVal& src, BigIntVal* dst); |
| template void AggregateFunctions::UpdateVal<FloatVal>( |
| FunctionContext*, const FloatVal& src, FloatVal* dst); |
| template void AggregateFunctions::UpdateVal<DoubleVal>( |
| FunctionContext*, const DoubleVal& src, DoubleVal* dst); |
| template void AggregateFunctions::UpdateVal<TimestampVal>( |
| FunctionContext*, const TimestampVal& src, TimestampVal* dst); |
| template void AggregateFunctions::UpdateVal<DecimalVal>( |
| FunctionContext*, const DecimalVal& src, DecimalVal* dst); |
| template void AggregateFunctions::UpdateVal<DateVal>( |
| FunctionContext*, const DateVal& src, DateVal* dst); |
| |
| template void AggregateFunctions::AvgUpdate<BigIntVal>( |
| FunctionContext* ctx, const BigIntVal& input, StringVal* dst); |
| template void AggregateFunctions::AvgUpdate<DoubleVal>( |
| FunctionContext* ctx, const DoubleVal& input, StringVal* dst); |
| template void AggregateFunctions::AvgUpdate<DateVal>( |
| FunctionContext* ctx, const DateVal& input, StringVal* dst); |
| template void AggregateFunctions::AvgRemove<BigIntVal>( |
| FunctionContext* ctx, const BigIntVal& input, StringVal* dst); |
| template void AggregateFunctions::AvgRemove<DoubleVal>( |
| FunctionContext* ctx, const DoubleVal& input, StringVal* dst); |
| template void AggregateFunctions::AvgRemove<DateVal>( |
| FunctionContext* ctx, const DateVal& input, StringVal* dst); |
| |
| template void AggregateFunctions::SumUpdate<TinyIntVal, BigIntVal>( |
| FunctionContext*, const TinyIntVal& src, BigIntVal* dst); |
| template void AggregateFunctions::SumUpdate<SmallIntVal, BigIntVal>( |
| FunctionContext*, const SmallIntVal& src, BigIntVal* dst); |
| template void AggregateFunctions::SumUpdate<IntVal, BigIntVal>( |
| FunctionContext*, const IntVal& src, BigIntVal* dst); |
| template void AggregateFunctions::SumUpdate<BigIntVal, BigIntVal>( |
| FunctionContext*, const BigIntVal& src, BigIntVal* dst); |
| template void AggregateFunctions::SumUpdate<FloatVal, DoubleVal>( |
| FunctionContext*, const FloatVal& src, DoubleVal* dst); |
| template void AggregateFunctions::SumUpdate<DoubleVal, DoubleVal>( |
| FunctionContext*, const DoubleVal& src, DoubleVal* dst); |
| |
| template void AggregateFunctions::SumRemove<TinyIntVal, BigIntVal>( |
| FunctionContext*, const TinyIntVal& src, BigIntVal* dst); |
| template void AggregateFunctions::SumRemove<SmallIntVal, BigIntVal>( |
| FunctionContext*, const SmallIntVal& src, BigIntVal* dst); |
| template void AggregateFunctions::SumRemove<IntVal, BigIntVal>( |
| FunctionContext*, const IntVal& src, BigIntVal* dst); |
| template void AggregateFunctions::SumRemove<BigIntVal, BigIntVal>( |
| FunctionContext*, const BigIntVal& src, BigIntVal* dst); |
| template void AggregateFunctions::SumRemove<FloatVal, DoubleVal>( |
| FunctionContext*, const FloatVal& src, DoubleVal* dst); |
| template void AggregateFunctions::SumRemove<DoubleVal, DoubleVal>( |
| FunctionContext*, const DoubleVal& src, DoubleVal* dst); |
| |
| template void AggregateFunctions::Min<BooleanVal>( |
| FunctionContext*, const BooleanVal& src, BooleanVal* dst); |
| template void AggregateFunctions::Min<TinyIntVal>( |
| FunctionContext*, const TinyIntVal& src, TinyIntVal* dst); |
| template void AggregateFunctions::Min<SmallIntVal>( |
| FunctionContext*, const SmallIntVal& src, SmallIntVal* dst); |
| template void AggregateFunctions::Min<IntVal>( |
| FunctionContext*, const IntVal& src, IntVal* dst); |
| template void AggregateFunctions::Min<BigIntVal>( |
| FunctionContext*, const BigIntVal& src, BigIntVal* dst); |
| template void AggregateFunctions::Min<DateVal>( |
| FunctionContext*, const DateVal& src, DateVal* dst); |
| |
| template void AggregateFunctions::Max<BooleanVal>( |
| FunctionContext*, const BooleanVal& src, BooleanVal* dst); |
| template void AggregateFunctions::Max<TinyIntVal>( |
| FunctionContext*, const TinyIntVal& src, TinyIntVal* dst); |
| template void AggregateFunctions::Max<SmallIntVal>( |
| FunctionContext*, const SmallIntVal& src, SmallIntVal* dst); |
| template void AggregateFunctions::Max<IntVal>( |
| FunctionContext*, const IntVal& src, IntVal* dst); |
| template void AggregateFunctions::Max<BigIntVal>( |
| FunctionContext*, const BigIntVal& src, BigIntVal* dst); |
| template void AggregateFunctions::Max<DateVal>( |
| FunctionContext*, const DateVal& src, DateVal* dst); |
| |
| template void AggregateFunctions::PcUpdate( |
| FunctionContext*, const BooleanVal&, StringVal*); |
| template void AggregateFunctions::PcUpdate( |
| FunctionContext*, const TinyIntVal&, StringVal*); |
| template void AggregateFunctions::PcUpdate( |
| FunctionContext*, const SmallIntVal&, StringVal*); |
| template void AggregateFunctions::PcUpdate( |
| FunctionContext*, const IntVal&, StringVal*); |
| template void AggregateFunctions::PcUpdate( |
| FunctionContext*, const BigIntVal&, StringVal*); |
| template void AggregateFunctions::PcUpdate( |
| FunctionContext*, const FloatVal&, StringVal*); |
| template void AggregateFunctions::PcUpdate( |
| FunctionContext*, const DoubleVal&, StringVal*); |
| template void AggregateFunctions::PcUpdate( |
| FunctionContext*, const StringVal&, StringVal*); |
| template void AggregateFunctions::PcUpdate( |
| FunctionContext*, const TimestampVal&, StringVal*); |
| template void AggregateFunctions::PcUpdate( |
| FunctionContext*, const DecimalVal&, StringVal*); |
| template void AggregateFunctions::PcUpdate( |
| FunctionContext*, const DateVal&, StringVal*); |
| |
| template void AggregateFunctions::PcsaUpdate( |
| FunctionContext*, const BooleanVal&, StringVal*); |
| template void AggregateFunctions::PcsaUpdate( |
| FunctionContext*, const TinyIntVal&, StringVal*); |
| template void AggregateFunctions::PcsaUpdate( |
| FunctionContext*, const SmallIntVal&, StringVal*); |
| template void AggregateFunctions::PcsaUpdate( |
| FunctionContext*, const IntVal&, StringVal*); |
| template void AggregateFunctions::PcsaUpdate( |
| FunctionContext*, const BigIntVal&, StringVal*); |
| template void AggregateFunctions::PcsaUpdate( |
| FunctionContext*, const FloatVal&, StringVal*); |
| template void AggregateFunctions::PcsaUpdate( |
| FunctionContext*, const DoubleVal&, StringVal*); |
| template void AggregateFunctions::PcsaUpdate( |
| FunctionContext*, const StringVal&, StringVal*); |
| template void AggregateFunctions::PcsaUpdate( |
| FunctionContext*, const TimestampVal&, StringVal*); |
| template void AggregateFunctions::PcsaUpdate( |
| FunctionContext*, const DecimalVal&, StringVal*); |
| template void AggregateFunctions::PcsaUpdate( |
| FunctionContext*, const DateVal&, StringVal*); |
| |
| template void AggregateFunctions::ReservoirSampleInit<BooleanVal>( |
| FunctionContext*, StringVal*); |
| template void AggregateFunctions::ReservoirSampleInit<TinyIntVal>( |
| FunctionContext*, StringVal*); |
| template void AggregateFunctions::ReservoirSampleInit<SmallIntVal>( |
| FunctionContext*, StringVal*); |
| template void AggregateFunctions::ReservoirSampleInit<IntVal>( |
| FunctionContext*, StringVal*); |
| template void AggregateFunctions::ReservoirSampleInit<BigIntVal>( |
| FunctionContext*, StringVal*); |
| template void AggregateFunctions::ReservoirSampleInit<FloatVal>( |
| FunctionContext*, StringVal*); |
| template void AggregateFunctions::ReservoirSampleInit<DoubleVal>( |
| FunctionContext*, StringVal*); |
| template void AggregateFunctions::ReservoirSampleInit<StringVal>( |
| FunctionContext*, StringVal*); |
| template void AggregateFunctions::ReservoirSampleInit<TimestampVal>( |
| FunctionContext*, StringVal*); |
| template void AggregateFunctions::ReservoirSampleInit<DecimalVal>( |
| FunctionContext*, StringVal*); |
| template void AggregateFunctions::ReservoirSampleInit<DateVal>( |
| FunctionContext*, StringVal*); |
| |
| template void AggregateFunctions::ReservoirSampleUpdate( |
| FunctionContext*, const BooleanVal&, StringVal*); |
| template void AggregateFunctions::ReservoirSampleUpdate( |
| FunctionContext*, const TinyIntVal&, StringVal*); |
| template void AggregateFunctions::ReservoirSampleUpdate( |
| FunctionContext*, const SmallIntVal&, StringVal*); |
| template void AggregateFunctions::ReservoirSampleUpdate( |
| FunctionContext*, const IntVal&, StringVal*); |
| template void AggregateFunctions::ReservoirSampleUpdate( |
| FunctionContext*, const BigIntVal&, StringVal*); |
| template void AggregateFunctions::ReservoirSampleUpdate( |
| FunctionContext*, const FloatVal&, StringVal*); |
| template void AggregateFunctions::ReservoirSampleUpdate( |
| FunctionContext*, const DoubleVal&, StringVal*); |
| template void AggregateFunctions::ReservoirSampleUpdate( |
| FunctionContext*, const StringVal&, StringVal*); |
| template void AggregateFunctions::ReservoirSampleUpdate( |
| FunctionContext*, const TimestampVal&, StringVal*); |
| template void AggregateFunctions::ReservoirSampleUpdate( |
| FunctionContext*, const DecimalVal&, StringVal*); |
| template void AggregateFunctions::ReservoirSampleUpdate( |
| FunctionContext*, const DateVal&, StringVal*); |
| |
| template StringVal AggregateFunctions::ReservoirSampleSerialize<BooleanVal>( |
| FunctionContext*, const StringVal&); |
| template StringVal AggregateFunctions::ReservoirSampleSerialize<TinyIntVal>( |
| FunctionContext*, const StringVal&); |
| template StringVal AggregateFunctions::ReservoirSampleSerialize<SmallIntVal>( |
| FunctionContext*, const StringVal&); |
| template StringVal AggregateFunctions::ReservoirSampleSerialize<IntVal>( |
| FunctionContext*, const StringVal&); |
| template StringVal AggregateFunctions::ReservoirSampleSerialize<BigIntVal>( |
| FunctionContext*, const StringVal&); |
| template StringVal AggregateFunctions::ReservoirSampleSerialize<FloatVal>( |
| FunctionContext*, const StringVal&); |
| template StringVal AggregateFunctions::ReservoirSampleSerialize<DoubleVal>( |
| FunctionContext*, const StringVal&); |
| template StringVal AggregateFunctions::ReservoirSampleSerialize<StringVal>( |
| FunctionContext*, const StringVal&); |
| template StringVal AggregateFunctions::ReservoirSampleSerialize<TimestampVal>( |
| FunctionContext*, const StringVal&); |
| template StringVal AggregateFunctions::ReservoirSampleSerialize<DecimalVal>( |
| FunctionContext*, const StringVal&); |
| template StringVal AggregateFunctions::ReservoirSampleSerialize<DateVal>( |
| FunctionContext*, const StringVal&); |
| |
| template void AggregateFunctions::ReservoirSampleMerge<BooleanVal>( |
| FunctionContext*, const StringVal&, StringVal*); |
| template void AggregateFunctions::ReservoirSampleMerge<TinyIntVal>( |
| FunctionContext*, const StringVal&, StringVal*); |
| template void AggregateFunctions::ReservoirSampleMerge<SmallIntVal>( |
| FunctionContext*, const StringVal&, StringVal*); |
| template void AggregateFunctions::ReservoirSampleMerge<IntVal>( |
| FunctionContext*, const StringVal&, StringVal*); |
| template void AggregateFunctions::ReservoirSampleMerge<BigIntVal>( |
| FunctionContext*, const StringVal&, StringVal*); |
| template void AggregateFunctions::ReservoirSampleMerge<FloatVal>( |
| FunctionContext*, const StringVal&, StringVal*); |
| template void AggregateFunctions::ReservoirSampleMerge<DoubleVal>( |
| FunctionContext*, const StringVal&, StringVal*); |
| template void AggregateFunctions::ReservoirSampleMerge<StringVal>( |
| FunctionContext*, const StringVal&, StringVal*); |
| template void AggregateFunctions::ReservoirSampleMerge<TimestampVal>( |
| FunctionContext*, const StringVal&, StringVal*); |
| template void AggregateFunctions::ReservoirSampleMerge<DecimalVal>( |
| FunctionContext*, const StringVal&, StringVal*); |
| template void AggregateFunctions::ReservoirSampleMerge<DateVal>( |
| FunctionContext*, const StringVal&, StringVal*); |
| |
| template StringVal AggregateFunctions::ReservoirSampleFinalize<BooleanVal>( |
| FunctionContext*, const StringVal&); |
| template StringVal AggregateFunctions::ReservoirSampleFinalize<TinyIntVal>( |
| FunctionContext*, const StringVal&); |
| template StringVal AggregateFunctions::ReservoirSampleFinalize<SmallIntVal>( |
| FunctionContext*, const StringVal&); |
| template StringVal AggregateFunctions::ReservoirSampleFinalize<IntVal>( |
| FunctionContext*, const StringVal&); |
| template StringVal AggregateFunctions::ReservoirSampleFinalize<BigIntVal>( |
| FunctionContext*, const StringVal&); |
| template StringVal AggregateFunctions::ReservoirSampleFinalize<FloatVal>( |
| FunctionContext*, const StringVal&); |
| template StringVal AggregateFunctions::ReservoirSampleFinalize<DoubleVal>( |
| FunctionContext*, const StringVal&); |
| template StringVal AggregateFunctions::ReservoirSampleFinalize<StringVal>( |
| FunctionContext*, const StringVal&); |
| template StringVal AggregateFunctions::ReservoirSampleFinalize<TimestampVal>( |
| FunctionContext*, const StringVal&); |
| template StringVal AggregateFunctions::ReservoirSampleFinalize<DecimalVal>( |
| FunctionContext*, const StringVal&); |
| template StringVal AggregateFunctions::ReservoirSampleFinalize<DateVal>( |
| FunctionContext*, const StringVal&); |
| |
| template StringVal AggregateFunctions::HistogramFinalize<BooleanVal>( |
| FunctionContext*, const StringVal&); |
| template StringVal AggregateFunctions::HistogramFinalize<TinyIntVal>( |
| FunctionContext*, const StringVal&); |
| template StringVal AggregateFunctions::HistogramFinalize<SmallIntVal>( |
| FunctionContext*, const StringVal&); |
| template StringVal AggregateFunctions::HistogramFinalize<IntVal>( |
| FunctionContext*, const StringVal&); |
| template StringVal AggregateFunctions::HistogramFinalize<BigIntVal>( |
| FunctionContext*, const StringVal&); |
| template StringVal AggregateFunctions::HistogramFinalize<FloatVal>( |
| FunctionContext*, const StringVal&); |
| template StringVal AggregateFunctions::HistogramFinalize<DoubleVal>( |
| FunctionContext*, const StringVal&); |
| template StringVal AggregateFunctions::HistogramFinalize<StringVal>( |
| FunctionContext*, const StringVal&); |
| template StringVal AggregateFunctions::HistogramFinalize<TimestampVal>( |
| FunctionContext*, const StringVal&); |
| template StringVal AggregateFunctions::HistogramFinalize<DecimalVal>( |
| FunctionContext*, const StringVal&); |
| template StringVal AggregateFunctions::HistogramFinalize<DateVal>( |
| FunctionContext*, const StringVal&); |
| |
| template BooleanVal AggregateFunctions::AppxMedianFinalize<BooleanVal>( |
| FunctionContext*, const StringVal&); |
| template TinyIntVal AggregateFunctions::AppxMedianFinalize<TinyIntVal>( |
| FunctionContext*, const StringVal&); |
| template SmallIntVal AggregateFunctions::AppxMedianFinalize<SmallIntVal>( |
| FunctionContext*, const StringVal&); |
| template IntVal AggregateFunctions::AppxMedianFinalize<IntVal>( |
| FunctionContext*, const StringVal&); |
| template BigIntVal AggregateFunctions::AppxMedianFinalize<BigIntVal>( |
| FunctionContext*, const StringVal&); |
| template FloatVal AggregateFunctions::AppxMedianFinalize<FloatVal>( |
| FunctionContext*, const StringVal&); |
| template DoubleVal AggregateFunctions::AppxMedianFinalize<DoubleVal>( |
| FunctionContext*, const StringVal&); |
| template StringVal AggregateFunctions::AppxMedianFinalize<StringVal>( |
| FunctionContext*, const StringVal&); |
| template TimestampVal AggregateFunctions::AppxMedianFinalize<TimestampVal>( |
| FunctionContext*, const StringVal&); |
| template DecimalVal AggregateFunctions::AppxMedianFinalize<DecimalVal>( |
| FunctionContext*, const StringVal&); |
| template DateVal AggregateFunctions::AppxMedianFinalize<DateVal>( |
| FunctionContext*, const StringVal&); |
| |
| template void AggregateFunctions::HllUpdate( |
| FunctionContext*, const BooleanVal&, StringVal*); |
| template void AggregateFunctions::HllUpdate( |
| FunctionContext*, const TinyIntVal&, StringVal*); |
| template void AggregateFunctions::HllUpdate( |
| FunctionContext*, const SmallIntVal&, StringVal*); |
| template void AggregateFunctions::HllUpdate( |
| FunctionContext*, const IntVal&, StringVal*); |
| template void AggregateFunctions::HllUpdate( |
| FunctionContext*, const BigIntVal&, StringVal*); |
| template void AggregateFunctions::HllUpdate( |
| FunctionContext*, const FloatVal&, StringVal*); |
| template void AggregateFunctions::HllUpdate( |
| FunctionContext*, const DoubleVal&, StringVal*); |
| template void AggregateFunctions::HllUpdate( |
| FunctionContext*, const StringVal&, StringVal*); |
| template void AggregateFunctions::HllUpdate( |
| FunctionContext*, const TimestampVal&, StringVal*); |
| template void AggregateFunctions::HllUpdate( |
| FunctionContext*, const DateVal&, StringVal*); |
| |
| template void AggregateFunctions::SampledNdvUpdate( |
| FunctionContext*, const BooleanVal&, const DoubleVal&, StringVal*); |
| template void AggregateFunctions::SampledNdvUpdate( |
| FunctionContext*, const TinyIntVal&, const DoubleVal&, StringVal*); |
| template void AggregateFunctions::SampledNdvUpdate( |
| FunctionContext*, const SmallIntVal&, const DoubleVal&, StringVal*); |
| template void AggregateFunctions::SampledNdvUpdate( |
| FunctionContext*, const IntVal&, const DoubleVal&, StringVal*); |
| template void AggregateFunctions::SampledNdvUpdate( |
| FunctionContext*, const BigIntVal&, const DoubleVal&, StringVal*); |
| template void AggregateFunctions::SampledNdvUpdate( |
| FunctionContext*, const FloatVal&, const DoubleVal&, StringVal*); |
| template void AggregateFunctions::SampledNdvUpdate( |
| FunctionContext*, const DoubleVal&, const DoubleVal&, StringVal*); |
| template void AggregateFunctions::SampledNdvUpdate( |
| FunctionContext*, const StringVal&, const DoubleVal&, StringVal*); |
| template void AggregateFunctions::SampledNdvUpdate( |
| FunctionContext*, const TimestampVal&, const DoubleVal&, StringVal*); |
| template void AggregateFunctions::SampledNdvUpdate( |
| FunctionContext*, const DecimalVal&, const DoubleVal&, StringVal*); |
| template void AggregateFunctions::SampledNdvUpdate( |
| FunctionContext*, const DateVal&, const DoubleVal&, StringVal*); |
| |
| template void AggregateFunctions::AggIfUpdate( |
| FunctionContext*, const BooleanVal& cond, const BooleanVal& src, BooleanVal* dst); |
| template void AggregateFunctions::AggIfUpdate( |
| FunctionContext*, const BooleanVal& cond, const TinyIntVal& src, TinyIntVal* dst); |
| template void AggregateFunctions::AggIfUpdate( |
| FunctionContext*, const BooleanVal& cond, const SmallIntVal& src, SmallIntVal* dst); |
| template void AggregateFunctions::AggIfUpdate( |
| FunctionContext*, const BooleanVal& cond, const IntVal& src, IntVal* dst); |
| template void AggregateFunctions::AggIfUpdate( |
| FunctionContext*, const BooleanVal& cond, const BigIntVal& src, BigIntVal* dst); |
| template void AggregateFunctions::AggIfUpdate( |
| FunctionContext*, const BooleanVal& cond, const FloatVal& src, FloatVal* dst); |
| template void AggregateFunctions::AggIfUpdate( |
| FunctionContext*, const BooleanVal& cond, const DoubleVal& src, DoubleVal* dst); |
| template void AggregateFunctions::AggIfUpdate( |
| FunctionContext*, const BooleanVal& cond, const TimestampVal& src, TimestampVal* dst); |
| template void AggregateFunctions::AggIfUpdate( |
| FunctionContext*, const BooleanVal& cond, const DecimalVal& src, DecimalVal* dst); |
| template void AggregateFunctions::AggIfUpdate( |
| FunctionContext*, const BooleanVal& cond, const DateVal& src, DateVal* dst); |
| |
| template void AggregateFunctions::AggIfMerge( |
| FunctionContext*, const BooleanVal& src, BooleanVal* dst); |
| template void AggregateFunctions::AggIfMerge( |
| FunctionContext*, const TinyIntVal& src, TinyIntVal* dst); |
| template void AggregateFunctions::AggIfMerge( |
| FunctionContext*, const SmallIntVal& src, SmallIntVal* dst); |
| template void AggregateFunctions::AggIfMerge( |
| FunctionContext*, const IntVal& src, IntVal* dst); |
| template void AggregateFunctions::AggIfMerge( |
| FunctionContext*, const BigIntVal& src, BigIntVal* dst); |
| template void AggregateFunctions::AggIfMerge( |
| FunctionContext*, const FloatVal& src, FloatVal* dst); |
| template void AggregateFunctions::AggIfMerge( |
| FunctionContext*, const DoubleVal& src, DoubleVal* dst); |
| template void AggregateFunctions::AggIfMerge( |
| FunctionContext*, const TimestampVal& src, TimestampVal* dst); |
| template void AggregateFunctions::AggIfMerge( |
| FunctionContext*, const DecimalVal& src, DecimalVal* dst); |
| template void AggregateFunctions::AggIfMerge( |
| FunctionContext*, const DateVal& src, DateVal* dst); |
| |
| template BooleanVal AggregateFunctions::AggIfFinalize( |
| FunctionContext*, const BooleanVal& src); |
| template TinyIntVal AggregateFunctions::AggIfFinalize( |
| FunctionContext*, const TinyIntVal& src); |
| template SmallIntVal AggregateFunctions::AggIfFinalize( |
| FunctionContext*, const SmallIntVal& src); |
| template IntVal AggregateFunctions::AggIfFinalize(FunctionContext*, const IntVal& src); |
| template BigIntVal AggregateFunctions::AggIfFinalize( |
| FunctionContext*, const BigIntVal& src); |
| template FloatVal AggregateFunctions::AggIfFinalize( |
| FunctionContext*, const FloatVal& src); |
| template DoubleVal AggregateFunctions::AggIfFinalize( |
| FunctionContext*, const DoubleVal& src); |
| template TimestampVal AggregateFunctions::AggIfFinalize( |
| FunctionContext*, const TimestampVal& src); |
| template DecimalVal AggregateFunctions::AggIfFinalize( |
| FunctionContext*, const DecimalVal& src); |
| template DateVal AggregateFunctions::AggIfFinalize( |
| FunctionContext*, const DateVal& src); |
| |
| template void AggregateFunctions::KnuthVarUpdate( |
| FunctionContext*, const TinyIntVal&, StringVal*); |
| template void AggregateFunctions::KnuthVarUpdate( |
| FunctionContext*, const SmallIntVal&, StringVal*); |
| template void AggregateFunctions::KnuthVarUpdate( |
| FunctionContext*, const IntVal&, StringVal*); |
| template void AggregateFunctions::KnuthVarUpdate( |
| FunctionContext*, const BigIntVal&, StringVal*); |
| template void AggregateFunctions::KnuthVarUpdate( |
| FunctionContext*, const FloatVal&, StringVal*); |
| template void AggregateFunctions::KnuthVarUpdate( |
| FunctionContext*, const DoubleVal&, StringVal*); |
| |
| template void AggregateFunctions::LastValRemove<BooleanVal>( |
| FunctionContext*, const BooleanVal& src, BooleanVal* dst); |
| template void AggregateFunctions::LastValRemove<TinyIntVal>( |
| FunctionContext*, const TinyIntVal& src, TinyIntVal* dst); |
| template void AggregateFunctions::LastValRemove<SmallIntVal>( |
| FunctionContext*, const SmallIntVal& src, SmallIntVal* dst); |
| template void AggregateFunctions::LastValRemove<IntVal>( |
| FunctionContext*, const IntVal& src, IntVal* dst); |
| template void AggregateFunctions::LastValRemove<BigIntVal>( |
| FunctionContext*, const BigIntVal& src, BigIntVal* dst); |
| template void AggregateFunctions::LastValRemove<FloatVal>( |
| FunctionContext*, const FloatVal& src, FloatVal* dst); |
| template void AggregateFunctions::LastValRemove<DoubleVal>( |
| FunctionContext*, const DoubleVal& src, DoubleVal* dst); |
| template void AggregateFunctions::LastValRemove<TimestampVal>( |
| FunctionContext*, const TimestampVal& src, TimestampVal* dst); |
| template void AggregateFunctions::LastValRemove<DecimalVal>( |
| FunctionContext*, const DecimalVal& src, DecimalVal* dst); |
| template void AggregateFunctions::LastValRemove<DateVal>( |
| FunctionContext*, const DateVal& src, DateVal* dst); |
| |
| template void AggregateFunctions::LastValIgnoreNullsInit<BooleanVal>( |
| FunctionContext*, StringVal*); |
| template void AggregateFunctions::LastValIgnoreNullsInit<TinyIntVal>( |
| FunctionContext*, StringVal*); |
| template void AggregateFunctions::LastValIgnoreNullsInit<SmallIntVal>( |
| FunctionContext*, StringVal*); |
| template void AggregateFunctions::LastValIgnoreNullsInit<IntVal>( |
| FunctionContext*, StringVal*); |
| template void AggregateFunctions::LastValIgnoreNullsInit<BigIntVal>( |
| FunctionContext*, StringVal*); |
| template void AggregateFunctions::LastValIgnoreNullsInit<FloatVal>( |
| FunctionContext*, StringVal*); |
| template void AggregateFunctions::LastValIgnoreNullsInit<DoubleVal>( |
| FunctionContext*, StringVal*); |
| template void AggregateFunctions::LastValIgnoreNullsInit<StringVal>( |
| FunctionContext*, StringVal*); |
| template void AggregateFunctions::LastValIgnoreNullsInit<TimestampVal>( |
| FunctionContext*, StringVal*); |
| template void AggregateFunctions::LastValIgnoreNullsInit<DecimalVal>( |
| FunctionContext*, StringVal*); |
| template void AggregateFunctions::LastValIgnoreNullsInit<DateVal>( |
| FunctionContext*, StringVal*); |
| |
| template void AggregateFunctions::LastValIgnoreNullsUpdate<BooleanVal>( |
| FunctionContext*, const BooleanVal& src, StringVal* dst); |
| template void AggregateFunctions::LastValIgnoreNullsUpdate<TinyIntVal>( |
| FunctionContext*, const TinyIntVal& src, StringVal* dst); |
| template void AggregateFunctions::LastValIgnoreNullsUpdate<SmallIntVal>( |
| FunctionContext*, const SmallIntVal& src, StringVal* dst); |
| template void AggregateFunctions::LastValIgnoreNullsUpdate<IntVal>( |
| FunctionContext*, const IntVal& src, StringVal* dst); |
| template void AggregateFunctions::LastValIgnoreNullsUpdate<BigIntVal>( |
| FunctionContext*, const BigIntVal& src, StringVal* dst); |
| template void AggregateFunctions::LastValIgnoreNullsUpdate<FloatVal>( |
| FunctionContext*, const FloatVal& src, StringVal* dst); |
| template void AggregateFunctions::LastValIgnoreNullsUpdate<DoubleVal>( |
| FunctionContext*, const DoubleVal& src, StringVal* dst); |
| template void AggregateFunctions::LastValIgnoreNullsUpdate<StringVal>( |
| FunctionContext*, const StringVal& src, StringVal* dst); |
| template void AggregateFunctions::LastValIgnoreNullsUpdate<TimestampVal>( |
| FunctionContext*, const TimestampVal& src, StringVal* dst); |
| template void AggregateFunctions::LastValIgnoreNullsUpdate<DecimalVal>( |
| FunctionContext*, const DecimalVal& src, StringVal* dst); |
| template void AggregateFunctions::LastValIgnoreNullsUpdate<DateVal>( |
| FunctionContext*, const DateVal& src, StringVal* dst); |
| |
| template void AggregateFunctions::LastValIgnoreNullsRemove<BooleanVal>( |
| FunctionContext*, const BooleanVal& src, StringVal* dst); |
| template void AggregateFunctions::LastValIgnoreNullsRemove<TinyIntVal>( |
| FunctionContext*, const TinyIntVal& src, StringVal* dst); |
| template void AggregateFunctions::LastValIgnoreNullsRemove<SmallIntVal>( |
| FunctionContext*, const SmallIntVal& src, StringVal* dst); |
| template void AggregateFunctions::LastValIgnoreNullsRemove<IntVal>( |
| FunctionContext*, const IntVal& src, StringVal* dst); |
| template void AggregateFunctions::LastValIgnoreNullsRemove<BigIntVal>( |
| FunctionContext*, const BigIntVal& src, StringVal* dst); |
| template void AggregateFunctions::LastValIgnoreNullsRemove<FloatVal>( |
| FunctionContext*, const FloatVal& src, StringVal* dst); |
| template void AggregateFunctions::LastValIgnoreNullsRemove<DoubleVal>( |
| FunctionContext*, const DoubleVal& src, StringVal* dst); |
| template void AggregateFunctions::LastValIgnoreNullsRemove<StringVal>( |
| FunctionContext*, const StringVal& src, StringVal* dst); |
| template void AggregateFunctions::LastValIgnoreNullsRemove<TimestampVal>( |
| FunctionContext*, const TimestampVal& src, StringVal* dst); |
| template void AggregateFunctions::LastValIgnoreNullsRemove<DecimalVal>( |
| FunctionContext*, const DecimalVal& src, StringVal* dst); |
| template void AggregateFunctions::LastValIgnoreNullsRemove<DateVal>( |
| FunctionContext*, const DateVal& src, StringVal* dst); |
| |
| template BooleanVal AggregateFunctions::LastValIgnoreNullsGetValue<BooleanVal>( |
| FunctionContext*, const StringVal&); |
| template TinyIntVal AggregateFunctions::LastValIgnoreNullsGetValue<TinyIntVal>( |
| FunctionContext*, const StringVal&); |
| template SmallIntVal AggregateFunctions::LastValIgnoreNullsGetValue<SmallIntVal>( |
| FunctionContext*, const StringVal&); |
| template IntVal AggregateFunctions::LastValIgnoreNullsGetValue<IntVal>( |
| FunctionContext*, const StringVal&); |
| template BigIntVal AggregateFunctions::LastValIgnoreNullsGetValue<BigIntVal>( |
| FunctionContext*, const StringVal&); |
| template FloatVal AggregateFunctions::LastValIgnoreNullsGetValue<FloatVal>( |
| FunctionContext*, const StringVal&); |
| template DoubleVal AggregateFunctions::LastValIgnoreNullsGetValue<DoubleVal>( |
| FunctionContext*, const StringVal&); |
| template TimestampVal AggregateFunctions::LastValIgnoreNullsGetValue<TimestampVal>( |
| FunctionContext*, const StringVal&); |
| template DecimalVal AggregateFunctions::LastValIgnoreNullsGetValue<DecimalVal>( |
| FunctionContext*, const StringVal&); |
| template DateVal AggregateFunctions::LastValIgnoreNullsGetValue<DateVal>( |
| FunctionContext*, const StringVal&); |
| |
| template BooleanVal AggregateFunctions::LastValIgnoreNullsFinalize<BooleanVal>( |
| FunctionContext*, const StringVal&); |
| template TinyIntVal AggregateFunctions::LastValIgnoreNullsFinalize<TinyIntVal>( |
| FunctionContext*, const StringVal&); |
| template SmallIntVal AggregateFunctions::LastValIgnoreNullsFinalize<SmallIntVal>( |
| FunctionContext*, const StringVal&); |
| template IntVal AggregateFunctions::LastValIgnoreNullsFinalize<IntVal>( |
| FunctionContext*, const StringVal&); |
| template BigIntVal AggregateFunctions::LastValIgnoreNullsFinalize<BigIntVal>( |
| FunctionContext*, const StringVal&); |
| template FloatVal AggregateFunctions::LastValIgnoreNullsFinalize<FloatVal>( |
| FunctionContext*, const StringVal&); |
| template DoubleVal AggregateFunctions::LastValIgnoreNullsFinalize<DoubleVal>( |
| FunctionContext*, const StringVal&); |
| template TimestampVal AggregateFunctions::LastValIgnoreNullsFinalize<TimestampVal>( |
| FunctionContext*, const StringVal&); |
| template DecimalVal AggregateFunctions::LastValIgnoreNullsFinalize<DecimalVal>( |
| FunctionContext*, const StringVal&); |
| template DateVal AggregateFunctions::LastValIgnoreNullsFinalize<DateVal>( |
| FunctionContext*, const StringVal&); |
| |
| template void AggregateFunctions::FirstValUpdate<BooleanVal>( |
| FunctionContext*, const BooleanVal& src, BooleanVal* dst); |
| template void AggregateFunctions::FirstValUpdate<TinyIntVal>( |
| FunctionContext*, const TinyIntVal& src, TinyIntVal* dst); |
| template void AggregateFunctions::FirstValUpdate<SmallIntVal>( |
| FunctionContext*, const SmallIntVal& src, SmallIntVal* dst); |
| template void AggregateFunctions::FirstValUpdate<IntVal>( |
| FunctionContext*, const IntVal& src, IntVal* dst); |
| template void AggregateFunctions::FirstValUpdate<BigIntVal>( |
| FunctionContext*, const BigIntVal& src, BigIntVal* dst); |
| template void AggregateFunctions::FirstValUpdate<FloatVal>( |
| FunctionContext*, const FloatVal& src, FloatVal* dst); |
| template void AggregateFunctions::FirstValUpdate<DoubleVal>( |
| FunctionContext*, const DoubleVal& src, DoubleVal* dst); |
| template void AggregateFunctions::FirstValUpdate<TimestampVal>( |
| FunctionContext*, const TimestampVal& src, TimestampVal* dst); |
| template void AggregateFunctions::FirstValUpdate<DecimalVal>( |
| FunctionContext*, const DecimalVal& src, DecimalVal* dst); |
| template void AggregateFunctions::FirstValUpdate<DateVal>( |
| FunctionContext*, const DateVal& src, DateVal* dst); |
| |
| template void AggregateFunctions::FirstValRewriteUpdate<BooleanVal>( |
| FunctionContext*, const BooleanVal& src, const BigIntVal&, BooleanVal* dst); |
| template void AggregateFunctions::FirstValRewriteUpdate<TinyIntVal>( |
| FunctionContext*, const TinyIntVal& src, const BigIntVal&, TinyIntVal* dst); |
| template void AggregateFunctions::FirstValRewriteUpdate<SmallIntVal>( |
| FunctionContext*, const SmallIntVal& src, const BigIntVal&, SmallIntVal* dst); |
| template void AggregateFunctions::FirstValRewriteUpdate<IntVal>( |
| FunctionContext*, const IntVal& src, const BigIntVal&, IntVal* dst); |
| template void AggregateFunctions::FirstValRewriteUpdate<BigIntVal>( |
| FunctionContext*, const BigIntVal& src, const BigIntVal&, BigIntVal* dst); |
| template void AggregateFunctions::FirstValRewriteUpdate<FloatVal>( |
| FunctionContext*, const FloatVal& src, const BigIntVal&, FloatVal* dst); |
| template void AggregateFunctions::FirstValRewriteUpdate<DoubleVal>( |
| FunctionContext*, const DoubleVal& src, const BigIntVal&, DoubleVal* dst); |
| template void AggregateFunctions::FirstValRewriteUpdate<StringVal>( |
| FunctionContext*, const StringVal& src, const BigIntVal&, StringVal* dst); |
| template void AggregateFunctions::FirstValRewriteUpdate<TimestampVal>( |
| FunctionContext*, const TimestampVal& src, const BigIntVal&, TimestampVal* dst); |
| template void AggregateFunctions::FirstValRewriteUpdate<DecimalVal>( |
| FunctionContext*, const DecimalVal& src, const BigIntVal&, DecimalVal* dst); |
| template void AggregateFunctions::FirstValRewriteUpdate<DateVal>( |
| FunctionContext*, const DateVal& src, const BigIntVal&, DateVal* dst); |
| |
| template void AggregateFunctions::FirstValIgnoreNullsUpdate<BooleanVal>( |
| FunctionContext*, const BooleanVal& src, BooleanVal* dst); |
| template void AggregateFunctions::FirstValIgnoreNullsUpdate<TinyIntVal>( |
| FunctionContext*, const TinyIntVal& src, TinyIntVal* dst); |
| template void AggregateFunctions::FirstValIgnoreNullsUpdate<SmallIntVal>( |
| FunctionContext*, const SmallIntVal& src, SmallIntVal* dst); |
| template void AggregateFunctions::FirstValIgnoreNullsUpdate<IntVal>( |
| FunctionContext*, const IntVal& src, IntVal* dst); |
| template void AggregateFunctions::FirstValIgnoreNullsUpdate<BigIntVal>( |
| FunctionContext*, const BigIntVal& src, BigIntVal* dst); |
| template void AggregateFunctions::FirstValIgnoreNullsUpdate<FloatVal>( |
| FunctionContext*, const FloatVal& src, FloatVal* dst); |
| template void AggregateFunctions::FirstValIgnoreNullsUpdate<DoubleVal>( |
| FunctionContext*, const DoubleVal& src, DoubleVal* dst); |
| template void AggregateFunctions::FirstValIgnoreNullsUpdate<TimestampVal>( |
| FunctionContext*, const TimestampVal& src, TimestampVal* dst); |
| template void AggregateFunctions::FirstValIgnoreNullsUpdate<DecimalVal>( |
| FunctionContext*, const DecimalVal& src, DecimalVal* dst); |
| template void AggregateFunctions::FirstValIgnoreNullsUpdate<DateVal>( |
| FunctionContext*, const DateVal& src, DateVal* dst); |
| |
| template void AggregateFunctions::OffsetFnInit<BooleanVal>( |
| FunctionContext*, BooleanVal*); |
| template void AggregateFunctions::OffsetFnInit<TinyIntVal>( |
| FunctionContext*, TinyIntVal*); |
| template void AggregateFunctions::OffsetFnInit<SmallIntVal>( |
| FunctionContext*, SmallIntVal*); |
| template void AggregateFunctions::OffsetFnInit<IntVal>( |
| FunctionContext*, IntVal*); |
| template void AggregateFunctions::OffsetFnInit<BigIntVal>( |
| FunctionContext*, BigIntVal*); |
| template void AggregateFunctions::OffsetFnInit<FloatVal>( |
| FunctionContext*, FloatVal*); |
| template void AggregateFunctions::OffsetFnInit<DoubleVal>( |
| FunctionContext*, DoubleVal*); |
| template void AggregateFunctions::OffsetFnInit<TimestampVal>( |
| FunctionContext*, TimestampVal*); |
| template void AggregateFunctions::OffsetFnInit<DecimalVal>( |
| FunctionContext*, DecimalVal*); |
| template void AggregateFunctions::OffsetFnInit<DateVal>( |
| FunctionContext*, DateVal*); |
| |
| template void AggregateFunctions::OffsetFnUpdate<BooleanVal>( |
| FunctionContext*, const BooleanVal& src, const BigIntVal&, const BooleanVal&, |
| BooleanVal* dst); |
| template void AggregateFunctions::OffsetFnUpdate<TinyIntVal>( |
| FunctionContext*, const TinyIntVal& src, const BigIntVal&, const TinyIntVal&, |
| TinyIntVal* dst); |
| template void AggregateFunctions::OffsetFnUpdate<SmallIntVal>( |
| FunctionContext*, const SmallIntVal& src, const BigIntVal&, const SmallIntVal&, |
| SmallIntVal* dst); |
| template void AggregateFunctions::OffsetFnUpdate<IntVal>( |
| FunctionContext*, const IntVal& src, const BigIntVal&, const IntVal&, IntVal* dst); |
| template void AggregateFunctions::OffsetFnUpdate<BigIntVal>( |
| FunctionContext*, const BigIntVal& src, const BigIntVal&, const BigIntVal&, |
| BigIntVal* dst); |
| template void AggregateFunctions::OffsetFnUpdate<FloatVal>( |
| FunctionContext*, const FloatVal& src, const BigIntVal&, const FloatVal&, |
| FloatVal* dst); |
| template void AggregateFunctions::OffsetFnUpdate<DoubleVal>( |
| FunctionContext*, const DoubleVal& src, const BigIntVal&, const DoubleVal&, |
| DoubleVal* dst); |
| template void AggregateFunctions::OffsetFnUpdate<StringVal>( |
| FunctionContext*, const StringVal& src, const BigIntVal&, const StringVal&, |
| StringVal* dst); |
| template void AggregateFunctions::OffsetFnUpdate<TimestampVal>( |
| FunctionContext*, const TimestampVal& src, const BigIntVal&, const TimestampVal&, |
| TimestampVal* dst); |
| template void AggregateFunctions::OffsetFnUpdate<DecimalVal>( |
| FunctionContext*, const DecimalVal& src, const BigIntVal&, const DecimalVal&, |
| DecimalVal* dst); |
| template void AggregateFunctions::OffsetFnUpdate<DateVal>( |
| FunctionContext*, const DateVal& src, const BigIntVal&, const DateVal&, |
| DateVal* dst); |
| } |