blob: f8e8fb6328894b84dd6f3c9f14c3b946eb050279 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <gen_cpp/AgentService_types.h>
#include <gtest/gtest.h>
#include "common/object_pool.h"
#include "gen_cpp/Descriptors_types.h"
#include "gen_cpp/Types_types.h"
#include "gen_cpp/internal_service.pb.h"
#include "olap/tablet_schema.h"
#include "runtime/descriptor_helper.h"
#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
#include "runtime/runtime_state.h"
#include "service/point_query_executor.h"
#include "vec/core/block.h"
#include "vec/exprs/vexpr.h"
namespace doris {
// Helper class for setting up Reusable objects to test LookupConnectionCache
class ReusableTestHelper {
public:
static TDescriptorTable create_descriptor_tablet() {
TDescriptorTableBuilder dtb;
TTupleDescriptorBuilder tuple_builder;
tuple_builder.add_slot(TSlotDescriptorBuilder()
.type(TYPE_TINYINT)
.column_name("k1")
.column_pos(0)
.build());
tuple_builder.add_slot(TSlotDescriptorBuilder()
.type(TYPE_SMALLINT)
.column_name("k2")
.column_pos(1)
.build());
tuple_builder.add_slot(
TSlotDescriptorBuilder().type(TYPE_INT).column_name("k3").column_pos(2).build());
tuple_builder.add_slot(
TSlotDescriptorBuilder().type(TYPE_BIGINT).column_name("k4").column_pos(3).build());
tuple_builder.add_slot(TSlotDescriptorBuilder()
.type(TYPE_LARGEINT)
.column_name("k5")
.column_pos(4)
.build());
tuple_builder.add_slot(
TSlotDescriptorBuilder().type(TYPE_DATE).column_name("k6").column_pos(5).build());
tuple_builder.add_slot(TSlotDescriptorBuilder()
.type(TYPE_DATETIME)
.column_name("k7")
.column_pos(6)
.build());
tuple_builder.add_slot(
TSlotDescriptorBuilder().string_type(4).column_name("k8").column_pos(7).build());
tuple_builder.add_slot(
TSlotDescriptorBuilder().string_type(65).column_name("k9").column_pos(8).build());
tuple_builder.add_slot(TSlotDescriptorBuilder()
.decimal_type(6, 3)
.column_name("k10")
.column_pos(9)
.build());
tuple_builder.add_slot(TSlotDescriptorBuilder()
.type(TYPE_DATEV2)
.column_name("k11")
.column_pos(10)
.build());
tuple_builder.add_slot(TSlotDescriptorBuilder()
.type(TYPE_TINYINT)
.column_name("v1")
.column_pos(11)
.nullable(false)
.build());
tuple_builder.add_slot(TSlotDescriptorBuilder()
.type(TYPE_SMALLINT)
.column_name("v2")
.column_pos(12)
.nullable(false)
.build());
tuple_builder.add_slot(TSlotDescriptorBuilder()
.type(TYPE_INT)
.column_name("v3")
.column_pos(13)
.nullable(false)
.build());
tuple_builder.add_slot(TSlotDescriptorBuilder()
.type(TYPE_BIGINT)
.column_name("v4")
.column_pos(14)
.nullable(false)
.build());
tuple_builder.add_slot(TSlotDescriptorBuilder()
.type(TYPE_LARGEINT)
.column_name("v5")
.column_pos(15)
.nullable(false)
.build());
tuple_builder.add_slot(TSlotDescriptorBuilder()
.type(TYPE_DATE)
.column_name("v6")
.column_pos(16)
.nullable(false)
.build());
tuple_builder.add_slot(TSlotDescriptorBuilder()
.type(TYPE_DATETIME)
.column_name("v7")
.column_pos(17)
.nullable(false)
.build());
tuple_builder.add_slot(TSlotDescriptorBuilder()
.string_type(4)
.column_name("v8")
.column_pos(18)
.nullable(false)
.build());
tuple_builder.add_slot(TSlotDescriptorBuilder()
.string_type(65)
.column_name("v9")
.column_pos(19)
.nullable(false)
.build());
tuple_builder.add_slot(TSlotDescriptorBuilder()
.decimal_type(6, 3)
.column_name("v10")
.column_pos(20)
.nullable(false)
.build());
tuple_builder.add_slot(TSlotDescriptorBuilder()
.type(TYPE_DATEV2)
.column_name("v11")
.column_pos(21)
.nullable(false)
.build());
tuple_builder.build(&dtb);
return dtb.desc_tbl();
}
static std::shared_ptr<Reusable> create_reusable() {
auto obj_pool = std::make_unique<ObjectPool>();
auto runtime_state = RuntimeState::create_unique();
auto reusable = std::make_shared<Reusable>();
// Create a simple TupleDescriptor for testing
TDescriptorTable t_desc_tbl = create_descriptor_tablet();
// Initialize Reusable
Status st = reusable->init(t_desc_tbl, output_exprs, query_options, *tablet_schema, 2);
if (!st.ok()) {
return nullptr;
}
return reusable;
}
static std::vector<TExpr> output_exprs;
static TQueryOptions query_options;
static std::shared_ptr<TabletSchema> tablet_schema;
};
std::vector<TExpr> ReusableTestHelper::output_exprs = []() {
std::vector<TExpr> list;
TExpr expr;
expr.nodes.emplace_back(TExprNode());
expr.nodes[0].node_type = TExprNodeType::SLOT_REF;
auto type = TTypeDesc();
type.types.emplace_back(TTypeNode());
type.types[0].type = TTypeNodeType::SCALAR;
type.types[0].__isset.scalar_type = true;
type.types[0].scalar_type.type = TPrimitiveType::BIGINT;
expr.nodes[0].type = type;
expr.nodes[0].num_children = 0;
list.push_back(expr);
return list;
}();
TQueryOptions ReusableTestHelper::query_options = TQueryOptions();
std::shared_ptr<TabletSchema> ReusableTestHelper::tablet_schema = []() {
auto schema = std::make_shared<TabletSchema>();
for (int i = 0; i < 11; ++i) {
schema->append_column(TabletColumn(FieldAggregationMethod::OLAP_FIELD_AGGREGATION_NONE,
FieldType::OLAP_FIELD_TYPE_BIGINT));
}
return schema;
}();
// RowCache test class
class RowCacheTest : public testing::Test {
protected:
void SetUp() override {
// Create RowCache instance
_row_cache = new RowCache(1024, 4); // 1KB cache, 4 shards
}
void TearDown() override { delete _row_cache; }
RowCache* _row_cache = nullptr;
};
// Test basic RowCache functionality
TEST_F(RowCacheTest, PQTestBasicOperations) {
// Create test data
const int64_t tablet_id = 12345;
const std::string key_data = "test_key";
Slice key_slice(key_data.c_str(), key_data.size());
const std::string value_data = "test_value";
Slice value_slice(value_data.c_str(), value_data.size());
RowCache::RowCacheKey cache_key(tablet_id, key_slice);
// Test insertion
_row_cache->insert(cache_key, value_slice);
// Test lookup
RowCache::CacheHandle handle;
bool found = _row_cache->lookup(cache_key, &handle);
ASSERT_TRUE(found) << "Cache entry should be found";
ASSERT_TRUE(handle.valid()) << "Cache handle should be valid";
// Verify data correctness
Slice cached_data = handle.data();
ASSERT_EQ(cached_data.size, value_slice.size) << "Cache data size should match";
ASSERT_EQ(memcmp(cached_data.data, value_slice.data, value_slice.size), 0)
<< "Cache data content should match";
// Test deletion
_row_cache->erase(cache_key);
RowCache::CacheHandle handle2;
found = _row_cache->lookup(cache_key, &handle2);
ASSERT_FALSE(found) << "Cache entry should be deleted";
ASSERT_FALSE(handle2.valid()) << "Cache handle should be invalid";
}
// Test RowCache LRU mechanism
TEST_F(RowCacheTest, PQTestLRUEviction) {
// Use small capacity cache to test LRU eviction
RowCache small_cache(100, 1); // 100 bytes capacity, 1 shard
// Create enough entries to trigger LRU eviction
for (int i = 0; i < 10; i++) {
const int64_t tablet_id = 12345;
std::string key_data = "key_" + std::to_string(i);
Slice key_slice(key_data.c_str(), key_data.size());
// Create values large enough, about 20 bytes each
std::string value_data = "value_" + std::to_string(i) + std::string(10, 'x');
Slice value_slice(value_data.c_str(), value_data.size());
RowCache::RowCacheKey cache_key(tablet_id, key_slice);
small_cache.insert(cache_key, value_slice);
}
// Check if some earlier entries have been evicted
bool at_least_one_evicted = false;
for (int i = 0; i < 5; i++) {
const int64_t tablet_id = 12345;
std::string key_data = "key_" + std::to_string(i);
Slice key_slice(key_data.c_str(), key_data.size());
RowCache::RowCacheKey cache_key(tablet_id, key_slice);
RowCache::CacheHandle handle;
if (!small_cache.lookup(cache_key, &handle)) {
at_least_one_evicted = true;
break;
}
}
ASSERT_TRUE(at_least_one_evicted) << "At least one cache entry should be evicted";
// Ensure the most recently accessed entry is still in cache
const int64_t tablet_id = 12345;
std::string key_data = "key_9"; // Last inserted key
Slice key_slice(key_data.c_str(), key_data.size());
RowCache::RowCacheKey cache_key(tablet_id, key_slice);
RowCache::CacheHandle handle;
bool found = small_cache.lookup(cache_key, &handle);
//evicted
ASSERT_TRUE(!found);
}
// LookupConnectionCache test class
class LookupConnectionCacheTest : public testing::Test {
protected:
void SetUp() override {
// Create LookupConnectionCache instance
_lookup_cache = new LookupConnectionCache(1024); // 1KB cache
}
void TearDown() override { delete _lookup_cache; }
LookupConnectionCache* _lookup_cache = nullptr;
};
TEST_F(LookupConnectionCacheTest, PQTestLRUEvictionPolicy) {
LookupConnectionCache cache(40);
for (int i = 1; i <= 45; ++i) {
auto reusable = std::make_shared<Reusable>();
reusable->_block_pool.resize(10);
cache.add(i, reusable);
}
EXPECT_LT(cache.get_element_count(), 45) << "capacity " << cache.get_capacity();
auto entry2 = cache.get(10);
auto reusable5 = std::make_shared<Reusable>();
reusable5->_block_pool.resize(10);
cache.add(41, reusable5);
cache.add(42, reusable5);
cache.add(43, reusable5);
EXPECT_LT(cache.get_element_count(), 45) << "capacity " << cache.get_capacity();
}
// Test cache capacity boundary and LRU eviction policy
TEST_F(LookupConnectionCacheTest, PQTestCapacityBoundary) {
LookupConnectionCache cache(40);
const int num_entries = 45;
// Insert 15 entries to exceed capacity
for (int i = 0; i < num_entries; ++i) {
auto reusable = ReusableTestHelper::create_reusable();
cache.add(i, reusable);
}
// Verify at least 5 entries are evicted
int found_count = 0;
for (int i = 0; i < num_entries; ++i) {
if (cache.get(i) != nullptr) found_count++;
}
EXPECT_LT(found_count, 45) << "LRU eviction should maintain capacity limit";
}
// Test thread safety with concurrent operations
TEST_F(LookupConnectionCacheTest, PQTestConcurrentAccess) {
const int num_threads = 4;
const int num_ops_per_thread = 1000;
LookupConnectionCache cache(1024 * 1024 * 100); // 100MB buffer
std::vector<std::thread> threads;
for (int i = 0; i < num_threads; ++i) {
threads.emplace_back([&, i]() {
// Mixed operations: 50% insert, 50% query
for (int j = 0; j < num_ops_per_thread; ++j) {
int64_t key = i * 1000 + j;
auto reusable = ReusableTestHelper::create_reusable();
if (j % 2 == 0) {
cache.add(key, reusable); // Insert
} else {
auto entry = cache.get(key); // Query
EXPECT_TRUE(entry == nullptr || entry.use_count() == 2);
}
}
});
}
for (auto& t : threads) t.join();
}
// Test exceptional input handling
TEST_F(LookupConnectionCacheTest, PQTestInvalidKeys) {
LookupConnectionCache cache(1024 * 1024);
// Null value test
cache.add(0, nullptr);
// Oversized entry test (1MB limit)
auto large_reusable = std::make_shared<Reusable>();
large_reusable->_block_pool.resize(1024 * 1024); // 1MB block
cache.add(1, large_reusable);
}
// Test key collision handling
TEST_F(LookupConnectionCacheTest, PQTestDuplicateAdd) {
LookupConnectionCache cache(1024 * 1024);
auto reusable1 = ReusableTestHelper::create_reusable();
auto reusable2 = ReusableTestHelper::create_reusable();
// Overwrite existing key
cache.add(123, reusable1);
cache.add(123, reusable2);
auto entry = cache.get(123);
ASSERT_NE(entry, nullptr);
EXPECT_EQ(entry.get(), reusable2.get()) << "Last write should win in key collision";
}
// Test reference counting mechanism
TEST_F(LookupConnectionCacheTest, PQTestEntryLifetime) {
LookupConnectionCache cache(1024 * 1024);
{
auto reusable = ReusableTestHelper::create_reusable();
cache.add(123, reusable);
auto entry = cache.get(123);
ASSERT_NE(entry, nullptr);
EXPECT_EQ(entry.use_count(), 3); // Cache + local reference
} // Local reference released
// Verify cache maintains ownership
auto entry = cache.get(123);
ASSERT_NE(entry, nullptr);
EXPECT_EQ(entry.use_count(), 2) << "Cache should maintain sole ownership after scope exit";
}
} // namespace doris