blob: b0420ccbd5a7cd60900f2e25395b384698ad416a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "benchmark.h"
// IWYU pragma: no_include <ext/alloc_traits.h>
#include <fmt/core.h>
#include <pegasus/error.h>
#include <rocksdb/env.h>
#include <rocksdb/statistics.h>
#include <stdio.h>
#include <map>
#include <set>
#include <sstream>
#include <utility>
#include <vector>
#include "pegasus/client.h"
#include "rand.h"
#include "runtime/app_model.h"
#include "test/bench_test/config.h"
#include "test/bench_test/statistics.h"
#include "utils/flags.h"
#include "utils/fmt_logging.h"
#include "utils/strings.h"
namespace pegasus {
namespace test {
DSN_DEFINE_uint64(pegasus.benchmark,
benchmark_num,
10000,
"Number of key/values to place in database");
DSN_DEFINE_uint64(pegasus.benchmark,
benchmark_seed,
1000,
"Seed base for random number generators. When 0 it is deterministic");
DSN_DEFINE_string(pegasus.benchmark, pegasus_cluster_name, "onebox", "The Pegasus cluster name");
DSN_DEFINE_validator(pegasus_cluster_name,
[](const char *value) -> bool { return !dsn::utils::is_empty(value); });
DSN_DEFINE_string(pegasus.benchmark, pegasus_app_name, "temp", "pegasus app name");
DSN_DEFINE_validator(pegasus_app_name,
[](const char *value) -> bool { return !dsn::utils::is_empty(value); });
DSN_DEFINE_string(
pegasus.benchmark,
benchmarks,
"fillrandom_pegasus,readrandom_pegasus,deleterandom_pegasus",
"Comma-separated list of operations to run in the specified order. Available benchmarks:\n"
"\tfillrandom_pegasus -- pegasus write N values in random key order\n"
"\treadrandom_pegasus -- pegasus read N times in random order\n"
"\tdeleterandom_pegasus -- pegasus delete N keys in random order\n"
"\tmultisetrandom_pegasus -- pegasus write N random values with multi_count hash keys list\n"
"\tmultigetrandom_pegasus -- pegasus read N random keys with multi_count hash list\n");
DSN_DEFINE_validator(benchmarks,
[](const char *value) -> bool { return !dsn::utils::is_empty(value); });
DSN_DEFINE_int32(pegasus.benchmark,
pegasus_timeout_ms,
1000,
"pegasus read/write timeout in milliseconds");
DSN_DEFINE_int32(pegasus.benchmark, threads, 1, "Number of concurrent threads to run");
DSN_DEFINE_int32(pegasus.benchmark, hashkey_size, 16, "Size of each hashkey");
DSN_DEFINE_int32(pegasus.benchmark, sortkey_size, 16, "Size of each sortkey");
DSN_DEFINE_int32(pegasus.benchmark, value_size, 100, "Size of each value");
DSN_DEFINE_int32(pegasus.benchmark, multi_count, 100, "Values count of the same hashkey");
DSN_DEFINE_group_validator(multi_count, [](std::string &message) -> bool {
std::string operation_type = FLAGS_benchmarks;
if ((operation_type == "multisetrandom_pegasus" ||
operation_type == "multigetrandom_pegasus") &&
FLAGS_benchmark_num % FLAGS_multi_count != 0) {
message = fmt::format("[pegasus.benchmark].benchmark_num {} should be a multiple of "
"[pegasus.benchmark].multi_count({}).",
FLAGS_benchmark_num,
FLAGS_multi_count);
return false;
}
return true;
});
benchmark::benchmark()
{
_client =
pegasus_client_factory::get_client(FLAGS_pegasus_cluster_name, FLAGS_pegasus_app_name);
CHECK_NOTNULL(_client, "");
// init operation method map
_operation_method = {{kUnknown, nullptr},
{kRead, &benchmark::read_random},
{kWrite, &benchmark::write_random},
{kMultiSet, &benchmark::multi_set_random},
{kMultiGet, &benchmark::multi_get_random},
{kDelete, &benchmark::delete_random}};
}
void benchmark::run()
{
// print summarize information
print_header();
std::stringstream benchmark_stream(FLAGS_benchmarks);
std::string name;
while (std::getline(benchmark_stream, name, ',')) {
// run the specified benchmark
operation_type op_type = get_operation_type(name);
run_benchmark(FLAGS_threads, op_type);
}
}
void benchmark::run_benchmark(int thread_count, operation_type op_type)
{
// get method by operation type
bench_method method = _operation_method[op_type];
CHECK_NOTNULL(method, "");
// create histogram statistic
std::shared_ptr<rocksdb::Statistics> hist_stats = rocksdb::CreateDBStatistics();
// create thread args for each thread, and run them
std::vector<std::shared_ptr<thread_arg>> args;
for (int i = 0; i < thread_count; i++) {
args.push_back(std::make_shared<thread_arg>(
i + (FLAGS_benchmark_seed == 0 ? 1000 : FLAGS_benchmark_seed),
hist_stats,
method,
this));
config::instance().env->StartThread(thread_body, args[i].get());
}
// wait all threads are done
config::instance().env->WaitForJoin();
// merge statistics
statistics merge_stats(hist_stats);
for (int i = 0; i < thread_count; i++) {
merge_stats.merge(args[i]->stats);
}
merge_stats.report(op_type);
}
void benchmark::thread_body(void *v)
{
thread_arg *arg = static_cast<thread_arg *>(v);
// reseed local random generator
reseed_thread_local_rng(arg->seed);
// progress the method
arg->stats.start();
(arg->bm->*(arg->method))(arg);
arg->stats.stop();
}
void benchmark::write_random(thread_arg *thread)
{
// do write operation num times
uint64_t bytes = 0;
int count = 0;
for (int i = 0; i < FLAGS_benchmark_num; i++) {
// generate hash key and sort key
std::string hashkey, sortkey, value;
generate_kv_pair(hashkey, sortkey, value);
// write to pegasus
int try_count = 0;
while (true) {
try_count++;
int ret = _client->set(hashkey, sortkey, value, FLAGS_pegasus_timeout_ms);
if (ret == ::pegasus::PERR_OK) {
bytes += FLAGS_value_size + FLAGS_hashkey_size + FLAGS_sortkey_size;
count++;
break;
} else if (ret != ::pegasus::PERR_TIMEOUT || try_count > 3) {
fmt::print(stderr, "Set returned an error: {}\n", _client->get_error_string(ret));
dsn_exit(1);
} else {
fmt::print(stderr, "Set timeout, retry({})\n", try_count);
}
}
// count this operation
thread->stats.finished_ops(1, kWrite);
}
// count total write bytes
thread->stats.add_bytes(bytes);
}
void benchmark::multi_set_random(thread_arg *thread)
{
uint64_t bytes = 0;
for (int i = 0; i < FLAGS_benchmark_num / FLAGS_multi_count; i++) {
// Generate hash key.
std::string hashkey = generate_string(FLAGS_hashkey_size);
// Generate sort key and value.
std::map<std::string, std::string> kvs;
std::string sortkey, value;
for (int j = 0; j < FLAGS_multi_count; j++) {
sortkey = generate_string(FLAGS_sortkey_size);
value = generate_string(FLAGS_value_size);
kvs.emplace(sortkey, value);
}
// Write to Pegasus.
int try_count = 0;
while (true) {
try_count++;
int ret = _client->multi_set(hashkey, kvs, FLAGS_pegasus_timeout_ms);
if (ret == ::pegasus::PERR_OK) {
bytes += (FLAGS_value_size + FLAGS_hashkey_size + FLAGS_sortkey_size) *
FLAGS_multi_count;
break;
}
if (ret != ::pegasus::PERR_TIMEOUT || try_count > 3) {
fmt::print(
stderr, "multi_set returned an error: {}\n", _client->get_error_string(ret));
dsn_exit(1);
}
fmt::print(stderr, "multi_set timeout, retry({})\n", try_count);
}
// Count this operation.
thread->stats.finished_ops(1, kMultiSet);
}
// Count total write bytes.
thread->stats.add_bytes(bytes);
}
void benchmark::read_random(thread_arg *thread)
{
uint64_t bytes = 0;
uint64_t found = 0;
for (int i = 0; i < FLAGS_benchmark_num; i++) {
// generate hash key and sort key
// generate value for random to keep in peace with write
std::string hashkey, sortkey, value;
generate_kv_pair(hashkey, sortkey, value);
// read from pegasus
int try_count = 0;
while (true) {
try_count++;
int ret = _client->get(hashkey, sortkey, value, FLAGS_pegasus_timeout_ms);
if (ret == ::pegasus::PERR_OK) {
found++;
bytes += hashkey.size() + sortkey.size() + value.size();
break;
} else if (ret == ::pegasus::PERR_NOT_FOUND) {
break;
} else if (ret != ::pegasus::PERR_TIMEOUT || try_count > 3) {
fmt::print(stderr, "Get returned an error: {}\n", _client->get_error_string(ret));
dsn_exit(1);
} else {
fmt::print(stderr, "Get timeout, retry({})\n", try_count);
}
}
// count this operation
thread->stats.finished_ops(1, kRead);
}
// count total read bytes and hit rate
std::string msg = fmt::format("({} of {} found)", found, FLAGS_benchmark_num);
thread->stats.add_bytes(bytes);
thread->stats.add_message(msg);
}
void benchmark::multi_get_random(thread_arg *thread)
{
uint64_t bytes = 0;
uint64_t found = 0;
int max_fetch_count = 100;
int max_fetch_size = 1000000;
for (int i = 0; i < FLAGS_benchmark_num / FLAGS_multi_count; i++) {
// Generate hash key.
std::string hashkey = generate_string(FLAGS_hashkey_size);
// Generate sort key.
// Generate value for random to keep in peace with write.
std::map<std::string, std::string> kvs;
std::set<std::string> sortkeys;
for (int j = 0; j < FLAGS_multi_count; j++) {
sortkeys.insert(generate_string(FLAGS_sortkey_size));
// Make output string be sorted like multi_set_random.
generate_string(FLAGS_value_size);
}
// Read from Pegasus.
int try_count = 0;
while (true) {
try_count++;
int ret = _client->multi_get(
hashkey, sortkeys, kvs, max_fetch_count, max_fetch_size, FLAGS_pegasus_timeout_ms);
if (ret == ::pegasus::PERR_OK) {
found += kvs.size();
bytes += FLAGS_multi_count * hashkey.size();
for (const auto &kv : kvs) {
bytes = kv.first.size() + kv.second.size() + bytes;
}
break;
}
if (ret == ::pegasus::PERR_NOT_FOUND) {
break;
}
if (ret != ::pegasus::PERR_TIMEOUT || try_count > 3) {
fmt::print(
stderr, "multi_get returned an error: {}\n", _client->get_error_string(ret));
dsn_exit(1);
}
fmt::print(stderr, "multi_get timeout, retry({})\n", try_count);
}
// Count this operation.
thread->stats.finished_ops(1, kMultiGet);
}
// Count total read bytes and hit rate.
std::string msg = fmt::format("({} of {} found)", found, FLAGS_benchmark_num);
thread->stats.add_bytes(bytes);
thread->stats.add_message(msg);
}
void benchmark::delete_random(thread_arg *thread)
{
// do delete operation num times
for (int i = 0; i < FLAGS_benchmark_num; i++) {
// generate hash key and sort key
// generate value for random to keep in peace with write
std::string hashkey, sortkey, value;
generate_kv_pair(hashkey, sortkey, value);
int try_count = 0;
while (true) {
try_count++;
int ret = _client->del(hashkey, sortkey, FLAGS_pegasus_timeout_ms);
if (ret == ::pegasus::PERR_OK) {
break;
} else if (ret != ::pegasus::PERR_TIMEOUT || try_count > 3) {
fmt::print(stderr, "Del returned an error: {}\n", _client->get_error_string(ret));
dsn_exit(1);
} else {
fmt::print(stderr, "Get timeout, retry({})\n", try_count);
}
}
// count this operation
thread->stats.finished_ops(1, kDelete);
}
}
void benchmark::generate_kv_pair(std::string &hashkey, std::string &sortkey, std::string &value)
{
hashkey = generate_string(FLAGS_hashkey_size);
sortkey = generate_string(FLAGS_sortkey_size);
value = generate_string(FLAGS_value_size);
}
operation_type benchmark::get_operation_type(const std::string &name)
{
operation_type op_type = kUnknown;
if (name == "fillrandom_pegasus") {
op_type = kWrite;
} else if (name == "readrandom_pegasus") {
op_type = kRead;
} else if (name == "deleterandom_pegasus") {
op_type = kDelete;
} else if (name == "multisetrandom_pegasus") {
op_type = kMultiSet;
} else if (name == "multigetrandom_pegasus") {
op_type = kMultiGet;
} else if (!name.empty()) { // No error message for empty name
fmt::print(stderr, "unknown benchmark '{}'\n", name);
dsn_exit(1);
}
return op_type;
}
void benchmark::print_header()
{
const config &config_ = config::instance();
fmt::print(stdout, "Hashkeys: {} bytes each\n", FLAGS_hashkey_size);
fmt::print(stdout, "Sortkeys: {} bytes each\n", FLAGS_sortkey_size);
fmt::print(stdout, "Values: {} bytes each\n", FLAGS_value_size);
fmt::print(stdout, "Entries: {}\n", FLAGS_benchmark_num);
fmt::print(
stdout,
"FileSize: {} MB (estimated)\n",
((FLAGS_hashkey_size + FLAGS_sortkey_size + FLAGS_value_size) * FLAGS_benchmark_num) >> 20);
print_warnings();
fmt::print(stdout, "------------------------------------------------\n");
}
void benchmark::print_warnings()
{
#if defined(__GNUC__) && !defined(__OPTIMIZE__)
fmt::print(stdout, "WARNING: Optimization is disabled: benchmarks unnecessarily slow\n");
#endif
#ifndef NDEBUG
fmt::print(stdout, "WARNING: Assertions are enabled; benchmarks unnecessarily slow\n");
#endif
}
} // namespace test
} // namespace pegasus