blob: 86db5afec8fd572fc5c203ebd954dc8bec3effdb [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "vec/aggregate_functions/aggregate_function.h"
#include "vec/columns/column.h"
#include "vec/columns/column_array.h"
#include "vec/columns/column_decimal.h"
#include "vec/columns/column_nullable.h"
#include "vec/columns/column_string.h"
#include "vec/common/assert_cast.h"
#include "vec/common/string_buffer.hpp"
#include "vec/common/string_ref.h"
#include "vec/core/types.h"
#include "vec/data_types/data_type.h"
#include "vec/data_types/data_type_array.h"
#include "vec/data_types/data_type_nullable.h"
#include "vec/io/io_helper.h"
namespace doris::vectorized {
#include "common/compile_check_begin.h"
class Arena;
template <PrimitiveType T>
struct AggregateFunctionArrayAggData {
static constexpr PrimitiveType PType = T;
using ElementType = typename PrimitiveTypeTraits<T>::ColumnItemType;
using ColVecType = typename PrimitiveTypeTraits<T>::ColumnType;
using Self = AggregateFunctionArrayAggData<T>;
MutableColumnPtr column_data;
ColVecType* nested_column = nullptr;
NullMap* null_map = nullptr;
AggregateFunctionArrayAggData(const DataTypes& argument_types) {
DataTypePtr column_type = make_nullable(argument_types[0]);
column_data = column_type->create_column();
null_map = &(assert_cast<ColumnNullable&>(*column_data).get_null_map_data());
nested_column = assert_cast<ColVecType*>(
assert_cast<ColumnNullable&>(*column_data).get_nested_column_ptr().get());
}
void add(const IColumn& column, size_t row_num) {
const auto& col = assert_cast<const ColumnNullable&, TypeCheckOnRelease::DISABLE>(column);
const auto& vec =
assert_cast<const ColVecType&, TypeCheckOnRelease::DISABLE>(col.get_nested_column())
.get_data();
null_map->push_back(col.get_null_map_data()[row_num]);
nested_column->get_data().push_back(vec[row_num]);
DCHECK(null_map->size() == nested_column->size());
}
void deserialize_and_merge(const IColumn& column, size_t row_num) {
const auto& to_arr = assert_cast<const ColumnArray&>(column);
const auto& to_nested_col = to_arr.get_data();
const auto* col_null = assert_cast<const ColumnNullable*>(&to_nested_col);
const auto& vec = assert_cast<const ColVecType&>(col_null->get_nested_column()).get_data();
auto start = to_arr.get_offsets()[row_num - 1];
auto end = start + to_arr.get_offsets()[row_num] - to_arr.get_offsets()[row_num - 1];
for (auto i = start; i < end; ++i) {
null_map->push_back(col_null->get_null_map_data()[i]);
nested_column->get_data().push_back(vec[i]);
}
}
void reset() {
null_map->clear();
nested_column->clear();
}
void insert_result_into(IColumn& to) const {
auto& to_arr = assert_cast<ColumnArray&>(to);
auto& to_nested_col = to_arr.get_data();
auto* col_null = assert_cast<ColumnNullable*>(&to_nested_col);
auto& vec = assert_cast<ColVecType&>(col_null->get_nested_column()).get_data();
size_t num_rows = null_map->size();
auto& nested_column_data = nested_column->get_data();
for (size_t i = 0; i < num_rows; ++i) {
col_null->get_null_map_data().push_back((*null_map)[i]);
vec.push_back(nested_column_data[i]);
}
to_arr.get_offsets().push_back(to_nested_col.size());
}
void write(BufferWritable& buf) const {
const size_t size = null_map->size();
buf.write_binary(size);
for (size_t i = 0; i < size; i++) {
buf.write_binary(null_map->data()[i]);
}
for (size_t i = 0; i < size; i++) {
buf.write_binary(nested_column->get_data()[i]);
}
}
void read(BufferReadable& buf) {
DCHECK(null_map);
DCHECK(null_map->empty());
size_t size = 0;
buf.read_binary(size);
null_map->resize(size);
nested_column->reserve(size);
for (size_t i = 0; i < size; i++) {
buf.read_binary(null_map->data()[i]);
}
ElementType data_value;
for (size_t i = 0; i < size; i++) {
buf.read_binary(data_value);
nested_column->get_data().push_back(data_value);
}
}
void merge(const Self& rhs) {
column_data->insert_range_from(*rhs.column_data, 0, rhs.column_data->size());
}
};
template <PrimitiveType T>
requires(is_string_type(T))
struct AggregateFunctionArrayAggData<T> {
static constexpr PrimitiveType PType = T;
using ElementType = StringRef;
using ColVecType = ColumnString;
using Self = AggregateFunctionArrayAggData<T>;
MutableColumnPtr column_data;
ColVecType* nested_column = nullptr;
NullMap* null_map = nullptr;
AggregateFunctionArrayAggData(const DataTypes& argument_types) {
DataTypePtr column_type = make_nullable(argument_types[0]);
column_data = column_type->create_column();
null_map = &(assert_cast<ColumnNullable&>(*column_data).get_null_map_data());
nested_column = assert_cast<ColVecType*>(
assert_cast<ColumnNullable&>(*column_data).get_nested_column_ptr().get());
}
void add(const IColumn& column, size_t row_num) {
const auto& col = assert_cast<const ColumnNullable&, TypeCheckOnRelease::DISABLE>(column);
const auto& vec = assert_cast<const ColVecType&, TypeCheckOnRelease::DISABLE>(
col.get_nested_column());
null_map->push_back(col.get_null_map_data()[row_num]);
nested_column->insert_from(vec, row_num);
DCHECK(null_map->size() == nested_column->size());
}
void deserialize_and_merge(const IColumn& column, size_t row_num) {
auto& to_arr = assert_cast<const ColumnArray&>(column);
auto& to_nested_col = to_arr.get_data();
auto col_null = assert_cast<const ColumnNullable*>(&to_nested_col);
const auto& vec = assert_cast<const ColVecType&>(col_null->get_nested_column());
auto start = to_arr.get_offsets()[row_num - 1];
auto end = start + to_arr.get_offsets()[row_num] - to_arr.get_offsets()[row_num - 1];
for (auto i = start; i < end; ++i) {
null_map->push_back(col_null->get_null_map_data()[i]);
nested_column->insert_from(vec, i);
}
}
void reset() {
null_map->clear();
nested_column->clear();
}
void insert_result_into(IColumn& to) const {
auto& to_arr = assert_cast<ColumnArray&>(to);
auto& to_nested_col = to_arr.get_data();
auto* col_null = assert_cast<ColumnNullable*>(&to_nested_col);
auto& vec = assert_cast<ColVecType&>(col_null->get_nested_column());
size_t num_rows = null_map->size();
for (size_t i = 0; i < num_rows; ++i) {
col_null->get_null_map_data().push_back((*null_map)[i]);
vec.insert_from(*nested_column, i);
}
to_arr.get_offsets().push_back(to_nested_col.size());
}
void write(BufferWritable& buf) const {
const size_t size = null_map->size();
buf.write_binary(size);
for (size_t i = 0; i < size; i++) {
buf.write_binary(null_map->data()[i]);
}
for (size_t i = 0; i < size; i++) {
buf.write_binary(nested_column->get_data_at(i));
}
}
void read(BufferReadable& buf) {
DCHECK(null_map);
DCHECK(null_map->empty());
size_t size = 0;
buf.read_binary(size);
null_map->resize(size);
nested_column->reserve(size);
for (size_t i = 0; i < size; i++) {
buf.read_binary(null_map->data()[i]);
}
StringRef s;
for (size_t i = 0; i < size; i++) {
buf.read_binary(s);
nested_column->insert_data(s.data, s.size);
}
}
void merge(const Self& rhs) {
column_data->insert_range_from(*rhs.column_data, 0, rhs.column_data->size());
}
};
template <PrimitiveType T>
requires(!is_string_type(T) && !is_int_or_bool(T) && !is_float_or_double(T) && !is_decimal(T) &&
!is_date_type(T) && !is_ip(T))
struct AggregateFunctionArrayAggData<T> {
static constexpr PrimitiveType PType = T;
using ElementType = StringRef;
using Self = AggregateFunctionArrayAggData<T>;
MutableColumnPtr column_data;
AggregateFunctionArrayAggData(const DataTypes& argument_types) {
DataTypePtr column_type = argument_types[0];
column_data = column_type->create_column();
}
void add(const IColumn& column, size_t row_num) { column_data->insert_from(column, row_num); }
void deserialize_and_merge(const IColumn& column, size_t row_num) {
const auto& to_arr = assert_cast<const ColumnArray&>(column);
const auto& to_nested_col = to_arr.get_data();
auto start = to_arr.get_offsets()[row_num - 1];
auto end = start + to_arr.get_offsets()[row_num] - to_arr.get_offsets()[row_num - 1];
for (auto i = start; i < end; ++i) {
column_data->insert_from(to_nested_col, i);
}
}
void reset() { column_data->clear(); }
void insert_result_into(IColumn& to) const {
auto& to_arr = assert_cast<ColumnArray&>(to);
auto& to_nested_col = to_arr.get_data();
size_t num_rows = column_data->size();
for (size_t i = 0; i < num_rows; ++i) {
to_nested_col.insert_from(*column_data, i);
}
to_arr.get_offsets().push_back(to_nested_col.size());
}
void write(BufferWritable& buf) const {
throw Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, "array_agg not support write");
}
void read(BufferReadable& buf) {
throw Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, "array_agg not support read");
}
void merge(const Self& rhs) {
throw Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, "array_agg not support merge");
}
};
//ShowNull is just used to support array_agg because array_agg needs to display NULL
//todo: Supports order by sorting for array_agg
template <typename Data>
class AggregateFunctionArrayAgg
: public IAggregateFunctionDataHelper<Data, AggregateFunctionArrayAgg<Data>, true>,
UnaryExpression,
NotNullableAggregateFunction {
public:
AggregateFunctionArrayAgg(const DataTypes& argument_types_)
: IAggregateFunctionDataHelper<Data, AggregateFunctionArrayAgg<Data>, true>(
{argument_types_}),
return_type(std::make_shared<DataTypeArray>(make_nullable(argument_types_[0]))) {}
std::string get_name() const override { return "array_agg"; }
DataTypePtr get_return_type() const override { return return_type; }
void add(AggregateDataPtr __restrict place, const IColumn** columns, ssize_t row_num,
Arena& arena) const override {
this->data(place).add(*columns[0], row_num);
}
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs,
Arena& arena) const override {
this->data(place).merge(this->data(rhs));
}
void serialize(ConstAggregateDataPtr __restrict place, BufferWritable& buf) const override {
this->data(place).write(buf);
}
void deserialize(AggregateDataPtr __restrict place, BufferReadable& buf,
Arena&) const override {
this->data(place).read(buf);
}
void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn& to) const override {
auto& to_arr = assert_cast<ColumnArray&>(to);
auto& to_nested_col = to_arr.get_data();
DCHECK(to_nested_col.is_nullable());
this->data(place).insert_result_into(to);
}
void serialize_without_key_to_column(ConstAggregateDataPtr __restrict place,
IColumn& to) const override {
this->data(place).insert_result_into(to);
}
void deserialize_and_merge_from_column(AggregateDataPtr __restrict place, const IColumn& column,
Arena& arena) const override {
const size_t num_rows = column.size();
for (size_t i = 0; i != num_rows; ++i) {
this->data(place).deserialize_and_merge(column, i);
}
}
void deserialize_and_merge_vec(const AggregateDataPtr* places, size_t offset,
AggregateDataPtr rhs, const IColumn* column, Arena& arena,
const size_t num_rows) const override {
for (size_t i = 0; i != num_rows; ++i) {
this->data(places[i] + offset).deserialize_and_merge(*column, i);
}
}
void deserialize_from_column(AggregateDataPtr places, const IColumn& column, Arena& arena,
size_t num_rows) const override {
for (size_t i = 0; i != num_rows; ++i) {
this->data(places).deserialize_and_merge(column, i);
}
}
void deserialize_and_merge_from_column_range(AggregateDataPtr __restrict place,
const IColumn& column, size_t begin, size_t end,
Arena& arena) const override {
DCHECK(end <= column.size() && begin <= end)
<< ", begin:" << begin << ", end:" << end << ", column.size():" << column.size();
for (size_t i = begin; i <= end; ++i) {
this->data(place).deserialize_and_merge(column, i);
}
}
void deserialize_and_merge_vec_selected(const AggregateDataPtr* places, size_t offset,
AggregateDataPtr rhs, const IColumn* column,
Arena& arena, const size_t num_rows) const override {
for (size_t i = 0; i != num_rows; ++i) {
if (places[i]) {
this->data(places[i] + offset).deserialize_and_merge(*column, i);
}
}
}
void serialize_to_column(const std::vector<AggregateDataPtr>& places, size_t offset,
MutableColumnPtr& dst, const size_t num_rows) const override {
for (size_t i = 0; i != num_rows; ++i) {
Data& data_ = this->data(places[i] + offset);
data_.insert_result_into(*dst);
}
}
void streaming_agg_serialize_to_column(const IColumn** columns, MutableColumnPtr& dst,
const size_t num_rows, Arena& arena) const override {
auto& to_arr = assert_cast<ColumnArray&>(*dst);
auto& to_nested_col = to_arr.get_data();
DCHECK(num_rows == columns[0]->size());
auto* col_null = assert_cast<ColumnNullable*>(&to_nested_col);
const auto& col_src = assert_cast<const ColumnNullable&>(*(columns[0]));
for (size_t i = 0; i < num_rows; ++i) {
col_null->get_null_map_data().push_back(col_src.get_null_map_data()[i]);
if constexpr (is_string_type(Data::PType)) {
auto& vec = assert_cast<ColumnString&, TypeCheckOnRelease::DISABLE>(
col_null->get_nested_column());
const auto& vec_src = assert_cast<const ColumnString&, TypeCheckOnRelease::DISABLE>(
col_src.get_nested_column());
vec.insert_from(vec_src, i);
} else if constexpr (!is_string_type(Data::PType) && !is_int_or_bool(Data::PType) &&
!is_float_or_double(Data::PType) && !is_decimal(Data::PType) &&
!is_date_type(Data::PType) && !is_ip(Data::PType)) {
auto& vec = col_null->get_nested_column();
vec.insert_from(col_src.get_nested_column(), i);
} else {
using ColVecType = typename PrimitiveTypeTraits<Data::PType>::ColumnType;
auto& vec = assert_cast<ColVecType&, TypeCheckOnRelease::DISABLE>(
col_null->get_nested_column())
.get_data();
auto& vec_src = assert_cast<const ColVecType&, TypeCheckOnRelease::DISABLE>(
col_src.get_nested_column())
.get_data();
vec.push_back(vec_src[i]);
}
to_arr.get_offsets().push_back(to_nested_col.size());
}
}
MutableColumnPtr create_serialize_column() const override {
return get_serialized_type()->create_column();
}
DataTypePtr get_serialized_type() const override { return return_type; }
private:
DataTypePtr return_type;
};
} // namespace doris::vectorized
#include "common/compile_check_end.h"