blob: 6a2eb4b620b957cd5ad95fe3fa95e85e97a0832b [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <atomic>
#include <cstdint>
#include <memory>
#include <optional>
#include <ostream>
#include <string>
#include <thread>
#include <vector>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include <gtest/gtest.h>
#include "kudu/gutil/atomicops.h"
#include "kudu/gutil/port.h"
#include "kudu/rpc/inbound_call.h"
#include "kudu/rpc/service_queue.h"
#include "kudu/util/monotime.h"
#include "kudu/util/stopwatch.h"
#include "kudu/util/test_util.h"
using std::shared_ptr;
using std::string;
using std::unique_ptr;
using std::vector;
DEFINE_int32(num_producers, 4,
"Number of producer threads");
DEFINE_int32(num_consumers, 20,
"Number of consumer threads");
DEFINE_int32(max_queue_size, 50,
"Max queue length");
namespace kudu {
namespace rpc {
static std::atomic<uint32_t> inprogress;
static std::atomic<uint32_t> total;
template <typename Queue>
void ProducerThread(Queue* queue) {
int max_inprogress = FLAGS_max_queue_size - FLAGS_num_producers;
while (true) {
while (inprogress > max_inprogress) {
base::subtle::PauseCPU();
}
inprogress++;
InboundCall* call = new InboundCall(nullptr);
std::optional<InboundCall*> evicted;
auto status = queue->Put(call, &evicted);
if (status == QUEUE_FULL) {
LOG(INFO) << "queue full: producer exiting";
delete call;
break;
}
if (PREDICT_TRUE(evicted)) {
LOG(INFO) << "call evicted: producer exiting";
delete *evicted;
break;
}
if (PREDICT_TRUE(status == QUEUE_SHUTDOWN)) {
delete call;
break;
}
}
}
template <typename Queue>
void ConsumerThread(Queue* queue) {
unique_ptr<InboundCall> call;
while (queue->BlockingGet(&call)) {
inprogress--;
total++;
call.reset();
}
}
TEST(TestServiceQueue, LifoServiceQueuePerf) {
LifoServiceQueue queue(FLAGS_max_queue_size);
vector<std::thread> producers;
vector<std::thread> consumers;
for (int i = 0; i < FLAGS_num_producers; i++) {
producers.emplace_back(&ProducerThread<LifoServiceQueue>, &queue);
}
for (int i = 0; i < FLAGS_num_consumers; i++) {
consumers.emplace_back(&ConsumerThread<LifoServiceQueue>, &queue);
}
int seconds = AllowSlowTests() ? 10 : 1;
uint64_t total_sample = 0;
uint64_t total_queue_len = 0;
uint64_t total_idle_workers = 0;
Stopwatch sw(Stopwatch::ALL_THREADS);
sw.start();
int32_t before = total;
for (int i = 0; i < seconds * 50; i++) {
SleepFor(MonoDelta::FromMilliseconds(20));
total_sample++;
total_queue_len += queue.estimated_queue_length();
total_idle_workers += queue.estimated_idle_worker_count();
}
sw.stop();
int32_t delta = total - before;
queue.Shutdown();
for (int i = 0; i < FLAGS_num_producers; i++) {
producers[i].join();
}
for (int i = 0; i < FLAGS_num_consumers; i++) {
consumers[i].join();
}
float reqs_per_second = static_cast<float>(delta / sw.elapsed().wall_seconds());
float user_cpu_micros_per_req = static_cast<float>(sw.elapsed().user / 1000.0 / delta);
float sys_cpu_micros_per_req = static_cast<float>(sw.elapsed().system / 1000.0 / delta);
LOG(INFO) << "Reqs/sec: " << (int32_t)reqs_per_second;
LOG(INFO) << "User CPU per req: " << user_cpu_micros_per_req << "us";
LOG(INFO) << "Sys CPU per req: " << sys_cpu_micros_per_req << "us";
LOG(INFO) << "Avg rpc queue length: " << total_queue_len / static_cast<double>(total_sample);
LOG(INFO) << "Avg idle workers: " << total_idle_workers / static_cast<double>(total_sample);
}
} // namespace rpc
} // namespace kudu