blob: 8f895354f182236a221152a072c52083b00556e3 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "common/bvars.h"
#include <brpc/server.h>
#include <bthread/bthread.h>
#include <glog/logging.h>
#include <gtest/gtest.h>
#include <atomic>
#include <chrono>
#include <iostream>
#include <memory>
#include <string>
#include <thread>
#include <vector>
#include "common/logging.h"
#include "meta-service/meta_service.h"
#include "meta-store/mem_txn_kv.h"
namespace doris::cloud {
int main(int argc, char** argv) {
if (!cloud::init_glog("bvars_test")) {
std::cerr << "failed to init glog" << std::endl;
return -1;
}
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
class BvarsTest : public ::testing::Test {
public:
void SetUp() override {
if (server.Start("0.0.0.0:0", &options) == -1) {
perror("Start brpc server");
}
}
void TearDown() override {
server.Stop(0);
server.Join();
}
brpc::ServerOptions options;
brpc::Server server;
};
TEST(BvarsTest, MultiThreadRecordMetrics) {
auto txn_kv = std::make_shared<MemTxnKv>();
ASSERT_EQ(txn_kv->init(), 0);
std::vector<std::thread> threads;
std::atomic_long update_count = 0;
MBvarLatencyRecorderWithStatus<60> mbvarlr_partition_test("partition_test", {"instance_id"});
MBvarLatencyRecorderWithStatus<60> mbvarlr_tablet_test("tablet_test", {"instance_id"});
int interval_s = 60;
auto* sp = SyncPoint::get_instance();
sp->set_call_back("mBvarLatencyRecorderWithStatus::put", [&interval_s](auto&& args) {
auto* vault = try_any_cast<int*>(args[0]);
*vault = interval_s;
});
sp->set_call_back("mBvarLatencyRecorderWithStatus::update",
[&update_count](auto&& args) { update_count++; });
sp->enable_processing();
for (int i = 0; i < 20; ++i) {
threads.emplace_back([&]() {
for (size_t j = 0; j < 1000; j++) {
std::unique_ptr<Transaction> txn;
ASSERT_EQ(TxnErrorCode::TXN_OK, txn_kv->create_txn(&txn));
std::string instance_id = "instance_" + std::to_string(j % 100);
int64_t partition_count = 10 + j;
int64_t tablet_count = 50 + j;
mbvarlr_partition_test.put({instance_id}, partition_count);
mbvarlr_tablet_test.put({instance_id}, tablet_count);
std::this_thread::sleep_for(std::chrono::microseconds(1000));
}
});
}
for (int i = 0; i < 20; ++i) {
threads.emplace_back([&]() {
for (size_t j = 0; j < 1000; j++) {
std::unique_ptr<Transaction> txn;
ASSERT_EQ(TxnErrorCode::TXN_OK, txn_kv->create_txn(&txn));
std::string instance_id = "instance_" + std::to_string(j % 100);
mbvarlr_partition_test.get_count({instance_id});
mbvarlr_partition_test.get_avg({instance_id});
mbvarlr_partition_test.get_max({instance_id});
mbvarlr_tablet_test.get_count({instance_id});
mbvarlr_tablet_test.get_avg({instance_id});
mbvarlr_tablet_test.get_max({instance_id});
std::this_thread::sleep_for(std::chrono::microseconds(1000));
}
});
}
for (auto& t : threads) {
t.join();
}
sleep(interval_s * 3);
ASSERT_GT(update_count.load(), 200);
}
} // namespace doris::cloud