blob: d345ad66af14da095892fb7455c79a8e26c61f46 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#pragma once
#include <getopt.h>
#include <thread>
#include <iomanip>
#include <fstream>
#include <queue>
#include <boost/algorithm/string.hpp>
#include <rocksdb/db.h>
#include <rocksdb/sst_dump_tool.h>
#include <rocksdb/env.h>
#include <rocksdb/statistics.h>
#include <dsn/cpp/json_helper.h>
#include <dsn/dist/remote_command.h>
#include <dsn/dist/replication/replication_ddl_client.h>
#include <dsn/dist/replication/mutation_log_tool.h>
#include <dsn/perf_counter/perf_counter_utils.h>
#include <dsn/utility/string_view.h>
#include <dsn/utils/time_utils.h>
#include <rrdb/rrdb.code.definition.h>
#include <rrdb/rrdb_types.h>
#include <pegasus/version.h>
#include <pegasus/git_commit.h>
#include <pegasus/error.h>
#include <geo/lib/geo_client.h>
#include "base/pegasus_key_schema.h"
#include "base/pegasus_value_schema.h"
#include "base/pegasus_utils.h"
#include "command_executor.h"
#include "command_utils.h"
using namespace dsn::replication;
#define STR_I(var) #var
#define STR(var) STR_I(var)
#ifndef DSN_BUILD_TYPE
#define PEGASUS_BUILD_TYPE ""
#else
#define PEGASUS_BUILD_TYPE STR(DSN_BUILD_TYPE)
#endif
DEFINE_TASK_CODE(LPC_SCAN_DATA, TASK_PRIORITY_COMMON, ::dsn::THREAD_POOL_DEFAULT)
enum scan_data_operator
{
SCAN_COPY,
SCAN_CLEAR,
SCAN_COUNT,
SCAN_GEN_GEO
};
class top_container
{
public:
struct top_heap_item
{
std::string hash_key;
std::string sort_key;
long row_size;
top_heap_item(std::string &&hash_key_, std::string &&sort_key_, long row_size_)
: hash_key(std::move(hash_key_)), sort_key(std::move(sort_key_)), row_size(row_size_)
{
}
};
struct top_heap_compare
{
bool operator()(top_heap_item i1, top_heap_item i2) { return i1.row_size < i2.row_size; }
};
typedef std::priority_queue<top_heap_item, std::vector<top_heap_item>, top_heap_compare>
top_heap;
top_container(int count) : _count(count) {}
void push(std::string &&hash_key, std::string &&sort_key, long row_size)
{
dsn::utils::auto_lock<dsn::utils::ex_lock_nr> l(_lock);
if (_heap.size() < _count) {
_heap.emplace(std::move(hash_key), std::move(sort_key), row_size);
} else {
const top_heap_item &top = _heap.top();
if (top.row_size < row_size) {
_heap.pop();
_heap.emplace(std::move(hash_key), std::move(sort_key), row_size);
}
}
}
top_heap &all() { return _heap; }
private:
int _count;
top_heap _heap;
dsn::utils::ex_lock_nr _lock;
};
enum class histogram_type
{
HASH_KEY_SIZE,
SORT_KEY_SIZE,
VALUE_SIZE,
ROW_SIZE
};
struct scan_data_context
{
scan_data_operator op;
int split_id;
int max_batch_count;
int timeout_ms;
bool no_overwrite; // if set true, then use check_and_set() instead of set()
// when inserting data to destination table for copy_data,
// to not overwrite old data if it aleady exist.
pegasus::pegasus_client::filter_type sort_key_filter_type;
std::string sort_key_filter_pattern;
pegasus::pegasus_client::filter_type value_filter_type;
std::string value_filter_pattern;
pegasus::pegasus_client::pegasus_scanner_wrapper scanner;
pegasus::pegasus_client *client;
pegasus::geo::geo_client *geoclient;
std::atomic_bool *error_occurred;
std::atomic_long split_rows;
std::atomic_long split_request_count;
std::atomic_bool split_completed;
bool stat_size;
std::shared_ptr<rocksdb::Statistics> statistics;
int top_count;
top_container top_rows;
bool count_hash_key;
std::string last_hash_key;
std::atomic_long split_hash_key_count;
scan_data_context(scan_data_operator op_,
int split_id_,
int max_batch_count_,
int timeout_ms_,
pegasus::pegasus_client::pegasus_scanner_wrapper scanner_,
pegasus::pegasus_client *client_,
pegasus::geo::geo_client *geoclient_,
std::atomic_bool *error_occurred_,
bool stat_size_ = false,
std::shared_ptr<rocksdb::Statistics> statistics_ = nullptr,
int top_count_ = 0,
bool count_hash_key_ = false)
: op(op_),
split_id(split_id_),
max_batch_count(max_batch_count_),
timeout_ms(timeout_ms_),
no_overwrite(false),
sort_key_filter_type(pegasus::pegasus_client::FT_NO_FILTER),
value_filter_type(pegasus::pegasus_client::FT_NO_FILTER),
scanner(scanner_),
client(client_),
geoclient(geoclient_),
error_occurred(error_occurred_),
split_rows(0),
split_request_count(0),
split_completed(false),
stat_size(stat_size_),
statistics(statistics_),
top_count(top_count_),
top_rows(top_count_),
count_hash_key(count_hash_key_),
split_hash_key_count(0)
{
// max_batch_count should > 1 because scan may be terminated
// when split_request_count = 1
dassert(max_batch_count > 1, "");
}
void set_sort_key_filter(pegasus::pegasus_client::filter_type type, const std::string &pattern)
{
sort_key_filter_type = type;
sort_key_filter_pattern = pattern;
}
void set_value_filter(pegasus::pegasus_client::filter_type type, const std::string &pattern)
{
value_filter_type = type;
value_filter_pattern = pattern;
}
void set_no_overwrite() { no_overwrite = true; }
};
inline void update_atomic_max(std::atomic_long &max, long value)
{
while (true) {
long old = max.load();
if (value <= old || max.compare_exchange_weak(old, value)) {
break;
}
}
}
inline pegasus::pegasus_client::filter_type parse_filter_type(const std::string &name,
bool include_exact)
{
if (include_exact && name == "exact")
return pegasus::pegasus_client::FT_MATCH_EXACT;
else
return (pegasus::pegasus_client::filter_type)type_from_string(
dsn::apps::_filter_type_VALUES_TO_NAMES,
std::string("ft_match_") + name,
::dsn::apps::filter_type::FT_NO_FILTER);
}
// return true if the data is valid for the filter
inline bool validate_filter(pegasus::pegasus_client::filter_type filter_type,
const std::string &filter_pattern,
const std::string &value)
{
switch (filter_type) {
case pegasus::pegasus_client::FT_NO_FILTER:
return true;
case pegasus::pegasus_client::FT_MATCH_EXACT:
return filter_pattern == value;
case pegasus::pegasus_client::FT_MATCH_ANYWHERE:
case pegasus::pegasus_client::FT_MATCH_PREFIX:
case pegasus::pegasus_client::FT_MATCH_POSTFIX: {
if (filter_pattern.length() == 0)
return true;
if (value.length() < filter_pattern.length())
return false;
if (filter_type == pegasus::pegasus_client::FT_MATCH_ANYWHERE) {
return dsn::string_view(value).find(filter_pattern) != dsn::string_view::npos;
} else if (filter_type == pegasus::pegasus_client::FT_MATCH_PREFIX) {
return ::memcmp(value.data(), filter_pattern.data(), filter_pattern.length()) == 0;
} else { // filter_type == pegasus::pegasus_client::FT_MATCH_POSTFIX
return ::memcmp(value.data() + value.length() - filter_pattern.length(),
filter_pattern.data(),
filter_pattern.length()) == 0;
}
}
default:
dassert(false, "unsupported filter type: %d", filter_type);
}
return false;
}
// return true if the data is valid for the filter
inline bool
validate_filter(scan_data_context *context, const std::string &sort_key, const std::string &value)
{
// for sort key, we only need to check MATCH_EXACT, because it is not supported
// on the server side, but MATCH_PREFIX is already satisified.
if (context->sort_key_filter_type == pegasus::pegasus_client::FT_MATCH_EXACT &&
sort_key.length() > context->sort_key_filter_pattern.length())
return false;
return validate_filter(context->value_filter_type, context->value_filter_pattern, value);
}
inline void scan_data_next(scan_data_context *context)
{
while (!context->split_completed.load() && !context->error_occurred->load() &&
context->split_request_count.load() < context->max_batch_count) {
context->split_request_count++;
context->scanner->async_next([context](int ret,
std::string &&hash_key,
std::string &&sort_key,
std::string &&value,
pegasus::pegasus_client::internal_info &&info) {
if (ret == pegasus::PERR_OK) {
if (validate_filter(context, sort_key, value)) {
switch (context->op) {
case SCAN_COPY:
context->split_request_count++;
if (context->no_overwrite) {
auto callback = [context](
int err,
pegasus::pegasus_client::check_and_set_results &&results,
pegasus::pegasus_client::internal_info &&info) {
if (err != pegasus::PERR_OK) {
if (!context->split_completed.exchange(true)) {
fprintf(stderr,
"ERROR: split[%d] async check and set failed: %s\n",
context->split_id,
context->client->get_error_string(err));
context->error_occurred->store(true);
}
} else {
if (results.set_succeed) {
context->split_rows++;
}
scan_data_next(context);
}
// should put "split_request_count--" at end of the scope,
// to prevent that split_request_count becomes 0 in the middle.
context->split_request_count--;
};
pegasus::pegasus_client::check_and_set_options options;
context->client->async_check_and_set(
hash_key,
sort_key,
pegasus::pegasus_client::cas_check_type::CT_VALUE_NOT_EXIST,
"",
sort_key,
value,
options,
std::move(callback),
context->timeout_ms);
} else {
auto callback =
[context](int err, pegasus::pegasus_client::internal_info &&info) {
if (err != pegasus::PERR_OK) {
if (!context->split_completed.exchange(true)) {
fprintf(stderr,
"ERROR: split[%d] async set failed: %s\n",
context->split_id,
context->client->get_error_string(err));
context->error_occurred->store(true);
}
} else {
context->split_rows++;
scan_data_next(context);
}
// should put "split_request_count--" at end of the scope,
// to prevent that split_request_count becomes 0 in the middle.
context->split_request_count--;
};
context->client->async_set(hash_key,
sort_key,
value,
std::move(callback),
context->timeout_ms);
}
break;
case SCAN_CLEAR:
context->split_request_count++;
context->client->async_del(
hash_key,
sort_key,
[context](int err, pegasus::pegasus_client::internal_info &&info) {
if (err != pegasus::PERR_OK) {
if (!context->split_completed.exchange(true)) {
fprintf(stderr,
"ERROR: split[%d] async del failed: %s\n",
context->split_id,
context->client->get_error_string(err));
context->error_occurred->store(true);
}
} else {
context->split_rows++;
scan_data_next(context);
}
// should put "split_request_count--" at end of the scope,
// to prevent that split_request_count becomes 0 in the middle.
context->split_request_count--;
},
context->timeout_ms);
break;
case SCAN_COUNT:
context->split_rows++;
if (context->stat_size && context->statistics) {
long hash_key_size = hash_key.size();
context->statistics->measureTime(
static_cast<uint32_t>(histogram_type::HASH_KEY_SIZE),
hash_key_size);
long sort_key_size = sort_key.size();
context->statistics->measureTime(
static_cast<uint32_t>(histogram_type::SORT_KEY_SIZE),
sort_key_size);
long value_size = value.size();
context->statistics->measureTime(
static_cast<uint32_t>(histogram_type::VALUE_SIZE), value_size);
long row_size = hash_key_size + sort_key_size + value_size;
context->statistics->measureTime(
static_cast<uint32_t>(histogram_type::ROW_SIZE), row_size);
if (context->top_count > 0) {
context->top_rows.push(
std::move(hash_key), std::move(sort_key), row_size);
}
}
if (context->count_hash_key) {
if (hash_key != context->last_hash_key) {
context->split_hash_key_count++;
context->last_hash_key = std::move(hash_key);
}
}
scan_data_next(context);
break;
case SCAN_GEN_GEO:
context->split_request_count++;
context->geoclient->async_set(
hash_key,
sort_key,
value,
[context](int err, pegasus::pegasus_client::internal_info &&info) {
if (err != pegasus::PERR_OK) {
if (!context->split_completed.exchange(true)) {
fprintf(stderr,
"ERROR: split[%d] async set failed: %s\n",
context->split_id,
context->client->get_error_string(err));
context->error_occurred->store(true);
}
} else {
context->split_rows++;
scan_data_next(context);
}
// should put "split_request_count--" at end of the scope,
// to prevent that split_request_count becomes 0 in the middle.
context->split_request_count--;
},
context->timeout_ms);
break;
default:
dassert(false, "op = %d", context->op);
break;
}
} else {
scan_data_next(context);
}
} else if (ret == pegasus::PERR_SCAN_COMPLETE) {
context->split_completed.store(true);
} else {
if (!context->split_completed.exchange(true)) {
fprintf(stderr,
"ERROR: split[%d] scan next failed: %s\n",
context->split_id,
context->client->get_error_string(ret));
context->error_occurred->store(true);
}
}
// should put "split_request_count--" at end of the scope,
// to prevent that split_request_count becomes 0 in the middle.
context->split_request_count--;
});
if (context->count_hash_key) {
// disable parallel scan if count_hash_key == true
break;
}
}
}
struct node_desc
{
std::string desc;
dsn::rpc_address address;
node_desc(const std::string &s, const dsn::rpc_address &n) : desc(s), address(n) {}
};
// type: all | replica-server | meta-server
inline bool fill_nodes(shell_context *sc, const std::string &type, std::vector<node_desc> &nodes)
{
if (type == "all" || type == "meta-server") {
for (auto &addr : sc->meta_list) {
nodes.emplace_back("meta-server", addr);
}
}
if (type == "all" || type == "replica-server") {
std::map<dsn::rpc_address, dsn::replication::node_status::type> rs_nodes;
::dsn::error_code err =
sc->ddl_client->list_nodes(dsn::replication::node_status::NS_ALIVE, rs_nodes);
if (err != ::dsn::ERR_OK) {
fprintf(stderr, "ERROR: list node failed: %s\n", err.to_string());
return false;
}
for (auto &kv : rs_nodes) {
nodes.emplace_back("replica-server", kv.first);
}
}
return true;
}
inline std::vector<std::pair<bool, std::string>>
call_remote_command(shell_context *sc,
const std::vector<node_desc> &nodes,
const std::string &cmd,
const std::vector<std::string> &arguments)
{
std::vector<std::pair<bool, std::string>> results;
std::vector<dsn::task_ptr> tasks;
tasks.resize(nodes.size());
results.resize(nodes.size());
for (int i = 0; i < nodes.size(); ++i) {
auto callback = [&results, i](::dsn::error_code err, const std::string &resp) {
if (err == ::dsn::ERR_OK) {
results[i].first = true;
results[i].second = resp;
} else {
results[i].first = false;
results[i].second = err.to_string();
}
};
tasks[i] = dsn::dist::cmd::async_call_remote(
nodes[i].address, cmd, arguments, callback, std::chrono::milliseconds(5000));
}
for (int i = 0; i < nodes.size(); ++i) {
tasks[i]->wait();
}
return results;
}
inline bool parse_app_pegasus_perf_counter_name(const std::string &name,
int32_t &app_id,
int32_t &partition_index,
std::string &counter_name)
{
std::string::size_type find = name.find_last_of('@');
if (find == std::string::npos)
return false;
int n = sscanf(name.c_str() + find + 1, "%d.%d", &app_id, &partition_index);
if (n != 2)
return false;
std::string::size_type find2 = name.find_last_of('*');
if (find2 == std::string::npos)
return false;
counter_name = name.substr(find2 + 1, find - find2 - 1);
return true;
}
inline bool parse_app_perf_counter_name(const std::string &name,
std::string &app_name,
std::string &counter_name)
{
/**
* name format:
* 1.{node}*{section}*{counter_name}@{app_name}.{percent_line}
* 2.{node}*{section}*{counter_name}@{app_name}
*/
std::string::size_type find = name.find_last_of('@');
if (find == std::string::npos)
return false;
std::string::size_type find2 = name.find_last_of('.');
if (find2 == std::string::npos) {
app_name = name.substr(find + 1);
} else {
app_name = name.substr(find + 1, find2 - find - 1);
}
std::string::size_type find3 = name.find_last_of('*');
if (find3 == std::string::npos)
return false;
counter_name = name.substr(find3 + 1, find - find3 - 1);
return true;
}
struct row_data
{
double get_total_qps() const
{
return get_qps + multi_get_qps + scan_qps + put_qps + multi_put_qps + remove_qps +
multi_remove_qps + incr_qps + check_and_set_qps + check_and_mutate_qps +
duplicate_qps;
}
double get_total_cu() const { return recent_read_cu + recent_write_cu; }
double get_total_read_qps() const { return get_qps + multi_get_qps + scan_qps; }
double get_total_write_qps() const
{
return put_qps + remove_qps + multi_put_qps + multi_remove_qps + check_and_set_qps +
check_and_mutate_qps;
}
std::string row_name;
int32_t app_id = 0;
int32_t partition_count = 0;
double get_qps = 0;
double multi_get_qps = 0;
double put_qps = 0;
double multi_put_qps = 0;
double remove_qps = 0;
double multi_remove_qps = 0;
double incr_qps = 0;
double check_and_set_qps = 0;
double check_and_mutate_qps = 0;
double scan_qps = 0;
double duplicate_qps = 0;
double dup_shipped_ops = 0;
double dup_failed_shipping_ops = 0;
double recent_read_cu = 0;
double recent_write_cu = 0;
double recent_expire_count = 0;
double recent_filter_count = 0;
double recent_abnormal_count = 0;
double recent_write_throttling_delay_count = 0;
double recent_write_throttling_reject_count = 0;
double storage_mb = 0;
double storage_count = 0;
double rdb_block_cache_hit_count = 0;
double rdb_block_cache_total_count = 0;
double rdb_index_and_filter_blocks_mem_usage = 0;
double rdb_memtable_mem_usage = 0;
double rdb_estimate_num_keys = 0;
double rdb_bf_seek_negatives = 0;
double rdb_bf_seek_total = 0;
double rdb_bf_point_positive_true = 0;
double rdb_bf_point_positive_total = 0;
double rdb_bf_point_negatives = 0;
double backup_request_qps = 0;
double get_bytes = 0;
double multi_get_bytes = 0;
double scan_bytes = 0;
double put_bytes = 0;
double multi_put_bytes = 0;
double check_and_set_bytes = 0;
double check_and_mutate_bytes = 0;
};
inline bool
update_app_pegasus_perf_counter(row_data &row, const std::string &counter_name, double value)
{
if (counter_name == "get_qps")
row.get_qps += value;
else if (counter_name == "multi_get_qps")
row.multi_get_qps += value;
else if (counter_name == "put_qps")
row.put_qps += value;
else if (counter_name == "multi_put_qps")
row.multi_put_qps += value;
else if (counter_name == "remove_qps")
row.remove_qps += value;
else if (counter_name == "multi_remove_qps")
row.multi_remove_qps += value;
else if (counter_name == "incr_qps")
row.incr_qps += value;
else if (counter_name == "check_and_set_qps")
row.check_and_set_qps += value;
else if (counter_name == "check_and_mutate_qps")
row.check_and_mutate_qps += value;
else if (counter_name == "scan_qps")
row.scan_qps += value;
else if (counter_name == "duplicate_qps")
row.duplicate_qps += value;
else if (counter_name == "dup_shipped_ops")
row.dup_shipped_ops += value;
else if (counter_name == "dup_failed_shipping_ops")
row.dup_failed_shipping_ops += value;
else if (counter_name == "recent.read.cu")
row.recent_read_cu += value;
else if (counter_name == "recent.write.cu")
row.recent_write_cu += value;
else if (counter_name == "recent.expire.count")
row.recent_expire_count += value;
else if (counter_name == "recent.filter.count")
row.recent_filter_count += value;
else if (counter_name == "recent.abnormal.count")
row.recent_abnormal_count += value;
else if (counter_name == "recent.write.throttling.delay.count")
row.recent_write_throttling_delay_count += value;
else if (counter_name == "recent.write.throttling.reject.count")
row.recent_write_throttling_reject_count += value;
else if (counter_name == "disk.storage.sst(MB)")
row.storage_mb += value;
else if (counter_name == "disk.storage.sst.count")
row.storage_count += value;
else if (counter_name == "rdb.block_cache.hit_count")
row.rdb_block_cache_hit_count += value;
else if (counter_name == "rdb.block_cache.total_count")
row.rdb_block_cache_total_count += value;
else if (counter_name == "rdb.index_and_filter_blocks.memory_usage")
row.rdb_index_and_filter_blocks_mem_usage += value;
else if (counter_name == "rdb.memtable.memory_usage")
row.rdb_memtable_mem_usage += value;
else if (counter_name == "rdb.estimate_num_keys")
row.rdb_estimate_num_keys += value;
else if (counter_name == "rdb.bf_seek_negatives")
row.rdb_bf_seek_negatives += value;
else if (counter_name == "rdb.bf_seek_total")
row.rdb_bf_seek_total += value;
else if (counter_name == "rdb.bf_point_positive_true")
row.rdb_bf_point_positive_true += value;
else if (counter_name == "rdb.bf_point_positive_total")
row.rdb_bf_point_positive_total += value;
else if (counter_name == "rdb.bf_point_negatives")
row.rdb_bf_point_negatives += value;
else if (counter_name == "backup_request_qps")
row.backup_request_qps += value;
else if (counter_name == "get_bytes")
row.get_bytes += value;
else if (counter_name == "multi_get_bytes")
row.multi_get_bytes += value;
else if (counter_name == "scan_bytes")
row.scan_bytes += value;
else if (counter_name == "put_bytes")
row.put_bytes += value;
else if (counter_name == "multi_put_bytes")
row.multi_put_bytes += value;
else if (counter_name == "check_and_set_bytes")
row.check_and_set_bytes += value;
else if (counter_name == "check_and_mutate_bytes")
row.check_and_mutate_bytes += value;
else
return false;
return true;
}
inline bool get_apps_and_nodes(shell_context *sc,
std::vector<::dsn::app_info> &apps,
std::vector<node_desc> &nodes)
{
dsn::error_code err = sc->ddl_client->list_apps(dsn::app_status::AS_AVAILABLE, apps);
if (err != dsn::ERR_OK) {
derror("list apps failed, error = %s", err.to_string());
return false;
}
if (!fill_nodes(sc, "replica-server", nodes)) {
derror("get replica server node list failed");
return false;
}
return true;
}
inline bool
get_app_partitions(shell_context *sc,
const std::vector<::dsn::app_info> &apps,
std::map<int32_t, std::vector<dsn::partition_configuration>> &app_partitions)
{
for (const ::dsn::app_info &app : apps) {
int32_t app_id = 0;
int32_t partition_count = 0;
dsn::error_code err = sc->ddl_client->list_app(
app.app_name, app_id, partition_count, app_partitions[app.app_id]);
if (err != ::dsn::ERR_OK) {
derror("list app %s failed, error = %s", app.app_name.c_str(), err.to_string());
return false;
}
dassert(app_id == app.app_id, "%d VS %d", app_id, app.app_id);
dassert(partition_count == app.partition_count,
"%d VS %d",
partition_count,
app.partition_count);
}
return true;
}
inline bool decode_node_perf_counter_info(const dsn::rpc_address &node_addr,
const std::pair<bool, std::string> &result,
dsn::perf_counter_info &info)
{
if (!result.first) {
derror("query perf counter info from node %s failed", node_addr.to_string());
return false;
}
dsn::blob bb(result.second.data(), 0, result.second.size());
if (!dsn::json::json_forwarder<dsn::perf_counter_info>::decode(bb, info)) {
derror("decode perf counter info from node %s failed, result = %s",
node_addr.to_string(),
result.second.c_str());
return false;
}
if (info.result != "OK") {
derror("query perf counter info from node %s returns error, error = %s",
node_addr.to_string(),
info.result.c_str());
return false;
}
return true;
}
// rows: key-app name, value-perf counters for each partition
inline bool get_app_partition_stat(shell_context *sc,
std::map<std::string, std::vector<row_data>> &rows)
{
// get apps and nodes
std::vector<::dsn::app_info> apps;
std::vector<node_desc> nodes;
if (!get_apps_and_nodes(sc, apps, nodes)) {
return false;
}
// get the relationship between app_id and app_name
std::map<int32_t, std::string> app_id_name;
std::map<std::string, int32_t> app_name_id;
for (::dsn::app_info &app : apps) {
app_id_name[app.app_id] = app.app_name;
app_name_id[app.app_name] = app.app_id;
rows[app.app_name].resize(app.partition_count);
}
// get app_id --> partitions
std::map<int32_t, std::vector<dsn::partition_configuration>> app_partitions;
if (!get_app_partitions(sc, apps, app_partitions)) {
return false;
}
// get all of the perf counters with format ".*@.*"
std::vector<std::pair<bool, std::string>> results =
call_remote_command(sc, nodes, "perf-counters", {".*@.*"});
for (int i = 0; i < nodes.size(); ++i) {
// decode info of perf-counters on node i
dsn::perf_counter_info info;
if (!decode_node_perf_counter_info(nodes[i].address, results[i], info)) {
return false;
}
for (dsn::perf_counter_metric &m : info.counters) {
// get app_id/partition_id/counter_name/app_name from the name of perf-counter
int32_t app_id_x, partition_index_x;
std::string counter_name;
std::string app_name;
if (parse_app_pegasus_perf_counter_name(
m.name, app_id_x, partition_index_x, counter_name)) {
// only primary partition will be counted
auto find = app_partitions.find(app_id_x);
if (find != app_partitions.end() &&
find->second[partition_index_x].primary == nodes[i].address) {
row_data &row = rows[app_id_name[app_id_x]][partition_index_x];
row.row_name = std::to_string(partition_index_x);
row.app_id = app_id_x;
update_app_pegasus_perf_counter(row, counter_name, m.value);
}
} else if (parse_app_perf_counter_name(m.name, app_name, counter_name)) {
// if the app_name from perf-counter isn't existed(maybe the app was dropped), it
// will be ignored.
if (app_name_id.find(app_name) == app_name_id.end()) {
continue;
}
// perf-counter value will be set into partition index 0.
row_data &row = rows[app_name][0];
row.app_id = app_name_id[app_name];
update_app_pegasus_perf_counter(row, counter_name, m.value);
}
}
}
return true;
}
inline bool
get_app_stat(shell_context *sc, const std::string &app_name, std::vector<row_data> &rows)
{
std::vector<::dsn::app_info> apps;
std::vector<node_desc> nodes;
if (!get_apps_and_nodes(sc, apps, nodes))
return false;
::dsn::app_info *app_info = nullptr;
if (!app_name.empty()) {
for (auto &app : apps) {
if (app.app_name == app_name) {
app_info = &app;
break;
}
}
if (app_info == nullptr) {
derror("app %s not found", app_name.c_str());
return false;
}
}
std::vector<std::string> arguments;
char tmp[256];
if (app_name.empty()) {
sprintf(tmp, ".*@.*");
} else {
sprintf(tmp, ".*@%d\\..*", app_info->app_id);
}
arguments.emplace_back(tmp);
std::vector<std::pair<bool, std::string>> results =
call_remote_command(sc, nodes, "perf-counters", arguments);
if (app_name.empty()) {
std::map<int32_t, std::vector<dsn::partition_configuration>> app_partitions;
if (!get_app_partitions(sc, apps, app_partitions))
return false;
rows.resize(app_partitions.size());
int idx = 0;
std::map<int32_t, int> app_row_idx; // app_id --> row_idx
for (::dsn::app_info &app : apps) {
rows[idx].row_name = app.app_name;
rows[idx].app_id = app.app_id;
rows[idx].partition_count = app.partition_count;
app_row_idx[app.app_id] = idx;
idx++;
}
for (int i = 0; i < nodes.size(); ++i) {
dsn::rpc_address node_addr = nodes[i].address;
dsn::perf_counter_info info;
if (!decode_node_perf_counter_info(node_addr, results[i], info))
return false;
for (dsn::perf_counter_metric &m : info.counters) {
int32_t app_id_x, partition_index_x;
std::string counter_name;
if (!parse_app_pegasus_perf_counter_name(
m.name, app_id_x, partition_index_x, counter_name)) {
continue;
}
auto find = app_partitions.find(app_id_x);
if (find == app_partitions.end())
continue;
dsn::partition_configuration &pc = find->second[partition_index_x];
if (pc.primary != node_addr)
continue;
update_app_pegasus_perf_counter(rows[app_row_idx[app_id_x]], counter_name, m.value);
}
}
} else {
rows.resize(app_info->partition_count);
for (int i = 0; i < app_info->partition_count; i++)
rows[i].row_name = std::to_string(i);
int32_t app_id = 0;
int32_t partition_count = 0;
std::vector<dsn::partition_configuration> partitions;
dsn::error_code err =
sc->ddl_client->list_app(app_name, app_id, partition_count, partitions);
if (err != ::dsn::ERR_OK) {
derror("list app %s failed, error = %s", app_name.c_str(), err.to_string());
return false;
}
dassert(app_id == app_info->app_id, "%d VS %d", app_id, app_info->app_id);
dassert(partition_count == app_info->partition_count,
"%d VS %d",
partition_count,
app_info->partition_count);
for (int i = 0; i < nodes.size(); ++i) {
dsn::rpc_address node_addr = nodes[i].address;
dsn::perf_counter_info info;
if (!decode_node_perf_counter_info(node_addr, results[i], info))
return false;
for (dsn::perf_counter_metric &m : info.counters) {
int32_t app_id_x, partition_index_x;
std::string counter_name;
bool parse_ret = parse_app_pegasus_perf_counter_name(
m.name, app_id_x, partition_index_x, counter_name);
dassert(parse_ret, "name = %s", m.name.c_str());
dassert(app_id_x == app_id, "name = %s", m.name.c_str());
dassert(partition_index_x < partition_count, "name = %s", m.name.c_str());
if (partitions[partition_index_x].primary != node_addr)
continue;
update_app_pegasus_perf_counter(rows[partition_index_x], counter_name, m.value);
}
}
}
return true;
}
struct node_capacity_unit_stat
{
// timestamp when node perf_counter_info has updated.
std::string timestamp;
std::string node_address;
// mapping: app_id --> (read_cu, write_cu)
std::map<int32_t, std::pair<int64_t, int64_t>> cu_value_by_app;
std::string dump_to_json() const
{
std::map<int32_t, std::vector<int64_t>> values;
for (auto &kv : cu_value_by_app) {
auto &pair = kv.second;
if (pair.first != 0 || pair.second != 0)
values.emplace(kv.first, std::vector<int64_t>{pair.first, pair.second});
}
std::stringstream out;
rapidjson::OStreamWrapper wrapper(out);
dsn::json::JsonWriter writer(wrapper);
dsn::json::json_encode(writer, values);
return out.str();
}
};
inline bool get_capacity_unit_stat(shell_context *sc,
std::vector<node_capacity_unit_stat> &nodes_stat)
{
std::vector<node_desc> nodes;
if (!fill_nodes(sc, "replica-server", nodes)) {
derror("get replica server node list failed");
return false;
}
std::vector<std::pair<bool, std::string>> results =
call_remote_command(sc, nodes, "perf-counters-by-substr", {".cu@"});
nodes_stat.resize(nodes.size());
for (int i = 0; i < nodes.size(); ++i) {
dsn::rpc_address node_addr = nodes[i].address;
dsn::perf_counter_info info;
if (!decode_node_perf_counter_info(node_addr, results[i], info)) {
dwarn("decode perf counter from node(%s) failed, just ignore it",
node_addr.to_string());
continue;
}
nodes_stat[i].timestamp = info.timestamp_str;
nodes_stat[i].node_address = node_addr.to_string();
for (dsn::perf_counter_metric &m : info.counters) {
int32_t app_id, pidx;
std::string counter_name;
bool r = parse_app_pegasus_perf_counter_name(m.name, app_id, pidx, counter_name);
dassert(r, "name = %s", m.name.c_str());
if (counter_name == "recent.read.cu") {
nodes_stat[i].cu_value_by_app[app_id].first += (int64_t)m.value;
} else if (counter_name == "recent.write.cu") {
nodes_stat[i].cu_value_by_app[app_id].second += (int64_t)m.value;
}
}
}
return true;
}
struct app_storage_size_stat
{
// timestamp when this stat is generated.
std::string timestamp;
// mapping: app_id --> [app_partition_count, stat_partition_count, storage_size_in_mb]
std::map<int32_t, std::vector<int64_t>> st_value_by_app;
std::string dump_to_json() const
{
std::stringstream out;
rapidjson::OStreamWrapper wrapper(out);
dsn::json::JsonWriter writer(wrapper);
dsn::json::json_encode(writer, st_value_by_app);
return out.str();
}
};
inline bool get_storage_size_stat(shell_context *sc, app_storage_size_stat &st_stat)
{
std::vector<::dsn::app_info> apps;
std::vector<node_desc> nodes;
if (!get_apps_and_nodes(sc, apps, nodes)) {
derror("get apps and nodes failed");
return false;
}
std::map<int32_t, std::vector<dsn::partition_configuration>> app_partitions;
if (!get_app_partitions(sc, apps, app_partitions)) {
derror("get app partitions failed");
return false;
}
for (auto &kv : app_partitions) {
auto &v = kv.second;
for (auto &c : v) {
// use partition_flags to record if this partition's storage size is calculated,
// because `app_partitions' is a temporary variable, so we can re-use partition_flags.
c.partition_flags = 0;
}
}
std::vector<std::pair<bool, std::string>> results = call_remote_command(
sc, nodes, "perf-counters-by-prefix", {"replica*app.pegasus*disk.storage.sst(MB)"});
for (int i = 0; i < nodes.size(); ++i) {
dsn::rpc_address node_addr = nodes[i].address;
dsn::perf_counter_info info;
if (!decode_node_perf_counter_info(node_addr, results[i], info)) {
dwarn("decode perf counter from node(%s) failed, just ignore it",
node_addr.to_string());
continue;
}
for (dsn::perf_counter_metric &m : info.counters) {
int32_t app_id_x, partition_index_x;
std::string counter_name;
bool parse_ret = parse_app_pegasus_perf_counter_name(
m.name, app_id_x, partition_index_x, counter_name);
dassert(parse_ret, "name = %s", m.name.c_str());
if (counter_name != "disk.storage.sst(MB)")
continue;
auto find = app_partitions.find(app_id_x);
if (find == app_partitions.end()) // app id not found
continue;
dsn::partition_configuration &pc = find->second[partition_index_x];
if (pc.primary != node_addr) // not primary replica
continue;
if (pc.partition_flags != 0) // already calculated
continue;
pc.partition_flags = 1;
int64_t app_partition_count = find->second.size();
auto st_it = st_stat.st_value_by_app
.emplace(app_id_x, std::vector<int64_t>{app_partition_count, 0, 0})
.first;
st_it->second[1]++; // stat_partition_count
st_it->second[2] += m.value; // storage_size_in_mb
}
}
char buf[20];
dsn::utils::time_ms_to_date_time(dsn_now_ms(), buf, sizeof(buf));
st_stat.timestamp = buf;
return true;
}
inline configuration_proposal_action new_proposal_action(const dsn::rpc_address &target,
const dsn::rpc_address &node,
config_type::type type)
{
configuration_proposal_action act;
act.__set_target(target);
act.__set_node(node);
act.__set_type(type);
return act;
}