blob: b0cacf30ea4b43d34b10e97b3dadfa6cbc77a00b [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "util/quantile_state.h"
#include <string.h>
#include <cmath>
#include <ostream>
#include <utility>
#include "common/logging.h"
#include "util/coding.h"
#include "util/slice.h"
#include "util/tdigest.h"
namespace doris {
QuantileState::QuantileState() : _type(EMPTY), _compression(QUANTILE_STATE_COMPRESSION_MIN) {}
QuantileState::QuantileState(float compression) : _type(EMPTY), _compression(compression) {}
QuantileState::QuantileState(const Slice& slice) {
if (!deserialize(slice)) {
_type = EMPTY;
}
}
size_t QuantileState::get_serialized_size() {
size_t size = 1 + sizeof(float); // type(QuantileStateType) + compression(float)
switch (_type) {
case EMPTY:
break;
case SINGLE:
size += sizeof(double);
break;
case EXPLICIT:
size += sizeof(uint16_t) + sizeof(double) * _explicit_data.size();
break;
case TDIGEST:
size += _tdigest_ptr->serialized_size();
break;
}
return size;
}
void QuantileState::set_compression(float compression) {
DCHECK(compression >= QUANTILE_STATE_COMPRESSION_MIN &&
compression <= QUANTILE_STATE_COMPRESSION_MAX);
this->_compression = compression;
}
bool QuantileState::is_valid(const Slice& slice) {
if (slice.size < 1) {
return false;
}
const uint8_t* ptr = (uint8_t*)slice.data;
const uint8_t* end = (uint8_t*)slice.data + slice.size;
float compress_value = *reinterpret_cast<const float*>(ptr);
if (compress_value < QUANTILE_STATE_COMPRESSION_MIN ||
compress_value > QUANTILE_STATE_COMPRESSION_MAX) {
return false;
}
ptr += sizeof(float);
auto type = (QuantileStateType)*ptr++;
switch (type) {
case EMPTY:
break;
case SINGLE: {
if ((ptr + sizeof(double)) > end) {
return false;
}
ptr += sizeof(double);
break;
}
case EXPLICIT: {
if ((ptr + sizeof(uint16_t)) > end) {
return false;
}
uint16_t num_explicits = decode_fixed16_le(ptr);
ptr += sizeof(uint16_t);
ptr += num_explicits * sizeof(double);
break;
}
case TDIGEST: {
if ((ptr + sizeof(uint32_t)) > end) {
return false;
}
uint32_t tdigest_serialized_length = decode_fixed32_le(ptr);
ptr += tdigest_serialized_length;
break;
}
default:
return false;
}
return ptr == end;
}
double QuantileState::get_explicit_value_by_percentile(float percentile) const {
DCHECK(_type == EXPLICIT);
int n = _explicit_data.size();
std::vector<double> sorted_data(_explicit_data.begin(), _explicit_data.end());
std::sort(sorted_data.begin(), sorted_data.end());
double index = (n - 1) * percentile;
int intIdx = (int)index;
if (intIdx == n - 1) {
return sorted_data[intIdx];
}
return sorted_data[intIdx + 1] * (index - intIdx) + sorted_data[intIdx] * (intIdx + 1 - index);
}
double QuantileState::get_value_by_percentile(float percentile) const {
DCHECK(percentile >= 0 && percentile <= 1);
switch (_type) {
case EMPTY: {
return NAN;
}
case SINGLE: {
return _single_data;
}
case EXPLICIT: {
return get_explicit_value_by_percentile(percentile);
}
case TDIGEST: {
return _tdigest_ptr->quantile(percentile);
}
default:
break;
}
return NAN;
}
bool QuantileState::deserialize(const Slice& slice) {
DCHECK(_type == EMPTY);
// in case of insert error data caused be crashed
if (slice.data == nullptr || slice.size <= 0) {
return false;
}
// check input is valid
if (!is_valid(slice)) {
LOG(WARNING) << "QuantileState deserialize failed: slice is invalid";
return false;
}
const uint8_t* ptr = (uint8_t*)slice.data;
_compression = *reinterpret_cast<const float*>(ptr);
ptr += sizeof(float);
// first byte : type
_type = (QuantileStateType)*ptr++;
switch (_type) {
case EMPTY:
// 1: empty
break;
case SINGLE: {
// 2: single_data value
_single_data = *reinterpret_cast<const double*>(ptr);
ptr += sizeof(double);
break;
}
case EXPLICIT: {
// 3: number of explicit values
// make sure that num_explicit is positive
uint16_t num_explicits = decode_fixed16_le(ptr);
ptr += sizeof(uint16_t);
_explicit_data.reserve(std::min(num_explicits * 2, QUANTILE_STATE_EXPLICIT_NUM));
_explicit_data.resize(num_explicits);
memcpy(&_explicit_data[0], ptr, num_explicits * sizeof(double));
ptr += num_explicits * sizeof(double);
break;
}
case TDIGEST: {
// 4: Tdigest object value
_tdigest_ptr = std::make_shared<TDigest>(0);
_tdigest_ptr->unserialize(ptr);
break;
}
default:
// revert type to EMPTY
_type = EMPTY;
return false;
}
return true;
}
size_t QuantileState::serialize(uint8_t* dst) const {
uint8_t* ptr = dst;
*reinterpret_cast<float*>(ptr) = _compression;
ptr += sizeof(float);
switch (_type) {
case EMPTY: {
*ptr++ = EMPTY;
break;
}
case SINGLE: {
*ptr++ = SINGLE;
*reinterpret_cast<double*>(ptr) = _single_data;
ptr += sizeof(double);
break;
}
case EXPLICIT: {
*ptr++ = EXPLICIT;
uint16_t size = _explicit_data.size();
*reinterpret_cast<uint16_t*>(ptr) = size;
ptr += sizeof(uint16_t);
memcpy(ptr, &_explicit_data[0], size * sizeof(double));
ptr += size * sizeof(double);
break;
}
case TDIGEST: {
*ptr++ = TDIGEST;
size_t tdigest_size = _tdigest_ptr->serialize(ptr);
ptr += tdigest_size;
break;
}
default:
break;
}
return ptr - dst;
}
void QuantileState::merge(const QuantileState& other) {
switch (other._type) {
case EMPTY:
break;
case SINGLE: {
add_value(other._single_data);
break;
}
case EXPLICIT: {
switch (_type) {
case EMPTY:
_type = EXPLICIT;
_explicit_data = other._explicit_data;
break;
case SINGLE:
_type = EXPLICIT;
_explicit_data = other._explicit_data;
add_value(_single_data);
break;
case EXPLICIT:
if (_explicit_data.size() + other._explicit_data.size() > QUANTILE_STATE_EXPLICIT_NUM) {
_type = TDIGEST;
_tdigest_ptr = std::make_shared<TDigest>(_compression);
for (int i = 0; i < _explicit_data.size(); i++) {
_tdigest_ptr->add(_explicit_data[i]);
}
for (int i = 0; i < other._explicit_data.size(); i++) {
_tdigest_ptr->add(other._explicit_data[i]);
}
} else {
_explicit_data.insert(_explicit_data.end(), other._explicit_data.begin(),
other._explicit_data.end());
}
break;
case TDIGEST:
for (int i = 0; i < other._explicit_data.size(); i++) {
_tdigest_ptr->add(other._explicit_data[i]);
}
break;
default:
break;
}
break;
}
case TDIGEST: {
switch (_type) {
case EMPTY:
_type = TDIGEST;
_tdigest_ptr = other._tdigest_ptr;
break;
case SINGLE:
_type = TDIGEST;
_tdigest_ptr = other._tdigest_ptr;
_tdigest_ptr->add(_single_data);
break;
case EXPLICIT:
_type = TDIGEST;
_tdigest_ptr = other._tdigest_ptr;
for (int i = 0; i < _explicit_data.size(); i++) {
_tdigest_ptr->add(_explicit_data[i]);
}
break;
case TDIGEST:
_tdigest_ptr->merge(other._tdigest_ptr.get());
break;
default:
break;
}
break;
}
default:
return;
}
}
void QuantileState::add_value(const double& value) {
switch (_type) {
case EMPTY:
_single_data = value;
_type = SINGLE;
break;
case SINGLE:
_explicit_data.emplace_back(_single_data);
_explicit_data.emplace_back(value);
_type = EXPLICIT;
break;
case EXPLICIT:
if (_explicit_data.size() == QUANTILE_STATE_EXPLICIT_NUM) {
_tdigest_ptr = std::make_shared<TDigest>(_compression);
for (int i = 0; i < _explicit_data.size(); i++) {
_tdigest_ptr->add(_explicit_data[i]);
}
_explicit_data.clear();
_explicit_data.shrink_to_fit();
_type = TDIGEST;
} else {
_explicit_data.emplace_back(value);
}
break;
case TDIGEST:
_tdigest_ptr->add(value);
break;
}
}
void QuantileState::clear() {
_type = EMPTY;
_tdigest_ptr.reset();
_explicit_data.clear();
_explicit_data.shrink_to_fit();
}
} // namespace doris