blob: cbf6d0f9bf85c26efc1f60e26276a327a60f5767 [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <glog/logging.h>
#include <gtest/gtest-message.h>
#include <gtest/gtest-test-part.h>
#include <stddef.h>
#include <memory>
#include <ostream>
#include "core/column/column_string.h"
#include "core/column/column_vector.h"
#include "core/data_type/data_type_date_or_datetime_v2.h"
#include "core/data_type/data_type_number.h"
#include "core/data_type/data_type_string.h"
#include "core/string_buffer.hpp"
#include "core/value/vdatetime_value.h"
#include "exprs/aggregate/aggregate_function.h"
#include "exprs/aggregate/aggregate_function_simple_factory.h"
#include "gtest/gtest_pred_impl.h"
namespace doris {
class IColumn;
} // namespace doris
namespace doris {
void register_aggregate_function_window_funnel_v2(AggregateFunctionSimpleFactory& factory);
class VWindowFunnelV2Test : public testing::Test {
public:
AggregateFunctionPtr agg_function;
VWindowFunnelV2Test() {}
void SetUp() {
AggregateFunctionSimpleFactory factory = AggregateFunctionSimpleFactory::instance();
DataTypes data_types = {
std::make_shared<DataTypeInt64>(), std::make_shared<DataTypeString>(),
std::make_shared<DataTypeDateTimeV2>(), std::make_shared<DataTypeUInt8>(),
std::make_shared<DataTypeUInt8>(), std::make_shared<DataTypeUInt8>(),
std::make_shared<DataTypeUInt8>()};
agg_function = factory.get("window_funnel_v2", data_types, nullptr, false,
BeExecVersionManager::get_newest_version());
EXPECT_NE(agg_function, nullptr);
}
void TearDown() {}
Arena arena;
};
TEST_F(VWindowFunnelV2Test, testEmpty) {
std::unique_ptr<char[]> memory(new char[agg_function->size_of_data()]);
AggregateDataPtr place = memory.get();
agg_function->create(place);
ColumnString buf;
VectorBufferWriter buf_writer(buf);
agg_function->serialize(place, buf_writer);
buf_writer.commit();
LOG(INFO) << "buf size : " << buf.size();
VectorBufferReader buf_reader(buf.get_data_at(0));
agg_function->deserialize(place, buf_reader, arena);
std::unique_ptr<char[]> memory2(new char[agg_function->size_of_data()]);
AggregateDataPtr place2 = memory2.get();
agg_function->create(place2);
agg_function->merge(place, place2, arena);
ColumnInt32 column_result;
agg_function->insert_result_into(place, column_result);
EXPECT_EQ(column_result.get_data()[0], 0);
ColumnInt32 column_result2;
agg_function->insert_result_into(place2, column_result2);
EXPECT_EQ(column_result2.get_data()[0], 0);
agg_function->destroy(place);
agg_function->destroy(place2);
}
TEST_F(VWindowFunnelV2Test, testSerialize) {
const int NUM_CONDS = 4;
auto column_mode = ColumnString::create();
for (int i = 0; i < NUM_CONDS; i++) {
column_mode->insert(Field::create_field<TYPE_STRING>("default"));
}
auto column_timestamp = ColumnDateTimeV2::create();
for (int i = 0; i < NUM_CONDS; i++) {
VecDateTimeValue time_value;
time_value.unchecked_set_time(2022, 2, 28, 0, 0, i);
auto dtv2 = time_value.to_datetime_v2();
column_timestamp->insert_data((char*)&dtv2, 0);
}
auto column_event1 = ColumnUInt8::create();
column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
auto column_event2 = ColumnUInt8::create();
column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event2->insert(Field::create_field<TYPE_BOOLEAN>(1));
column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
auto column_event3 = ColumnUInt8::create();
column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event3->insert(Field::create_field<TYPE_BOOLEAN>(1));
column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
auto column_event4 = ColumnUInt8::create();
column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event4->insert(Field::create_field<TYPE_BOOLEAN>(1));
auto column_window = ColumnInt64::create();
for (int i = 0; i < NUM_CONDS; i++) {
column_window->insert(Field::create_field<TYPE_BIGINT>(2));
}
std::unique_ptr<char[]> memory(new char[agg_function->size_of_data()]);
AggregateDataPtr place = memory.get();
agg_function->create(place);
const IColumn* column[7] = {column_window.get(), column_mode.get(), column_timestamp.get(),
column_event1.get(), column_event2.get(), column_event3.get(),
column_event4.get()};
for (int i = 0; i < NUM_CONDS; i++) {
agg_function->add(place, column, i, arena);
}
ColumnInt32 column_result;
agg_function->insert_result_into(place, column_result);
EXPECT_EQ(column_result.get_data()[0], 3);
ColumnString buf;
VectorBufferWriter buf_writer(buf);
agg_function->serialize(place, buf_writer);
buf_writer.commit();
agg_function->destroy(place);
std::unique_ptr<char[]> memory2(new char[agg_function->size_of_data()]);
AggregateDataPtr place2 = memory2.get();
agg_function->create(place2);
VectorBufferReader buf_reader(buf.get_data_at(0));
agg_function->deserialize(place2, buf_reader, arena);
ColumnInt32 column_result2;
agg_function->insert_result_into(place2, column_result2);
EXPECT_EQ(column_result2.get_data()[0], 3);
agg_function->destroy(place2);
}
TEST_F(VWindowFunnelV2Test, testDefaultSortedNoMerge) {
const int NUM_CONDS = 4;
auto column_mode = ColumnString::create();
for (int i = 0; i < NUM_CONDS; i++) {
column_mode->insert(Field::create_field<TYPE_STRING>("default"));
}
auto column_timestamp = ColumnDateTimeV2::create();
for (int i = 0; i < NUM_CONDS; i++) {
VecDateTimeValue time_value;
time_value.unchecked_set_time(2022, 2, 28, 0, 0, i);
auto dtv2 = time_value.to_datetime_v2();
column_timestamp->insert_data((char*)&dtv2, 0);
}
auto column_event1 = ColumnUInt8::create();
column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
auto column_event2 = ColumnUInt8::create();
column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event2->insert(Field::create_field<TYPE_BOOLEAN>(1));
column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
auto column_event3 = ColumnUInt8::create();
column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event3->insert(Field::create_field<TYPE_BOOLEAN>(1));
column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
auto column_event4 = ColumnUInt8::create();
column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event4->insert(Field::create_field<TYPE_BOOLEAN>(1));
for (int win = 0; win < NUM_CONDS + 1; win++) {
auto column_window = ColumnInt64::create();
for (int i = 0; i < NUM_CONDS; i++) {
column_window->insert(Field::create_field<TYPE_BIGINT>(win));
}
std::unique_ptr<char[]> memory(new char[agg_function->size_of_data()]);
AggregateDataPtr place = memory.get();
agg_function->create(place);
const IColumn* column[7] = {column_window.get(), column_mode.get(),
column_timestamp.get(), column_event1.get(),
column_event2.get(), column_event3.get(),
column_event4.get()};
for (int i = 0; i < NUM_CONDS; i++) {
agg_function->add(place, column, i, arena);
}
ColumnInt32 column_result;
agg_function->insert_result_into(place, column_result);
EXPECT_EQ(column_result.get_data()[0],
win < 0 ? 1 : (win < NUM_CONDS ? win + 1 : NUM_CONDS));
agg_function->destroy(place);
}
}
TEST_F(VWindowFunnelV2Test, testDefaultSortedMerge) {
const int NUM_CONDS = 4;
auto column_mode = ColumnString::create();
for (int i = 0; i < NUM_CONDS; i++) {
column_mode->insert(Field::create_field<TYPE_STRING>("default"));
}
auto column_timestamp = ColumnDateTimeV2::create();
for (int i = 0; i < NUM_CONDS; i++) {
VecDateTimeValue time_value;
time_value.unchecked_set_time(2022, 2, 28, 0, 0, i);
auto dtv2 = time_value.to_datetime_v2();
column_timestamp->insert_data((char*)&dtv2, 0);
}
auto column_event1 = ColumnUInt8::create();
column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
auto column_event2 = ColumnUInt8::create();
column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event2->insert(Field::create_field<TYPE_BOOLEAN>(1));
column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
auto column_event3 = ColumnUInt8::create();
column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event3->insert(Field::create_field<TYPE_BOOLEAN>(1));
column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
auto column_event4 = ColumnUInt8::create();
column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event4->insert(Field::create_field<TYPE_BOOLEAN>(1));
for (int win = 0; win < NUM_CONDS + 1; win++) {
auto column_window = ColumnInt64::create();
for (int i = 0; i < NUM_CONDS; i++) {
column_window->insert(Field::create_field<TYPE_BIGINT>(win));
}
std::unique_ptr<char[]> memory(new char[agg_function->size_of_data()]);
AggregateDataPtr place = memory.get();
agg_function->create(place);
const IColumn* column[7] = {column_window.get(), column_mode.get(),
column_timestamp.get(), column_event1.get(),
column_event2.get(), column_event3.get(),
column_event4.get()};
for (int i = 0; i < NUM_CONDS; i++) {
agg_function->add(place, column, i, arena);
}
std::unique_ptr<char[]> memory2(new char[agg_function->size_of_data()]);
AggregateDataPtr place2 = memory2.get();
agg_function->create(place2);
agg_function->merge(place2, place, arena);
ColumnInt32 column_result;
agg_function->insert_result_into(place2, column_result);
EXPECT_EQ(column_result.get_data()[0],
win < 0 ? 1 : (win < NUM_CONDS ? win + 1 : NUM_CONDS));
agg_function->destroy(place);
agg_function->destroy(place2);
}
}
TEST_F(VWindowFunnelV2Test, testDefaultReverseSortedNoMerge) {
const int NUM_CONDS = 4;
auto column_mode = ColumnString::create();
for (int i = 0; i < NUM_CONDS; i++) {
column_mode->insert(Field::create_field<TYPE_STRING>("default"));
}
auto column_timestamp = ColumnDateTimeV2::create();
for (int i = 0; i < NUM_CONDS; i++) {
VecDateTimeValue time_value;
time_value.unchecked_set_time(2022, 2, 28, 0, 0, NUM_CONDS - i);
auto dtv2 = time_value.to_datetime_v2();
column_timestamp->insert_data((char*)&dtv2, 0);
}
auto column_event1 = ColumnUInt8::create();
column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
auto column_event2 = ColumnUInt8::create();
column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event2->insert(Field::create_field<TYPE_BOOLEAN>(1));
column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
auto column_event3 = ColumnUInt8::create();
column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event3->insert(Field::create_field<TYPE_BOOLEAN>(1));
column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
auto column_event4 = ColumnUInt8::create();
column_event4->insert(Field::create_field<TYPE_BOOLEAN>(1));
column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
for (int win = 0; win < NUM_CONDS + 1; win++) {
auto column_window = ColumnInt64::create();
for (int i = 0; i < NUM_CONDS; i++) {
column_window->insert(Field::create_field<TYPE_BIGINT>(win));
}
std::unique_ptr<char[]> memory(new char[agg_function->size_of_data()]);
AggregateDataPtr place = memory.get();
agg_function->create(place);
const IColumn* column[7] = {column_window.get(), column_mode.get(),
column_timestamp.get(), column_event1.get(),
column_event2.get(), column_event3.get(),
column_event4.get()};
for (int i = 0; i < NUM_CONDS; i++) {
agg_function->add(place, column, i, arena);
}
LOG(INFO) << "win " << win;
ColumnInt32 column_result;
agg_function->insert_result_into(place, column_result);
EXPECT_EQ(column_result.get_data()[0],
win < 0 ? 1 : (win < NUM_CONDS ? win + 1 : NUM_CONDS));
agg_function->destroy(place);
}
}
TEST_F(VWindowFunnelV2Test, testDefaultReverseSortedMerge) {
const int NUM_CONDS = 4;
auto column_mode = ColumnString::create();
for (int i = 0; i < NUM_CONDS; i++) {
column_mode->insert(Field::create_field<TYPE_STRING>("default"));
}
auto column_timestamp = ColumnDateTimeV2::create();
for (int i = 0; i < NUM_CONDS; i++) {
VecDateTimeValue time_value;
time_value.unchecked_set_time(2022, 2, 28, 0, 0, NUM_CONDS - i);
auto dtv2 = time_value.to_datetime_v2();
column_timestamp->insert_data((char*)&dtv2, 0);
}
auto column_event1 = ColumnUInt8::create();
column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
auto column_event2 = ColumnUInt8::create();
column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event2->insert(Field::create_field<TYPE_BOOLEAN>(1));
column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
auto column_event3 = ColumnUInt8::create();
column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event3->insert(Field::create_field<TYPE_BOOLEAN>(1));
column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
auto column_event4 = ColumnUInt8::create();
column_event4->insert(Field::create_field<TYPE_BOOLEAN>(1));
column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
for (int win = 0; win < NUM_CONDS + 1; win++) {
auto column_window = ColumnInt64::create();
for (int i = 0; i < NUM_CONDS; i++) {
column_window->insert(Field::create_field<TYPE_BIGINT>(win));
}
std::unique_ptr<char[]> memory(new char[agg_function->size_of_data()]);
AggregateDataPtr place = memory.get();
agg_function->create(place);
const IColumn* column[7] = {column_window.get(), column_mode.get(),
column_timestamp.get(), column_event1.get(),
column_event2.get(), column_event3.get(),
column_event4.get()};
for (int i = 0; i < NUM_CONDS; i++) {
agg_function->add(place, column, i, arena);
}
std::unique_ptr<char[]> memory2(new char[agg_function->size_of_data()]);
AggregateDataPtr place2 = memory2.get();
agg_function->create(place2);
agg_function->merge(place2, place, arena);
ColumnInt32 column_result;
agg_function->insert_result_into(place2, column_result);
EXPECT_EQ(column_result.get_data()[0],
win < 0 ? 1 : (win < NUM_CONDS ? win + 1 : NUM_CONDS));
agg_function->destroy(place);
agg_function->destroy(place2);
}
}
// Test that V2 only stores matched events (unmatched rows are not stored)
// This verifies the core memory optimization.
TEST_F(VWindowFunnelV2Test, testOnlyMatchedEventsStored) {
const int NUM_ROWS = 6;
auto column_mode = ColumnString::create();
for (int i = 0; i < NUM_ROWS; i++) {
column_mode->insert(Field::create_field<TYPE_STRING>("default"));
}
auto column_timestamp = ColumnDateTimeV2::create();
for (int i = 0; i < NUM_ROWS; i++) {
VecDateTimeValue time_value;
time_value.unchecked_set_time(2022, 2, 28, 0, 0, i);
auto dtv2 = time_value.to_datetime_v2();
column_timestamp->insert_data((char*)&dtv2, 0);
}
// 4 events, but rows 2 and 4 (0-indexed) match nothing
// Row 0: event1=true
// Row 1: event2=true
// Row 2: all false (no match)
// Row 3: event3=true
// Row 4: all false (no match)
// Row 5: event4=true
auto column_event1 = ColumnUInt8::create();
column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
auto column_event2 = ColumnUInt8::create();
column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event2->insert(Field::create_field<TYPE_BOOLEAN>(1));
column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
auto column_event3 = ColumnUInt8::create();
column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event3->insert(Field::create_field<TYPE_BOOLEAN>(1));
column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
auto column_event4 = ColumnUInt8::create();
column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event4->insert(Field::create_field<TYPE_BOOLEAN>(1));
auto column_window = ColumnInt64::create();
for (int i = 0; i < NUM_ROWS; i++) {
column_window->insert(Field::create_field<TYPE_BIGINT>(10));
}
std::unique_ptr<char[]> memory(new char[agg_function->size_of_data()]);
AggregateDataPtr place = memory.get();
agg_function->create(place);
const IColumn* column[7] = {column_window.get(), column_mode.get(), column_timestamp.get(),
column_event1.get(), column_event2.get(), column_event3.get(),
column_event4.get()};
for (int i = 0; i < NUM_ROWS; i++) {
agg_function->add(place, column, i, arena);
}
ColumnInt32 column_result;
agg_function->insert_result_into(place, column_result);
// All 4 events matched in order within the window
EXPECT_EQ(column_result.get_data()[0], 4);
agg_function->destroy(place);
}
// Test INCREASE mode: timestamps must be strictly increasing
TEST_F(VWindowFunnelV2Test, testIncreaseMode) {
const int NUM_ROWS = 4;
auto column_mode = ColumnString::create();
for (int i = 0; i < NUM_ROWS; i++) {
column_mode->insert(Field::create_field<TYPE_STRING>("increase"));
}
auto column_timestamp = ColumnDateTimeV2::create();
// Events 2 and 3 have the same timestamp
VecDateTimeValue tv0, tv1, tv2, tv3;
tv0.unchecked_set_time(2022, 2, 28, 0, 0, 0);
tv1.unchecked_set_time(2022, 2, 28, 0, 0, 1);
tv2.unchecked_set_time(2022, 2, 28, 0, 0, 1); // same as tv1
tv3.unchecked_set_time(2022, 2, 28, 0, 0, 3);
auto dtv2_0 = tv0.to_datetime_v2();
auto dtv2_1 = tv1.to_datetime_v2();
auto dtv2_2 = tv2.to_datetime_v2();
auto dtv2_3 = tv3.to_datetime_v2();
column_timestamp->insert_data((char*)&dtv2_0, 0);
column_timestamp->insert_data((char*)&dtv2_1, 0);
column_timestamp->insert_data((char*)&dtv2_2, 0);
column_timestamp->insert_data((char*)&dtv2_3, 0);
auto column_event1 = ColumnUInt8::create();
column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
auto column_event2 = ColumnUInt8::create();
column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event2->insert(Field::create_field<TYPE_BOOLEAN>(1));
column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
auto column_event3 = ColumnUInt8::create();
column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event3->insert(Field::create_field<TYPE_BOOLEAN>(1));
column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
auto column_event4 = ColumnUInt8::create();
column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event4->insert(Field::create_field<TYPE_BOOLEAN>(1));
auto column_window = ColumnInt64::create();
for (int i = 0; i < NUM_ROWS; i++) {
column_window->insert(Field::create_field<TYPE_BIGINT>(10));
}
std::unique_ptr<char[]> memory(new char[agg_function->size_of_data()]);
AggregateDataPtr place = memory.get();
agg_function->create(place);
const IColumn* column[7] = {column_window.get(), column_mode.get(), column_timestamp.get(),
column_event1.get(), column_event2.get(), column_event3.get(),
column_event4.get()};
for (int i = 0; i < NUM_ROWS; i++) {
agg_function->add(place, column, i, arena);
}
ColumnInt32 column_result;
agg_function->insert_result_into(place, column_result);
// Event 2 and 3 have same timestamp, so increase mode breaks at event 3
// Chain: event1(t=0) -> event2(t=1), event3 has same ts as event2, so fails
// Result: 2
EXPECT_EQ(column_result.get_data()[0], 2);
agg_function->destroy(place);
}
// Test DEDUPLICATION mode: duplicate events break the chain
TEST_F(VWindowFunnelV2Test, testDeduplicationMode) {
const int NUM_ROWS = 5;
auto column_mode = ColumnString::create();
for (int i = 0; i < NUM_ROWS; i++) {
column_mode->insert(Field::create_field<TYPE_STRING>("deduplication"));
}
auto column_timestamp = ColumnDateTimeV2::create();
VecDateTimeValue tv0, tv1, tv2, tv3, tv4;
tv0.unchecked_set_time(2022, 2, 28, 0, 0, 0);
tv1.unchecked_set_time(2022, 2, 28, 0, 0, 1);
tv2.unchecked_set_time(2022, 2, 28, 0, 0, 2);
tv3.unchecked_set_time(2022, 2, 28, 0, 0, 3);
tv4.unchecked_set_time(2022, 2, 28, 0, 0, 4);
auto dtv2_0 = tv0.to_datetime_v2();
auto dtv2_1 = tv1.to_datetime_v2();
auto dtv2_2 = tv2.to_datetime_v2();
auto dtv2_3 = tv3.to_datetime_v2();
auto dtv2_4 = tv4.to_datetime_v2();
column_timestamp->insert_data((char*)&dtv2_0, 0);
column_timestamp->insert_data((char*)&dtv2_1, 0);
column_timestamp->insert_data((char*)&dtv2_2, 0);
column_timestamp->insert_data((char*)&dtv2_3, 0);
column_timestamp->insert_data((char*)&dtv2_4, 0);
// Events: event1, event2, event1(dup!), event3, event4
auto column_event1 = ColumnUInt8::create();
column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1)); // duplicate
column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
auto column_event2 = ColumnUInt8::create();
column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event2->insert(Field::create_field<TYPE_BOOLEAN>(1));
column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
auto column_event3 = ColumnUInt8::create();
column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event3->insert(Field::create_field<TYPE_BOOLEAN>(1));
column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
auto column_event4 = ColumnUInt8::create();
column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event4->insert(Field::create_field<TYPE_BOOLEAN>(1));
auto column_window = ColumnInt64::create();
for (int i = 0; i < NUM_ROWS; i++) {
column_window->insert(Field::create_field<TYPE_BIGINT>(10));
}
std::unique_ptr<char[]> memory(new char[agg_function->size_of_data()]);
AggregateDataPtr place = memory.get();
agg_function->create(place);
const IColumn* column[7] = {column_window.get(), column_mode.get(), column_timestamp.get(),
column_event1.get(), column_event2.get(), column_event3.get(),
column_event4.get()};
for (int i = 0; i < NUM_ROWS; i++) {
agg_function->add(place, column, i, arena);
}
ColumnInt32 column_result;
agg_function->insert_result_into(place, column_result);
// Chain: event1(t=0) -> event2(t=1), then event1 dup at t=2 breaks chain (max_level=2)
// New chain: event1(t=2) -> event3(t=3) -> event4(t=4): level=3 but event2 not matched
// Actually event1(t=2) starts new chain, event3(t=3) needs event2 first, so new chain = 1
// max(2, 1) = 2
EXPECT_EQ(column_result.get_data()[0], 2);
agg_function->destroy(place);
}
// Test FIXED mode (StarRocks-style): event level must not jump
TEST_F(VWindowFunnelV2Test, testFixedMode) {
const int NUM_ROWS = 5;
auto column_mode = ColumnString::create();
for (int i = 0; i < NUM_ROWS; i++) {
column_mode->insert(Field::create_field<TYPE_STRING>("fixed"));
}
auto column_timestamp = ColumnDateTimeV2::create();
VecDateTimeValue tv0, tv1, tv2, tv3, tv4;
tv0.unchecked_set_time(2022, 2, 28, 0, 0, 0);
tv1.unchecked_set_time(2022, 2, 28, 0, 0, 1);
tv2.unchecked_set_time(2022, 2, 28, 0, 0, 2);
tv3.unchecked_set_time(2022, 2, 28, 0, 0, 3);
tv4.unchecked_set_time(2022, 2, 28, 0, 0, 4);
auto dtv2_0 = tv0.to_datetime_v2();
auto dtv2_1 = tv1.to_datetime_v2();
auto dtv2_2 = tv2.to_datetime_v2();
auto dtv2_3 = tv3.to_datetime_v2();
auto dtv2_4 = tv4.to_datetime_v2();
column_timestamp->insert_data((char*)&dtv2_0, 0);
column_timestamp->insert_data((char*)&dtv2_1, 0);
column_timestamp->insert_data((char*)&dtv2_2, 0);
column_timestamp->insert_data((char*)&dtv2_3, 0);
column_timestamp->insert_data((char*)&dtv2_4, 0);
// Events: event1, event2, event4(jump! skips event3), event3, event4
// In V2 fixed mode (StarRocks-style), event4 at t=2 has no predecessor (event3 not matched),
// so the chain breaks.
auto column_event1 = ColumnUInt8::create();
column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
auto column_event2 = ColumnUInt8::create();
column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event2->insert(Field::create_field<TYPE_BOOLEAN>(1));
column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
auto column_event3 = ColumnUInt8::create();
column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event3->insert(Field::create_field<TYPE_BOOLEAN>(1));
column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
auto column_event4 = ColumnUInt8::create();
column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event4->insert(Field::create_field<TYPE_BOOLEAN>(1)); // jump
column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event4->insert(Field::create_field<TYPE_BOOLEAN>(1));
auto column_window = ColumnInt64::create();
for (int i = 0; i < NUM_ROWS; i++) {
column_window->insert(Field::create_field<TYPE_BIGINT>(10));
}
std::unique_ptr<char[]> memory(new char[agg_function->size_of_data()]);
AggregateDataPtr place = memory.get();
agg_function->create(place);
const IColumn* column[7] = {column_window.get(), column_mode.get(), column_timestamp.get(),
column_event1.get(), column_event2.get(), column_event3.get(),
column_event4.get()};
for (int i = 0; i < NUM_ROWS; i++) {
agg_function->add(place, column, i, arena);
}
ColumnInt32 column_result;
agg_function->insert_result_into(place, column_result);
// Chain: event1(t=0) -> event2(t=1), then event4(t=2) jumps (no event3 predecessor),
// chain breaks, max_level=2.
// No further complete chain starts since event3 and event4 happen after.
EXPECT_EQ(column_result.get_data()[0], 2);
agg_function->destroy(place);
}
// Test that same-row multi-condition matching does NOT advance the chain
// through multiple levels. This tests the continuation flag logic.
// Scenario: window_funnel(86400, 'default', ts, xwhat=1, xwhat!=2, xwhat=3)
// Row 0 (xwhat=1): matches cond0=T (xwhat=1), cond1=T (1!=2), cond2=F → 2 conditions on same row
// Row 1 (xwhat=2): matches nothing (cond1: 2!=2=F)
// Row 2 (xwhat=3): matches cond1=T (3!=2), cond2=T (xwhat=3) → 2 conditions on same row
// Correct result: 2 (cond0 from row0, cond1 from row2). NOT 3.
// Without the continuation flag, row0 would advance through both cond0 and cond1,
// then row2 would match cond2 → result 3 (wrong).
TEST_F(VWindowFunnelV2Test, testSameRowMultiConditionDefault) {
// 3 conditions (instead of 4), so we need a different setup
AggregateFunctionSimpleFactory factory = AggregateFunctionSimpleFactory::instance();
DataTypes data_types_3 = {
std::make_shared<DataTypeInt64>(), std::make_shared<DataTypeString>(),
std::make_shared<DataTypeDateTimeV2>(), std::make_shared<DataTypeUInt8>(),
std::make_shared<DataTypeUInt8>(), std::make_shared<DataTypeUInt8>()};
auto agg_func_3 = factory.get("window_funnel_v2", data_types_3, nullptr, false,
BeExecVersionManager::get_newest_version());
ASSERT_NE(agg_func_3, nullptr);
const int NUM_ROWS = 4;
auto column_mode = ColumnString::create();
for (int i = 0; i < NUM_ROWS; i++) {
column_mode->insert(Field::create_field<TYPE_STRING>("default"));
}
auto column_timestamp = ColumnDateTimeV2::create();
// Row 0: ts=10:41:00 (xwhat=1)
// Row 1: ts=13:28:02 (xwhat=2)
// Row 2: ts=16:15:01 (xwhat=3)
// Row 3: ts=19:05:04 (xwhat=4)
VecDateTimeValue tv0, tv1, tv2, tv3;
tv0.unchecked_set_time(2022, 3, 12, 10, 41, 0);
tv1.unchecked_set_time(2022, 3, 12, 13, 28, 2);
tv2.unchecked_set_time(2022, 3, 12, 16, 15, 1);
tv3.unchecked_set_time(2022, 3, 12, 19, 5, 4);
auto dtv2_0 = tv0.to_datetime_v2();
auto dtv2_1 = tv1.to_datetime_v2();
auto dtv2_2 = tv2.to_datetime_v2();
auto dtv2_3 = tv3.to_datetime_v2();
column_timestamp->insert_data((char*)&dtv2_0, 0);
column_timestamp->insert_data((char*)&dtv2_1, 0);
column_timestamp->insert_data((char*)&dtv2_2, 0);
column_timestamp->insert_data((char*)&dtv2_3, 0);
// cond0: xwhat=1 (only row0 matches)
auto column_cond0 = ColumnUInt8::create();
column_cond0->insert(Field::create_field<TYPE_BOOLEAN>(1)); // row0: xwhat=1 → T
column_cond0->insert(Field::create_field<TYPE_BOOLEAN>(0)); // row1: xwhat=2 → F
column_cond0->insert(Field::create_field<TYPE_BOOLEAN>(0)); // row2: xwhat=3 → F
column_cond0->insert(Field::create_field<TYPE_BOOLEAN>(0)); // row3: xwhat=4 → F
// cond1: xwhat!=2 (rows 0,2,3 match)
auto column_cond1 = ColumnUInt8::create();
column_cond1->insert(Field::create_field<TYPE_BOOLEAN>(1)); // row0: 1!=2 → T
column_cond1->insert(Field::create_field<TYPE_BOOLEAN>(0)); // row1: 2!=2 → F
column_cond1->insert(Field::create_field<TYPE_BOOLEAN>(1)); // row2: 3!=2 → T
column_cond1->insert(Field::create_field<TYPE_BOOLEAN>(1)); // row3: 4!=2 → T
// cond2: xwhat=3 (only row2 matches)
auto column_cond2 = ColumnUInt8::create();
column_cond2->insert(Field::create_field<TYPE_BOOLEAN>(0)); // row0: F
column_cond2->insert(Field::create_field<TYPE_BOOLEAN>(0)); // row1: F
column_cond2->insert(Field::create_field<TYPE_BOOLEAN>(1)); // row2: T
column_cond2->insert(Field::create_field<TYPE_BOOLEAN>(0)); // row3: F
// window = 86400 seconds (24 hours)
auto column_window = ColumnInt64::create();
for (int i = 0; i < NUM_ROWS; i++) {
column_window->insert(Field::create_field<TYPE_BIGINT>(86400));
}
std::unique_ptr<char[]> memory(new char[agg_func_3->size_of_data()]);
AggregateDataPtr place = memory.get();
agg_func_3->create(place);
const IColumn* column[6] = {column_window.get(), column_mode.get(), column_timestamp.get(),
column_cond0.get(), column_cond1.get(), column_cond2.get()};
for (int i = 0; i < NUM_ROWS; i++) {
agg_func_3->add(place, column, i, arena);
}
ColumnInt32 column_result;
agg_func_3->insert_result_into(place, column_result);
// Without continuation flag: row0 matches cond0+cond1 (same row advances both),
// row2 matches cond2 → result=3 (WRONG)
// With continuation flag: row0 sets cond0 only (cond1 is same-row continuation),
// row2's cond1 extends chain (different row), but cond2 from row2 is same-row → stops at 2
EXPECT_EQ(column_result.get_data()[0], 2);
agg_func_3->destroy(place);
}
// Test same-row multi-condition with ALL conditions matching the same event name
// window_funnel(big_window, 'default', ts, event='登录', event='登录', event='登录', ...)
// A single row matching all 4 conditions should only count as level 1 (not 4).
TEST_F(VWindowFunnelV2Test, testSameRowAllConditionsMatch) {
auto column_mode = ColumnString::create();
column_mode->insert(Field::create_field<TYPE_STRING>("default"));
auto column_timestamp = ColumnDateTimeV2::create();
VecDateTimeValue tv0;
tv0.unchecked_set_time(2022, 2, 28, 0, 0, 0);
auto dtv2_0 = tv0.to_datetime_v2();
column_timestamp->insert_data((char*)&dtv2_0, 0);
// All 4 conditions match the single row
auto column_event1 = ColumnUInt8::create();
column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
auto column_event2 = ColumnUInt8::create();
column_event2->insert(Field::create_field<TYPE_BOOLEAN>(1));
auto column_event3 = ColumnUInt8::create();
column_event3->insert(Field::create_field<TYPE_BOOLEAN>(1));
auto column_event4 = ColumnUInt8::create();
column_event4->insert(Field::create_field<TYPE_BOOLEAN>(1));
auto column_window = ColumnInt64::create();
column_window->insert(Field::create_field<TYPE_BIGINT>(86400));
std::unique_ptr<char[]> memory(new char[agg_function->size_of_data()]);
AggregateDataPtr place = memory.get();
agg_function->create(place);
const IColumn* column[7] = {column_window.get(), column_mode.get(), column_timestamp.get(),
column_event1.get(), column_event2.get(), column_event3.get(),
column_event4.get()};
agg_function->add(place, column, 0, arena);
ColumnInt32 column_result;
agg_function->insert_result_into(place, column_result);
// Only 1 row matching all conditions → can only reach level 1
// because each funnel step must come from a different row
EXPECT_EQ(column_result.get_data()[0], 1);
agg_function->destroy(place);
}
// Test INCREASE mode: event-0 re-occurrence should not break an already-valid chain.
// Counterexample from code review:
// 3 conditions, window=100s, INCREASE mode
// Row A (t=0s): event1 only
// Row B (t=50s): event1 only (new event-0 overwrites old)
// Row C (t=50s): event2 only
// Row D (t=60s): event3 only
// Correct result: 3 (chain starting from t=0: e1@0 -> e2@50 -> e3@60)
// Bug (before fix): returned 1 because overwriting events_timestamp[0] with t=50
// caused the INCREASE check (50 < 50) to fail for e2.
TEST_F(VWindowFunnelV2Test, testIncreaseModeEvent0Overwrite) {
AggregateFunctionSimpleFactory factory = AggregateFunctionSimpleFactory::instance();
DataTypes data_types_3 = {
std::make_shared<DataTypeInt64>(), std::make_shared<DataTypeString>(),
std::make_shared<DataTypeDateTimeV2>(), std::make_shared<DataTypeUInt8>(),
std::make_shared<DataTypeUInt8>(), std::make_shared<DataTypeUInt8>()};
auto agg_func_3 = factory.get("window_funnel_v2", data_types_3, nullptr, false,
BeExecVersionManager::get_newest_version());
ASSERT_NE(agg_func_3, nullptr);
const int NUM_ROWS = 4;
auto column_mode = ColumnString::create();
for (int i = 0; i < NUM_ROWS; i++) {
column_mode->insert(Field::create_field<TYPE_STRING>("increase"));
}
auto column_timestamp = ColumnDateTimeV2::create();
// Row 0: t=0s, Row 1: t=50s, Row 2: t=50s, Row 3: t=60s
VecDateTimeValue tv0, tv1, tv2, tv3;
tv0.unchecked_set_time(2022, 2, 28, 0, 0, 0);
tv1.unchecked_set_time(2022, 2, 28, 0, 0, 50);
tv2.unchecked_set_time(2022, 2, 28, 0, 0, 50);
tv3.unchecked_set_time(2022, 2, 28, 0, 1, 0);
auto dtv2_0 = tv0.to_datetime_v2();
auto dtv2_1 = tv1.to_datetime_v2();
auto dtv2_2 = tv2.to_datetime_v2();
auto dtv2_3 = tv3.to_datetime_v2();
column_timestamp->insert_data((char*)&dtv2_0, 0);
column_timestamp->insert_data((char*)&dtv2_1, 0);
column_timestamp->insert_data((char*)&dtv2_2, 0);
column_timestamp->insert_data((char*)&dtv2_3, 0);
// Row 0: event1=T, event2=F, event3=F
// Row 1: event1=T, event2=F, event3=F (duplicate event-0)
// Row 2: event1=F, event2=T, event3=F
// Row 3: event1=F, event2=F, event3=T
auto column_event1 = ColumnUInt8::create();
column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
auto column_event2 = ColumnUInt8::create();
column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event2->insert(Field::create_field<TYPE_BOOLEAN>(1));
column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
auto column_event3 = ColumnUInt8::create();
column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event3->insert(Field::create_field<TYPE_BOOLEAN>(1));
auto column_window = ColumnInt64::create();
for (int i = 0; i < NUM_ROWS; i++) {
column_window->insert(Field::create_field<TYPE_BIGINT>(100));
}
std::unique_ptr<char[]> memory(new char[agg_func_3->size_of_data()]);
AggregateDataPtr place = memory.get();
agg_func_3->create(place);
const IColumn* column[6] = {column_window.get(), column_mode.get(), column_timestamp.get(),
column_event1.get(), column_event2.get(), column_event3.get()};
for (int i = 0; i < NUM_ROWS; i++) {
agg_func_3->add(place, column, i, arena);
}
ColumnInt32 column_result;
agg_func_3->insert_result_into(place, column_result);
// Chain from t=0: e1@0 -> e2@50 (50>0 ✓) -> e3@60 (60>50 ✓) = 3
EXPECT_EQ(column_result.get_data()[0], 3);
agg_func_3->destroy(place);
}
// Test INCREASE mode: later event-0 starts a better chain when early chain cannot complete.
// 3 conditions, window=5s, INCREASE mode
// Row 0 (t=0s): event1
// Row 1 (t=50s): event1 (new start — old chain can't reach e2 within 5s)
// Row 2 (t=51s): event2
// Row 3 (t=52s): event3
// Correct result: 3 (chain starting from t=50: e1@50 -> e2@51 -> e3@52)
TEST_F(VWindowFunnelV2Test, testIncreaseModeNewChainBetter) {
AggregateFunctionSimpleFactory factory = AggregateFunctionSimpleFactory::instance();
DataTypes data_types_3 = {
std::make_shared<DataTypeInt64>(), std::make_shared<DataTypeString>(),
std::make_shared<DataTypeDateTimeV2>(), std::make_shared<DataTypeUInt8>(),
std::make_shared<DataTypeUInt8>(), std::make_shared<DataTypeUInt8>()};
auto agg_func_3 = factory.get("window_funnel_v2", data_types_3, nullptr, false,
BeExecVersionManager::get_newest_version());
ASSERT_NE(agg_func_3, nullptr);
const int NUM_ROWS = 4;
auto column_mode = ColumnString::create();
for (int i = 0; i < NUM_ROWS; i++) {
column_mode->insert(Field::create_field<TYPE_STRING>("increase"));
}
auto column_timestamp = ColumnDateTimeV2::create();
VecDateTimeValue tv0, tv1, tv2, tv3;
tv0.unchecked_set_time(2022, 2, 28, 0, 0, 0);
tv1.unchecked_set_time(2022, 2, 28, 0, 0, 50);
tv2.unchecked_set_time(2022, 2, 28, 0, 0, 51);
tv3.unchecked_set_time(2022, 2, 28, 0, 0, 52);
auto dtv2_0 = tv0.to_datetime_v2();
auto dtv2_1 = tv1.to_datetime_v2();
auto dtv2_2 = tv2.to_datetime_v2();
auto dtv2_3 = tv3.to_datetime_v2();
column_timestamp->insert_data((char*)&dtv2_0, 0);
column_timestamp->insert_data((char*)&dtv2_1, 0);
column_timestamp->insert_data((char*)&dtv2_2, 0);
column_timestamp->insert_data((char*)&dtv2_3, 0);
auto column_event1 = ColumnUInt8::create();
column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
auto column_event2 = ColumnUInt8::create();
column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event2->insert(Field::create_field<TYPE_BOOLEAN>(1));
column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
auto column_event3 = ColumnUInt8::create();
column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event3->insert(Field::create_field<TYPE_BOOLEAN>(1));
auto column_window = ColumnInt64::create();
for (int i = 0; i < NUM_ROWS; i++) {
column_window->insert(Field::create_field<TYPE_BIGINT>(5));
}
std::unique_ptr<char[]> memory(new char[agg_func_3->size_of_data()]);
AggregateDataPtr place = memory.get();
agg_func_3->create(place);
const IColumn* column[6] = {column_window.get(), column_mode.get(), column_timestamp.get(),
column_event1.get(), column_event2.get(), column_event3.get()};
for (int i = 0; i < NUM_ROWS; i++) {
agg_func_3->add(place, column, i, arena);
}
ColumnInt32 column_result;
agg_func_3->insert_result_into(place, column_result);
// Old chain from t=0 reaches only level 1 (e2@51 is 51s away, outside 5s window)
// New chain from t=50: e1@50 -> e2@51 (51>50 ✓, within 5s) -> e3@52 (52>51 ✓) = 3
EXPECT_EQ(column_result.get_data()[0], 3);
agg_func_3->destroy(place);
}
// Test INCREASE mode: old chain is better than new chain (max_level preserved).
// 3 conditions, window=100s, INCREASE mode
// Row 0 (t=0s): event1
// Row 1 (t=10s): event2
// Row 2 (t=50s): event1 (restarts chain, old chain had level=2)
// No more events after the restart — new chain can only reach level 1.
// Correct result: 2 (old chain: e1@0 -> e2@10)
TEST_F(VWindowFunnelV2Test, testIncreaseModeOldChainBetter) {
AggregateFunctionSimpleFactory factory = AggregateFunctionSimpleFactory::instance();
DataTypes data_types_3 = {
std::make_shared<DataTypeInt64>(), std::make_shared<DataTypeString>(),
std::make_shared<DataTypeDateTimeV2>(), std::make_shared<DataTypeUInt8>(),
std::make_shared<DataTypeUInt8>(), std::make_shared<DataTypeUInt8>()};
auto agg_func_3 = factory.get("window_funnel_v2", data_types_3, nullptr, false,
BeExecVersionManager::get_newest_version());
ASSERT_NE(agg_func_3, nullptr);
const int NUM_ROWS = 3;
auto column_mode = ColumnString::create();
for (int i = 0; i < NUM_ROWS; i++) {
column_mode->insert(Field::create_field<TYPE_STRING>("increase"));
}
{
auto column_timestamp = ColumnDateTimeV2::create();
VecDateTimeValue tv0, tv1, tv2;
tv0.unchecked_set_time(2022, 2, 28, 0, 0, 0);
tv1.unchecked_set_time(2022, 2, 28, 0, 0, 10);
tv2.unchecked_set_time(2022, 2, 28, 0, 0, 50);
auto dtv2_0 = tv0.to_datetime_v2();
auto dtv2_1 = tv1.to_datetime_v2();
auto dtv2_2 = tv2.to_datetime_v2();
column_timestamp->insert_data((char*)&dtv2_0, 0);
column_timestamp->insert_data((char*)&dtv2_1, 0);
column_timestamp->insert_data((char*)&dtv2_2, 0);
// Row 0: e1=T, Row 1: e2=T, Row 2: e1=T (restart)
auto column_event1 = ColumnUInt8::create();
column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
auto column_event2 = ColumnUInt8::create();
column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event2->insert(Field::create_field<TYPE_BOOLEAN>(1));
column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
auto column_event3 = ColumnUInt8::create();
column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
auto column_window = ColumnInt64::create();
for (int i = 0; i < NUM_ROWS; i++) {
column_window->insert(Field::create_field<TYPE_BIGINT>(100));
}
std::unique_ptr<char[]> memory(new char[agg_func_3->size_of_data()]);
AggregateDataPtr place = memory.get();
agg_func_3->create(place);
const IColumn* column[6] = {column_window.get(), column_mode.get(),
column_timestamp.get(), column_event1.get(),
column_event2.get(), column_event3.get()};
for (int i = 0; i < NUM_ROWS; i++) {
agg_func_3->add(place, column, i, arena);
}
ColumnInt32 column_result;
agg_func_3->insert_result_into(place, column_result);
// Old chain: e1@0 -> e2@10 = level 2
// New chain from t=50: only e1@50, no further events = level 1
// max(2, 1) = 2
EXPECT_EQ(column_result.get_data()[0], 2);
agg_func_3->destroy(place);
}
{
auto column_timestamp = ColumnDateTimeV2::create();
VecDateTimeValue tv0, tv1, tv2, tv3;
tv0.unchecked_set_time(2022, 2, 28, 0, 0, 0);
tv1.unchecked_set_time(2022, 2, 28, 0, 0, 10);
tv2.unchecked_set_time(2022, 2, 28, 0, 0, 50);
tv3.unchecked_set_time(2022, 2, 28, 0, 0, 50);
auto dtv2_0 = tv0.to_datetime_v2();
auto dtv2_1 = tv1.to_datetime_v2();
auto dtv2_2 = tv2.to_datetime_v2();
auto dtv2_3 = tv3.to_datetime_v2();
column_timestamp->insert_data((char*)&dtv2_0, 0);
column_timestamp->insert_data((char*)&dtv2_1, 0);
column_timestamp->insert_data((char*)&dtv2_2, 0);
column_timestamp->insert_data((char*)&dtv2_3, 0);
auto column_event1 = ColumnUInt8::create();
column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
auto column_event2 = ColumnUInt8::create();
column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event2->insert(Field::create_field<TYPE_BOOLEAN>(1));
column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
auto column_event3 = ColumnUInt8::create();
column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event3->insert(Field::create_field<TYPE_BOOLEAN>(1));
auto column_window = ColumnInt64::create();
for (int i = 0; i < NUM_ROWS; i++) {
column_window->insert(Field::create_field<TYPE_BIGINT>(100));
}
std::unique_ptr<char[]> memory(new char[agg_func_3->size_of_data()]);
AggregateDataPtr place = memory.get();
agg_func_3->create(place);
const IColumn* column[6] = {column_window.get(), column_mode.get(),
column_timestamp.get(), column_event1.get(),
column_event2.get(), column_event3.get()};
for (int i = 0; i < NUM_ROWS; i++) {
agg_func_3->add(place, column, i, arena);
}
ColumnInt32 column_result;
agg_func_3->insert_result_into(place, column_result);
// Old chain: e1@0 -> e2@10 = level 2
// New chain from t=50: only e1@50, e3@50 can't advance (no e2 matched)
// max(2, 1) = 2
EXPECT_EQ(column_result.get_data()[0], 2);
agg_func_3->destroy(place);
}
}
// Test DEDUPLICATION mode: same-row multi-event matching should NOT break the chain.
// This is the exact scenario from the bug report where V1 returns 3 but V2 returned 2.
//
// 3 conditions, window=1 day (86400s), DEDUPLICATION mode
// Row 0 (t=10:00): event1=T, event2=T, event3=F → matches E0+E1 on same row
// Row 1 (t=11:00): event1=T, event2=T, event3=F → matches E0+E1 on same row
// Row 2 (t=12:00): event1=T, event2=F, event3=T → matches E0+E2 on same row
//
// V2 stores events in reverse order per row: [E1, E0] for each multi-match row.
// After sort, events_list for row0: (t=10:00, E1-cont), (t=10:00, E0)
// Bug: when processing E0 from row0 after E1 was already used to advance the chain,
// old V2 would break the chain. With the fix, E0 is recognized as same-row and skipped.
//
// Expected chain: E0@row0 → E1@row1 → E2@row2 = 3
TEST_F(VWindowFunnelV2Test, testDeduplicationSameRowMultiEvent) {
AggregateFunctionSimpleFactory factory = AggregateFunctionSimpleFactory::instance();
DataTypes data_types_3 = {
std::make_shared<DataTypeInt64>(), std::make_shared<DataTypeString>(),
std::make_shared<DataTypeDateTimeV2>(), std::make_shared<DataTypeUInt8>(),
std::make_shared<DataTypeUInt8>(), std::make_shared<DataTypeUInt8>()};
auto agg_func_3 = factory.get("window_funnel_v2", data_types_3, nullptr, false,
BeExecVersionManager::get_newest_version());
ASSERT_NE(agg_func_3, nullptr);
const int NUM_ROWS = 3;
auto column_mode = ColumnString::create();
for (int i = 0; i < NUM_ROWS; i++) {
column_mode->insert(Field::create_field<TYPE_STRING>("deduplication"));
}
auto column_timestamp = ColumnDateTimeV2::create();
VecDateTimeValue tv0, tv1, tv2;
tv0.unchecked_set_time(2022, 3, 12, 10, 0, 0);
tv1.unchecked_set_time(2022, 3, 12, 11, 0, 0);
tv2.unchecked_set_time(2022, 3, 12, 12, 0, 0);
auto dtv2_0 = tv0.to_datetime_v2();
auto dtv2_1 = tv1.to_datetime_v2();
auto dtv2_2 = tv2.to_datetime_v2();
column_timestamp->insert_data((char*)&dtv2_0, 0);
column_timestamp->insert_data((char*)&dtv2_1, 0);
column_timestamp->insert_data((char*)&dtv2_2, 0);
// Row 0: event1=T, event2=T, event3=F (matches E0 and E1)
// Row 1: event1=T, event2=T, event3=F (matches E0 and E1)
// Row 2: event1=T, event2=F, event3=T (matches E0 and E2)
auto column_event1 = ColumnUInt8::create();
column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
auto column_event2 = ColumnUInt8::create();
column_event2->insert(Field::create_field<TYPE_BOOLEAN>(1));
column_event2->insert(Field::create_field<TYPE_BOOLEAN>(1));
column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
auto column_event3 = ColumnUInt8::create();
column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event3->insert(Field::create_field<TYPE_BOOLEAN>(1));
auto column_window = ColumnInt64::create();
for (int i = 0; i < NUM_ROWS; i++) {
column_window->insert(Field::create_field<TYPE_BIGINT>(86400));
}
std::unique_ptr<char[]> memory(new char[agg_func_3->size_of_data()]);
AggregateDataPtr place = memory.get();
agg_func_3->create(place);
const IColumn* column[6] = {column_window.get(), column_mode.get(), column_timestamp.get(),
column_event1.get(), column_event2.get(), column_event3.get()};
for (int i = 0; i < NUM_ROWS; i++) {
agg_func_3->add(place, column, i, arena);
}
ColumnInt32 column_result;
agg_func_3->insert_result_into(place, column_result);
// Chain: E0@row0 -> E1@row1 -> E2@row2 = 3
// Before fix: V2 returned 2 because same-row E0 broke the chain after E1 was used.
EXPECT_EQ(column_result.get_data()[0], 3);
agg_func_3->destroy(place);
}
// Test DEDUPLICATION mode: a true duplicate on a DIFFERENT row should still break the chain.
// This ensures the fix doesn't over-suppress chain breaks.
//
// 3 conditions, window=86400s, DEDUPLICATION mode
// Row 0 (t=10:00): event1=T only
// Row 1 (t=11:00): event2=T only
// Row 2 (t=12:00): event1=T only ← true duplicate E0 from different row → breaks chain
// Row 3 (t=13:00): event3=T only
//
// Expected: 2 (chain E0@row0 → E1@row1, then E0@row2 breaks it; new chain E0@row2 alone = 1)
TEST_F(VWindowFunnelV2Test, testDeduplicationTrueDuplicateStillBreaks) {
AggregateFunctionSimpleFactory factory = AggregateFunctionSimpleFactory::instance();
DataTypes data_types_3 = {
std::make_shared<DataTypeInt64>(), std::make_shared<DataTypeString>(),
std::make_shared<DataTypeDateTimeV2>(), std::make_shared<DataTypeUInt8>(),
std::make_shared<DataTypeUInt8>(), std::make_shared<DataTypeUInt8>()};
auto agg_func_3 = factory.get("window_funnel_v2", data_types_3, nullptr, false,
BeExecVersionManager::get_newest_version());
ASSERT_NE(agg_func_3, nullptr);
const int NUM_ROWS = 4;
auto column_mode = ColumnString::create();
for (int i = 0; i < NUM_ROWS; i++) {
column_mode->insert(Field::create_field<TYPE_STRING>("deduplication"));
}
auto column_timestamp = ColumnDateTimeV2::create();
VecDateTimeValue tv0, tv1, tv2, tv3;
tv0.unchecked_set_time(2022, 3, 12, 10, 0, 0);
tv1.unchecked_set_time(2022, 3, 12, 11, 0, 0);
tv2.unchecked_set_time(2022, 3, 12, 12, 0, 0);
tv3.unchecked_set_time(2022, 3, 12, 13, 0, 0);
auto dtv2_0 = tv0.to_datetime_v2();
auto dtv2_1 = tv1.to_datetime_v2();
auto dtv2_2 = tv2.to_datetime_v2();
auto dtv2_3 = tv3.to_datetime_v2();
column_timestamp->insert_data((char*)&dtv2_0, 0);
column_timestamp->insert_data((char*)&dtv2_1, 0);
column_timestamp->insert_data((char*)&dtv2_2, 0);
column_timestamp->insert_data((char*)&dtv2_3, 0);
// Row 0: event1=T only
// Row 1: event2=T only
// Row 2: event1=T only (true duplicate from different row)
// Row 3: event3=T only
auto column_event1 = ColumnUInt8::create();
column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
auto column_event2 = ColumnUInt8::create();
column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event2->insert(Field::create_field<TYPE_BOOLEAN>(1));
column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
auto column_event3 = ColumnUInt8::create();
column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event3->insert(Field::create_field<TYPE_BOOLEAN>(1));
auto column_window = ColumnInt64::create();
for (int i = 0; i < NUM_ROWS; i++) {
column_window->insert(Field::create_field<TYPE_BIGINT>(86400));
}
std::unique_ptr<char[]> memory(new char[agg_func_3->size_of_data()]);
AggregateDataPtr place = memory.get();
agg_func_3->create(place);
const IColumn* column[6] = {column_window.get(), column_mode.get(), column_timestamp.get(),
column_event1.get(), column_event2.get(), column_event3.get()};
for (int i = 0; i < NUM_ROWS; i++) {
agg_func_3->add(place, column, i, arena);
}
ColumnInt32 column_result;
agg_func_3->insert_result_into(place, column_result);
// Chain: E0@row0 -> E1@row1 (level=2), then E0@row2 is a TRUE duplicate from a
// different row → breaks chain. New chain: E0@row2, no E1 → level=1.
// max(2, 1) = 2
EXPECT_EQ(column_result.get_data()[0], 2);
agg_func_3->destroy(place);
}
// Dedup mode: same-row E0 skip when chain is advanced by same-row events.
// When a row matches all 3 conditions and its E2 advances the chain, E0 from
// the same row should be skipped (not restart the chain) because the row already
// contributed to chain advancement.
TEST_F(VWindowFunnelV2Test, testDeduplicationE0PendingRestart) {
AggregateFunctionSimpleFactory factory = AggregateFunctionSimpleFactory::instance();
DataTypes data_types_3 = {
std::make_shared<DataTypeInt64>(), std::make_shared<DataTypeString>(),
std::make_shared<DataTypeDateTimeV2>(), std::make_shared<DataTypeUInt8>(),
std::make_shared<DataTypeUInt8>(), std::make_shared<DataTypeUInt8>()};
auto agg_func_3 = factory.get("window_funnel_v2", data_types_3, nullptr, false,
BeExecVersionManager::get_newest_version());
ASSERT_NE(agg_func_3, nullptr);
// Scenario: 3 conditions, c1/c2/c3.
// Row 0 (t=10:00): c1=T only → E0@R0, chain starts level=0
// Row 1 (t=11:00): c2=T only → E1@R1, chain advances level=1
// Row 2 (t=12:00): c1=T, c2=T, c3=T → E2(cont=0), E1(cont=1), E0(cont=1)
// E2: predecessor E1 matched, not same row → promote level=2
// E1(cont=1): duplicate, same row as E2@R2 → skip
// E0(cont=1): same row as chain (E2@R2 at level 2) → skip
// Row 3 (t=13:00): c3=T only → E2: duplicate level=2, different row → terminates chain
// Result: max(2+1, 0+1) = 3
const int NUM_ROWS = 4;
auto column_mode = ColumnString::create();
for (int i = 0; i < NUM_ROWS; i++) {
column_mode->insert(Field::create_field<TYPE_STRING>("deduplication"));
}
auto column_timestamp = ColumnDateTimeV2::create();
VecDateTimeValue tv0, tv1, tv2, tv3;
tv0.unchecked_set_time(2022, 3, 12, 10, 0, 0);
tv1.unchecked_set_time(2022, 3, 12, 11, 0, 0);
tv2.unchecked_set_time(2022, 3, 12, 12, 0, 0);
tv3.unchecked_set_time(2022, 3, 12, 13, 0, 0);
auto dtv2_0 = tv0.to_datetime_v2();
auto dtv2_1 = tv1.to_datetime_v2();
auto dtv2_2 = tv2.to_datetime_v2();
auto dtv2_3 = tv3.to_datetime_v2();
column_timestamp->insert_data((char*)&dtv2_0, 0);
column_timestamp->insert_data((char*)&dtv2_1, 0);
column_timestamp->insert_data((char*)&dtv2_2, 0);
column_timestamp->insert_data((char*)&dtv2_3, 0);
// Row 0: c1=T, c2=F, c3=F
// Row 1: c1=F, c2=T, c3=F
// Row 2: c1=T, c2=T, c3=T
// Row 3: c1=F, c2=F, c3=T
auto column_event1 = ColumnUInt8::create();
column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
auto column_event2 = ColumnUInt8::create();
column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event2->insert(Field::create_field<TYPE_BOOLEAN>(1));
column_event2->insert(Field::create_field<TYPE_BOOLEAN>(1));
column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
auto column_event3 = ColumnUInt8::create();
column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event3->insert(Field::create_field<TYPE_BOOLEAN>(1));
column_event3->insert(Field::create_field<TYPE_BOOLEAN>(1));
auto column_window = ColumnInt64::create();
for (int i = 0; i < NUM_ROWS; i++) {
column_window->insert(Field::create_field<TYPE_BIGINT>(86400));
}
std::unique_ptr<char[]> memory(new char[agg_func_3->size_of_data()]);
AggregateDataPtr place = memory.get();
agg_func_3->create(place);
const IColumn* column[6] = {column_window.get(), column_mode.get(), column_timestamp.get(),
column_event1.get(), column_event2.get(), column_event3.get()};
for (int i = 0; i < NUM_ROWS; i++) {
agg_func_3->add(place, column, i, arena);
}
ColumnInt32 column_result;
agg_func_3->insert_result_into(place, column_result);
// Chain reaches level 2 (3 steps matched: E0→E1→E2) before E2@R3 duplicate terminates it.
EXPECT_EQ(column_result.get_data()[0], 3);
agg_func_3->destroy(place);
}
// Dedup mode: E0 restart after chain terminated by same-row duplicate.
// When a row matches c1+c2, and c2 is a duplicate that terminates the chain,
// the E0 from the same row should start a new chain because _eliminate_chain()
// clears events_timestamp, so _is_same_row_as_chain won't trigger.
TEST_F(VWindowFunnelV2Test, testDeduplicationE0PendingRestartApplied) {
AggregateFunctionSimpleFactory factory = AggregateFunctionSimpleFactory::instance();
DataTypes data_types_3 = {
std::make_shared<DataTypeInt64>(), std::make_shared<DataTypeString>(),
std::make_shared<DataTypeDateTimeV2>(), std::make_shared<DataTypeUInt8>(),
std::make_shared<DataTypeUInt8>(), std::make_shared<DataTypeUInt8>()};
auto agg_func_3 = factory.get("window_funnel_v2", data_types_3, nullptr, false,
BeExecVersionManager::get_newest_version());
ASSERT_NE(agg_func_3, nullptr);
// Scenario: Row matches c1+c2 where c2 is a duplicate → chain terminated by duplicate.
// The pending E0 from same row should restart.
// Row 0 (t=10:00): c1=T only → chain level=0
// Row 1 (t=11:00): c2=T only → chain level=1
// Row 2 (t=12:00): c1=T, c2=T → E1(cont=0), E0(cont=1)
// E1(cont=0): duplicate! R2 ≠ R0, R2 ≠ R1 → NOT same row as chain → terminate.
// E0(cont=1): _is_same_row_as_chain → but chain just eliminated! events_timestamp[0] cleared.
// Actually after eliminate, events_timestamp[0] has no value, so the _is_same_row_as_chain
// check at E0 won't enter the if branch. E0 directly starts new chain.
// Row 3 (t=13:00): c2=T only → chain level=1
// Row 4 (t=14:00): c3=T only → chain level=2
// Result: max(1+1, 2+1) = 3
const int NUM_ROWS = 5;
auto column_mode = ColumnString::create();
for (int i = 0; i < NUM_ROWS; i++) {
column_mode->insert(Field::create_field<TYPE_STRING>("deduplication"));
}
auto column_timestamp = ColumnDateTimeV2::create();
VecDateTimeValue tv0, tv1, tv2, tv3, tv4;
tv0.unchecked_set_time(2022, 3, 12, 10, 0, 0);
tv1.unchecked_set_time(2022, 3, 12, 11, 0, 0);
tv2.unchecked_set_time(2022, 3, 12, 12, 0, 0);
tv3.unchecked_set_time(2022, 3, 12, 13, 0, 0);
tv4.unchecked_set_time(2022, 3, 12, 14, 0, 0);
auto dtv2_0 = tv0.to_datetime_v2();
auto dtv2_1 = tv1.to_datetime_v2();
auto dtv2_2 = tv2.to_datetime_v2();
auto dtv2_3 = tv3.to_datetime_v2();
auto dtv2_4 = tv4.to_datetime_v2();
column_timestamp->insert_data((char*)&dtv2_0, 0);
column_timestamp->insert_data((char*)&dtv2_1, 0);
column_timestamp->insert_data((char*)&dtv2_2, 0);
column_timestamp->insert_data((char*)&dtv2_3, 0);
column_timestamp->insert_data((char*)&dtv2_4, 0);
// Row 0: c1=T, c2=F, c3=F
// Row 1: c1=F, c2=T, c3=F
// Row 2: c1=T, c2=T, c3=F
// Row 3: c1=F, c2=T, c3=F
// Row 4: c1=F, c2=F, c3=T
auto column_event1 = ColumnUInt8::create();
column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
auto column_event2 = ColumnUInt8::create();
column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event2->insert(Field::create_field<TYPE_BOOLEAN>(1));
column_event2->insert(Field::create_field<TYPE_BOOLEAN>(1));
column_event2->insert(Field::create_field<TYPE_BOOLEAN>(1));
column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
auto column_event3 = ColumnUInt8::create();
column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_event3->insert(Field::create_field<TYPE_BOOLEAN>(1));
auto column_window = ColumnInt64::create();
for (int i = 0; i < NUM_ROWS; i++) {
column_window->insert(Field::create_field<TYPE_BIGINT>(86400));
}
std::unique_ptr<char[]> memory(new char[agg_func_3->size_of_data()]);
AggregateDataPtr place = memory.get();
agg_func_3->create(place);
const IColumn* column[6] = {column_window.get(), column_mode.get(), column_timestamp.get(),
column_event1.get(), column_event2.get(), column_event3.get()};
for (int i = 0; i < NUM_ROWS; i++) {
agg_func_3->add(place, column, i, arena);
}
ColumnInt32 column_result;
agg_func_3->insert_result_into(place, column_result);
// First chain: E0@R0→E1@R1 (level=2). E1@R2 duplicate terminates.
// E0@R2 restarts. New chain: E0@R2→E1@R3→E2@R4 (level=3).
// max(2, 3) = 3.
EXPECT_EQ(column_result.get_data()[0], 3);
agg_func_3->destroy(place);
}
// Fixed mode: same-row multi-condition level jump fix.
// When a row matches both a high-index condition (c3) and a low-index condition (c2),
// the high-index event was processed first and triggered a spurious level jump before
// the low-index event could advance the chain.
TEST_F(VWindowFunnelV2Test, testFixedSameRowMultiCondLevelJump) {
// Use 3 conditions: c1=event1, c2=event2, c3=event3
AggregateFunctionSimpleFactory factory = AggregateFunctionSimpleFactory::instance();
DataTypes data_types_3 = {
std::make_shared<DataTypeInt64>(), std::make_shared<DataTypeString>(),
std::make_shared<DataTypeDateTimeV2>(), std::make_shared<DataTypeUInt8>(),
std::make_shared<DataTypeUInt8>(), std::make_shared<DataTypeUInt8>()};
auto agg_func_3 = factory.get("window_funnel_v2", data_types_3, nullptr, false,
BeExecVersionManager::get_newest_version());
ASSERT_NE(agg_func_3, nullptr);
const int NUM_ROWS = 2;
auto column_mode = ColumnString::create();
for (int i = 0; i < NUM_ROWS; i++) {
column_mode->insert(Field::create_field<TYPE_STRING>("fixed"));
}
auto column_timestamp = ColumnDateTimeV2::create();
VecDateTimeValue tv0, tv1;
tv0.unchecked_set_time(2020, 1, 1, 0, 0, 0);
tv1.unchecked_set_time(2020, 1, 2, 0, 0, 0);
auto dtv2_0 = tv0.to_datetime_v2();
auto dtv2_1 = tv1.to_datetime_v2();
column_timestamp->insert_data((char*)&dtv2_0, 0);
column_timestamp->insert_data((char*)&dtv2_1, 0);
// Row 0: c1=T, c2=T, c3=T (all match — starts chain via c1)
// Row 1: c1=F, c2=T, c3=T (c3 would trigger level jump before c2 can advance)
auto column_event1 = ColumnUInt8::create();
column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
auto column_event2 = ColumnUInt8::create();
column_event2->insert(Field::create_field<TYPE_BOOLEAN>(1));
column_event2->insert(Field::create_field<TYPE_BOOLEAN>(1));
auto column_event3 = ColumnUInt8::create();
column_event3->insert(Field::create_field<TYPE_BOOLEAN>(1));
column_event3->insert(Field::create_field<TYPE_BOOLEAN>(1));
auto column_window = ColumnInt64::create();
for (int i = 0; i < NUM_ROWS; i++) {
column_window->insert(Field::create_field<TYPE_BIGINT>(86400));
}
std::unique_ptr<char[]> memory(new char[agg_func_3->size_of_data()]);
AggregateDataPtr place = memory.get();
agg_func_3->create(place);
const IColumn* column[6] = {column_window.get(), column_mode.get(), column_timestamp.get(),
column_event1.get(), column_event2.get(), column_event3.get()};
for (int i = 0; i < NUM_ROWS; i++) {
agg_func_3->add(place, column, i, arena);
}
ColumnInt32 column_result;
agg_func_3->insert_result_into(place, column_result);
// Before fix: c3 from row 1 triggered level jump (c2 predecessor not matched) → result 1.
// After fix: same-row look-ahead finds c2 can advance → skip level jump → result 2.
EXPECT_EQ(column_result.get_data()[0], 2);
agg_func_3->destroy(place);
}
// Fixed mode: same-row duplicate should NOT suppress a level-jump break.
// Counterexample from review: E0@r0, E1@r1, (E3,E1)@r2, E2@r3, E3@r4.
// E1@r2 is a duplicate of an already-matched level (curr_level=2, sr_idx=1 ≤ curr_level),
// so it must not be treated as an advancement. The chain should break at E3@r2.
TEST_F(VWindowFunnelV2Test, testFixedSameRowDuplicateDoesNotSuppressJump) {
AggregateFunctionSimpleFactory factory = AggregateFunctionSimpleFactory::instance();
// 4 conditions: window, mode, timestamp, c1, c2, c3, c4
DataTypes data_types_4 = {
std::make_shared<DataTypeInt64>(), std::make_shared<DataTypeString>(),
std::make_shared<DataTypeDateTimeV2>(), std::make_shared<DataTypeUInt8>(),
std::make_shared<DataTypeUInt8>(), std::make_shared<DataTypeUInt8>(),
std::make_shared<DataTypeUInt8>()};
auto agg_func = factory.get("window_funnel_v2", data_types_4, nullptr, false,
BeExecVersionManager::get_newest_version());
ASSERT_NE(agg_func, nullptr);
const int NUM_ROWS = 5;
auto column_mode = ColumnString::create();
for (int i = 0; i < NUM_ROWS; i++) {
column_mode->insert(Field::create_field<TYPE_STRING>("fixed"));
}
// Timestamps: r0=t0, r1=t1, r2=t2, r3=t3, r4=t4 (all within window)
auto column_timestamp = ColumnDateTimeV2::create();
for (int i = 0; i < NUM_ROWS; i++) {
VecDateTimeValue tv;
tv.unchecked_set_time(2020, 1, 1 + i, 0, 0, 0);
auto dtv2 = tv.to_datetime_v2();
column_timestamp->insert_data((char*)&dtv2, 0);
}
// r0: c1=T, c2=F, c3=F, c4=F → E0
// r1: c1=F, c2=T, c3=F, c4=F → E1
// r2: c1=F, c2=T, c3=F, c4=T → E1(dup)+E3(jump)
// r3: c1=F, c2=F, c3=T, c4=F → E2
// r4: c1=F, c2=F, c3=F, c4=T → E3
auto column_c1 = ColumnUInt8::create();
for (int v : {1, 0, 0, 0, 0}) column_c1->insert(Field::create_field<TYPE_BOOLEAN>(v));
auto column_c2 = ColumnUInt8::create();
for (int v : {0, 1, 1, 0, 0}) column_c2->insert(Field::create_field<TYPE_BOOLEAN>(v));
auto column_c3 = ColumnUInt8::create();
for (int v : {0, 0, 0, 1, 0}) column_c3->insert(Field::create_field<TYPE_BOOLEAN>(v));
auto column_c4 = ColumnUInt8::create();
for (int v : {0, 0, 1, 0, 1}) column_c4->insert(Field::create_field<TYPE_BOOLEAN>(v));
auto column_window = ColumnInt64::create();
for (int i = 0; i < NUM_ROWS; i++) {
column_window->insert(Field::create_field<TYPE_BIGINT>(86400 * 10));
}
std::unique_ptr<char[]> memory(new char[agg_func->size_of_data()]);
AggregateDataPtr place = memory.get();
agg_func->create(place);
const IColumn* column[7] = {column_window.get(), column_mode.get(), column_timestamp.get(),
column_c1.get(), column_c2.get(), column_c3.get(),
column_c4.get()};
for (int i = 0; i < NUM_ROWS; i++) {
agg_func->add(place, column, i, arena);
}
ColumnInt32 column_result;
agg_func->insert_result_into(place, column_result);
// Chain: E0@r0 → E1@r1 → E3@r2 triggers level jump. E1@r2 is a duplicate
// (sr_idx=1 ≤ curr_level=2), so it must NOT suppress the break. Result = 2.
EXPECT_EQ(column_result.get_data()[0], 2);
agg_func->destroy(place);
}
// Fixed mode: when a row matches multiple conditions including E0 (c1),
// the E0 continuation should NOT restart the chain if a same-row event
// already advanced it. This emulates V1's row-based semantics where each
// row is tested against exactly one expected condition.
TEST_F(VWindowFunnelV2Test, testFixedContinuationE0DoesNotRestartChain) {
AggregateFunctionSimpleFactory factory = AggregateFunctionSimpleFactory::instance();
DataTypes data_types_3 = {
std::make_shared<DataTypeInt64>(), std::make_shared<DataTypeString>(),
std::make_shared<DataTypeDateTimeV2>(), std::make_shared<DataTypeUInt8>(),
std::make_shared<DataTypeUInt8>(), std::make_shared<DataTypeUInt8>()};
auto agg_func_3 = factory.get("window_funnel_v2", data_types_3, nullptr, false,
BeExecVersionManager::get_newest_version());
ASSERT_NE(agg_func_3, nullptr);
// 3 rows, 3 conditions (c1, c2, c3):
// r0: c1=T, c2=F, c3=F -> E0 starts chain
// r1: c1=T, c2=T, c3=F -> c2 advances chain, c1(cont) should be skipped
// r2: c1=F, c2=F, c3=T -> c3 advances chain to level 2
// Expected result: 3 (c1@r0 -> c2@r1 -> c3@r2)
// Without E0-skip fix, c1@r1(cont) would restart chain, and result would be 2.
const int NUM_ROWS = 3;
auto column_mode = ColumnString::create();
for (int i = 0; i < NUM_ROWS; i++) {
column_mode->insert(Field::create_field<TYPE_STRING>("fixed"));
}
auto column_timestamp = ColumnDateTimeV2::create();
VecDateTimeValue tv;
for (int i = 0; i < NUM_ROWS; i++) {
tv.unchecked_set_time(2020, 1, i + 1, 0, 0, 0);
auto dtv2 = tv.to_datetime_v2();
column_timestamp->insert_data((char*)&dtv2, 0);
}
auto column_window = ColumnInt64::create();
for (int i = 0; i < NUM_ROWS; i++) {
column_window->insert(Field::create_field<TYPE_BIGINT>(86400 * 10));
}
// c1: r0=T, r1=T, r2=F
auto column_c1 = ColumnUInt8::create();
column_c1->insert(Field::create_field<TYPE_BOOLEAN>(1));
column_c1->insert(Field::create_field<TYPE_BOOLEAN>(1));
column_c1->insert(Field::create_field<TYPE_BOOLEAN>(0));
// c2: r0=F, r1=T, r2=F
auto column_c2 = ColumnUInt8::create();
column_c2->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_c2->insert(Field::create_field<TYPE_BOOLEAN>(1));
column_c2->insert(Field::create_field<TYPE_BOOLEAN>(0));
// c3: r0=F, r1=F, r2=T
auto column_c3 = ColumnUInt8::create();
column_c3->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_c3->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_c3->insert(Field::create_field<TYPE_BOOLEAN>(1));
std::unique_ptr<char[]> memory(new char[agg_func_3->size_of_data()]);
AggregateDataPtr place = memory.get();
agg_func_3->create(place);
const IColumn* columns[6] = {column_window.get(), column_mode.get(), column_timestamp.get(),
column_c1.get(), column_c2.get(), column_c3.get()};
for (int i = 0; i < NUM_ROWS; i++) {
agg_func_3->add(place, columns, i, arena);
}
ColumnInt32 column_result;
agg_func_3->insert_result_into(place, column_result);
EXPECT_EQ(column_result.get_data()[0], 3);
agg_func_3->destroy(place);
}
// Dedup mode multi-pass: V1 reaches level 4 by starting from a later E0 and skipping
// over rows that match higher-level conditions. The old single-pass V2 broke at level 3
// because it greedily promoted from multi-condition rows, creating premature duplicates.
// 7 conditions, 15 valid rows (after null filtering). Window ~35 years (effectively unlimited).
// V1 chain: c1@R5(pk=8) → c2@R9(pk=6) → c3@R10(pk=12) → c4@R12(pk=5), then c1 dup at R13 → level=4.
TEST_F(VWindowFunnelV2Test, testDeduplicationMultiPassGapCheck) {
AggregateFunctionSimpleFactory factory = AggregateFunctionSimpleFactory::instance();
// 7 conditions
DataTypes data_types_7 = {
std::make_shared<DataTypeInt64>(), std::make_shared<DataTypeString>(),
std::make_shared<DataTypeDateTimeV2>(), std::make_shared<DataTypeUInt8>(),
std::make_shared<DataTypeUInt8>(), std::make_shared<DataTypeUInt8>(),
std::make_shared<DataTypeUInt8>(), std::make_shared<DataTypeUInt8>(),
std::make_shared<DataTypeUInt8>(), std::make_shared<DataTypeUInt8>()};
auto agg_func = factory.get("window_funnel_v2", data_types_7, nullptr, false,
BeExecVersionManager::get_newest_version());
ASSERT_NE(agg_func, nullptr);
// 15 rows sorted by timestamp (null rows excluded at SQL level).
// Conditions: c1=val<4, c2=val<=2, c3=val!=6, c4=val<=4, c5=date='2023-12-13',
// c6=val<=5, c7=val<=8
struct RowData {
int year, month, day;
int val;
bool date_is_2023_12_13;
};
// clang-format off
RowData rows[] = {
{2000, 3, 19, 3, false}, // pk=10
{2000, 4, 3, 8, false}, // pk=9
{2001,10, 12, 3, false}, // pk=2
{2002,11, 15, 8, false}, // pk=0
{2003, 9, 10, 0, false}, // pk=19
{2006, 6, 6, 0, false}, // pk=8 ← V1 best E0 start
{2008, 7, 3, 8, false}, // pk=3
{2009, 3, 3, 9, false}, // pk=11
{2009, 5, 16, 8, true}, // pk=16
{2009,11, 3, 0, true}, // pk=6 ← all 7 conditions match
{2011, 6, 25, 7, true}, // pk=12
{2014, 5, 16, 6, false}, // pk=14
{2014,11, 2, 0, false}, // pk=5
{2016, 1, 8, 3, false}, // pk=13
{2017, 1, 3, 8, true}, // pk=17
};
// clang-format on
const int NUM_ROWS = 15;
auto column_mode = ColumnString::create();
for (int i = 0; i < NUM_ROWS; i++) {
column_mode->insert(Field::create_field<TYPE_STRING>("deduplication"));
}
auto column_timestamp = ColumnDateTimeV2::create();
for (int i = 0; i < NUM_ROWS; i++) {
VecDateTimeValue tv;
tv.unchecked_set_time(rows[i].year, rows[i].month, rows[i].day, 0, 0, 0);
auto dtv2 = tv.to_datetime_v2();
column_timestamp->insert_data((char*)&dtv2, 0);
}
auto column_window = ColumnInt64::create();
for (int i = 0; i < NUM_ROWS; i++) {
column_window->insert(Field::create_field<TYPE_BIGINT>(1119906808));
}
// c1: val < 4
auto col_c1 = ColumnUInt8::create();
for (int i = 0; i < NUM_ROWS; i++)
col_c1->insert(Field::create_field<TYPE_BOOLEAN>(rows[i].val < 4 ? 1 : 0));
// c2: val <= 2
auto col_c2 = ColumnUInt8::create();
for (int i = 0; i < NUM_ROWS; i++)
col_c2->insert(Field::create_field<TYPE_BOOLEAN>(rows[i].val <= 2 ? 1 : 0));
// c3: val != 6
auto col_c3 = ColumnUInt8::create();
for (int i = 0; i < NUM_ROWS; i++)
col_c3->insert(Field::create_field<TYPE_BOOLEAN>(rows[i].val != 6 ? 1 : 0));
// c4: val <= 4
auto col_c4 = ColumnUInt8::create();
for (int i = 0; i < NUM_ROWS; i++)
col_c4->insert(Field::create_field<TYPE_BOOLEAN>(rows[i].val <= 4 ? 1 : 0));
// c5: date = '2023-12-13'
auto col_c5 = ColumnUInt8::create();
for (int i = 0; i < NUM_ROWS; i++)
col_c5->insert(Field::create_field<TYPE_BOOLEAN>(rows[i].date_is_2023_12_13 ? 1 : 0));
// c6: val <= 5
auto col_c6 = ColumnUInt8::create();
for (int i = 0; i < NUM_ROWS; i++)
col_c6->insert(Field::create_field<TYPE_BOOLEAN>(rows[i].val <= 5 ? 1 : 0));
// c7: val <= 8
auto col_c7 = ColumnUInt8::create();
for (int i = 0; i < NUM_ROWS; i++)
col_c7->insert(Field::create_field<TYPE_BOOLEAN>(rows[i].val <= 8 ? 1 : 0));
std::unique_ptr<char[]> memory(new char[agg_func->size_of_data()]);
AggregateDataPtr place = memory.get();
agg_func->create(place);
const IColumn* column[10] = {column_window.get(), column_mode.get(), column_timestamp.get(),
col_c1.get(), col_c2.get(), col_c3.get(),
col_c4.get(), col_c5.get(), col_c6.get(),
col_c7.get()};
for (int i = 0; i < NUM_ROWS; i++) {
agg_func->add(place, column, i, arena);
}
ColumnInt32 column_result;
agg_func->insert_result_into(place, column_result);
// V1 = 4: c1@R5(pk=8) → c2@R9(pk=6) → c3@R10(pk=12) → c4@R12(pk=5)
// c1 dup at R13(pk=13) breaks the chain before c5 can be matched.
EXPECT_EQ(column_result.get_data()[0], 4);
agg_func->destroy(place);
}
// FIXED mode "relevant-but-wrong row" break: when the next row after a matched
// condition matches some condition but NOT the expected next one, the chain must
// break. V1 checks consecutive rows, so a c2-only row after c1→c2 breaks when
// expecting c3. Before the fix, V2 silently re-promoted c2 and kept searching.
TEST_F(VWindowFunnelV2Test, testFixedRelevantButWrongRowBreaksChain) {
AggregateFunctionSimpleFactory factory = AggregateFunctionSimpleFactory::instance();
DataTypes data_types_3 = {
std::make_shared<DataTypeInt64>(), std::make_shared<DataTypeString>(),
std::make_shared<DataTypeDateTimeV2>(), std::make_shared<DataTypeUInt8>(),
std::make_shared<DataTypeUInt8>(), std::make_shared<DataTypeUInt8>()};
auto agg_func_3 = factory.get("window_funnel_v2", data_types_3, nullptr, false,
BeExecVersionManager::get_newest_version());
ASSERT_NE(agg_func_3, nullptr);
// 4 rows, 3 conditions (c1, c2, c3):
// r0: c1=T, c2=F, c3=F → E0, chain level 0
// r1: c1=F, c2=T, c3=F → c2 advances chain to level 1
// r2: c1=F, c2=T, c3=F → c2 re-matches level 1 (expected c3). Chain BREAKS.
// r3: c1=F, c2=F, c3=T → c3 could advance but chain is already broken
// Expected: 2 (c1@r0 → c2@r1, break at r2)
const int NUM_ROWS = 4;
auto column_mode = ColumnString::create();
for (int i = 0; i < NUM_ROWS; i++) {
column_mode->insert(Field::create_field<TYPE_STRING>("fixed"));
}
auto column_timestamp = ColumnDateTimeV2::create();
VecDateTimeValue tv;
for (int i = 0; i < NUM_ROWS; i++) {
tv.unchecked_set_time(2020, 1, i + 1, 0, 0, 0);
auto dtv2 = tv.to_datetime_v2();
column_timestamp->insert_data((char*)&dtv2, 0);
}
auto column_window = ColumnInt64::create();
for (int i = 0; i < NUM_ROWS; i++) {
column_window->insert(Field::create_field<TYPE_BIGINT>(86400 * 10));
}
// c1: r0=T, r1=F, r2=F, r3=F
auto column_c1 = ColumnUInt8::create();
column_c1->insert(Field::create_field<TYPE_BOOLEAN>(1));
column_c1->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_c1->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_c1->insert(Field::create_field<TYPE_BOOLEAN>(0));
// c2: r0=F, r1=T, r2=T, r3=F
auto column_c2 = ColumnUInt8::create();
column_c2->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_c2->insert(Field::create_field<TYPE_BOOLEAN>(1));
column_c2->insert(Field::create_field<TYPE_BOOLEAN>(1));
column_c2->insert(Field::create_field<TYPE_BOOLEAN>(0));
// c3: r0=F, r1=F, r2=F, r3=T
auto column_c3 = ColumnUInt8::create();
column_c3->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_c3->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_c3->insert(Field::create_field<TYPE_BOOLEAN>(0));
column_c3->insert(Field::create_field<TYPE_BOOLEAN>(1));
std::unique_ptr<char[]> memory(new char[agg_func_3->size_of_data()]);
AggregateDataPtr place = memory.get();
agg_func_3->create(place);
const IColumn* columns[6] = {column_window.get(), column_mode.get(), column_timestamp.get(),
column_c1.get(), column_c2.get(), column_c3.get()};
for (int i = 0; i < NUM_ROWS; i++) {
agg_func_3->add(place, columns, i, arena);
}
ColumnInt32 column_result;
agg_func_3->insert_result_into(place, column_result);
EXPECT_EQ(column_result.get_data()[0], 2);
agg_func_3->destroy(place);
}
} // namespace doris