blob: d33325ddf903a5a0238af7a803dfa0e7c26fc880 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// A client sending requests to server asynchronously every 1 second.
#include <gflags/gflags.h>
#include <butil/logging.h>
#include <butil/time.h>
#include <brpc/channel.h>
#include <bvar/bvar.h>
#include <bthread/timer_thread.h>
#include <json2pb/json_to_pb.h>
#include <fstream>
#include "cl_test.pb.h"
DEFINE_string(protocol, "baidu_std", "Protocol type. Defined in src/brpc/options.proto");
DEFINE_string(connection_type, "", "Connection type. Available values: single, pooled, short");
DEFINE_string(cntl_server, "0.0.0.0:9000", "IP Address of server");
DEFINE_string(echo_server, "0.0.0.0:9001", "IP Address of server");
DEFINE_int32(timeout_ms, 3000, "RPC timeout in milliseconds");
DEFINE_int32(max_retry, 0, "Max retries(not including the first RPC)");
DEFINE_int32(case_interval, 20, "Intervals for different test cases");
DEFINE_int32(client_qps_change_interval_us, 50000,
"The interval for client changes the sending speed");
DEFINE_string(case_file, "", "File path for test_cases");
void DisplayStage(const test::Stage& stage) {
std::string type;
switch(stage.type()) {
case test::FLUCTUATE:
type = "Fluctuate";
break;
case test::SMOOTH:
type = "Smooth";
break;
default:
type = "Unknown";
}
std::stringstream ss;
ss
<< "Stage:[" << stage.lower_bound() << ':'
<< stage.upper_bound() << "]"
<< " , Type:" << type;
LOG(INFO) << ss.str();
}
uint32_t cast_func(void* arg) {
return *(uint32_t*)arg;
}
butil::atomic<uint32_t> g_timeout(0);
butil::atomic<uint32_t> g_error(0);
butil::atomic<uint32_t> g_succ(0);
bvar::PassiveStatus<uint32_t> g_timeout_bvar(cast_func, &g_timeout);
bvar::PassiveStatus<uint32_t> g_error_bvar(cast_func, &g_error);
bvar::PassiveStatus<uint32_t> g_succ_bvar(cast_func, &g_succ);
bvar::LatencyRecorder g_latency_rec;
void LoadCaseSet(test::TestCaseSet* case_set, const std::string& file_path) {
std::ifstream ifs(file_path.c_str(), std::ios::in);
if (!ifs) {
LOG(FATAL) << "Fail to open case set file: " << file_path;
}
std::string case_set_json((std::istreambuf_iterator<char>(ifs)),
std::istreambuf_iterator<char>());
std::string err;
if (!json2pb::JsonToProtoMessage(case_set_json, case_set, &err)) {
LOG(FATAL)
<< "Fail to trans case_set from json to protobuf message: "
<< err;
}
}
void HandleEchoResponse(
brpc::Controller* cntl,
test::NotifyResponse* response) {
// std::unique_ptr makes sure cntl/response will be deleted before returning.
std::unique_ptr<brpc::Controller> cntl_guard(cntl);
std::unique_ptr<test::NotifyResponse> response_guard(response);
if (cntl->Failed() && cntl->ErrorCode() == brpc::ERPCTIMEDOUT) {
g_timeout.fetch_add(1, butil::memory_order_relaxed);
LOG_EVERY_N(INFO, 1000) << cntl->ErrorText();
} else if (cntl->Failed()) {
g_error.fetch_add(1, butil::memory_order_relaxed);
LOG_EVERY_N(INFO, 1000) << cntl->ErrorText();
} else {
g_succ.fetch_add(1, butil::memory_order_relaxed);
g_latency_rec << cntl->latency_us();
}
}
void Expose() {
g_timeout_bvar.expose_as("cl", "timeout");
g_error_bvar.expose_as("cl", "failed");
g_succ_bvar.expose_as("cl", "succ");
g_latency_rec.expose("cl");
}
struct TestCaseContext {
TestCaseContext(const test::TestCase& tc)
: running(true)
, stage_index(0)
, test_case(tc)
, next_stage_sec(test_case.qps_stage_list(0).duration_sec() +
butil::gettimeofday_s()) {
DisplayStage(test_case.qps_stage_list(stage_index));
Update();
}
bool Update() {
if (butil::gettimeofday_s() >= next_stage_sec) {
++stage_index;
if (stage_index < test_case.qps_stage_list_size()) {
next_stage_sec += test_case.qps_stage_list(stage_index).duration_sec();
DisplayStage(test_case.qps_stage_list(stage_index));
} else {
return false;
}
}
int qps = 0;
const test::Stage& qps_stage = test_case.qps_stage_list(stage_index);
const int lower_bound = qps_stage.lower_bound();
const int upper_bound = qps_stage.upper_bound();
if (qps_stage.type() == test::FLUCTUATE) {
qps = butil::fast_rand_less_than(upper_bound - lower_bound) + lower_bound;
} else if (qps_stage.type() == test::SMOOTH) {
qps = lower_bound + (upper_bound - lower_bound) /
double(qps_stage.duration_sec()) * (qps_stage.duration_sec() - next_stage_sec
+ butil::gettimeofday_s());
}
interval_us.store(1.0 / qps * 1000000, butil::memory_order_relaxed);
return true;
}
butil::atomic<bool> running;
butil::atomic<int64_t> interval_us;
int stage_index;
const test::TestCase test_case;
int next_stage_sec;
};
void RunUpdateTask(void* data) {
TestCaseContext* context = (TestCaseContext*)data;
bool should_continue = context->Update();
if (should_continue) {
bthread::get_global_timer_thread()->schedule(RunUpdateTask, data,
butil::microseconds_from_now(FLAGS_client_qps_change_interval_us));
} else {
context->running.store(false, butil::memory_order_release);
}
}
void RunCase(test::ControlService_Stub &cntl_stub,
const test::TestCase& test_case) {
LOG(INFO) << "Running case:`" << test_case.case_name() << '\'';
brpc::Channel channel;
brpc::ChannelOptions options;
options.protocol = FLAGS_protocol;
options.connection_type = FLAGS_connection_type;
options.timeout_ms = FLAGS_timeout_ms;
options.max_retry = FLAGS_max_retry;
if (channel.Init(FLAGS_echo_server.c_str(), &options) != 0) {
LOG(FATAL) << "Fail to initialize channel";
}
test::EchoService_Stub echo_stub(&channel);
test::NotifyRequest cntl_req;
test::NotifyResponse cntl_rsp;
brpc::Controller cntl;
cntl_req.set_message("StartCase");
cntl_stub.Notify(&cntl, &cntl_req, &cntl_rsp, NULL);
CHECK(!cntl.Failed()) << "control failed";
TestCaseContext context(test_case);
bthread::get_global_timer_thread()->schedule(RunUpdateTask, &context,
butil::microseconds_from_now(FLAGS_client_qps_change_interval_us));
while (context.running.load(butil::memory_order_acquire)) {
test::NotifyRequest echo_req;
echo_req.set_message("hello");
brpc::Controller* echo_cntl = new brpc::Controller;
test::NotifyResponse* echo_rsp = new test::NotifyResponse;
google::protobuf::Closure* done = brpc::NewCallback(
&HandleEchoResponse, echo_cntl, echo_rsp);
echo_stub.Echo(echo_cntl, &echo_req, echo_rsp, done);
::usleep(context.interval_us.load(butil::memory_order_relaxed));
}
LOG(INFO) << "Waiting to stop case: `" << test_case.case_name() << '\'';
::sleep(FLAGS_case_interval);
cntl.Reset();
cntl_req.set_message("StopCase");
cntl_stub.Notify(&cntl, &cntl_req, &cntl_rsp, NULL);
CHECK(!cntl.Failed()) << "control failed";
LOG(INFO) << "Case `" << test_case.case_name() << "' finshed:";
}
int main(int argc, char* argv[]) {
// Parse gflags. We recommend you to use gflags as well.
GFLAGS_NAMESPACE::ParseCommandLineFlags(&argc, &argv, true);
Expose();
brpc::Channel channel;
brpc::ChannelOptions options;
options.protocol = FLAGS_protocol;
options.connection_type = FLAGS_connection_type;
options.timeout_ms = FLAGS_timeout_ms;
if (channel.Init(FLAGS_cntl_server.c_str(), &options) != 0) {
LOG(ERROR) << "Fail to initialize channel";
return -1;
}
test::ControlService_Stub cntl_stub(&channel);
test::TestCaseSet case_set;
LoadCaseSet(&case_set, FLAGS_case_file);
brpc::Controller cntl;
test::NotifyRequest cntl_req;
test::NotifyResponse cntl_rsp;
cntl_req.set_message("ResetCaseSet");
cntl_stub.Notify(&cntl, &cntl_req, &cntl_rsp, NULL);
CHECK(!cntl.Failed()) << "Cntl Failed";
for (int i = 0; i < case_set.test_case_size(); ++i) {
RunCase(cntl_stub, case_set.test_case(i));
}
LOG(INFO) << "EchoClient is going to quit";
return 0;
}