blob: 175f9c62de7920de3c2b65d345d1547ccbdbbe5b [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// This file is copied from
// https://github.com/ClickHouse/ClickHouse/blob/master/src/AggregateFunctions/AggregateFunctionMinMaxAny.h
// and modified by Doris
#pragma once
#include <fmt/format.h>
#include <glog/logging.h>
#include <string.h>
#include <memory>
#include <string>
#include <type_traits>
#include <vector>
#include "common/cast_set.h"
#include "common/compare.h"
#include "common/logging.h"
#include "runtime/primitive_type.h"
#include "runtime/type_limit.h"
#include "vec/aggregate_functions/aggregate_function.h"
#include "vec/columns/column.h"
#include "vec/columns/column_array.h"
#include "vec/columns/column_fixed_length_object.h"
#include "vec/columns/column_string.h"
#include "vec/common/assert_cast.h"
#include "vec/common/string_buffer.hpp"
#include "vec/common/string_ref.h"
#include "vec/core/types.h"
#include "vec/data_types/data_type.h"
#include "vec/data_types/data_type_fixed_length_object.h"
#include "vec/data_types/data_type_string.h"
#include "vec/io/io_helper.h"
namespace doris {
#include "common/compile_check_begin.h"
namespace vectorized {
class Arena;
template <PrimitiveType T>
class ColumnDecimal;
template <PrimitiveType T>
class ColumnVector;
} // namespace vectorized
} // namespace doris
namespace doris::vectorized {
/// For numeric values.
template <PrimitiveType T>
struct SingleValueDataFixed {
private:
using Self = SingleValueDataFixed;
bool has_value =
false; /// We need to remember if at least one value has been passed. This is necessary for AggregateFunctionIf.
typename PrimitiveTypeTraits<T>::ColumnItemType value;
public:
SingleValueDataFixed() = default;
SingleValueDataFixed(bool has_value_, typename PrimitiveTypeTraits<T>::ColumnItemType value_)
: has_value(has_value_), value(value_) {}
bool has() const { return has_value; }
constexpr static bool IsFixedLength = true;
void set_to_min_max(bool max) {
value = max ? Compare::max_value<typename PrimitiveTypeTraits<T>::ColumnItemType>()
: Compare::min_value<typename PrimitiveTypeTraits<T>::ColumnItemType>();
}
void change_if(const IColumn& column, size_t row_num, bool less) {
has_value = true;
value = less ? Compare::min(assert_cast<const typename PrimitiveTypeTraits<T>::ColumnType&,
TypeCheckOnRelease::DISABLE>(column)
.get_data()[row_num],
value)
: Compare::max(assert_cast<const typename PrimitiveTypeTraits<T>::ColumnType&,
TypeCheckOnRelease::DISABLE>(column)
.get_data()[row_num],
value);
}
void insert_result_into(IColumn& to) const {
if (has()) {
assert_cast<typename PrimitiveTypeTraits<T>::ColumnType&>(to).get_data().push_back(
value);
} else {
assert_cast<typename PrimitiveTypeTraits<T>::ColumnType&>(to).insert_default();
}
}
void reset() {
if (has()) {
has_value = false;
}
}
void write(BufferWritable& buf) const {
buf.write_binary(has());
if (has()) {
buf.write_binary(value);
}
}
void read(BufferReadable& buf, Arena&) {
buf.read_binary(has_value);
if (has()) {
buf.read_binary(value);
}
}
void change(const IColumn& column, size_t row_num, Arena&) {
has_value = true;
value = assert_cast<const typename PrimitiveTypeTraits<T>::ColumnType&,
TypeCheckOnRelease::DISABLE>(column)
.get_data()[row_num];
}
/// Assuming to.has()
void change(const Self& to, Arena&) {
has_value = true;
value = to.value;
}
bool change_if_less(const IColumn& column, size_t row_num, Arena& arena) {
if (!has() || Compare::less(assert_cast<const typename PrimitiveTypeTraits<T>::ColumnType&,
TypeCheckOnRelease::DISABLE>(column)
.get_data()[row_num],
value)) {
change(column, row_num, arena);
return true;
} else {
return false;
}
}
bool change_if_less(const Self& to, Arena& arena) {
if (to.has() && (!has() || Compare::less(to.value, value))) {
change(to, arena);
return true;
} else {
return false;
}
}
bool change_if_greater(const IColumn& column, size_t row_num, Arena& arena) {
if (!has() ||
Compare::greater(assert_cast<const typename PrimitiveTypeTraits<T>::ColumnType&,
TypeCheckOnRelease::DISABLE>(column)
.get_data()[row_num],
value)) {
change(column, row_num, arena);
return true;
} else {
return false;
}
}
bool check_if_equal(const IColumn& column, size_t row_num) const {
if (!has()) {
return false;
}
return Compare::equal(assert_cast<const typename PrimitiveTypeTraits<T>::ColumnType&,
TypeCheckOnRelease::DISABLE>(column)
.get_data()[row_num],
value);
}
bool change_if_greater(const Self& to, Arena& arena) {
if (to.has() && (!has() || Compare::greater(to.value, value))) {
change(to, arena);
return true;
} else {
return false;
}
}
void change_first_time(const IColumn& column, size_t row_num, Arena& arena) {
if (UNLIKELY(!has())) {
change(column, row_num, arena);
}
}
void change_first_time(const Self& to, Arena& arena) {
if (UNLIKELY(!has() && to.has())) {
change(to, arena);
}
}
};
/// For decimal values.
template <PrimitiveType T>
struct SingleValueDataDecimal {
private:
using Self = SingleValueDataDecimal;
bool has_value =
false; /// We need to remember if at least one value has been passed. This is necessary for AggregateFunctionIf.
typename PrimitiveTypeTraits<T>::ColumnItemType value;
public:
SingleValueDataDecimal() = default;
SingleValueDataDecimal(bool has_value_, typename PrimitiveTypeTraits<T>::ColumnItemType value_)
: has_value(has_value_), value(value_) {}
bool has() const { return has_value; }
constexpr static bool IsFixedLength = true;
void set_to_min_max(bool max) {
value = max ? Compare::max_value<typename PrimitiveTypeTraits<T>::ColumnItemType>()
: Compare::min_value<typename PrimitiveTypeTraits<T>::ColumnItemType>();
}
void change_if(const IColumn& column, size_t row_num, bool less) {
has_value = true;
value = less ? Compare::min(assert_cast<const typename PrimitiveTypeTraits<T>::ColumnType&,
TypeCheckOnRelease::DISABLE>(column)
.get_data()[row_num],
value)
: Compare::max(assert_cast<const typename PrimitiveTypeTraits<T>::ColumnType&,
TypeCheckOnRelease::DISABLE>(column)
.get_data()[row_num],
value);
}
void insert_result_into(IColumn& to) const {
if (has()) {
assert_cast<typename PrimitiveTypeTraits<T>::ColumnType&>(to).insert_data(
(const char*)&value, 0);
} else {
assert_cast<typename PrimitiveTypeTraits<T>::ColumnType&>(to).insert_default();
}
}
void reset() {
if (has()) {
has_value = false;
}
}
void write(BufferWritable& buf) const {
buf.write_binary(has());
if (has()) {
buf.write_binary(value);
}
}
void read(BufferReadable& buf, Arena&) {
buf.read_binary(has_value);
if (has()) {
buf.read_binary(value);
}
}
void change(const IColumn& column, size_t row_num, Arena&) {
has_value = true;
value = assert_cast<const typename PrimitiveTypeTraits<T>::ColumnType&,
TypeCheckOnRelease::DISABLE>(column)
.get_data()[row_num];
}
/// Assuming to.has()
void change(const Self& to, Arena&) {
has_value = true;
value = to.value;
}
bool change_if_less(const IColumn& column, size_t row_num, Arena& arena) {
if (!has() || Compare::less(assert_cast<const typename PrimitiveTypeTraits<T>::ColumnType&,
TypeCheckOnRelease::DISABLE>(column)
.get_data()[row_num],
value)) {
change(column, row_num, arena);
return true;
} else {
return false;
}
}
bool change_if_less(const Self& to, Arena& arena) {
if (to.has() && (!has() || Compare::less(to.value, value))) {
change(to, arena);
return true;
} else {
return false;
}
}
bool change_if_greater(const IColumn& column, size_t row_num, Arena& arena) {
if (!has() ||
Compare::greater(assert_cast<const typename PrimitiveTypeTraits<T>::ColumnType&,
TypeCheckOnRelease::DISABLE>(column)
.get_data()[row_num],
value)) {
change(column, row_num, arena);
return true;
} else {
return false;
}
}
bool change_if_greater(const Self& to, Arena& arena) {
if (to.has() && (!has() || Compare::greater(to.value, value))) {
change(to, arena);
return true;
} else {
return false;
}
}
bool check_if_equal(const IColumn& column, size_t row_num) const {
if (!has()) {
return false;
}
return Compare::equal(assert_cast<const typename PrimitiveTypeTraits<T>::ColumnType&,
TypeCheckOnRelease::DISABLE>(column)
.get_data()[row_num],
value);
}
void change_first_time(const IColumn& column, size_t row_num, Arena& arena) {
if (UNLIKELY(!has())) {
change(column, row_num, arena);
}
}
void change_first_time(const Self& to, Arena& arena) {
if (UNLIKELY(!has() && to.has())) {
change(to, arena);
}
}
};
/** For strings. Short strings are stored in the object itself, and long strings are allocated separately.
* NOTE It could also be suitable for arrays of numbers.
*/
struct SingleValueDataString {
private:
using Self = SingleValueDataString;
// This function uses int32 for storage, which triggers a 64-bit to 32-bit conversion warning.
// However, considering compatibility with future upgrades, no changes will be made here.
Int32 size = -1; /// -1 indicates that there is no value.
Int32 capacity = 0; /// power of two or zero
std::unique_ptr<char[]> large_data;
public:
static constexpr Int32 AUTOMATIC_STORAGE_SIZE = 64;
static constexpr Int32 MAX_SMALL_STRING_SIZE =
AUTOMATIC_STORAGE_SIZE - sizeof(size) - sizeof(capacity) - sizeof(large_data);
private:
char small_data[MAX_SMALL_STRING_SIZE]; /// Including the terminating zero.
public:
~SingleValueDataString() = default;
constexpr static bool IsFixedLength = false;
bool has() const { return size >= 0; }
const char* get_data() const {
return size <= MAX_SMALL_STRING_SIZE ? small_data : large_data.get();
}
void insert_result_into(IColumn& to) const {
if (has()) {
assert_cast<ColumnString&>(to).insert_data(get_data(), size);
} else {
assert_cast<ColumnString&>(to).insert_default();
}
}
void reset() {
if (size != -1) {
size = -1;
capacity = 0;
large_data = nullptr;
}
}
void write(BufferWritable& buf) const {
buf.write_binary(size);
if (has()) {
buf.write(get_data(), size);
}
}
void read(BufferReadable& buf, Arena&) {
Int32 rhs_size;
buf.read_binary(rhs_size);
if (rhs_size >= 0) {
if (rhs_size <= MAX_SMALL_STRING_SIZE) {
/// Don't free large_data here.
size = rhs_size;
if (size > 0) {
buf.read(small_data, size);
}
} else {
if (capacity < rhs_size) {
capacity = (Int32)round_up_to_power_of_two_or_zero(rhs_size);
large_data.reset(new char[capacity]);
}
size = rhs_size;
buf.read(large_data.get(), size);
}
} else {
/// Don't free large_data here.
size = rhs_size;
}
}
StringRef get_string_ref() const { return StringRef(get_data(), size); }
/// Assuming to.has()
void change_impl(StringRef value, Arena&) {
Int32 value_size = cast_set<Int32>(value.size);
if (value_size <= MAX_SMALL_STRING_SIZE) {
/// Don't free large_data here.
size = value_size;
if (size > 0) {
memcpy(small_data, value.data, size);
}
} else {
if (capacity < value_size) {
/// Don't free large_data here.
capacity = (Int32)round_up_to_power_of_two_or_zero(value_size);
large_data.reset(new char[capacity]);
}
size = value_size;
memcpy(large_data.get(), value.data, size);
}
}
void change(const IColumn& column, size_t row_num, Arena& arena) {
change_impl(
assert_cast<const ColumnString&, TypeCheckOnRelease::DISABLE>(column).get_data_at(
row_num),
arena);
}
void change(const Self& to, Arena& arena) { change_impl(to.get_string_ref(), arena); }
bool change_if_less(const IColumn& column, size_t row_num, Arena& arena) {
if (!has() ||
assert_cast<const ColumnString&, TypeCheckOnRelease::DISABLE>(column).get_data_at(
row_num) < get_string_ref()) {
change(column, row_num, arena);
return true;
} else {
return false;
}
}
bool change_if_greater(const IColumn& column, size_t row_num, Arena& arena) {
if (!has() ||
assert_cast<const ColumnString&, TypeCheckOnRelease::DISABLE>(column).get_data_at(
row_num) > get_string_ref()) {
change(column, row_num, arena);
return true;
} else {
return false;
}
}
bool change_if_less(const Self& to, Arena& arena) {
if (to.has() && (!has() || to.get_string_ref() < get_string_ref())) {
change(to, arena);
return true;
} else {
return false;
}
}
bool change_if_greater(const Self& to, Arena& arena) {
if (to.has() && (!has() || to.get_string_ref() > get_string_ref())) {
change(to, arena);
return true;
} else {
return false;
}
}
bool check_if_equal(const IColumn& column, size_t row_num) const {
if (!has()) {
return false;
}
return assert_cast<const ColumnString&, TypeCheckOnRelease::DISABLE>(column).get_data_at(
row_num) == get_string_ref();
}
void change_first_time(const IColumn& column, size_t row_num, Arena& arena) {
if (UNLIKELY(!has())) {
change(column, row_num, arena);
}
}
void change_first_time(const Self& to, Arena& arena) {
if (UNLIKELY(!has() && to.has())) {
change(to, arena);
}
}
};
struct SingleValueDataComplexType {
private:
using Self = SingleValueDataComplexType;
DataTypePtr column_type;
bool has_value = false;
MutableColumnPtr column_data; // a column ptr only save a single value
int be_exec_version = -1;
public:
SingleValueDataComplexType() = default;
SingleValueDataComplexType(const DataTypes& argument_types, int be_version) {
column_type = argument_types[0];
column_data = column_type->create_column();
be_exec_version = be_version;
}
bool has() const { return has_value; }
constexpr static bool IsFixedLength = false;
void insert_result_into(IColumn& to) const {
if (has()) {
to.insert_from(*column_data, 0);
} else {
to.insert_default();
}
}
void reset() {
has_value = false;
column_data->clear();
}
void write(BufferWritable& buf) const {
buf.write_binary(has_value);
if (!has()) {
return;
}
auto size_bytes =
column_type->get_uncompressed_serialized_bytes(*column_data, be_exec_version);
buf.write_binary(size_bytes);
buf.resize(size_bytes);
auto* p = column_type->serialize(*column_data, buf.data(), be_exec_version);
DCHECK_EQ(p, buf.data() + size_bytes);
buf.add_offset(size_bytes);
}
void read(BufferReadable& buf, Arena& arena) {
buf.read_binary(has_value);
if (!has()) {
return;
}
int64_t size;
buf.read_binary(size);
const auto* p = column_type->deserialize(buf.data(), &column_data, be_exec_version);
DCHECK_EQ(p, buf.data() + size);
buf.add_offset(size);
}
void change(const IColumn& column, size_t row_num, Arena&) {
has_value = true;
column_data->clear();
column_data->insert_from(column, row_num);
}
/// Assuming to.has()
void change(const Self& to, Arena&) {
has_value = true;
column_data->clear();
column_data->insert_from(*to.column_data, 0);
}
bool change_if_less(const IColumn& column, size_t row_num, Arena& arena) {
if (!has() || column_data->compare_at(0, row_num, column, -1) == 1) {
change(column, row_num, arena);
return true;
} else {
return false;
}
}
bool change_if_less(const Self& to, Arena& arena) {
if (to.has() && (!has() || column_data->compare_at(0, 0, *to.column_data, -1) == 1)) {
change(to, arena);
return true;
} else {
return false;
}
}
bool change_if_greater(const IColumn& column, size_t row_num, Arena& arena) {
if (!has() || column_data->compare_at(0, row_num, column, -1) == -1) {
change(column, row_num, arena);
return true;
} else {
return false;
}
}
bool change_if_greater(const Self& to, Arena& arena) {
if (to.has() && (!has() || column_data->compare_at(0, 0, *to.column_data, -1) == -1)) {
change(to, arena);
return true;
} else {
return false;
}
}
bool check_if_equal(const IColumn& column, size_t row_num) const {
if (!has()) {
return false;
}
auto type = column_type->get_primitive_type();
if (type == TYPE_BITMAP || type == TYPE_HLL || type == TYPE_QUANTILE_STATE ||
type == TYPE_AGG_STATE) {
return false;
} else {
return !column_data->compare_at(0, row_num, column, -1);
}
}
void change_first_time(const IColumn& column, size_t row_num, Arena& arena) {
if (UNLIKELY(!has())) {
change(column, row_num, arena);
}
}
void change_first_time(const Self& to, Arena& arena) {
if (UNLIKELY(!has() && to.has())) {
change(to, arena);
}
}
};
template <typename Data>
struct AggregateFunctionMaxData : public Data {
using Self = AggregateFunctionMaxData;
using Data::IsFixedLength;
constexpr static bool IS_ANY = false;
AggregateFunctionMaxData(const DataTypes& argument_types, int be_version)
: Data(argument_types, be_version) {
this->reset();
}
AggregateFunctionMaxData() { reset(); }
void change_if_better(const IColumn& column, size_t row_num, Arena& arena) {
if constexpr (Data::IsFixedLength) {
this->change_if(column, row_num, false);
} else {
this->change_if_greater(column, row_num, arena);
}
}
void change_if_better(const Self& to, Arena& arena) { this->change_if_greater(to, arena); }
void reset() {
if constexpr (Data::IsFixedLength) {
this->set_to_min_max(false);
}
Data::reset();
}
static const char* name() { return "max"; }
};
template <typename Data>
struct AggregateFunctionMinData : Data {
using Self = AggregateFunctionMinData;
using Data::IsFixedLength;
constexpr static bool IS_ANY = false;
AggregateFunctionMinData(const DataTypes& argument_types, int be_version)
: Data(argument_types, be_version) {
this->reset();
}
AggregateFunctionMinData() { reset(); }
void change_if_better(const IColumn& column, size_t row_num, Arena& arena) {
if constexpr (Data::IsFixedLength) {
this->change_if(column, row_num, true);
} else {
this->change_if_less(column, row_num, arena);
}
}
void change_if_better(const Self& to, Arena& arena) { this->change_if_less(to, arena); }
void reset() {
if constexpr (Data::IsFixedLength) {
this->set_to_min_max(true);
}
Data::reset();
}
static const char* name() { return "min"; }
};
// this is used for plain type about any_value function
template <typename Data>
struct AggregateFunctionAnyData : Data {
using Self = AggregateFunctionAnyData;
using Data::IsFixedLength;
static const char* name() { return "any"; }
constexpr static bool IS_ANY = true;
AggregateFunctionAnyData(const DataTypes& argument_types, int be_version)
: Data(argument_types, be_version) {};
AggregateFunctionAnyData() {};
void change_if_better(const IColumn& column, size_t row_num, Arena& arena) {
this->change_first_time(column, row_num, arena);
}
void change_if_better(const Self& to, Arena& arena) { this->change_first_time(to, arena); }
};
template <typename Data>
class AggregateFunctionsSingleValue final
: public IAggregateFunctionDataHelper<Data, AggregateFunctionsSingleValue<Data>> {
private:
DataTypePtr& type;
using Base = IAggregateFunctionDataHelper<Data, AggregateFunctionsSingleValue<Data>>;
using IAggregateFunction::argument_types;
public:
AggregateFunctionsSingleValue(const DataTypes& arguments)
: IAggregateFunctionDataHelper<Data, AggregateFunctionsSingleValue<Data>>(arguments),
type(this->argument_types[0]) {}
void create(AggregateDataPtr __restrict place) const override {
if constexpr (std::is_same_v<Data, AggregateFunctionMaxData<SingleValueDataComplexType>> ||
std::is_same_v<Data, AggregateFunctionMinData<SingleValueDataComplexType>> ||
std::is_same_v<Data, AggregateFunctionAnyData<SingleValueDataComplexType>>) {
new (place) Data(argument_types, IAggregateFunction::version);
} else {
new (place) Data;
}
}
String get_name() const override { return Data::name(); }
DataTypePtr get_return_type() const override { return type; }
void add(AggregateDataPtr __restrict place, const IColumn** columns, ssize_t row_num,
Arena& arena) const override {
this->data(place).change_if_better(*columns[0], row_num, arena);
}
void add_batch_single_place(size_t batch_size, AggregateDataPtr place, const IColumn** columns,
Arena& arena) const override {
if constexpr (Data::IS_ANY) {
DCHECK_GT(batch_size, 0);
this->data(place).change_if_better(*columns[0], 0, arena);
} else {
Base::add_batch_single_place(batch_size, place, columns, arena);
}
}
void reset(AggregateDataPtr place) const override { this->data(place).reset(); }
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs,
Arena& arena) const override {
this->data(place).change_if_better(this->data(rhs), arena);
}
void serialize(ConstAggregateDataPtr __restrict place, BufferWritable& buf) const override {
this->data(place).write(buf);
}
void deserialize(AggregateDataPtr __restrict place, BufferReadable& buf,
Arena& arena) const override {
this->data(place).read(buf, arena);
}
void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn& to) const override {
this->data(place).insert_result_into(to);
}
void deserialize_from_column(AggregateDataPtr places, const IColumn& column, Arena& arena,
size_t num_rows) const override {
if constexpr (Data::IsFixedLength) {
const auto& col = assert_cast<const ColumnFixedLengthObject&>(column);
auto* column_data = reinterpret_cast<const Data*>(col.get_data().data());
Data* data = reinterpret_cast<Data*>(places);
for (size_t i = 0; i != num_rows; ++i) {
data[i] = column_data[i];
}
} else {
Base::deserialize_from_column(places, column, arena, num_rows);
}
}
void serialize_to_column(const std::vector<AggregateDataPtr>& places, size_t offset,
MutableColumnPtr& dst, const size_t num_rows) const override {
if constexpr (Data::IsFixedLength) {
auto& dst_column = assert_cast<ColumnFixedLengthObject&>(*dst);
dst_column.resize(num_rows);
auto* dst_data = reinterpret_cast<Data*>(dst_column.get_data().data());
for (size_t i = 0; i != num_rows; ++i) {
dst_data[i] = this->data(places[i] + offset);
}
} else {
Base::serialize_to_column(places, offset, dst, num_rows);
}
}
void streaming_agg_serialize_to_column(const IColumn** columns, MutableColumnPtr& dst,
const size_t num_rows, Arena& arena) const override {
if constexpr (Data::IsFixedLength) {
auto& dst_column = assert_cast<ColumnFixedLengthObject&>(*dst);
dst_column.resize(num_rows);
auto* dst_data = reinterpret_cast<Data*>(dst_column.get_data().data());
for (size_t i = 0; i != num_rows; ++i) {
dst_data[i].change(*columns[0], i, arena);
}
} else {
Base::streaming_agg_serialize_to_column(columns, dst, num_rows, arena);
}
}
void deserialize_and_merge_from_column(AggregateDataPtr __restrict place, const IColumn& column,
Arena& arena) const override {
if constexpr (Data::IsFixedLength) {
const auto& col = assert_cast<const ColumnFixedLengthObject&>(column);
auto* column_data = reinterpret_cast<const Data*>(col.get_data().data());
const size_t num_rows = column.size();
for (size_t i = 0; i != num_rows; ++i) {
this->data(place).change_if_better(column_data[i], arena);
}
} else {
Base::deserialize_and_merge_from_column(place, column, arena);
}
}
void deserialize_and_merge_from_column_range(AggregateDataPtr __restrict place,
const IColumn& column, size_t begin, size_t end,
Arena& arena) const override {
if constexpr (Data::IsFixedLength) {
DCHECK(end <= column.size() && begin <= end) << ", begin:" << begin << ", end:" << end
<< ", column.size():" << column.size();
auto& col = assert_cast<const ColumnFixedLengthObject&>(column);
auto* data = reinterpret_cast<const Data*>(col.get_data().data());
for (size_t i = begin; i <= end; ++i) {
this->data(place).change_if_better(data[i], arena);
}
} else {
Base::deserialize_and_merge_from_column_range(place, column, begin, end, arena);
}
}
void deserialize_and_merge_vec(const AggregateDataPtr* places, size_t offset,
AggregateDataPtr rhs, const IColumn* column, Arena& arena,
const size_t num_rows) const override {
this->deserialize_from_column(rhs, *column, arena, num_rows);
DEFER({ this->destroy_vec(rhs, num_rows); });
this->merge_vec(places, offset, rhs, arena, num_rows);
}
void deserialize_and_merge_vec_selected(const AggregateDataPtr* places, size_t offset,
AggregateDataPtr rhs, const IColumn* column,
Arena& arena, const size_t num_rows) const override {
this->deserialize_from_column(rhs, *column, arena, num_rows);
DEFER({ this->destroy_vec(rhs, num_rows); });
this->merge_vec_selected(places, offset, rhs, arena, num_rows);
}
void serialize_without_key_to_column(ConstAggregateDataPtr __restrict place,
IColumn& to) const override {
if constexpr (Data::IsFixedLength) {
auto& col = assert_cast<ColumnFixedLengthObject&>(to);
size_t old_size = col.size();
col.resize(old_size + 1);
*(reinterpret_cast<Data*>(col.get_data().data()) + old_size) = this->data(place);
} else {
Base::serialize_without_key_to_column(place, to);
}
}
MutableColumnPtr create_serialize_column() const override {
if constexpr (Data::IsFixedLength) {
return ColumnFixedLengthObject::create(sizeof(Data));
} else {
return ColumnString::create();
}
}
DataTypePtr get_serialized_type() const override {
if constexpr (Data::IsFixedLength) {
return std::make_shared<DataTypeFixedLengthObject>();
} else {
return std::make_shared<DataTypeString>();
}
}
bool supported_incremental_mode() const override { return !(Data::IS_ANY); }
void execute_function_with_incremental(int64_t partition_start, int64_t partition_end,
int64_t frame_start, int64_t frame_end,
AggregateDataPtr place, const IColumn** columns,
Arena& arena, bool previous_is_nul, bool end_is_nul,
bool has_null, UInt8* use_null_result,
UInt8* could_use_previous_result) const override {
int64_t current_frame_start = std::max<int64_t>(frame_start, partition_start);
int64_t current_frame_end = std::min<int64_t>(frame_end, partition_end);
if (current_frame_start >= current_frame_end) {
*use_null_result = true;
return;
}
if (*could_use_previous_result) {
auto outcoming_pos = frame_start - 1;
auto incoming_pos = frame_end - 1;
if (!previous_is_nul && outcoming_pos >= partition_start &&
outcoming_pos < partition_end) {
if (this->data(place).check_if_equal(*columns[0], outcoming_pos)) {
this->data(place).reset();
if (has_null) {
const auto& null_map_data =
assert_cast<const ColumnUInt8*>(columns[1])->get_data();
for (size_t i = current_frame_start; i < current_frame_end; ++i) {
if (null_map_data[i] == 0) {
this->data(place).change_if_better(*columns[0], i, arena);
}
}
} else {
this->add_range_single_place(partition_start, partition_end,
current_frame_start, current_frame_end, place,
columns, arena, use_null_result,
could_use_previous_result);
}
return;
}
}
if (!end_is_nul && incoming_pos >= partition_start && incoming_pos < partition_end) {
this->data(place).change_if_better(*columns[0], incoming_pos, arena);
}
} else {
this->add_range_single_place(partition_start, partition_end, frame_start, frame_end,
place, columns, arena, use_null_result,
could_use_previous_result);
}
}
void add_range_single_place(int64_t partition_start, int64_t partition_end, int64_t frame_start,
int64_t frame_end, AggregateDataPtr place, const IColumn** columns,
Arena& arena, UInt8* use_null_result,
UInt8* could_use_previous_result) const override {
auto current_frame_start = std::max<int64_t>(frame_start, partition_start);
auto current_frame_end = std::min<int64_t>(frame_end, partition_end);
if (current_frame_start >= current_frame_end) {
if (!*could_use_previous_result) {
*use_null_result = true;
}
} else {
for (size_t row_num = current_frame_start; row_num < current_frame_end; ++row_num) {
this->data(place).change_if_better(*columns[0], row_num, arena);
}
*use_null_result = false;
*could_use_previous_result = true;
}
}
};
template <template <typename> class Data>
AggregateFunctionPtr create_aggregate_function_single_value(const String& name,
const DataTypes& argument_types,
const DataTypePtr& result_type,
const bool result_is_nullable,
const AggregateFunctionAttr& attr = {});
template <template <typename> class Data>
AggregateFunctionPtr create_aggregate_function_single_value_any_value_function(
const String& name, const DataTypes& argument_types, const DataTypePtr& result_type,
const bool result_is_nullable, const AggregateFunctionAttr& attr = {});
} // namespace doris::vectorized
#include "common/compile_check_end.h"