| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| |
| #include <gflags/gflags.h> |
| #include <butil/logging.h> |
| #include <butil/time.h> |
| #include <butil/macros.h> |
| #include <butil/file_util.h> |
| #include <bvar/bvar.h> |
| #include <bthread/bthread.h> |
| #include <brpc/channel.h> |
| #include <brpc/server.h> |
| #include <brpc/rpc_dump.h> |
| #include <brpc/serialized_request.h> |
| #include <brpc/nshead_message.h> |
| #include <brpc/details/http_message.h> |
| #include "brpc/options.pb.h" |
| #include "info_thread.h" |
| |
| DEFINE_string(dir, "", "The directory of dumped requests"); |
| DEFINE_int32(times, 1, "Repeat replaying for so many times"); |
| DEFINE_int32(qps, 0, "Limit QPS if this flag is positive"); |
| DEFINE_int32(thread_num, 0, "Number of threads for replaying"); |
| DEFINE_bool(use_bthread, true, "Use bthread to replay"); |
| DEFINE_string(connection_type, "", "Connection type, choose automatically " |
| "according to protocol by default"); |
| DEFINE_string(server, "0.0.0.0:8002", "IP Address of server"); |
| DEFINE_string(load_balancer, "", "The algorithm for load balancing"); |
| DEFINE_int32(timeout_ms, 100, "RPC timeout in milliseconds"); |
| DEFINE_int32(max_retry, 3, "Maximum retry times"); |
| DEFINE_int32(dummy_port, 8899, "Port of dummy server(to monitor replaying)"); |
| DEFINE_string(http_host, "", "Host field for http protocol"); |
| |
| bvar::LatencyRecorder g_latency_recorder("rpc_replay"); |
| bvar::Adder<int64_t> g_error_count("rpc_replay_error_count"); |
| bvar::Adder<int64_t> g_sent_count; |
| |
| // Include channels for all protocols that support both client and server. |
| class ChannelGroup { |
| public: |
| int Init(); |
| |
| ~ChannelGroup(); |
| |
| // Get channel by protocol type. |
| brpc::Channel* channel(brpc::ProtocolType type) { |
| if ((size_t)type < _chans.size()) { |
| return _chans[(size_t)type]; |
| } |
| return NULL; |
| } |
| |
| private: |
| std::vector<brpc::Channel*> _chans; |
| }; |
| |
| int ChannelGroup::Init() { |
| { |
| // force global initialization of rpc. |
| brpc::Channel dummy_channel; |
| } |
| std::vector<std::pair<brpc::ProtocolType, brpc::Protocol> > protocols; |
| brpc::ListProtocols(&protocols); |
| size_t max_protocol_size = 0; |
| for (size_t i = 0; i < protocols.size(); ++i) { |
| max_protocol_size = std::max(max_protocol_size, |
| (size_t)protocols[i].first); |
| } |
| _chans.resize(max_protocol_size + 1); |
| for (size_t i = 0; i < protocols.size(); ++i) { |
| const brpc::ProtocolType protocol_type = protocols[i].first; |
| const brpc::Protocol protocol = protocols[i].second; |
| brpc::ChannelOptions options; |
| options.protocol = protocol_type; |
| options.connection_type = FLAGS_connection_type; |
| options.timeout_ms = FLAGS_timeout_ms/*milliseconds*/; |
| options.max_retry = FLAGS_max_retry; |
| if ((options.connection_type == brpc::CONNECTION_TYPE_UNKNOWN || |
| options.connection_type & protocol.supported_connection_type) && |
| protocol.support_client() && |
| protocol.support_server()) { |
| brpc::Channel* chan = new brpc::Channel; |
| if (chan->Init(FLAGS_server.c_str(), FLAGS_load_balancer.c_str(), |
| &options) != 0) { |
| LOG(ERROR) << "Fail to initialize channel"; |
| delete chan; |
| return -1; |
| } |
| _chans[protocol_type] = chan; |
| } |
| } |
| return 0; |
| } |
| |
| ChannelGroup::~ChannelGroup() { |
| for (size_t i = 0; i < _chans.size(); ++i) { |
| delete _chans[i]; |
| } |
| _chans.clear(); |
| } |
| |
| static void handle_response(brpc::Controller* cntl, int64_t start_time, |
| bool sleep_on_error/*note*/) { |
| // TODO(gejun): some bthreads are starved when new bthreads are created |
| // continuously, which happens when server is down and RPC keeps failing. |
| // Sleep a while on error to avoid that now. |
| const int64_t end_time = butil::gettimeofday_us(); |
| const int64_t elp = end_time - start_time; |
| if (!cntl->Failed()) { |
| g_latency_recorder << elp; |
| } else { |
| g_error_count << 1; |
| if (sleep_on_error) { |
| bthread_usleep(10000); |
| } |
| } |
| delete cntl; |
| } |
| |
| butil::atomic<int> g_thread_offset(0); |
| |
| static void* replay_thread(void* arg) { |
| ChannelGroup* chan_group = static_cast<ChannelGroup*>(arg); |
| const int thread_offset = g_thread_offset.fetch_add(1, butil::memory_order_relaxed); |
| double req_rate = FLAGS_qps / (double)FLAGS_thread_num; |
| brpc::SerializedRequest req; |
| brpc::NsheadMessage nshead_req; |
| int64_t last_expected_time = butil::monotonic_time_ns(); |
| const int64_t interval = (int64_t) (1000000000L / req_rate); |
| // the max tolerant delay between end_time and expected_time. 10ms or 10 intervals |
| int64_t max_tolerant_delay = std::max((int64_t) 10000000L, 10 * interval); |
| for (int i = 0; !brpc::IsAskedToQuit() && i < FLAGS_times; ++i) { |
| brpc::SampleIterator it(FLAGS_dir); |
| int j = 0; |
| for (brpc::SampledRequest* sample = it.Next(); |
| !brpc::IsAskedToQuit() && sample != NULL; sample = it.Next(), ++j) { |
| std::unique_ptr<brpc::SampledRequest> sample_guard(sample); |
| if ((j % FLAGS_thread_num) != thread_offset) { |
| continue; |
| } |
| brpc::Channel* chan = |
| chan_group->channel(sample->meta.protocol_type()); |
| if (chan == NULL) { |
| LOG(ERROR) << "No channel on protocol=" |
| << sample->meta.protocol_type(); |
| continue; |
| } |
| |
| brpc::Controller* cntl = new brpc::Controller; |
| req.Clear(); |
| |
| google::protobuf::Message* req_ptr = &req; |
| cntl->reset_sampled_request(sample_guard.release()); |
| if (sample->meta.protocol_type() == brpc::PROTOCOL_HTTP) { |
| brpc::HttpMessage http_message; |
| http_message.ParseFromIOBuf(sample->request); |
| cntl->http_request().Swap(http_message.header()); |
| if (!FLAGS_http_host.empty()) { |
| // reset Host in header |
| cntl->http_request().SetHeader("Host", FLAGS_http_host); |
| } |
| cntl->request_attachment() = http_message.body().movable(); |
| req_ptr = NULL; |
| } else if (sample->meta.protocol_type() == brpc::PROTOCOL_NSHEAD) { |
| nshead_req.Clear(); |
| memcpy(&nshead_req.head, sample->meta.nshead().c_str(), sample->meta.nshead().length()); |
| nshead_req.body = sample->request; |
| req_ptr = &nshead_req; |
| } else if (sample->meta.attachment_size() > 0) { |
| sample->request.cutn( |
| &req.serialized_data(), |
| sample->request.size() - sample->meta.attachment_size()); |
| cntl->request_attachment() = sample->request.movable(); |
| } else { |
| req.serialized_data() = sample->request.movable(); |
| } |
| g_sent_count << 1; |
| const int64_t start_time = butil::gettimeofday_us(); |
| if (FLAGS_qps <= 0) { |
| chan->CallMethod(NULL/*use rpc_dump_context in cntl instead*/, |
| cntl, req_ptr, NULL/*ignore response*/, NULL); |
| handle_response(cntl, start_time, true); |
| } else { |
| google::protobuf::Closure* done = |
| brpc::NewCallback(handle_response, cntl, start_time, false); |
| chan->CallMethod(NULL/*use rpc_dump_context in cntl instead*/, |
| cntl, req_ptr, NULL/*ignore response*/, done); |
| int64_t end_time = butil::monotonic_time_ns(); |
| int64_t expected_time = last_expected_time + interval; |
| if (end_time < expected_time) { |
| usleep((expected_time - end_time)/1000); |
| } |
| if (end_time - expected_time > max_tolerant_delay) { |
| expected_time = end_time; |
| } |
| last_expected_time = expected_time; |
| } |
| } |
| } |
| return NULL; |
| } |
| |
| int main(int argc, char* argv[]) { |
| // Parse gflags. We recommend you to use gflags as well. |
| GFLAGS_NAMESPACE::ParseCommandLineFlags(&argc, &argv, true); |
| |
| if (FLAGS_dir.empty() || |
| !butil::DirectoryExists(butil::FilePath(FLAGS_dir))) { |
| LOG(ERROR) << "--dir=<dir-of-dumped-files> is required"; |
| return -1; |
| } |
| |
| if (FLAGS_dummy_port >= 0) { |
| brpc::StartDummyServerAt(FLAGS_dummy_port); |
| } |
| |
| ChannelGroup chan_group; |
| if (chan_group.Init() != 0) { |
| LOG(ERROR) << "Fail to init ChannelGroup"; |
| return -1; |
| } |
| |
| if (FLAGS_thread_num <= 0) { |
| if (FLAGS_qps <= 0) { // unlimited qps |
| FLAGS_thread_num = 50; |
| } else { |
| FLAGS_thread_num = FLAGS_qps / 10000; |
| if (FLAGS_thread_num < 1) { |
| FLAGS_thread_num = 1; |
| } |
| if (FLAGS_thread_num > 50) { |
| FLAGS_thread_num = 50; |
| } |
| } |
| } |
| |
| const int rate_limit_per_thread = 1000000; |
| int req_rate_per_thread = FLAGS_qps / FLAGS_thread_num; |
| if (req_rate_per_thread > rate_limit_per_thread) { |
| LOG(ERROR) << "req_rate: " << (int64_t) req_rate_per_thread << " is too large in one thread. The rate limit is " |
| << rate_limit_per_thread << " in one thread"; |
| return -1; |
| } |
| |
| std::vector<bthread_t> bids; |
| std::vector<pthread_t> pids; |
| if (!FLAGS_use_bthread) { |
| pids.resize(FLAGS_thread_num); |
| for (int i = 0; i < FLAGS_thread_num; ++i) { |
| if (pthread_create(&pids[i], NULL, replay_thread, &chan_group) != 0) { |
| LOG(ERROR) << "Fail to create pthread"; |
| return -1; |
| } |
| } |
| } else { |
| bids.resize(FLAGS_thread_num); |
| for (int i = 0; i < FLAGS_thread_num; ++i) { |
| if (bthread_start_background( |
| &bids[i], NULL, replay_thread, &chan_group) != 0) { |
| LOG(ERROR) << "Fail to create bthread"; |
| return -1; |
| } |
| } |
| } |
| brpc::InfoThread info_thr; |
| brpc::InfoThreadOptions info_thr_opt; |
| info_thr_opt.latency_recorder = &g_latency_recorder; |
| info_thr_opt.error_count = &g_error_count; |
| info_thr_opt.sent_count = &g_sent_count; |
| |
| if (!info_thr.start(info_thr_opt)) { |
| LOG(ERROR) << "Fail to create info_thread"; |
| return -1; |
| } |
| |
| for (int i = 0; i < FLAGS_thread_num; ++i) { |
| if (!FLAGS_use_bthread) { |
| pthread_join(pids[i], NULL); |
| } else { |
| bthread_join(bids[i], NULL); |
| } |
| } |
| info_thr.stop(); |
| |
| return 0; |
| } |