blob: a9b03a5f4106260607b063947bd7a0d3f2d6e408 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// This file is copied from
// https://github.com/ClickHouse/ClickHouse/blob/master/src/Processors/Transforms/WindowTransform.h
// and modified by Doris
#pragma once
#include <glog/logging.h>
#include <algorithm>
#include <boost/iterator/iterator_facade.hpp>
#include <cstdint>
#include <memory>
#include "vec/aggregate_functions/aggregate_function.h"
#include "vec/aggregate_functions/aggregate_function_reader_first_last.h"
#include "vec/columns/column.h"
#include "vec/columns/column_nullable.h"
#include "vec/columns/column_vector.h"
#include "vec/common/assert_cast.h"
#include "vec/common/string_ref.h"
#include "vec/core/types.h"
#include "vec/data_types/data_type_number.h"
namespace doris::vectorized {
#include "common/compile_check_begin.h"
class Arena;
class BufferReadable;
class BufferWritable;
struct RowNumberData {
int64_t count = 0;
};
class WindowFunctionRowNumber final
: public IAggregateFunctionDataHelper<RowNumberData, WindowFunctionRowNumber> {
public:
WindowFunctionRowNumber(const DataTypes& argument_types_)
: IAggregateFunctionDataHelper(argument_types_) {}
String get_name() const override { return "row_number"; }
DataTypePtr get_return_type() const override { return std::make_shared<DataTypeInt64>(); }
void add(AggregateDataPtr place, const IColumn**, ssize_t, Arena&) const override {
++data(place).count;
}
void add_range_single_place(int64_t partition_start, int64_t partition_end, int64_t frame_start,
int64_t frame_end, AggregateDataPtr place, const IColumn** columns,
Arena&, UInt8*, UInt8*) const override {
++data(place).count;
}
void reset(AggregateDataPtr place) const override {
WindowFunctionRowNumber::data(place).count = 0;
}
void insert_result_into(ConstAggregateDataPtr place, IColumn& to) const override {
assert_cast<ColumnInt64&, TypeCheckOnRelease::DISABLE>(to).get_data().push_back(
doris::vectorized::WindowFunctionRowNumber::data(place).count);
}
bool result_column_could_resize() const override { return true; }
void insert_result_into_range(ConstAggregateDataPtr __restrict place, IColumn& to,
const size_t start, const size_t end) const override {
auto& column = assert_cast<ColumnInt64&, TypeCheckOnRelease::DISABLE>(to);
for (size_t i = start; i < end; ++i) {
column.get_data()[i] = (doris::vectorized::WindowFunctionRowNumber::data(place).count);
}
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena&) const override {}
void serialize(ConstAggregateDataPtr place, BufferWritable& buf) const override {}
void deserialize(AggregateDataPtr place, BufferReadable& buf, Arena&) const override {}
};
struct RankData {
int64_t rank = 0;
int64_t count = 1;
int64_t peer_group_start = -1;
};
class WindowFunctionRank final : public IAggregateFunctionDataHelper<RankData, WindowFunctionRank> {
public:
WindowFunctionRank(const DataTypes& argument_types_)
: IAggregateFunctionDataHelper(argument_types_) {}
String get_name() const override { return "rank"; }
DataTypePtr get_return_type() const override { return std::make_shared<DataTypeInt64>(); }
void add(AggregateDataPtr place, const IColumn**, ssize_t, Arena&) const override {
++data(place).rank;
}
void add_range_single_place(int64_t partition_start, int64_t partition_end, int64_t frame_start,
int64_t frame_end, AggregateDataPtr place, const IColumn** columns,
Arena&, UInt8*, UInt8*) const override {
int64_t peer_group_count = frame_end - frame_start;
if (WindowFunctionRank::data(place).peer_group_start != frame_start) {
WindowFunctionRank::data(place).peer_group_start = frame_start;
WindowFunctionRank::data(place).rank += WindowFunctionRank::data(place).count;
}
WindowFunctionRank::data(place).count = peer_group_count;
}
void reset(AggregateDataPtr place) const override {
WindowFunctionRank::data(place).rank = 0;
WindowFunctionRank::data(place).count = 1;
WindowFunctionRank::data(place).peer_group_start = -1;
}
void insert_result_into(ConstAggregateDataPtr place, IColumn& to) const override {
assert_cast<ColumnInt64&, TypeCheckOnRelease::DISABLE>(to).get_data().push_back(
data(place).rank);
}
bool result_column_could_resize() const override { return true; }
void insert_result_into_range(ConstAggregateDataPtr __restrict place, IColumn& to,
const size_t start, const size_t end) const override {
auto& column = assert_cast<ColumnInt64&, TypeCheckOnRelease::DISABLE>(to);
for (size_t i = start; i < end; ++i) {
column.get_data()[i] = (doris::vectorized::WindowFunctionRank::data(place).rank);
}
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena&) const override {}
void serialize(ConstAggregateDataPtr place, BufferWritable& buf) const override {}
void deserialize(AggregateDataPtr place, BufferReadable& buf, Arena&) const override {}
};
struct DenseRankData {
int64_t rank = 0;
int64_t peer_group_start = -1;
};
class WindowFunctionDenseRank final
: public IAggregateFunctionDataHelper<DenseRankData, WindowFunctionDenseRank> {
public:
WindowFunctionDenseRank(const DataTypes& argument_types_)
: IAggregateFunctionDataHelper(argument_types_) {}
String get_name() const override { return "dense_rank"; }
DataTypePtr get_return_type() const override { return std::make_shared<DataTypeInt64>(); }
void add(AggregateDataPtr place, const IColumn**, ssize_t, Arena&) const override {
++data(place).rank;
}
void add_range_single_place(int64_t partition_start, int64_t partition_end, int64_t frame_start,
int64_t frame_end, AggregateDataPtr place, const IColumn** columns,
Arena&, UInt8*, UInt8*) const override {
if (WindowFunctionDenseRank::data(place).peer_group_start != frame_start) {
WindowFunctionDenseRank::data(place).peer_group_start = frame_start;
WindowFunctionDenseRank::data(place).rank++;
}
}
void reset(AggregateDataPtr place) const override {
WindowFunctionDenseRank::data(place).rank = 0;
WindowFunctionDenseRank::data(place).peer_group_start = -1;
}
void insert_result_into(ConstAggregateDataPtr place, IColumn& to) const override {
assert_cast<ColumnInt64&, TypeCheckOnRelease::DISABLE>(to).get_data().push_back(
data(place).rank);
}
bool result_column_could_resize() const override { return true; }
void insert_result_into_range(ConstAggregateDataPtr __restrict place, IColumn& to,
const size_t start, const size_t end) const override {
auto& column = assert_cast<ColumnInt64&, TypeCheckOnRelease::DISABLE>(to);
for (size_t i = start; i < end; ++i) {
column.get_data()[i] = (doris::vectorized::WindowFunctionDenseRank::data(place).rank);
}
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena&) const override {}
void serialize(ConstAggregateDataPtr place, BufferWritable& buf) const override {}
void deserialize(AggregateDataPtr place, BufferReadable& buf, Arena&) const override {}
};
struct PercentRankData {
int64_t rank = 0;
int64_t count = 1;
int64_t peer_group_start = -1;
int64_t partition_size = 0;
};
class WindowFunctionPercentRank final
: public IAggregateFunctionDataHelper<PercentRankData, WindowFunctionPercentRank> {
private:
static double _cal_percent(int64_t rank, int64_t total_rows) {
return total_rows <= 1 ? 0.0 : double(rank - 1) * 1.0 / double(total_rows - 1);
}
public:
WindowFunctionPercentRank(const DataTypes& argument_types_)
: IAggregateFunctionDataHelper(argument_types_) {}
String get_name() const override { return "percent_rank"; }
DataTypePtr get_return_type() const override { return std::make_shared<DataTypeFloat64>(); }
void add(AggregateDataPtr place, const IColumn**, ssize_t, Arena&) const override {}
void add_range_single_place(int64_t partition_start, int64_t partition_end, int64_t frame_start,
int64_t frame_end, AggregateDataPtr place, const IColumn** columns,
Arena&, UInt8*, UInt8*) const override {
int64_t peer_group_count = frame_end - frame_start;
if (WindowFunctionPercentRank::data(place).peer_group_start != frame_start) {
WindowFunctionPercentRank::data(place).peer_group_start = frame_start;
WindowFunctionPercentRank::data(place).rank +=
WindowFunctionPercentRank::data(place).count;
// some variables are partition related, but there is no chance to init them
// when the new partition arrives, so we calculate them every time now.
WindowFunctionPercentRank::data(place).partition_size = partition_end - partition_start;
}
WindowFunctionPercentRank::data(place).count = peer_group_count;
}
void reset(AggregateDataPtr place) const override {
WindowFunctionPercentRank::data(place).rank = 0;
WindowFunctionPercentRank::data(place).count = 1;
WindowFunctionPercentRank::data(place).peer_group_start = -1;
WindowFunctionPercentRank::data(place).partition_size = 0;
}
void insert_result_into(ConstAggregateDataPtr place, IColumn& to) const override {
auto percent_rank = _cal_percent(data(place).rank, data(place).partition_size);
assert_cast<ColumnFloat64&, TypeCheckOnRelease::DISABLE>(to).get_data().push_back(
percent_rank);
}
bool result_column_could_resize() const override { return true; }
void insert_result_into_range(ConstAggregateDataPtr __restrict place, IColumn& to,
const size_t start, const size_t end) const override {
auto& column = assert_cast<ColumnFloat64&, TypeCheckOnRelease::DISABLE>(to);
auto percent_rank = _cal_percent(data(place).rank, data(place).partition_size);
for (size_t i = start; i < end; ++i) {
column.get_data()[i] = percent_rank;
}
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena&) const override {}
void serialize(ConstAggregateDataPtr place, BufferWritable& buf) const override {}
void deserialize(AggregateDataPtr place, BufferReadable& buf, Arena&) const override {}
};
struct CumeDistData {
int64_t numerator = 0;
int64_t denominator = 0;
int64_t peer_group_start = -1;
};
class WindowFunctionCumeDist final
: public IAggregateFunctionDataHelper<CumeDistData, WindowFunctionCumeDist> {
private:
static void check_default(AggregateDataPtr place, int64_t partition_start,
int64_t partition_end) {
if (data(place).denominator == 0) {
data(place).denominator = partition_end - partition_start;
}
}
public:
WindowFunctionCumeDist(const DataTypes& argument_types_)
: IAggregateFunctionDataHelper(argument_types_) {}
String get_name() const override { return "cume_dist"; }
DataTypePtr get_return_type() const override { return std::make_shared<DataTypeFloat64>(); }
void add(AggregateDataPtr place, const IColumn**, ssize_t, Arena&) const override {}
void add_range_single_place(int64_t partition_start, int64_t partition_end, int64_t frame_start,
int64_t frame_end, AggregateDataPtr place, const IColumn** columns,
Arena&, UInt8*, UInt8*) const override {
check_default(place, partition_start, partition_end);
int64_t peer_group_count = frame_end - frame_start;
if (WindowFunctionCumeDist::data(place).peer_group_start != frame_start) {
WindowFunctionCumeDist::data(place).peer_group_start = frame_start;
WindowFunctionCumeDist::data(place).numerator += peer_group_count;
}
}
void reset(AggregateDataPtr place) const override {
WindowFunctionCumeDist::data(place).numerator = 0;
WindowFunctionCumeDist::data(place).denominator = 0;
WindowFunctionCumeDist::data(place).peer_group_start = -1;
}
void insert_result_into(ConstAggregateDataPtr place, IColumn& to) const override {
auto cume_dist = (double)data(place).numerator * 1.0 / (double)data(place).denominator;
assert_cast<ColumnFloat64&, TypeCheckOnRelease::DISABLE>(to).get_data().push_back(
cume_dist);
}
bool result_column_could_resize() const override { return true; }
void insert_result_into_range(ConstAggregateDataPtr __restrict place, IColumn& to,
const size_t start, const size_t end) const override {
auto& column = assert_cast<ColumnFloat64&, TypeCheckOnRelease::DISABLE>(to);
auto cume_dist = (double)data(place).numerator * 1.0 / (double)data(place).denominator;
for (size_t i = start; i < end; ++i) {
column.get_data()[i] = cume_dist;
}
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena&) const override {}
void serialize(ConstAggregateDataPtr place, BufferWritable& buf) const override {}
void deserialize(AggregateDataPtr place, BufferReadable& buf, Arena&) const override {}
};
struct NTileData {
int64_t bucket_index = 0;
int64_t rows = 0;
};
class WindowFunctionNTile final
: public IAggregateFunctionDataHelper<NTileData, WindowFunctionNTile>,
UnaryExpression,
NullableAggregateFunction {
public:
WindowFunctionNTile(const DataTypes& argument_types_)
: IAggregateFunctionDataHelper(argument_types_) {}
String get_name() const override { return "ntile"; }
DataTypePtr get_return_type() const override { return std::make_shared<DataTypeInt64>(); }
void add(AggregateDataPtr place, const IColumn**, ssize_t, Arena&) const override {}
void add_range_single_place(int64_t partition_start, int64_t partition_end, int64_t frame_start,
int64_t frame_end, AggregateDataPtr place, const IColumn** columns,
Arena&, UInt8*, UInt8*) const override {
// some variables are partition related, but there is no chance to init them
// when the new partition arrives, so we calculate them every time now.
// Partition = big_bucket_num * big_bucket_size + small_bucket_num * small_bucket_size
int64_t row_index = ++WindowFunctionNTile::data(place).rows - 1;
int64_t bucket_num = columns[0]->get_int(0);
int64_t partition_size = partition_end - partition_start;
int64_t small_bucket_size = partition_size / bucket_num;
int64_t big_bucket_num = partition_size % bucket_num;
int64_t first_small_bucket_row_index = big_bucket_num * (small_bucket_size + 1);
if (row_index >= first_small_bucket_row_index) {
// small_bucket_size can't be zero
WindowFunctionNTile::data(place).bucket_index =
big_bucket_num + 1 +
(row_index - first_small_bucket_row_index) / small_bucket_size;
} else {
WindowFunctionNTile::data(place).bucket_index = row_index / (small_bucket_size + 1) + 1;
}
}
void reset(AggregateDataPtr place) const override { WindowFunctionNTile::data(place).rows = 0; }
void insert_result_into(ConstAggregateDataPtr place, IColumn& to) const override {
assert_cast<ColumnInt64&, TypeCheckOnRelease::DISABLE>(to).get_data().push_back(
WindowFunctionNTile::data(place).bucket_index);
}
bool result_column_could_resize() const override { return true; }
void insert_result_into_range(ConstAggregateDataPtr __restrict place, IColumn& to,
const size_t start, const size_t end) const override {
auto& column = assert_cast<ColumnInt64&, TypeCheckOnRelease::DISABLE>(to);
for (size_t i = start; i < end; ++i) {
column.get_data()[i] = WindowFunctionNTile::data(place).bucket_index;
}
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena&) const override {}
void serialize(ConstAggregateDataPtr place, BufferWritable& buf) const override {}
void deserialize(AggregateDataPtr place, BufferReadable& buf, Arena&) const override {}
};
template <typename ColVecType, bool result_is_nullable, bool arg_is_nullable>
struct FirstLastData
: public ReaderFirstAndLastData<ColVecType, result_is_nullable, arg_is_nullable, false> {
public:
void set_is_null() { this->_data_value.reset(); }
};
template <typename ColVecType, bool result_is_nullable, bool arg_is_nullable>
struct NthValueData : public FirstLastData<ColVecType, result_is_nullable, arg_is_nullable> {
public:
void reset() {
this->_data_value.reset();
this->_has_value = false;
this->_frame_start_pose = 0;
this->_frame_total_rows = 0;
}
int64_t _frame_start_pose = 0;
int64_t _frame_total_rows = 0;
};
template <typename ColVecType, bool arg_is_nullable>
struct BaseValue : public Value<ColVecType, arg_is_nullable> {
public:
bool is_null() const { return this->_ptr == nullptr; }
// because _ptr pointer to first_argument or third argument, so it's difficult to cast ptr
// so here will call virtual function
StringRef get_value() const { return this->_ptr->get_data_at(this->_offset); }
};
template <typename ColVecType, bool result_is_nullable, bool arg_is_nullable>
struct LeadLagData {
public:
static constexpr bool result_nullable = result_is_nullable;
void reset() {
_data_value.reset();
_is_inited = false;
_offset_value = 0;
}
void insert_result_into(IColumn& to) const {
if constexpr (result_is_nullable) {
if (_data_value.is_null()) {
auto& col = assert_cast<ColumnNullable&>(to);
col.insert_default();
} else {
auto& col = assert_cast<ColumnNullable&>(to);
StringRef value = _data_value.get_value();
col.insert_data(value.data, value.size);
}
} else {
StringRef value = _data_value.get_value();
to.insert_data(value.data, value.size);
}
}
void set_value(const IColumn** columns, size_t pos) {
if constexpr (arg_is_nullable) {
if (assert_cast<const ColumnNullable*, TypeCheckOnRelease::DISABLE>(columns[0])
->is_null_at(pos)) {
// ptr == nullptr means nullable
_data_value.reset();
return;
}
}
// here ptr is pointer to nullable column or not null column from first
_data_value.set_value(columns[0], pos);
}
void set_value_from_default(const IColumn* column, size_t pos) {
DCHECK_GE(pos, 0);
if (is_column_nullable(*column)) {
const auto* nullable_column =
assert_cast<const ColumnNullable*, TypeCheckOnRelease::DISABLE>(column);
if (nullable_column->is_null_at(pos)) {
this->_data_value.reset();
} else {
this->_data_value.set_value(nullable_column->get_nested_column_ptr().get(), pos);
}
} else {
this->_data_value.set_value(column, pos);
}
}
void set_offset_value(const IColumn* column) {
if (!_is_inited) {
const auto* column_number = assert_cast<const ColumnInt64*>(column);
_offset_value = column_number->get_data()[0];
_is_inited = true;
}
}
int64_t get_offset_value() const { return _offset_value; }
private:
BaseValue<ColVecType, arg_is_nullable> _data_value;
bool _is_inited = false;
int64_t _offset_value = 0;
};
template <typename Data, bool = false>
struct WindowFunctionLeadImpl : Data {
void add_range_single_place(int64_t partition_start, int64_t partition_end, int64_t frame_start,
int64_t frame_end, const IColumn** columns) {
if (frame_end > partition_end) { //output default value, win end is under partition
this->set_offset_value(columns[1]);
// eg: lead(column, 10, default_value), column size maybe 3 rows
// offset value 10 is from second argument, pos: 11 is calculated as frame_end
auto pos = frame_end - 1 - this->get_offset_value();
this->set_value_from_default(columns[2], pos);
return;
}
this->set_value(columns, frame_end - 1);
}
static const char* name() { return "lead"; }
};
template <typename Data, bool = false>
struct WindowFunctionLagImpl : Data {
void add_range_single_place(int64_t partition_start, int64_t partition_end, int64_t frame_start,
int64_t frame_end, const IColumn** columns) {
// window start is beyond partition
if (partition_start >= frame_end) { //[unbound preceding(0), offset preceding(-123)]
this->set_offset_value(columns[1]);
auto pos = frame_end - 1 + this->get_offset_value();
this->set_value_from_default(columns[2], pos);
return;
}
this->set_value(columns, frame_end - 1);
}
static const char* name() { return "lag"; }
};
template <typename Data, bool arg_ignore_null = false>
struct WindowFunctionFirstImpl : Data {
void add_range_single_place(int64_t partition_start, int64_t partition_end, int64_t frame_start,
int64_t frame_end, const IColumn** columns) {
// case 1: (has_set_value() = true && arg_ignore_null = false)
// case 2: (has_set_value() = true && arg_ignore_null = true && is_null() = false)
if ((this->has_set_value()) &&
(!arg_ignore_null || (arg_ignore_null && !this->is_null()))) {
return;
}
DCHECK_LE(frame_start, frame_end);
if (frame_start >= partition_end || frame_end <= partition_start) {
this->set_is_null();
return;
}
frame_start = std::max<int64_t>(frame_start, partition_start);
if constexpr (arg_ignore_null) {
frame_end = std::min<int64_t>(frame_end, partition_end);
if (columns[0]->is_nullable()) {
const auto& arg_nullable = assert_cast<const ColumnNullable&>(*columns[0]);
// the valid range is: [frame_start, frame_end)
while (frame_start < frame_end - 1 && arg_nullable.is_null_at(frame_start)) {
frame_start++;
}
}
}
this->set_value(columns, frame_start);
}
static const char* name() { return "first_value"; }
};
template <typename Data, bool arg_ignore_null = false>
struct WindowFunctionLastImpl : Data {
void add_range_single_place(int64_t partition_start, int64_t partition_end, int64_t frame_start,
int64_t frame_end, const IColumn** columns) {
DCHECK_LE(frame_start, frame_end);
if ((frame_end <= partition_start) ||
(frame_start >= partition_end)) { //beyond or under partition, set null
if ((this->has_set_value()) &&
(!arg_ignore_null || (arg_ignore_null && !this->is_null()))) {
// have set value, do nothing, because like rows unbouned preceding and M following
// it's caculated as the cumulative mode, so it's could reuse the previous
} else {
this->set_is_null();
}
return;
}
frame_end = std::min<int64_t>(frame_end, partition_end);
if constexpr (arg_ignore_null) {
frame_start = std::max<int64_t>(frame_start, partition_start);
if (columns[0]->is_nullable()) {
const auto& arg_nullable = assert_cast<const ColumnNullable&>(*columns[0]);
// wants find a not null value in [frame_start, frame_end)
// iff has find: set_value and return directly
// iff not find: the while loop is finished
// case 1: iff has_set_value, means the previous window have value, could reuse it, so return directly
// case 2: iff not has_set_value, means there is none value, set it's to NULL
while (frame_start < frame_end) {
if (arg_nullable.is_null_at(frame_end - 1)) {
frame_end--;
} else {
this->set_value(columns, frame_end - 1);
return;
}
}
if (!this->has_set_value()) {
this->set_is_null();
}
return;
}
}
this->set_value(columns, frame_end - 1);
}
static const char* name() { return "last_value"; }
};
template <typename Data, bool = false>
struct WindowFunctionNthValueImpl : Data {
void add_range_single_place(int64_t partition_start, int64_t partition_end, int64_t frame_start,
int64_t frame_end, const IColumn** columns) {
DCHECK_LE(frame_start, frame_end);
int64_t real_frame_start = std::max<int64_t>(frame_start, partition_start);
int64_t real_frame_end = std::min<int64_t>(frame_end, partition_end);
this->_frame_start_pose =
this->_frame_total_rows ? this->_frame_start_pose : real_frame_start;
this->_frame_total_rows += real_frame_end - real_frame_start;
int64_t offset = assert_cast<const ColumnInt64&, TypeCheckOnRelease::DISABLE>(*columns[1])
.get_data()[0] -
1;
if (offset >= this->_frame_total_rows) {
// offset is beyond the frame, so set null
this->set_is_null();
return;
}
this->set_value(columns, offset + this->_frame_start_pose);
}
static const char* name() { return "nth_value"; }
};
template <typename Data>
class WindowFunctionData final
: public IAggregateFunctionDataHelper<Data, WindowFunctionData<Data>> {
public:
WindowFunctionData(const DataTypes& argument_types_)
: IAggregateFunctionDataHelper<Data, WindowFunctionData<Data>>(argument_types_),
_argument_type(argument_types_[0]) {}
String get_name() const override { return Data::name(); }
DataTypePtr get_return_type() const override {
if constexpr (Data::result_nullable) {
return make_nullable(_argument_type);
} else {
return _argument_type;
}
}
void add_range_single_place(int64_t partition_start, int64_t partition_end, int64_t frame_start,
int64_t frame_end, AggregateDataPtr place, const IColumn** columns,
Arena&, UInt8*, UInt8*) const override {
this->data(place).add_range_single_place(partition_start, partition_end, frame_start,
frame_end, columns);
}
void reset(AggregateDataPtr place) const override { this->data(place).reset(); }
void insert_result_into(ConstAggregateDataPtr place, IColumn& to) const override {
this->data(place).insert_result_into(to);
}
void add(AggregateDataPtr place, const IColumn** columns, ssize_t row_num,
Arena&) const override {
throw doris::Exception(Status::FatalError("WindowFunctionLeadLagData do not support add"));
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena&) const override {
throw doris::Exception(
Status::FatalError("WindowFunctionLeadLagData do not support merge"));
}
void serialize(ConstAggregateDataPtr place, BufferWritable& buf) const override {
throw doris::Exception(
Status::FatalError("WindowFunctionLeadLagData do not support serialize"));
}
void deserialize(AggregateDataPtr place, BufferReadable& buf, Arena&) const override {
throw doris::Exception(
Status::FatalError("WindowFunctionLeadLagData do not support deserialize"));
}
private:
DataTypePtr _argument_type;
};
} // namespace doris::vectorized
#include "common/compile_check_end.h"