| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| // A server to receive EchoRequest and send back EchoResponse. |
| |
| #include <gflags/gflags.h> |
| #include <butil/logging.h> |
| #include <brpc/server.h> |
| #include <butil/atomicops.h> |
| #include <butil/time.h> |
| #include <butil/logging.h> |
| #include <json2pb/json_to_pb.h> |
| #include <bthread/timer_thread.h> |
| #include <bthread/bthread.h> |
| |
| #include <cstdlib> |
| #include <fstream> |
| #include "cl_test.pb.h" |
| |
| DEFINE_int32(server_bthread_concurrency, 4, |
| "Configuring the value of bthread_concurrency, For compute max qps, "); |
| DEFINE_int32(server_sync_sleep_us, 2500, |
| "Usleep time, each request will be executed once, For compute max qps"); |
| // max qps = 1000 / 2.5 * 4 |
| |
| DEFINE_int32(control_server_port, 9000, ""); |
| DEFINE_int32(echo_port, 9001, "TCP Port of echo server"); |
| DEFINE_int32(cntl_port, 9000, "TCP Port of controller server"); |
| DEFINE_string(case_file, "", "File path for test_cases"); |
| DEFINE_int32(latency_change_interval_us, 50000, "Intervalt for server side changes the latency"); |
| DEFINE_int32(server_max_concurrency, 0, "Echo Server's max_concurrency"); |
| DEFINE_bool(use_usleep, false, |
| "EchoServer uses ::usleep or bthread_usleep to simulate latency " |
| "when processing requests"); |
| |
| |
| bthread::TimerThread g_timer_thread; |
| |
| int cast_func(void* arg) { |
| return *(int*)arg; |
| } |
| |
| void DisplayStage(const test::Stage& stage) { |
| std::string type; |
| switch(stage.type()) { |
| case test::FLUCTUATE: |
| type = "Fluctuate"; |
| break; |
| case test::SMOOTH: |
| type = "Smooth"; |
| break; |
| default: |
| type = "Unknown"; |
| } |
| std::stringstream ss; |
| ss |
| << "Stage:[" << stage.lower_bound() << ':' |
| << stage.upper_bound() << "]" |
| << " , Type:" << type; |
| LOG(INFO) << ss.str(); |
| } |
| |
| butil::atomic<int> cnt(0); |
| butil::atomic<int> atomic_sleep_time(0); |
| bvar::PassiveStatus<int> atomic_sleep_time_bvar(cast_func, &atomic_sleep_time); |
| |
| namespace bthread { |
| DECLARE_int32(bthread_concurrency); |
| } |
| |
| void TimerTask(void* data); |
| |
| class EchoServiceImpl : public test::EchoService { |
| public: |
| EchoServiceImpl() |
| : _stage_index(0) |
| , _running_case(false) { |
| }; |
| |
| virtual ~EchoServiceImpl() {} |
| |
| void SetTestCase(const test::TestCase& test_case) { |
| _test_case = test_case; |
| _next_stage_start = _test_case.latency_stage_list(0).duration_sec() + |
| butil::gettimeofday_s(); |
| _stage_index = 0; |
| _running_case = false; |
| DisplayStage(_test_case.latency_stage_list(_stage_index)); |
| } |
| |
| void StartTestCase() { |
| CHECK(!_running_case); |
| _running_case = true; |
| UpdateLatency(); |
| } |
| |
| void StopTestCase() { |
| _running_case = false; |
| } |
| |
| void UpdateLatency() { |
| if (!_running_case) { |
| return; |
| } |
| ComputeLatency(); |
| g_timer_thread.schedule(TimerTask, (void*)this, |
| butil::microseconds_from_now(FLAGS_latency_change_interval_us)); |
| } |
| |
| virtual void Echo(google::protobuf::RpcController* cntl_base, |
| const test::NotifyRequest* request, |
| test::NotifyResponse* response, |
| google::protobuf::Closure* done) { |
| brpc::ClosureGuard done_guard(done); |
| response->set_message("hello"); |
| ::usleep(FLAGS_server_sync_sleep_us); |
| if (FLAGS_use_usleep) { |
| ::usleep(_latency.load(butil::memory_order_relaxed)); |
| } else { |
| bthread_usleep(_latency.load(butil::memory_order_relaxed)); |
| } |
| } |
| |
| void ComputeLatency() { |
| if (_stage_index < _test_case.latency_stage_list_size() && |
| butil::gettimeofday_s() > _next_stage_start) { |
| ++_stage_index; |
| if (_stage_index < _test_case.latency_stage_list_size()) { |
| _next_stage_start += _test_case.latency_stage_list(_stage_index).duration_sec(); |
| DisplayStage(_test_case.latency_stage_list(_stage_index)); |
| } |
| } |
| |
| if (_stage_index == _test_case.latency_stage_list_size()) { |
| const test::Stage& latency_stage = |
| _test_case.latency_stage_list(_stage_index - 1); |
| if (latency_stage.type() == test::ChangeType::FLUCTUATE) { |
| _latency.store((latency_stage.lower_bound() + latency_stage.upper_bound()) / 2, |
| butil::memory_order_relaxed); |
| } else if (latency_stage.type() == test::ChangeType::SMOOTH) { |
| _latency.store(latency_stage.upper_bound(), butil::memory_order_relaxed); |
| } |
| return; |
| } |
| |
| const test::Stage& latency_stage = _test_case.latency_stage_list(_stage_index); |
| const int lower_bound = latency_stage.lower_bound(); |
| const int upper_bound = latency_stage.upper_bound(); |
| if (latency_stage.type() == test::FLUCTUATE) { |
| _latency.store(butil::fast_rand_less_than(upper_bound - lower_bound) + lower_bound, |
| butil::memory_order_relaxed); |
| } else if (latency_stage.type() == test::SMOOTH) { |
| int latency = lower_bound + (upper_bound - lower_bound) / |
| double(latency_stage.duration_sec()) * |
| (latency_stage.duration_sec() - _next_stage_start + |
| butil::gettimeofday_s()); |
| _latency.store(latency, butil::memory_order_relaxed); |
| } else { |
| LOG(FATAL) << "Wrong Type:" << latency_stage.type(); |
| } |
| } |
| |
| private: |
| int _stage_index; |
| int _next_stage_start; |
| butil::atomic<int> _latency; |
| test::TestCase _test_case; |
| bool _running_case; |
| }; |
| |
| void TimerTask(void* data) { |
| EchoServiceImpl* echo_service = (EchoServiceImpl*)data; |
| echo_service->UpdateLatency(); |
| } |
| |
| class ControlServiceImpl : public test::ControlService { |
| public: |
| ControlServiceImpl() |
| : _case_index(0) { |
| LoadCaseSet(FLAGS_case_file); |
| _echo_service = new EchoServiceImpl; |
| if (_server.AddService(_echo_service, |
| brpc::SERVER_OWNS_SERVICE) != 0) { |
| LOG(FATAL) << "Fail to add service"; |
| } |
| g_timer_thread.start(NULL); |
| } |
| virtual ~ControlServiceImpl() { |
| _echo_service->StopTestCase(); |
| g_timer_thread.stop_and_join(); |
| }; |
| |
| virtual void Notify(google::protobuf::RpcController* cntl_base, |
| const test::NotifyRequest* request, |
| test::NotifyResponse* response, |
| google::protobuf::Closure* done) { |
| brpc::ClosureGuard done_guard(done); |
| const std::string& message = request->message(); |
| LOG(INFO) << message; |
| if (message == "ResetCaseSet") { |
| _server.Stop(0); |
| _server.Join(); |
| _echo_service->StopTestCase(); |
| |
| LoadCaseSet(FLAGS_case_file); |
| _case_index = 0; |
| response->set_message("CaseSetReset"); |
| } else if (message == "StartCase") { |
| CHECK(!_server.IsRunning()) << "Continuous StartCase"; |
| const test::TestCase& test_case = _case_set.test_case(_case_index++); |
| _echo_service->SetTestCase(test_case); |
| brpc::ServerOptions options; |
| options.max_concurrency = FLAGS_server_max_concurrency; |
| _server.MaxConcurrencyOf("test.EchoService.Echo") = test_case.max_concurrency(); |
| |
| _server.Start(FLAGS_echo_port, &options); |
| _echo_service->StartTestCase(); |
| response->set_message("CaseStarted"); |
| } else if (message == "StopCase") { |
| CHECK(_server.IsRunning()) << "Continuous StopCase"; |
| _server.Stop(0); |
| _server.Join(); |
| |
| _echo_service->StopTestCase(); |
| response->set_message("CaseStopped"); |
| } else { |
| LOG(FATAL) << "Invalid message:" << message; |
| response->set_message("Invalid Cntl Message"); |
| } |
| } |
| |
| private: |
| void LoadCaseSet(const std::string& file_path) { |
| std::ifstream ifs(file_path.c_str(), std::ios::in); |
| if (!ifs) { |
| LOG(FATAL) << "Fail to open case set file: " << file_path; |
| } |
| std::string case_set_json((std::istreambuf_iterator<char>(ifs)), |
| std::istreambuf_iterator<char>()); |
| test::TestCaseSet case_set; |
| std::string err; |
| if (!json2pb::JsonToProtoMessage(case_set_json, &case_set, &err)) { |
| LOG(FATAL) |
| << "Fail to trans case_set from json to protobuf message: " |
| << err; |
| } |
| _case_set = case_set; |
| ifs.close(); |
| } |
| |
| brpc::Server _server; |
| EchoServiceImpl* _echo_service; |
| test::TestCaseSet _case_set; |
| int _case_index; |
| }; |
| |
| |
| int main(int argc, char* argv[]) { |
| // Parse gflags. We recommend you to use gflags as well. |
| GFLAGS_NAMESPACE::ParseCommandLineFlags(&argc, &argv, true); |
| bthread::FLAGS_bthread_concurrency= FLAGS_server_bthread_concurrency; |
| |
| brpc::Server server; |
| |
| ControlServiceImpl control_service_impl; |
| |
| if (server.AddService(&control_service_impl, |
| brpc::SERVER_DOESNT_OWN_SERVICE) != 0) { |
| LOG(ERROR) << "Fail to add service"; |
| return -1; |
| } |
| |
| if (server.Start(FLAGS_cntl_port, NULL) != 0) { |
| LOG(ERROR) << "Fail to start EchoServer"; |
| return -1; |
| } |
| |
| server.RunUntilAskedToQuit(); |
| return 0; |
| } |
| |