blob: 68e1555a4fe89f63cdce4dec3c4d0e322acb79c7 [file] [log] [blame]
/*
* Copyright (c) 2019-2022 ExpoLab, UC Davis
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or
* sell copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
* DEALINGS IN THE SOFTWARE.
*
*/
#include "platform/statistic/stats.h"
#include <glog/logging.h>
#include <ctime>
#include "common/utils/utils.h"
#include "proto/kv/kv.pb.h"
namespace asio = boost::asio;
namespace beast = boost::beast;
using tcp = asio::ip::tcp;
namespace resdb {
std::mutex g_mutex;
Stats* Stats::GetGlobalStats(int seconds) {
std::unique_lock<std::mutex> lk(g_mutex);
static Stats stats(seconds);
return &stats;
}
Stats::Stats(int sleep_time) {
monitor_sleep_time_ = sleep_time;
#ifdef TEST_MODE
monitor_sleep_time_ = 1;
#endif
num_call_ = 0;
num_commit_ = 0;
run_time_ = 0;
run_call_ = 0;
run_call_time_ = 0;
server_call_ = 0;
server_process_ = 0;
run_req_num_ = 0;
run_req_run_time_ = 0;
seq_gap_ = 0;
total_request_ = 0;
total_geo_request_ = 0;
geo_request_ = 0;
stop_ = false;
begin_ = false;
socket_recv_ = 0;
broad_cast_msg_ = 0;
send_broad_cast_msg_ = 0;
prometheus_ = nullptr;
global_thread_ =
std::thread(&Stats::MonitorGlobal, this); // pass by reference
transaction_summary_.port=-1;
//Setup websocket here
send_summary_.store(false);
make_faulty_.store(false);
transaction_summary_.request_pre_prepare_state_time=std::chrono::system_clock::time_point::min();
transaction_summary_.prepare_state_time=std::chrono::system_clock::time_point::min();
transaction_summary_.commit_state_time=std::chrono::system_clock::time_point::min();
transaction_summary_.execution_time=std::chrono::system_clock::time_point::min();
transaction_summary_.txn_number=0;
}
void Stats::Stop() { stop_ = true; }
Stats::~Stats() {
stop_ = true;
if (global_thread_.joinable()) {
global_thread_.join();
}
if(enable_resview && summary_thread_.joinable()){
summary_thread_.join();
crow_thread_.join();
}
if(enable_faulty_switch && faulty_thread_.joinable()){
faulty_thread_.join();
}
}
void Stats::SocketManagementWrite(){
while(!stop_){
try{
int count=0;
LOG(ERROR)<<"Port:" <<transaction_summary_.port;
asio::io_context io_context;
tcp::acceptor acceptor(io_context, {{}, (boost::asio::ip::port_type)(11000+transaction_summary_.port)});
tcp::socket socket(io_context);
acceptor.accept(socket);
beast::websocket::stream<tcp::socket> ws(std::move(socket));
ws.accept();
while(!stop_){
if(!ws.is_open()){
break;
}
if(send_summary_.load()){
ws.write(asio::buffer(summary_json_.dump()));
summary_json_={};
LOG(ERROR)<<"SENT MESSAGE";
send_summary_.store(false);
}
}
}
catch( const std::exception& e){
LOG(ERROR)<<"Exception: " <<e.what();
}
}
}
void Stats::SocketManagementRead(){
while(!stop_){
try{
LOG(ERROR)<<"Read Port:" <<transaction_summary_.port;
asio::io_context io_context;
tcp::acceptor acceptor(io_context, {{}, (boost::asio::ip::port_type)(12000+transaction_summary_.port)});
tcp::socket socket(io_context);
acceptor.accept(socket);
beast::websocket::stream<tcp::socket> ws(std::move(socket));
ws.accept();
beast::flat_buffer data;
ws.read(data);
make_faulty_.store(!make_faulty_.load());
LOG(ERROR)<<"Received Message on port "<<transaction_summary_.port;
ws.close("Message Received");
}
catch( const std::exception& e){
LOG(ERROR)<<"Exception: " <<e.what();
}
}
}
void Stats::CrowRoute(){
crow::SimpleApp app;
while(!stop_){
try{
CROW_ROUTE(app, "/consensus_data")([this](){
LOG(ERROR)<<"API";
return consensus_history_.dump();
});
app.port(8500+transaction_summary_.port).multithreaded().run();
sleep(1);
}
catch( const std::exception& e){
}
}
}
bool Stats::IsFaulty(){
return make_faulty_.load();
}
void Stats::ChangePrimary(int primary_id){
transaction_summary_.primary_id=primary_id;
make_faulty_.store(false);
}
void Stats::SetProps(int replica_id, std::string ip, int port, bool resview_flag, bool faulty_flag){
transaction_summary_.replica_id=replica_id;
transaction_summary_.ip=ip;
transaction_summary_.port=port;
enable_resview=resview_flag;
enable_faulty_switch=faulty_flag;
if(resview_flag){
summary_thread_ = std::thread(&Stats::SocketManagementWrite, this);
crow_thread_ = std::thread(&Stats::CrowRoute, this);
}
if(faulty_flag){
faulty_thread_ = std::thread(&Stats::SocketManagementRead, this);
}
}
void Stats::SetPrimaryId(int primary_id){
transaction_summary_.primary_id=primary_id;
}
void Stats::RecordStateTime(std::string state){
if(!enable_resview){
return;
}
if(state=="request" || state=="pre-prepare"){
transaction_summary_.request_pre_prepare_state_time=std::chrono::system_clock::now();
}
else if(state=="prepare"){
transaction_summary_.prepare_state_time=std::chrono::system_clock::now();
}
else if(state=="commit"){
transaction_summary_.commit_state_time=std::chrono::system_clock::now();
}
}
void Stats::GetTransactionDetails(BatchUserRequest batch_request){
if(!enable_resview){
return;
}
transaction_summary_.txn_number=batch_request.seq();
transaction_summary_.txn_command.clear();
transaction_summary_.txn_key.clear();
transaction_summary_.txn_value.clear();
for (auto& sub_request : batch_request.user_requests()) {
KVRequest kv_request;
if(!kv_request.ParseFromString(sub_request.request().data())){
break;
}
if (kv_request.cmd() == KVRequest::SET) {
transaction_summary_.txn_command.push_back("SET");
transaction_summary_.txn_key.push_back(kv_request.key());
transaction_summary_.txn_value.push_back(kv_request.value());
} else if (kv_request.cmd() == KVRequest::GET) {
transaction_summary_.txn_command.push_back("GET");
transaction_summary_.txn_key.push_back(kv_request.key());
transaction_summary_.txn_value.push_back("");
} else if (kv_request.cmd() == KVRequest::GETALLVALUES) {
transaction_summary_.txn_command.push_back("GETALLVALUES");
transaction_summary_.txn_key.push_back(kv_request.key());
transaction_summary_.txn_value.push_back("");
} else if (kv_request.cmd() == KVRequest::GETRANGE) {
transaction_summary_.txn_command.push_back("GETRANGE");
transaction_summary_.txn_key.push_back(kv_request.key());
transaction_summary_.txn_value.push_back(kv_request.value());
}
}
}
void Stats::SendSummary(){
if(!enable_resview){
return;
}
transaction_summary_.execution_time=std::chrono::system_clock::now();
//transaction_summary_.txn_number=transaction_summary_.txn_number+1;
//Convert Transaction Summary to JSON
summary_json_["replica_id"]=transaction_summary_.replica_id;
summary_json_["ip"]=transaction_summary_.ip;
summary_json_["port"]=transaction_summary_.port;
summary_json_["primary_id"]=transaction_summary_.primary_id;
summary_json_["propose_pre_prepare_time"]=transaction_summary_.request_pre_prepare_state_time.time_since_epoch().count();
summary_json_["prepare_time"]=transaction_summary_.prepare_state_time.time_since_epoch().count();
summary_json_["commit_time"]=transaction_summary_.commit_state_time.time_since_epoch().count();
summary_json_["execution_time"]=transaction_summary_.execution_time.time_since_epoch().count();
for(size_t i=0; i<transaction_summary_.prepare_message_count_times_list.size(); i++){
summary_json_["prepare_message_timestamps"].push_back(transaction_summary_.prepare_message_count_times_list[i].time_since_epoch().count());
}
for(size_t i=0; i<transaction_summary_.commit_message_count_times_list.size(); i++){
summary_json_["commit_message_timestamps"].push_back(transaction_summary_.commit_message_count_times_list[i].time_since_epoch().count());
}
summary_json_["txn_number"]=transaction_summary_.txn_number;
for(size_t i=0; i<transaction_summary_.txn_command.size(); i++){
summary_json_["txn_commands"].push_back(transaction_summary_.txn_command[i]);
}
for(size_t i=0; i<transaction_summary_.txn_key.size(); i++){
summary_json_["txn_keys"].push_back(transaction_summary_.txn_key[i]);
}
for(size_t i=0; i<transaction_summary_.txn_value.size(); i++){
summary_json_["txn_values"].push_back(transaction_summary_.txn_value[i]);
}
consensus_history_[std::to_string(transaction_summary_.txn_number)]=summary_json_;
LOG(ERROR)<<summary_json_.dump();
//Send Summary via Websocket
send_summary_.store(true);
int count =0;
while(send_summary_.load() && count<5){
sleep(1);
count=count+1;
}
if(send_summary_.load()){
send_summary_.store(false);
}
//Reset Transaction Summary Parameters
transaction_summary_.request_pre_prepare_state_time=std::chrono::system_clock::time_point::min();
transaction_summary_.prepare_state_time=std::chrono::system_clock::time_point::min();
transaction_summary_.commit_state_time=std::chrono::system_clock::time_point::min();
transaction_summary_.execution_time=std::chrono::system_clock::time_point::min();
transaction_summary_.prepare_message_count_times_list.clear();
transaction_summary_.commit_message_count_times_list.clear();
}
void Stats::MonitorGlobal() {
LOG(ERROR) << "monitor:" << name_ << " sleep time:" << monitor_sleep_time_;
uint64_t seq_fail = 0;
uint64_t client_call = 0, socket_recv = 0;
uint64_t num_client_req = 0, num_propose = 0, num_prepare = 0, num_commit = 0,
pending_execute = 0, execute = 0, execute_done = 0;
uint64_t broad_cast_msg = 0, send_broad_cast_msg = 0;
uint64_t send_broad_cast_msg_per_rep = 0;
uint64_t server_call = 0, server_process = 0;
uint64_t seq_gap = 0;
uint64_t total_request = 0, total_geo_request = 0, geo_request = 0;
// ====== for client proxy ======
uint64_t run_req_num = 0, run_req_run_time = 0;
uint64_t last_run_req_num = 0, last_run_req_run_time = 0;
// =============================
uint64_t last_seq_fail = 0;
uint64_t last_num_client_req = 0, last_num_propose = 0, last_num_prepare = 0,
last_num_commit = 0;
uint64_t last_pending_execute = 0, last_execute = 0, last_execute_done = 0;
uint64_t last_client_call = 0, last_socket_recv = 0;
uint64_t last_broad_cast_msg = 0, last_send_broad_cast_msg = 0;
uint64_t last_send_broad_cast_msg_per_rep = 0;
uint64_t last_server_call = 0, last_server_process = 0;
uint64_t last_total_request = 0, last_total_geo_request = 0,
last_geo_request = 0;
uint64_t time = 0;
while (!stop_) {
sleep(monitor_sleep_time_);
time += monitor_sleep_time_;
seq_fail = seq_fail_;
socket_recv = socket_recv_;
client_call = client_call_;
num_client_req = num_client_req_;
num_propose = num_propose_;
num_prepare = num_prepare_;
num_commit = num_commit_;
pending_execute = pending_execute_;
execute = execute_;
execute_done = execute_done_;
broad_cast_msg = broad_cast_msg_;
send_broad_cast_msg = send_broad_cast_msg_;
send_broad_cast_msg_per_rep = send_broad_cast_msg_per_rep_;
server_call = server_call_;
server_process = server_process_;
seq_gap = seq_gap_;
total_request = total_request_;
total_geo_request = total_geo_request_;
geo_request = geo_request_;
run_req_num = run_req_num_;
run_req_run_time = run_req_run_time_;
LOG(ERROR) << "=========== monitor =========\n"
<< "server call:" << server_call - last_server_call
<< " server process:" << server_process - last_server_process
<< " socket recv:" << socket_recv - last_socket_recv
<< " "
"client call:"
<< client_call - last_client_call
<< " "
"client req:"
<< num_client_req - last_num_client_req
<< " "
"broad_cast:"
<< broad_cast_msg - last_broad_cast_msg
<< " "
"send broad_cast:"
<< send_broad_cast_msg - last_send_broad_cast_msg
<< " "
"per send broad_cast:"
<< send_broad_cast_msg_per_rep - last_send_broad_cast_msg_per_rep
<< " "
"propose:"
<< num_propose - last_num_propose
<< " "
"prepare:"
<< (num_prepare - last_num_prepare)
<< " "
"commit:"
<< (num_commit - last_num_commit)
<< " "
"pending execute:"
<< pending_execute - last_pending_execute
<< " "
"execute:"
<< execute - last_execute
<< " "
"execute done:"
<< execute_done - last_execute_done << " seq gap:" << seq_gap
<< " total request:" << total_request - last_total_request
<< " txn:" << (total_request - last_total_request) / 5
<< " total geo request:"
<< total_geo_request - last_total_geo_request
<< " total geo request per:"
<< (total_geo_request - last_total_geo_request) / 5
<< " geo request:" << (geo_request - last_geo_request)
<< " "
"seq fail:"
<< seq_fail - last_seq_fail << " time:" << time
<< " "
"\n--------------- monitor ------------";
if (run_req_num - last_run_req_num > 0) {
LOG(ERROR) << " req client latency:"
<< static_cast<double>(run_req_run_time -
last_run_req_run_time) /
(run_req_num - last_run_req_num) / 1000000000.0;
}
last_seq_fail = seq_fail;
last_socket_recv = socket_recv;
last_client_call = client_call;
last_num_client_req = num_client_req;
last_num_propose = num_propose;
last_num_prepare = num_prepare;
last_num_commit = num_commit;
last_pending_execute = pending_execute;
last_execute = execute;
last_execute_done = execute_done;
last_broad_cast_msg = broad_cast_msg;
last_send_broad_cast_msg = send_broad_cast_msg;
last_send_broad_cast_msg_per_rep = send_broad_cast_msg_per_rep;
last_server_call = server_call;
last_server_process = server_process;
last_run_req_num = run_req_num;
last_run_req_run_time = run_req_run_time;
last_total_request = total_request;
last_total_geo_request = total_geo_request;
last_geo_request = geo_request;
}
}
void Stats::IncClientCall() {
if (prometheus_) {
prometheus_->Inc(CLIENT_CALL, 1);
}
client_call_++;
}
void Stats::IncClientRequest() {
if (prometheus_) {
prometheus_->Inc(CLIENT_REQ, 1);
}
num_client_req_++;
}
void Stats::IncPropose() {
if (prometheus_) {
prometheus_->Inc(PROPOSE, 1);
}
num_propose_++;
}
void Stats::IncPrepare() {
if (prometheus_) {
prometheus_->Inc(PREPARE, 1);
}
num_prepare_++;
transaction_summary_.prepare_message_count_times_list.push_back(std::chrono::system_clock::now());
}
void Stats::IncCommit() {
if (prometheus_) {
prometheus_->Inc(COMMIT, 1);
}
num_commit_++;
transaction_summary_.commit_message_count_times_list.push_back(std::chrono::system_clock::now());
}
void Stats::IncPendingExecute() {
pending_execute_++;
}
void Stats::IncExecute() { execute_++; }
void Stats::IncExecuteDone() {
if (prometheus_) {
prometheus_->Inc(EXECUTE, 1);
}
execute_done_++;
}
void Stats::BroadCastMsg() {
if (prometheus_) {
prometheus_->Inc(BROAD_CAST, 1);
}
broad_cast_msg_++;
}
void Stats::SendBroadCastMsg(uint32_t num) { send_broad_cast_msg_ += num; }
void Stats::SendBroadCastMsgPerRep() { send_broad_cast_msg_per_rep_++; }
void Stats::SeqFail() { seq_fail_++; }
void Stats::IncTotalRequest(uint32_t num) {
if (prometheus_) {
prometheus_->Inc(NUM_EXECUTE_TX, num);
}
total_request_ += num;
}
void Stats::IncTotalGeoRequest(uint32_t num) { total_geo_request_ += num; }
void Stats::IncGeoRequest() { geo_request_++; }
void Stats::ServerCall() {
if (prometheus_) {
prometheus_->Inc(SERVER_CALL_NAME, 1);
}
server_call_++;
}
void Stats::ServerProcess() {
if (prometheus_) {
prometheus_->Inc(SERVER_PROCESS, 1);
}
server_process_++;
}
void Stats::SeqGap(uint64_t seq_gap) { seq_gap_ = seq_gap; }
void Stats::AddLatency(uint64_t run_time) {
run_req_num_++;
run_req_run_time_ += run_time;
}
void Stats::SetPrometheus(const std::string& prometheus_address) {
prometheus_ = std::make_unique<PrometheusHandler>(prometheus_address);
}
} // namespace resdb