blob: a7afff20c1a830a3197149bbc960eb6d00db7002 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#pragma once
#include <dsn/tool-api/task_tracker.h>
#include <dsn/dist/replication.h>
#include <dsn/dist/replication/replication_other_types.h>
#include <dsn/perf_counter/perf_counter_wrapper.h>
#include <stdlib.h>
#include <signal.h>
#include <unistd.h>
#include <evhttp.h>
#include <event2/event.h>
#include <event2/http.h>
#include <event2/bufferevent.h>
#include "../shell/commands.h"
#include "table_stats.h"
namespace pegasus {
namespace server {
class result_writer;
class hotspot_partition_calculator;
class info_collector
{
public:
struct app_stat_counters
{
double convert_to_1M_ratio(double hit, double total)
{
return std::abs(total) < 1e-6 ? 0 : hit / total * 1e6;
}
void set(const table_stats &row_stats)
{
get_qps->set(row_stats.total_get_qps);
multi_get_qps->set(row_stats.total_multi_get_qps);
put_qps->set(row_stats.total_put_qps);
multi_put_qps->set(row_stats.total_multi_put_qps);
remove_qps->set(row_stats.total_remove_qps);
multi_remove_qps->set(row_stats.total_multi_remove_qps);
incr_qps->set(row_stats.total_incr_qps);
check_and_set_qps->set(row_stats.total_check_and_set_qps);
check_and_mutate_qps->set(row_stats.total_check_and_mutate_qps);
scan_qps->set(row_stats.total_scan_qps);
duplicate_qps->set(row_stats.total_duplicate_qps);
dup_shipped_ops->set(row_stats.total_dup_shipped_ops);
dup_failed_shipping_ops->set(row_stats.total_dup_failed_shipping_ops);
recent_read_cu->set(row_stats.total_recent_read_cu);
recent_write_cu->set(row_stats.total_recent_write_cu);
recent_expire_count->set(row_stats.total_recent_expire_count);
recent_filter_count->set(row_stats.total_recent_filter_count);
recent_abnormal_count->set(row_stats.total_recent_abnormal_count);
recent_write_throttling_delay_count->set(
row_stats.total_recent_write_throttling_delay_count);
recent_write_throttling_reject_count->set(
row_stats.total_recent_write_throttling_reject_count);
storage_mb->set(row_stats.total_storage_mb);
storage_count->set(row_stats.total_storage_count);
rdb_block_cache_hit_rate->set(
convert_to_1M_ratio(row_stats.total_rdb_block_cache_hit_count,
row_stats.total_rdb_block_cache_total_count));
rdb_index_and_filter_blocks_mem_usage->set(
row_stats.total_rdb_index_and_filter_blocks_mem_usage);
rdb_memtable_mem_usage->set(row_stats.total_rdb_memtable_mem_usage);
rdb_estimate_num_keys->set(row_stats.total_rdb_estimate_num_keys);
rdb_bf_seek_negatives_rate->set(convert_to_1M_ratio(
row_stats.total_rdb_bf_seek_negatives, row_stats.total_rdb_bf_seek_total));
rdb_bf_point_negatives_rate->set(
convert_to_1M_ratio(row_stats.total_rdb_bf_point_negatives,
row_stats.total_rdb_bf_point_negatives +
row_stats.total_rdb_bf_point_positive_total));
rdb_bf_point_false_positive_rate->set(
convert_to_1M_ratio(row_stats.total_rdb_bf_point_positive_total -
row_stats.total_rdb_bf_point_positive_true,
(row_stats.total_rdb_bf_point_positive_total -
row_stats.total_rdb_bf_point_positive_true) +
row_stats.total_rdb_bf_point_negatives));
read_qps->set(row_stats.get_total_read_qps());
write_qps->set(row_stats.get_total_write_qps());
backup_request_qps->set(row_stats.total_backup_request_qps);
get_bytes->set(row_stats.total_get_bytes);
multi_get_bytes->set(row_stats.total_multi_get_bytes);
scan_bytes->set(row_stats.total_scan_bytes);
put_bytes->set(row_stats.total_put_bytes);
multi_put_bytes->set(row_stats.total_multi_put_bytes);
check_and_set_bytes->set(row_stats.total_check_and_set_bytes);
check_and_mutate_bytes->set(row_stats.total_check_and_mutate_bytes);
read_bytes->set(row_stats.get_total_read_bytes());
write_bytes->set(row_stats.get_total_write_bytes());
}
::dsn::perf_counter_wrapper get_qps;
::dsn::perf_counter_wrapper multi_get_qps;
::dsn::perf_counter_wrapper put_qps;
::dsn::perf_counter_wrapper multi_put_qps;
::dsn::perf_counter_wrapper remove_qps;
::dsn::perf_counter_wrapper multi_remove_qps;
::dsn::perf_counter_wrapper incr_qps;
::dsn::perf_counter_wrapper check_and_set_qps;
::dsn::perf_counter_wrapper check_and_mutate_qps;
::dsn::perf_counter_wrapper scan_qps;
::dsn::perf_counter_wrapper duplicate_qps;
::dsn::perf_counter_wrapper dup_shipped_ops;
::dsn::perf_counter_wrapper dup_failed_shipping_ops;
::dsn::perf_counter_wrapper recent_read_cu;
::dsn::perf_counter_wrapper recent_write_cu;
::dsn::perf_counter_wrapper recent_expire_count;
::dsn::perf_counter_wrapper recent_filter_count;
::dsn::perf_counter_wrapper recent_abnormal_count;
::dsn::perf_counter_wrapper recent_write_throttling_delay_count;
::dsn::perf_counter_wrapper recent_write_throttling_reject_count;
::dsn::perf_counter_wrapper storage_mb;
::dsn::perf_counter_wrapper storage_count;
::dsn::perf_counter_wrapper rdb_block_cache_hit_rate;
::dsn::perf_counter_wrapper rdb_block_cache_mem_usage;
::dsn::perf_counter_wrapper rdb_index_and_filter_blocks_mem_usage;
::dsn::perf_counter_wrapper rdb_memtable_mem_usage;
::dsn::perf_counter_wrapper rdb_estimate_num_keys;
::dsn::perf_counter_wrapper rdb_bf_seek_negatives_rate;
::dsn::perf_counter_wrapper rdb_bf_point_negatives_rate;
::dsn::perf_counter_wrapper rdb_bf_point_false_positive_rate;
::dsn::perf_counter_wrapper read_qps;
::dsn::perf_counter_wrapper write_qps;
::dsn::perf_counter_wrapper backup_request_qps;
::dsn::perf_counter_wrapper get_bytes;
::dsn::perf_counter_wrapper multi_get_bytes;
::dsn::perf_counter_wrapper scan_bytes;
::dsn::perf_counter_wrapper put_bytes;
::dsn::perf_counter_wrapper multi_put_bytes;
::dsn::perf_counter_wrapper check_and_set_bytes;
::dsn::perf_counter_wrapper check_and_mutate_bytes;
::dsn::perf_counter_wrapper read_bytes;
::dsn::perf_counter_wrapper write_bytes;
};
info_collector();
~info_collector();
void start();
void stop();
void on_app_stat();
app_stat_counters *get_app_counters(const std::string &app_name);
void on_capacity_unit_stat(int remaining_retry_count);
bool has_capacity_unit_updated(const std::string &node_address, const std::string &timestamp);
void on_storage_size_stat(int remaining_retry_count);
private:
dsn::task_tracker _tracker;
::dsn::rpc_address _meta_servers;
std::string _cluster_name;
std::shared_ptr<shell_context> _shell_context;
uint32_t _app_stat_interval_seconds;
::dsn::task_ptr _app_stat_timer_task;
::dsn::utils::ex_lock_nr _app_stat_counter_lock;
std::map<std::string, app_stat_counters *> _app_stat_counters;
// app for recording usage statistics, including read/write capacity unit and storage size.
std::string _usage_stat_app;
// client to access server.
pegasus_client *_client;
// for writing cu stat result
std::unique_ptr<result_writer> _result_writer;
uint32_t _capacity_unit_fetch_interval_seconds;
uint32_t _capacity_unit_retry_wait_seconds;
uint32_t _capacity_unit_retry_max_count;
::dsn::task_ptr _capacity_unit_stat_timer_task;
uint32_t _storage_size_fetch_interval_seconds;
uint32_t _storage_size_retry_wait_seconds;
uint32_t _storage_size_retry_max_count;
::dsn::task_ptr _storage_size_stat_timer_task;
::dsn::utils::ex_lock_nr _capacity_unit_update_info_lock;
// mapping 'node address' --> 'last updated timestamp'
std::map<std::string, string> _capacity_unit_update_info;
// _hotspot_calculator_store is to save hotspot_partition_calculator for each table, a
// hotspot_partition_calculator saves historical hotspot data and alert perf_counters of
// corresponding table
std::map<std::string, std::shared_ptr<hotspot_partition_calculator>> _hotspot_calculator_store;
std::shared_ptr<hotspot_partition_calculator>
get_hotspot_calculator(const std::string &app_name, const int partition_count);
};
} // namespace server
} // namespace pegasus