Fix Bug (#142)
* fix viewchange bug
* format code
* rm unused log
* rm unused log
* fix ut
diff --git a/WORKSPACE b/WORKSPACE
index 6a7883f..ea67f3e 100644
--- a/WORKSPACE
+++ b/WORKSPACE
@@ -199,7 +199,7 @@
http_archive(
name = "pybind11_bazel",
strip_prefix = "pybind11_bazel-2.11.1.bzl.1",
- urls = ["https://github.com/pybind/pybind11_bazel/archive/refs/tags/v2.11.1.bzl.1.zip"]
+ urls = ["https://github.com/pybind/pybind11_bazel/archive/refs/tags/v2.11.1.bzl.1.zip"],
)
http_archive(
@@ -244,4 +244,4 @@
sha256 = "babcdfd2c744905a73d20de211b51367bda0d5200f11d654c4314b909d8c963c",
strip_prefix = "asio-asio-1-26-0",
url = "https://github.com/chriskohlhoff/asio/archive/refs/tags/asio-1-26-0.zip",
-)
\ No newline at end of file
+)
diff --git a/api/BUILD b/api/BUILD
index d69120c..93b83cd 100644
--- a/api/BUILD
+++ b/api/BUILD
@@ -1,19 +1,20 @@
package(default_visibility = ["//visibility:public"])
+
cc_binary(
name = "pybind_kv.so",
srcs = ["pybind_kv_service.cpp"],
- linkshared =1,
+ linkshared = 1,
linkstatic = 1,
deps = [
"@//common/proto:signature_info_cc_proto",
"@//interface/kv:kv_client",
"@//platform/config:resdb_config_utils",
- "@pybind11//:pybind11",
+ "@pybind11",
],
)
+
py_library(
name = "pybind_kv_so",
data = [":pybind_kv.so"],
imports = ["."],
)
-
diff --git a/api/pybind_kv_service.cpp b/api/pybind_kv_service.cpp
index bad50dc..40073d6 100644
--- a/api/pybind_kv_service.cpp
+++ b/api/pybind_kv_service.cpp
@@ -1,11 +1,12 @@
#include <fcntl.h>
#include <getopt.h>
+#include <pybind11/pybind11.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
-#include <pybind11/pybind11.h>
#include <fstream>
+
#include "common/proto/signature_info.pb.h"
#include "interface/kv/kv_client.h"
#include "platform/config/resdb_config_utils.h"
@@ -16,35 +17,31 @@
using resdb::ReplicaInfo;
using resdb::ResDBConfig;
-
-
-
std::string get(std::string key, std::string config_path) {
- ResDBConfig config = GenerateResDBConfig(config_path);
- config.SetClientTimeoutMs(100000);
- KVClient client(config);
- auto result_ptr = client.Get(key);
- if (result_ptr) {
- return *result_ptr;
- } else {
- return "";
- }
+ ResDBConfig config = GenerateResDBConfig(config_path);
+ config.SetClientTimeoutMs(100000);
+ KVClient client(config);
+ auto result_ptr = client.Get(key);
+ if (result_ptr) {
+ return *result_ptr;
+ } else {
+ return "";
+ }
}
bool set(std::string key, std::string value, std::string config_path) {
- ResDBConfig config = GenerateResDBConfig(config_path);
- config.SetClientTimeoutMs(100000);
- KVClient client(config);
- int result = client.Set(key, value);
- if (result == 0) {
- return true;
- } else {
- return false;
- }
+ ResDBConfig config = GenerateResDBConfig(config_path);
+ config.SetClientTimeoutMs(100000);
+ KVClient client(config);
+ int result = client.Set(key, value);
+ if (result == 0) {
+ return true;
+ } else {
+ return false;
+ }
}
PYBIND11_MODULE(pybind_kv, m) {
- m.def("get", &get, "A function that gets a value from the key-value store");
- m.def("set", &set, "A function that sets a value in the key-value store");
+ m.def("get", &get, "A function that gets a value from the key-value store");
+ m.def("set", &set, "A function that sets a value in the key-value store");
}
-
diff --git a/benchmark/protocols/poe/BUILD b/benchmark/protocols/poe/BUILD
index e7bbde3..f63f6fb 100644
--- a/benchmark/protocols/poe/BUILD
+++ b/benchmark/protocols/poe/BUILD
@@ -13,4 +13,3 @@
"//service/utils:server_factory",
],
)
-
diff --git a/benchmark/protocols/poe/kv_server_performance.cpp b/benchmark/protocols/poe/kv_server_performance.cpp
index fa13e9a..b2eb62d 100644
--- a/benchmark/protocols/poe/kv_server_performance.cpp
+++ b/benchmark/protocols/poe/kv_server_performance.cpp
@@ -82,7 +82,7 @@
request.SerializeToString(&request_data);
return request_data;
});
-
+
auto server =
std::make_unique<ServiceNetwork>(*config, std::move(performance_consens));
server->Run();
diff --git a/common/BUILD b/common/BUILD
index 84acee1..4e6a18a 100644
--- a/common/BUILD
+++ b/common/BUILD
@@ -16,10 +16,10 @@
)
cc_library(
- name= "beast",
+ name = "beast",
deps = [
- "@boost//:beast"
- ]
+ "@boost//:beast",
+ ],
)
cc_library(
diff --git a/interface/common/resdb_txn_accessor.cpp b/interface/common/resdb_txn_accessor.cpp
index 532d432..3b77519 100644
--- a/interface/common/resdb_txn_accessor.cpp
+++ b/interface/common/resdb_txn_accessor.cpp
@@ -59,15 +59,12 @@
ths.push_back(std::thread(
[&](NetChannel* client) {
std::string response_str;
- int ret = -1;
- for (int i = 0; i < 3 && ret < 0; ++i) {
- ret = client->SendRequest(request, Request::TYPE_QUERY);
- if (ret) {
- return;
- }
- client->SetRecvTimeout(1000);
- ret = client->RecvRawMessageStr(&response_str);
+ int ret = client->SendRequest(request, Request::TYPE_QUERY);
+ if (ret) {
+ return;
}
+ client->SetRecvTimeout(1000);
+ ret = client->RecvRawMessageStr(&response_str);
if (ret == 0) {
std::unique_lock<std::mutex> lck(mtx);
recv_count[response_str]++;
@@ -160,11 +157,12 @@
std::string final_str;
std::mutex mtx;
std::condition_variable resp_cv;
+ bool success = false;
std::unique_ptr<NetChannel> client =
GetNetChannel(replicas_[0].ip(), replicas_[0].port());
- LOG(ERROR) << "ip:" << replicas_[0].ip() << " port:" << replicas_[0].port();
+ LOG(INFO) << "ip:" << replicas_[0].ip() << " port:" << replicas_[0].port();
std::string response_str;
int ret = 0;
@@ -175,7 +173,7 @@
}
client->SetRecvTimeout(100000);
ret = client->RecvRawMessageStr(&response_str);
- LOG(ERROR) << "receive str:" << ret << " len:" << response_str.size();
+ LOG(INFO) << "receive str:" << ret << " len:" << response_str.size();
if (ret != 0) {
continue;
}
diff --git a/interface/common/resdb_txn_accessor_test.cpp b/interface/common/resdb_txn_accessor_test.cpp
index e0b4925..39c407b 100644
--- a/interface/common/resdb_txn_accessor_test.cpp
+++ b/interface/common/resdb_txn_accessor_test.cpp
@@ -62,9 +62,11 @@
auto client = std::make_unique<MockNetChannel>(ip, port);
EXPECT_CALL(*client,
SendRequest(EqualsProto(request), Request::TYPE_QUERY, _))
- .Times(AtLeast(1)).WillRepeatedly(Return(0));
+ .Times(AtLeast(1))
+ .WillRepeatedly(Return(0));
EXPECT_CALL(*client, RecvRawMessageStr)
- .Times(AtLeast(1)).WillRepeatedly(Invoke([&](std::string* resp) { return -1; }));
+ .Times(AtLeast(1))
+ .WillRepeatedly(Invoke([&](std::string* resp) { return -1; }));
return client;
}));
absl::StatusOr<std::vector<std::pair<uint64_t, std::string>>> resp =
diff --git a/platform/consensus/ordering/common/algorithm/BUILD b/platform/consensus/ordering/common/algorithm/BUILD
index 9abd871..1ba4e6f 100644
--- a/platform/consensus/ordering/common/algorithm/BUILD
+++ b/platform/consensus/ordering/common/algorithm/BUILD
@@ -9,4 +9,3 @@
"//common/crypto:signature_verifier",
],
)
-
diff --git a/platform/consensus/ordering/common/algorithm/protocol_base.cpp b/platform/consensus/ordering/common/algorithm/protocol_base.cpp
index 3c6c2fc..5419413 100644
--- a/platform/consensus/ordering/common/algorithm/protocol_base.cpp
+++ b/platform/consensus/ordering/common/algorithm/protocol_base.cpp
@@ -5,43 +5,36 @@
namespace resdb {
namespace common {
-ProtocolBase::ProtocolBase(
- int id,
- int f,
- int total_num,
- SingleCallFuncType single_call,
- BroadcastCallFuncType broadcast_call,
- CommitFuncType commit) :
- id_(id),
+ProtocolBase::ProtocolBase(int id, int f, int total_num,
+ SingleCallFuncType single_call,
+ BroadcastCallFuncType broadcast_call,
+ CommitFuncType commit)
+ : id_(id),
f_(f),
total_num_(total_num),
- single_call_(single_call),
- broadcast_call_(broadcast_call),
+ single_call_(single_call),
+ broadcast_call_(broadcast_call),
commit_(commit) {
- stop_ = false;
+ stop_ = false;
}
-ProtocolBase::ProtocolBase( int id, int f, int total_num) : ProtocolBase(id, f, total_num, nullptr, nullptr, nullptr){
+ProtocolBase::ProtocolBase(int id, int f, int total_num)
+ : ProtocolBase(id, f, total_num, nullptr, nullptr, nullptr) {}
-}
+ProtocolBase::~ProtocolBase() { Stop(); }
-ProtocolBase::~ProtocolBase() {
- Stop();
-}
+void ProtocolBase::Stop() { stop_ = true; }
-void ProtocolBase::Stop(){
- stop_ = true;
-}
+bool ProtocolBase::IsStop() { return stop_; }
-bool ProtocolBase::IsStop(){
- return stop_;
-}
-
-int ProtocolBase::SendMessage(int msg_type, const google::protobuf::Message& msg, int node_id) {
+int ProtocolBase::SendMessage(int msg_type,
+ const google::protobuf::Message& msg,
+ int node_id) {
return single_call_(msg_type, msg, node_id);
}
-int ProtocolBase::Broadcast(int msg_type, const google::protobuf::Message& msg) {
+int ProtocolBase::Broadcast(int msg_type,
+ const google::protobuf::Message& msg) {
return broadcast_call_(msg_type, msg);
}
@@ -49,5 +42,5 @@
return commit_(msg);
}
-} // namespace protocol
+} // namespace common
} // namespace resdb
diff --git a/platform/consensus/ordering/common/algorithm/protocol_base.h b/platform/consensus/ordering/common/algorithm/protocol_base.h
index a93be82..621ecd1 100644
--- a/platform/consensus/ordering/common/algorithm/protocol_base.h
+++ b/platform/consensus/ordering/common/algorithm/protocol_base.h
@@ -1,7 +1,9 @@
#pragma once
-#include <functional>
#include <google/protobuf/message.h>
+
+#include <functional>
+
#include "common/crypto/signature_verifier.h"
namespace resdb {
@@ -9,50 +11,52 @@
class ProtocolBase {
public:
- typedef std::function<int(int, const google::protobuf::Message& msg, int)> SingleCallFuncType;
- typedef std::function<int(int, const google::protobuf::Message& msg)> BroadcastCallFuncType;
- typedef std::function<int(const google::protobuf::Message& msg)> CommitFuncType;
+ typedef std::function<int(int, const google::protobuf::Message& msg, int)>
+ SingleCallFuncType;
+ typedef std::function<int(int, const google::protobuf::Message& msg)>
+ BroadcastCallFuncType;
+ typedef std::function<int(const google::protobuf::Message& msg)>
+ CommitFuncType;
- ProtocolBase(
- int id,
- int f,
- int total_num,
- SingleCallFuncType single_call,
- BroadcastCallFuncType broadcast_call,
- CommitFuncType commit
- );
+ ProtocolBase(int id, int f, int total_num, SingleCallFuncType single_call,
+ BroadcastCallFuncType broadcast_call, CommitFuncType commit);
- ProtocolBase( int id, int f, int total_num);
-
+ ProtocolBase(int id, int f, int total_num);
virtual ~ProtocolBase();
void Stop();
- inline
- void SetSingleCallFunc(SingleCallFuncType single_call) { single_call_ = single_call; }
-
- inline
- void SetBroadcastCallFunc(BroadcastCallFuncType broadcast_call) { broadcast_call_ = broadcast_call; }
+ inline void SetSingleCallFunc(SingleCallFuncType single_call) {
+ single_call_ = single_call;
+ }
- inline
- void SetCommitFunc(CommitFuncType commit_func) { commit_ = commit_func; }
+ inline void SetBroadcastCallFunc(BroadcastCallFuncType broadcast_call) {
+ broadcast_call_ = broadcast_call;
+ }
- inline
- void SetSignatureVerifier(SignatureVerifier* verifier) { verifier_ = verifier;}
+ inline void SetCommitFunc(CommitFuncType commit_func) {
+ commit_ = commit_func;
+ }
- protected:
- int SendMessage(int msg_type, const google::protobuf::Message& msg, int node_id);
- int Broadcast(int msg_type, const google::protobuf::Message& msg);
- int Commit(const google::protobuf::Message& msg);
+ inline void SetSignatureVerifier(SignatureVerifier* verifier) {
+ verifier_ = verifier;
+ }
- bool IsStop();
+ protected:
+ int SendMessage(int msg_type, const google::protobuf::Message& msg,
+ int node_id);
+ int Broadcast(int msg_type, const google::protobuf::Message& msg);
+ int Commit(const google::protobuf::Message& msg);
+
+ bool IsStop();
protected:
int id_;
int f_;
int total_num_;
- std::function<int(int, const google::protobuf::Message& msg, int)> single_call_;
+ std::function<int(int, const google::protobuf::Message& msg, int)>
+ single_call_;
std::function<int(int, const google::protobuf::Message& msg)> broadcast_call_;
std::function<int(const google::protobuf::Message& msg)> commit_;
std::atomic<bool> stop_;
@@ -60,5 +64,5 @@
SignatureVerifier* verifier_;
};
-} // namespace protocol
+} // namespace common
} // namespace resdb
diff --git a/platform/consensus/ordering/common/framework/BUILD b/platform/consensus/ordering/common/framework/BUILD
index 82e03a0..e4a5382 100644
--- a/platform/consensus/ordering/common/framework/BUILD
+++ b/platform/consensus/ordering/common/framework/BUILD
@@ -26,7 +26,6 @@
],
)
-
cc_library(
name = "response_manager",
srcs = ["response_manager.cpp"],
diff --git a/platform/consensus/ordering/common/framework/consensus.cpp b/platform/consensus/ordering/common/framework/consensus.cpp
index 683bce2..caea1a4 100644
--- a/platform/consensus/ordering/common/framework/consensus.cpp
+++ b/platform/consensus/ordering/common/framework/consensus.cpp
@@ -46,56 +46,51 @@
nullptr, std::move(executor))) {
LOG(INFO) << "is running is performance mode:"
<< config_.IsPerformanceRunning();
- is_stop_ = false;
+ is_stop_ = false;
global_stats_ = Stats::GetGlobalStats();
}
-void Consensus::Init(){
- if(performance_manager_ == nullptr){
- performance_manager_ =
- config_.IsPerformanceRunning()
- ? std::make_unique<PerformanceManager>(
- config_, GetBroadCastClient(), GetSignatureVerifier())
- : nullptr;
+void Consensus::Init() {
+ if (performance_manager_ == nullptr) {
+ performance_manager_ =
+ config_.IsPerformanceRunning()
+ ? std::make_unique<PerformanceManager>(
+ config_, GetBroadCastClient(), GetSignatureVerifier())
+ : nullptr;
}
- if(response_manager_ == nullptr){
- response_manager_ =
- !config_.IsPerformanceRunning()
- ? std::make_unique<ResponseManager>(config_, GetBroadCastClient(),
- GetSignatureVerifier())
- : nullptr;
+ if (response_manager_ == nullptr) {
+ response_manager_ =
+ !config_.IsPerformanceRunning()
+ ? std::make_unique<ResponseManager>(config_, GetBroadCastClient(),
+ GetSignatureVerifier())
+ : nullptr;
}
}
-void Consensus::InitProtocol(ProtocolBase * protocol){
+void Consensus::InitProtocol(ProtocolBase* protocol) {
protocol->SetSingleCallFunc(
[&](int type, const google::protobuf::Message& msg, int node_id) {
- return SendMsg(type, msg, node_id);
+ return SendMsg(type, msg, node_id);
});
protocol->SetBroadcastCallFunc(
[&](int type, const google::protobuf::Message& msg) {
- return Broadcast(type, msg);
+ return Broadcast(type, msg);
});
protocol->SetCommitFunc(
- [&](const google::protobuf::Message& msg) {
- return CommitMsg(msg);
- });
+ [&](const google::protobuf::Message& msg) { return CommitMsg(msg); });
}
-Consensus::~Consensus(){
- is_stop_ = true;
-}
+Consensus::~Consensus() { is_stop_ = true; }
-void Consensus::SetPerformanceManager(std::unique_ptr<PerformanceManager> performance_manager){
+void Consensus::SetPerformanceManager(
+ std::unique_ptr<PerformanceManager> performance_manager) {
performance_manager_ = std::move(performance_manager);
}
-bool Consensus::IsStop(){
- return is_stop_;
-}
+bool Consensus::IsStop() { return is_stop_; }
void Consensus::SetupPerformanceDataFunc(std::function<std::string()> func) {
performance_manager_->SetDataFunc(func);
@@ -131,9 +126,7 @@
return config_.GetReplicaInfos();
}
-int Consensus::CommitMsg(const google::protobuf::Message &txn) {
- return 0;
-}
+int Consensus::CommitMsg(const google::protobuf::Message& txn) { return 0; }
// The implementation of PBFT.
int Consensus::ConsensusCommit(std::unique_ptr<Context> context,
diff --git a/platform/consensus/ordering/common/framework/consensus.h b/platform/consensus/ordering/common/framework/consensus.h
index 881dc72..bb06592 100644
--- a/platform/consensus/ordering/common/framework/consensus.h
+++ b/platform/consensus/ordering/common/framework/consensus.h
@@ -49,12 +49,12 @@
void SetCommunicator(ReplicaCommunicator* replica_communicator);
- void InitProtocol(ProtocolBase * protocol);
+ void InitProtocol(ProtocolBase* protocol);
- protected:
- virtual int ProcessCustomConsensus(std::unique_ptr<Request> request);
- virtual int ProcessNewTransaction(std::unique_ptr<Request> request);
- virtual int CommitMsg(const google::protobuf::Message& msg);
+ protected:
+ virtual int ProcessCustomConsensus(std::unique_ptr<Request> request);
+ virtual int ProcessNewTransaction(std::unique_ptr<Request> request);
+ virtual int CommitMsg(const google::protobuf::Message& msg);
protected:
int SendMsg(int type, const google::protobuf::Message& msg, int node_id);
@@ -65,7 +65,8 @@
protected:
void Init();
- void SetPerformanceManager(std::unique_ptr<PerformanceManager> performance_manger);
+ void SetPerformanceManager(
+ std::unique_ptr<PerformanceManager> performance_manger);
protected:
ReplicaCommunicator* replica_communicator_;
diff --git a/platform/consensus/ordering/common/framework/performance_manager.cpp b/platform/consensus/ordering/common/framework/performance_manager.cpp
index 089f3bd..b386171 100644
--- a/platform/consensus/ordering/common/framework/performance_manager.cpp
+++ b/platform/consensus/ordering/common/framework/performance_manager.cpp
@@ -37,7 +37,7 @@
PerformanceManager::PerformanceManager(
const ResDBConfig& config, ReplicaCommunicator* replica_communicator,
SignatureVerifier* verifier)
- : config_(config),
+ : config_(config),
replica_communicator_(replica_communicator),
batch_queue_("user request"),
verifier_(verifier) {
@@ -91,7 +91,6 @@
}
eval_started_ = true;
for (int i = 0; i < 100000000; ++i) {
- // for (int i = 0; i < 60000000000; ++i) {
std::unique_ptr<QueueItem> queue_item = std::make_unique<QueueItem>();
queue_item->context = nullptr;
queue_item->user_request = GenerateUserRequest();
@@ -119,51 +118,49 @@
return 0;
}
- //LOG(INFO) << "get response:" << request->seq() << " sender:"<<request->sender_id();
+ // LOG(INFO) << "get response:" << request->seq() << "
+ // sender:"<<request->sender_id();
std::unique_ptr<BatchUserResponse> batch_response = nullptr;
- CollectorResultCode ret =
- AddResponseMsg(std::move(request), [&](std::unique_ptr<BatchUserResponse> request) {
+ CollectorResultCode ret = AddResponseMsg(
+ std::move(request), [&](std::unique_ptr<BatchUserResponse> request) {
batch_response = std::move(request);
return;
});
if (ret == CollectorResultCode::STATE_CHANGED) {
assert(batch_response);
- SendResponseToClient(*batch_response);
+ SendResponseToClient(*batch_response);
}
return ret == CollectorResultCode::INVALID ? -2 : 0;
}
CollectorResultCode PerformanceManager::AddResponseMsg(
std::unique_ptr<Request> request,
- std::function<void(std::unique_ptr<BatchUserResponse>)> response_call_back) {
+ std::function<void(std::unique_ptr<BatchUserResponse>)>
+ response_call_back) {
if (request == nullptr) {
return CollectorResultCode::INVALID;
}
- //uint64_t seq = request->seq();
-
- std::unique_ptr<BatchUserResponse> batch_response = std::make_unique<BatchUserResponse>();
+ std::unique_ptr<BatchUserResponse> batch_response =
+ std::make_unique<BatchUserResponse>();
if (!batch_response->ParseFromString(request->data())) {
- LOG(ERROR) << "parse response fail:"<<request->data().size()
- <<" seq:"<<request->seq(); return CollectorResultCode::INVALID;
+ LOG(ERROR) << "parse response fail:" << request->data().size()
+ << " seq:" << request->seq();
+ return CollectorResultCode::INVALID;
}
uint64_t seq = batch_response->local_id();
- //LOG(ERROR)<<"receive seq:"<<seq;
bool done = false;
{
int idx = seq % response_set_size_;
std::unique_lock<std::mutex> lk(response_lock_[idx]);
if (response_[idx].find(seq) == response_[idx].end()) {
- //LOG(ERROR)<<"has done local seq:"<<seq<<" global seq:"<<request->seq();
return CollectorResultCode::OK;
}
response_[idx][seq]++;
- //LOG(ERROR)<<"get seq :"<<request->seq()<<" local id:"<<seq<<" num:"<<response_[idx][seq]<<" send:"<<send_num_;
if (response_[idx][seq] >= config_.GetMinClientReceiveNum()) {
- //LOG(ERROR)<<"get seq :"<<request->seq()<<" local id:"<<seq<<" num:"<<response_[idx][seq]<<" done:"<<send_num_;
response_[idx].erase(response_[idx].find(seq));
done = true;
}
@@ -180,11 +177,11 @@
uint64_t create_time = batch_response.createtime();
if (create_time > 0) {
uint64_t run_time = GetCurrentTime() - create_time;
- LOG(ERROR)<<"receive current:"<<GetCurrentTime()<<" create time:"<<create_time<<" run time:"<<run_time<<" local id:"<<batch_response.local_id();
+ LOG(ERROR) << "receive current:" << GetCurrentTime()
+ << " create time:" << create_time << " run time:" << run_time
+ << " local id:" << batch_response.local_id();
global_stats_->AddLatency(run_time);
- } else {
}
- //send_num_-=10;
send_num_--;
}
@@ -198,7 +195,6 @@
bool start = false;
while (!stop_) {
if (send_num_ > config_.GetMaxProcessTxn()) {
- // LOG(ERROR)<<"wait send num:"<<send_num_;
usleep(100000);
continue;
}
@@ -206,8 +202,8 @@
std::unique_ptr<QueueItem> item =
batch_queue_.Pop(config_.ClientBatchWaitTimeMS());
if (item == nullptr) {
- if(start){
- LOG(ERROR)<<"no data";
+ if (start) {
+ LOG(ERROR) << "no data";
}
continue;
}
@@ -217,9 +213,7 @@
}
}
start = true;
- for(int i = 0; i < 1;++i){
- int ret = DoBatch(batch_req);
- }
+ DoBatch(batch_req);
batch_req.clear();
}
return 0;
@@ -269,7 +263,6 @@
global_stats_->BroadCastMsg();
send_num_++;
sum_ += batch_req.size();
- //LOG(ERROR)<<"send num:"<<send_num_<<" total num:"<<total_num_<<" sum:"<<sum_<<" to:"<<GetPrimary();
if (total_num_++ == 1000000) {
stop_ = true;
LOG(WARNING) << "total num is done:" << total_num_;
@@ -281,7 +274,7 @@
return 0;
}
-void PerformanceManager::SendMessage(const Request& request){
+void PerformanceManager::SendMessage(const Request& request) {
replica_communicator_->SendMessage(request, GetPrimary());
}
diff --git a/platform/consensus/ordering/common/framework/performance_manager.h b/platform/consensus/ordering/common/framework/performance_manager.h
index 5a874ba..7cacc84 100644
--- a/platform/consensus/ordering/common/framework/performance_manager.h
+++ b/platform/consensus/ordering/common/framework/performance_manager.h
@@ -50,7 +50,7 @@
std::unique_ptr<Request> request);
void SetDataFunc(std::function<std::string()> func);
- protected:
+ protected:
virtual void SendMessage(const Request& request);
private:
diff --git a/platform/consensus/ordering/common/framework/response_manager.cpp b/platform/consensus/ordering/common/framework/response_manager.cpp
index ae6b55c..9838514 100644
--- a/platform/consensus/ordering/common/framework/response_manager.cpp
+++ b/platform/consensus/ordering/common/framework/response_manager.cpp
@@ -238,5 +238,5 @@
return 0;
}
-} // namespace common
+} // namespace common
} // namespace resdb
diff --git a/platform/consensus/ordering/common/framework/response_manager.h b/platform/consensus/ordering/common/framework/response_manager.h
index 1c70439..93f7ed9 100644
--- a/platform/consensus/ordering/common/framework/response_manager.h
+++ b/platform/consensus/ordering/common/framework/response_manager.h
@@ -32,7 +32,7 @@
#include "platform/statistic/stats.h"
namespace resdb {
-namespace common {
+namespace common {
class ResponseManager {
public:
@@ -81,5 +81,5 @@
std::mutex response_lock_[response_set_size_];
};
-} // common
+} // namespace common
} // namespace resdb
diff --git a/platform/consensus/ordering/pbft/checkpoint_manager.cpp b/platform/consensus/ordering/pbft/checkpoint_manager.cpp
index ef74830..a5a24ca 100644
--- a/platform/consensus/ordering/pbft/checkpoint_manager.cpp
+++ b/platform/consensus/ordering/pbft/checkpoint_manager.cpp
@@ -312,11 +312,15 @@
last_hash_ = GetHash(last_hash_, request->hash());
last_seq_++;
}
+ bool is_recovery = request->is_recovery();
txn_db_->Put(std::move(request));
if (current_seq == last_ckpt_seq + water_mark) {
last_ckpt_seq = current_seq;
- BroadcastCheckPoint(last_ckpt_seq, last_hash_, stable_hashs, stable_seqs);
+ if (!is_recovery) {
+ BroadcastCheckPoint(last_ckpt_seq, last_hash_, stable_hashs,
+ stable_seqs);
+ }
}
}
return;
diff --git a/platform/consensus/ordering/pbft/commitment.cpp b/platform/consensus/ordering/pbft/commitment.cpp
index 081c11d..4a1aacc 100644
--- a/platform/consensus/ordering/pbft/commitment.cpp
+++ b/platform/consensus/ordering/pbft/commitment.cpp
@@ -41,7 +41,10 @@
duplicate_manager_ = std::make_unique<DuplicateManager>(config);
message_manager_->SetDuplicateManager(duplicate_manager_.get());
- global_stats_->SetProps(config_.GetSelfInfo().id(), config_.GetSelfInfo().ip(), config_.GetSelfInfo().port(), config_.GetConfigData().enable_resview(), config_.GetConfigData().enable_faulty_switch());
+ global_stats_->SetProps(
+ config_.GetSelfInfo().id(), config_.GetSelfInfo().ip(),
+ config_.GetSelfInfo().port(), config_.GetConfigData().enable_resview(),
+ config_.GetConfigData().enable_faulty_switch());
global_stats_->SetPrimaryId(message_manager_->GetCurrentPrimary());
}
@@ -81,7 +84,8 @@
// << message_manager_->GetCurrentPrimary()
// << " seq:" << user_request->seq()
// << " hash:" << user_request->hash();
- LOG(ERROR)<<"NOT PRIMARY, Primary is "<<message_manager_->GetCurrentPrimary();
+ LOG(INFO) << "NOT PRIMARY, Primary is "
+ << message_manager_->GetCurrentPrimary();
replica_communicator_->SendMessage(*user_request,
message_manager_->GetCurrentPrimary());
{
@@ -117,7 +121,6 @@
global_stats_->IncClientRequest();
if (duplicate_manager_->CheckAndAddProposed(user_request->hash())) {
- LOG(ERROR) << "duplicate check fail:";
return -2;
}
auto seq = message_manager_->AssignNextSeq();
@@ -155,14 +158,20 @@
// TODO check whether the sender is the primary.
int Commitment::ProcessProposeMsg(std::unique_ptr<Context> context,
std::unique_ptr<Request> request) {
- if (global_stats_->IsFaulty() || context == nullptr || context->signature.signature().empty()) {
+ if (global_stats_->IsFaulty() || context == nullptr ||
+ context->signature.signature().empty()) {
LOG(ERROR) << "user request doesn't contain signature, reject";
return -2;
}
if (request->is_recovery()) {
- if (static_cast<int64_t>(request->seq()) >=
- message_manager_->GetNextSeq()) {
+ if (message_manager_->GetNextSeq() == 0 ||
+ request->seq() == message_manager_->GetNextSeq()) {
message_manager_->SetNextSeq(request->seq() + 1);
+ } else {
+ LOG(ERROR) << " recovery request not valid:"
+ << " current seq:" << message_manager_->GetNextSeq()
+ << " data seq:" << request->seq();
+ return 0;
}
return message_manager_->AddConsensusMsg(context->signature,
std::move(request));
@@ -187,7 +196,7 @@
LOG(ERROR) << " check by the user func fail";
return -2;
}
- //global_stats_->GetTransactionDetails(std::move(request));
+ // global_stats_->GetTransactionDetails(std::move(request));
BatchUserRequest batch_request;
batch_request.ParseFromString(request->data());
batch_request.clear_createtime();
@@ -286,8 +295,8 @@
CollectorResultCode ret =
message_manager_->AddConsensusMsg(context->signature, std::move(request));
if (ret == CollectorResultCode::STATE_CHANGED) {
- //LOG(ERROR)<<request->data().size();
- //global_stats_->GetTransactionDetails(request->data());
+ // LOG(ERROR)<<request->data().size();
+ // global_stats_->GetTransactionDetails(request->data());
global_stats_->RecordStateTime("commit");
}
return ret == CollectorResultCode::INVALID ? -2 : 0;
diff --git a/platform/consensus/ordering/pbft/consensus_manager_pbft.cpp b/platform/consensus/ordering/pbft/consensus_manager_pbft.cpp
index 09feae4..72103ea 100644
--- a/platform/consensus/ordering/pbft/consensus_manager_pbft.cpp
+++ b/platform/consensus/ordering/pbft/consensus_manager_pbft.cpp
@@ -208,7 +208,7 @@
int ret = commitment_->ProcessNewRequest(std::move(context),
std::move(request));
if (ret == -3) {
- LOG(ERROR)<<"BAD RETURN";
+ LOG(ERROR) << "BAD RETURN";
std::pair<std::unique_ptr<Context>, std::unique_ptr<Request>>
request_complained;
{
diff --git a/platform/consensus/ordering/pbft/message_manager.cpp b/platform/consensus/ordering/pbft/message_manager.cpp
index 37d3b5d..cc5e187 100644
--- a/platform/consensus/ordering/pbft/message_manager.cpp
+++ b/platform/consensus/ordering/pbft/message_manager.cpp
@@ -39,6 +39,9 @@
[&](std::unique_ptr<Request> request,
std::unique_ptr<BatchUserResponse> resp_msg) {
if (request->is_recovery()) {
+ if (checkpoint_manager_) {
+ checkpoint_manager_->AddCommitData(std::move(request));
+ }
return;
}
resp_msg->set_proxy_id(request->proxy_id());
@@ -231,8 +234,6 @@
}
int MessageManager::GetReplicaState(ReplicaState* state) {
- state->set_view(GetCurrentView());
- *state->mutable_replica_info() = config_.GetSelfInfo();
*state->mutable_replica_config() = config_.GetConfigData();
return 0;
}
diff --git a/platform/consensus/ordering/pbft/performance_manager.cpp b/platform/consensus/ordering/pbft/performance_manager.cpp
index a5ba979..26d0ebc 100644
--- a/platform/consensus/ordering/pbft/performance_manager.cpp
+++ b/platform/consensus/ordering/pbft/performance_manager.cpp
@@ -191,8 +191,17 @@
return CollectorResultCode::INVALID;
}
+ std::unique_ptr<BatchUserResponse> batch_response =
+ std::make_unique<BatchUserResponse>();
+ if (!batch_response->ParseFromString(request->data())) {
+ LOG(ERROR) << "parse response fail:" << request->data().size()
+ << " seq:" << request->seq();
+ return CollectorResultCode::INVALID;
+ }
+
+ uint64_t seq = batch_response->local_id();
+
int type = request->type();
- uint64_t seq = request->seq();
int resp_received_count = 0;
int ret = collector_pool_->GetCollector(seq)->AddRequest(
std::move(request), signature, false,
diff --git a/platform/consensus/ordering/pbft/query.cpp b/platform/consensus/ordering/pbft/query.cpp
index 72bb4ed..c2d6027 100644
--- a/platform/consensus/ordering/pbft/query.cpp
+++ b/platform/consensus/ordering/pbft/query.cpp
@@ -50,24 +50,65 @@
int Query::ProcessQuery(std::unique_ptr<Context> context,
std::unique_ptr<Request> request) {
+ if (config_.GetPublicKeyCertificateInfo()
+ .public_key()
+ .public_key_info()
+ .type() == CertificateKeyInfo::CLIENT) {
+ auto find_primary = [&]() {
+ auto config_data = config_.GetConfigData();
+ for (const auto& r : config_data.region()) {
+ for (const auto& replica : r.replica_info()) {
+ if (replica.id() == 1) {
+ return replica;
+ }
+ }
+ }
+ };
+ ReplicaInfo primary = find_primary();
+ std::string ip = primary.ip();
+ int port = primary.port();
+
+ LOG(ERROR) << "redirect to primary:" << ip << " port:" << port;
+ auto client = std::make_unique<NetChannel>(ip, port);
+ if (client->SendRawMessage(*request) == 0) {
+ QueryResponse resp;
+ if (client->RecvRawMessage(&resp) == 0) {
+ if (context != nullptr && context->client != nullptr) {
+ LOG(ERROR) << "send response from primary:"
+ << resp.transactions_size();
+ int ret = context->client->SendRawMessage(resp);
+ if (ret) {
+ LOG(ERROR) << "send resp fail ret:" << ret;
+ }
+ }
+ }
+ }
+ return 0;
+ }
+
QueryRequest query;
if (!query.ParseFromString(request->data())) {
LOG(ERROR) << "parse data fail";
return -2;
}
- // LOG(ERROR) << "request:" << query.DebugString();
QueryResponse response;
- for (uint64_t i = query.min_seq(); i <= query.max_seq(); ++i) {
- Request* ret_request = message_manager_->GetRequest(i);
- if (ret_request == nullptr) {
- break;
+ if (query.max_seq() == 0 && query.min_seq() == 0) {
+ uint64_t mseq = message_manager_->GetNextSeq();
+ response.set_max_seq(mseq - 1);
+ LOG(ERROR) << "get max seq:" << mseq;
+ } else {
+ for (uint64_t i = query.min_seq(); i <= query.max_seq(); ++i) {
+ Request* ret_request = message_manager_->GetRequest(i);
+ if (ret_request == nullptr) {
+ break;
+ }
+ Request* txn = response.add_transactions();
+ txn->set_data(ret_request->data());
+ txn->set_hash(ret_request->hash());
+ txn->set_seq(ret_request->seq());
+ txn->set_proxy_id(ret_request->proxy_id());
}
- Request* txn = response.add_transactions();
- txn->set_data(ret_request->data());
- txn->set_hash(ret_request->hash());
- txn->set_seq(ret_request->seq());
- txn->set_proxy_id(ret_request->proxy_id());
}
if (context != nullptr && context->client != nullptr) {
diff --git a/platform/consensus/ordering/pbft/query_test.cpp b/platform/consensus/ordering/pbft/query_test.cpp
index 4cc1a11..871dfe5 100644
--- a/platform/consensus/ordering/pbft/query_test.cpp
+++ b/platform/consensus/ordering/pbft/query_test.cpp
@@ -152,10 +152,12 @@
TEST_F(QueryTest, QueryState) {
ReplicaState replica_state;
- replica_state.set_view(1);
- replica_state.mutable_replica_info()->set_id(1);
- replica_state.mutable_replica_info()->set_ip("127.0.0.1");
- replica_state.mutable_replica_info()->set_port(1234);
+ replica_state.mutable_replica_config()->set_view_change_timeout_ms(100);
+ replica_state.mutable_replica_config()->set_client_batch_num(100);
+ replica_state.mutable_replica_config()->set_worker_num(64);
+ replica_state.mutable_replica_config()->set_input_worker_num(1);
+ replica_state.mutable_replica_config()->set_output_worker_num(1);
+ replica_state.mutable_replica_config()->set_tcp_batch_num(100);
std::unique_ptr<MockNetChannel> channel =
std::make_unique<MockNetChannel>("127.0.0.1", 0);
diff --git a/platform/consensus/ordering/pbft/response_manager.cpp b/platform/consensus/ordering/pbft/response_manager.cpp
index 6a7162d..f490c00 100644
--- a/platform/consensus/ordering/pbft/response_manager.cpp
+++ b/platform/consensus/ordering/pbft/response_manager.cpp
@@ -26,7 +26,7 @@
namespace resdb {
ResponseClientTimeout::ResponseClientTimeout(std::string hash_,
- uint64_t time_) {
+ uint64_t time_) {
this->hash = hash_;
this->timeout_time = time_;
}
@@ -57,6 +57,7 @@
verifier_(verifier) {
stop_ = false;
local_id_ = 1;
+ timeout_length_ = 5000000;
if (config_.GetPublicKeyCertificateInfo()
.public_key()
@@ -65,9 +66,9 @@
config_.IsTestMode()) {
user_req_thread_ = std::thread(&ResponseManager::BatchProposeMsg, this);
}
- if(config_.GetConfigData().enable_viewchange()){
+ if (config_.GetConfigData().enable_viewchange()) {
checking_timeout_thread_ =
- std::thread(&ResponseManager::MonitoringClientTimeOut, this);
+ std::thread(&ResponseManager::MonitoringClientTimeOut, this);
}
global_stats_ = Stats::GetGlobalStats();
send_num_ = 0;
@@ -78,7 +79,7 @@
if (user_req_thread_.joinable()) {
user_req_thread_.join();
}
- if(checking_timeout_thread_.joinable()){
+ if (checking_timeout_thread_.joinable()) {
checking_timeout_thread_.join();
}
}
@@ -172,8 +173,21 @@
return CollectorResultCode::INVALID;
}
+ std::string hash = request->hash();
+
+ std::unique_ptr<BatchUserResponse> batch_response =
+ std::make_unique<BatchUserResponse>();
+ if (!batch_response->ParseFromString(request->data())) {
+ LOG(ERROR) << "parse response fail:" << request->data().size()
+ << " seq:" << request->seq();
+ RemoveWaitingResponseRequest(hash);
+ return CollectorResultCode::INVALID;
+ }
+
+ uint64_t seq = batch_response->local_id();
+ request->set_seq(seq);
+
int type = request->type();
- uint64_t seq = request->seq();
int resp_received_count = 0;
int ret = collector_pool_->GetCollector(seq)->AddRequest(
std::move(request), signature, false,
@@ -190,6 +204,7 @@
}
if (resp_received_count > 0) {
collector_pool_->Update(seq);
+ RemoveWaitingResponseRequest(hash);
return CollectorResultCode::STATE_CHANGED;
}
return CollectorResultCode::OK;
@@ -295,7 +310,8 @@
if (!config_.IsPerformanceRunning()) {
LOG(ERROR) << "add context list:" << new_request->seq()
- << " list size:" << context_list.size();
+ << " list size:" << context_list.size()
+ << " local_id:" << local_id_;
batch_request.set_local_id(local_id_);
int ret = AddContextList(std::move(context_list), local_id_++);
if (ret != 0) {
@@ -318,13 +334,10 @@
batch_request.SerializeToString(new_request->mutable_data());
new_request->set_hash(SignatureVerifier::CalculateHash(new_request->data()));
new_request->set_proxy_id(config_.GetSelfInfo().id());
- /*for(int i=1; i<=4; i++){
- replica_communicator_->SendMessage(*new_request, i);
- }*/
replica_communicator_->SendMessage(*new_request, GetPrimary());
send_num_++;
- LOG(INFO) << "send msg to primary:" << GetPrimary()
- << " batch size:" << batch_req.size();
+ // LOG(INFO) << "send msg to primary:" << GetPrimary()
+ // << " batch size:" << batch_req.size();
AddWaitingResponseRequest(std::move(new_request));
return 0;
}
@@ -335,16 +348,16 @@
return;
}
pm_lock_.lock();
- uint64_t time = GetCurrentTime() + this->timeout_length_;
- client_timeout_min_heap_.push(
- ResponseClientTimeout(request->hash(), time));
+ assert(timeout_length_ > 0);
+ uint64_t time = GetCurrentTime() + timeout_length_;
+ client_timeout_min_heap_.push(ResponseClientTimeout(request->hash(), time));
waiting_response_batches_.insert(
make_pair(request->hash(), std::move(request)));
pm_lock_.unlock();
sem_post(&request_sent_signal_);
}
-void ResponseManager::RemoveWaitingResponseRequest(std::string hash) {
+void ResponseManager::RemoveWaitingResponseRequest(const std::string& hash) {
if (!config_.GetConfigData().enable_viewchange()) {
return;
}
@@ -363,8 +376,7 @@
return value;
}
-std::unique_ptr<Request> ResponseManager::GetTimeOutRequest(
- std::string hash) {
+std::unique_ptr<Request> ResponseManager::GetTimeOutRequest(std::string hash) {
pm_lock_.lock();
auto value = std::move(waiting_response_batches_.find(hash)->second);
pm_lock_.unlock();
@@ -390,7 +402,6 @@
if (CheckTimeOut(client_timeout.hash)) {
auto request = GetTimeOutRequest(client_timeout.hash);
if (request) {
- LOG(ERROR) << "Client Request Timeout " << client_timeout.hash;
replica_communicator_->BroadCast(*request);
}
}
diff --git a/platform/consensus/ordering/pbft/response_manager.h b/platform/consensus/ordering/pbft/response_manager.h
index b238e0d..36412ea 100644
--- a/platform/consensus/ordering/pbft/response_manager.h
+++ b/platform/consensus/ordering/pbft/response_manager.h
@@ -77,7 +77,7 @@
int GetPrimary();
void AddWaitingResponseRequest(std::unique_ptr<Request> request);
- void RemoveWaitingResponseRequest(std::string hash);
+ void RemoveWaitingResponseRequest(const std::string& hash);
bool CheckTimeOut(std::string hash);
void ResponseTimer(std::string hash);
void MonitoringClientTimeOut();
diff --git a/platform/consensus/ordering/pbft/viewchange_manager.cpp b/platform/consensus/ordering/pbft/viewchange_manager.cpp
index 53060c2..3084c7a 100644
--- a/platform/consensus/ordering/pbft/viewchange_manager.cpp
+++ b/platform/consensus/ordering/pbft/viewchange_manager.cpp
@@ -109,7 +109,7 @@
return;
}
started_ = true;
- LOG(ERROR)<<"MAYSTART";
+ LOG(ERROR) << "MAYSTART";
if (config_.GetPublicKeyCertificateInfo()
.public_key()
@@ -149,7 +149,7 @@
bool ViewChangeManager::ChangeStatue(ViewChangeStatus status) {
if (status == ViewChangeStatus::READY_VIEW_CHANGE) {
if (status_ != ViewChangeStatus::READY_VIEW_CHANGE) {
- LOG(ERROR)<<"CHANGE STATUS";
+ LOG(ERROR) << "CHANGE STATUS";
status_ = status;
}
} else {
@@ -228,7 +228,7 @@
config_.GetReplicaInfos()[(view_number - 1) % replicas.size()].id();
system_info_->SetPrimary(id);
global_stats_->ChangePrimary(id);
- LOG(ERROR)<<"View Change Happened";
+ LOG(ERROR) << "View Change Happened";
}
std::vector<std::unique_ptr<Request>> ViewChangeManager::GetPrepareMsg(
@@ -509,7 +509,7 @@
}
void ViewChangeManager::AddComplaintTimer(uint64_t proxy_id, std::string hash) {
- LOG(ERROR)<<"ADDING COMPLAINT";
+ LOG(ERROR) << "ADDING COMPLAINT";
std::lock_guard<std::mutex> lk(vc_mutex_);
if (complaining_clients_.count(proxy_id) == 0) {
complaining_clients_[proxy_id].set_proxy_id(proxy_id);
diff --git a/platform/consensus/ordering/poe/algorithm/BUILD b/platform/consensus/ordering/poe/algorithm/BUILD
index 357f56d..335cbca 100644
--- a/platform/consensus/ordering/poe/algorithm/BUILD
+++ b/platform/consensus/ordering/poe/algorithm/BUILD
@@ -5,11 +5,11 @@
srcs = ["poe.cpp"],
hdrs = ["poe.h"],
deps = [
- "//platform/statistic:stats",
"//common:comm",
- "//platform/consensus/ordering/poe/proto:proposal_cc_proto",
"//common/crypto:signature_verifier",
- "//platform/consensus/ordering/common/algorithm:protocol_base",
"//platform/common/queue:lock_free_queue",
+ "//platform/consensus/ordering/common/algorithm:protocol_base",
+ "//platform/consensus/ordering/poe/proto:proposal_cc_proto",
+ "//platform/statistic:stats",
],
)
diff --git a/platform/consensus/ordering/poe/algorithm/poe.cpp b/platform/consensus/ordering/poe/algorithm/poe.cpp
index 8a9b3a3..6aa9d69 100644
--- a/platform/consensus/ordering/poe/algorithm/poe.cpp
+++ b/platform/consensus/ordering/poe/algorithm/poe.cpp
@@ -10,7 +10,6 @@
PoE::PoE(int id, int f, int total_num, SignatureVerifier* verifier)
: ProtocolBase(id, f, total_num), verifier_(verifier) {
-
LOG(ERROR) << "get proposal graph";
id_ = id;
total_num_ = total_num;
@@ -19,9 +18,7 @@
seq_ = 0;
}
-PoE::~PoE() {
- is_stop_ = true;
-}
+PoE::~PoE() { is_stop_ = true; }
bool PoE::IsStop() { return is_stop_; }
@@ -41,7 +38,7 @@
int proposer = txn->proposer();
{
std::unique_lock<std::mutex> lk(mutex_);
- data_[txn->hash()]=std::move(txn);
+ data_[txn->hash()] = std::move(txn);
}
Proposal proposal;
@@ -58,14 +55,14 @@
std::unique_lock<std::mutex> lk(mutex_);
received_[proposal->hash()].insert(proposal->proposer());
auto it = data_.find(proposal->hash());
- if(it != data_.end()){
- if(received_[proposal->hash()].size()>=2*f_+1){
+ if (it != data_.end()) {
+ if (received_[proposal->hash()].size() >= 2 * f_ + 1) {
txn = std::move(it->second);
data_.erase(it);
}
}
}
- if(txn != nullptr){
+ if (txn != nullptr) {
commit_(*txn);
}
return true;
diff --git a/platform/consensus/ordering/poe/algorithm/poe.h b/platform/consensus/ordering/poe/algorithm/poe.h
index 20cc71a..65f7d01 100644
--- a/platform/consensus/ordering/poe/algorithm/poe.h
+++ b/platform/consensus/ordering/poe/algorithm/poe.h
@@ -13,7 +13,7 @@
namespace resdb {
namespace poe {
-class PoE: public common::ProtocolBase {
+class PoE : public common::ProtocolBase {
public:
PoE(int id, int f, int total_num, SignatureVerifier* verifier);
~PoE();
@@ -31,10 +31,10 @@
std::map<std::string, std::unique_ptr<Transaction> > data_;
int64_t seq_;
- bool is_stop_;
+ bool is_stop_;
SignatureVerifier* verifier_;
Stats* global_stats_;
};
-} // namespace cassandra
+} // namespace poe
} // namespace resdb
diff --git a/platform/consensus/ordering/poe/framework/BUILD b/platform/consensus/ordering/poe/framework/BUILD
index 7030d2a..833a121 100644
--- a/platform/consensus/ordering/poe/framework/BUILD
+++ b/platform/consensus/ordering/poe/framework/BUILD
@@ -13,4 +13,3 @@
"//platform/consensus/ordering/poe/algorithm:poe",
],
)
-
diff --git a/platform/consensus/ordering/poe/framework/consensus.cpp b/platform/consensus/ordering/poe/framework/consensus.cpp
index b401ada..21763d4 100644
--- a/platform/consensus/ordering/poe/framework/consensus.cpp
+++ b/platform/consensus/ordering/poe/framework/consensus.cpp
@@ -35,7 +35,7 @@
Consensus::Consensus(const ResDBConfig& config,
std::unique_ptr<TransactionManager> executor)
- : common::Consensus(config, std::move(executor)){
+ : common::Consensus(config, std::move(executor)) {
int total_replicas = config_.GetReplicaNum();
int f = (total_replicas - 1) / 3;
@@ -47,9 +47,8 @@
.public_key()
.public_key_info()
.type() != CertificateKeyInfo::CLIENT) {
- poe_ = std::make_unique<PoE>(
- config_.GetSelfInfo().id(), f,
- total_replicas, GetSignatureVerifier());
+ poe_ = std::make_unique<PoE>(config_.GetSelfInfo().id(), f, total_replicas,
+ GetSignatureVerifier());
InitProtocol(poe_.get());
}
}
@@ -73,7 +72,7 @@
}
poe_->ReceivePrepare(std::move(proposal));
return 0;
- }
+ }
return 0;
}
diff --git a/platform/consensus/ordering/poe/framework/consensus.h b/platform/consensus/ordering/poe/framework/consensus.h
index 72e56e1..df51be3 100644
--- a/platform/consensus/ordering/poe/framework/consensus.h
+++ b/platform/consensus/ordering/poe/framework/consensus.h
@@ -55,5 +55,5 @@
int send_num_[200];
};
-} // namespace cassandra
+} // namespace poe
} // namespace resdb
diff --git a/platform/consensus/recovery/recovery.cpp b/platform/consensus/recovery/recovery.cpp
index fb1f6d5..51faad5 100644
--- a/platform/consensus/recovery/recovery.cpp
+++ b/platform/consensus/recovery/recovery.cpp
@@ -511,15 +511,18 @@
if (request_list.size() == 0) {
ftruncate(fd, 0);
}
+ uint64_t max_seq = 0;
for (std::unique_ptr<RecoveryData>& recovery_data : request_list) {
if (ckpt < recovery_data->request->seq()) {
recovery_data->request->set_is_recovery(true);
+ max_seq = recovery_data->request->seq();
call_back(std::move(recovery_data->context),
std::move(recovery_data->request));
}
}
- LOG(INFO) << "read log from files:" << path << " done";
+ LOG(ERROR) << "read log from files:" << path << " done"
+ << " recovery max seq:" << max_seq;
close(fd);
}
diff --git a/platform/networkstrate/async_acceptor.cpp b/platform/networkstrate/async_acceptor.cpp
index 9fd85a0..b61a510 100644
--- a/platform/networkstrate/async_acceptor.cpp
+++ b/platform/networkstrate/async_acceptor.cpp
@@ -77,7 +77,7 @@
delete recv_buffer_;
} else {
data_size_ = *reinterpret_cast<size_t*>(recv_buffer_);
- if (data_size_ > 1e6) {
+ if (data_size_ > 1e10) {
LOG(ERROR) << "read data size:" << data_size_
<< " data size:" << sizeof(data_size_) << " close socket";
Close();
diff --git a/platform/networkstrate/consensus_manager.cpp b/platform/networkstrate/consensus_manager.cpp
index 00b7081..b3fb106 100644
--- a/platform/networkstrate/consensus_manager.cpp
+++ b/platform/networkstrate/consensus_manager.cpp
@@ -90,46 +90,9 @@
std::mutex mutex;
std::condition_variable cv;
while (IsRunning()) {
- auto keys = verifier_->GetAllPublicKeys();
-
- std::vector<ReplicaInfo> replicas = GetAllReplicas();
- LOG(ERROR) << "all replicas:" << replicas.size();
- std::vector<ReplicaInfo> client_replicas = GetClientReplicas();
- HeartBeatInfo hb_info;
- for (const auto& key : keys) {
- *hb_info.add_public_keys() = key;
- }
- for (const auto& client : client_replicas) {
- replicas.push_back(client);
- }
- auto client = GetReplicaClient(replicas, false);
- if (client == nullptr) {
- continue;
- }
-
- // If it is not a client node, broadcost the current primary to the client.
- if (config_.GetPublicKeyCertificateInfo()
- .public_key()
- .public_key_info()
- .type() == CertificateKeyInfo::REPLICA) {
- hb_info.set_primary(GetPrimary());
- hb_info.set_version(GetVersion());
- }
- LOG(ERROR) << " server:" << config_.GetSelfInfo().id() << " sends HB"
- << " is ready:" << is_ready_
- << " client size:" << client_replicas.size()
- << " svr size:" << replicas.size();
-
- Request request;
- request.set_type(Request::TYPE_HEART_BEAT);
- request.mutable_region_info()->set_region_id(
- config_.GetConfigData().self_region_id());
- hb_info.SerializeToString(request.mutable_data());
-
- int ret = client->SendHeartBeat(request);
- if (ret <= 0) {
- LOG(ERROR) << " server:" << config_.GetSelfInfo().id()
- << " sends HB fail:" << ret;
+ {
+ std::unique_lock<std::mutex> lk(hb_mutex_);
+ SendHeartBeat();
}
std::unique_lock<std::mutex> lk(mutex);
cv.wait_for(lk, std::chrono::microseconds(sleep_time * 1000000),
@@ -138,12 +101,61 @@
if (config_.IsTestMode()) {
sleep_time = 1;
} else {
- sleep_time = 60 * 2;
+ sleep_time = 60;
}
}
}
}
+void ConsensusManager::SendHeartBeat() {
+ auto keys = verifier_->GetAllPublicKeys();
+
+ std::vector<ReplicaInfo> replicas = GetAllReplicas();
+ LOG(ERROR) << "all replicas:" << replicas.size();
+ std::vector<ReplicaInfo> client_replicas = GetClientReplicas();
+ HeartBeatInfo hb_info;
+ hb_info.set_sender(config_.GetSelfInfo().id());
+ hb_info.set_ip(config_.GetSelfInfo().ip());
+ hb_info.set_port(config_.GetSelfInfo().port());
+ hb_info.set_hb_version(version_);
+ for (const auto& key : keys) {
+ *hb_info.add_public_keys() = key;
+ hb_info.add_node_version(hb_[key.public_key_info().node_id()]);
+ }
+ for (const auto& client : client_replicas) {
+ replicas.push_back(client);
+ }
+ auto client = GetReplicaClient(replicas, false);
+ if (client == nullptr) {
+ return;
+ }
+
+ // If it is not a client node, broadcost the current primary to the client.
+ if (config_.GetPublicKeyCertificateInfo()
+ .public_key()
+ .public_key_info()
+ .type() == CertificateKeyInfo::REPLICA) {
+ hb_info.set_primary(GetPrimary());
+ hb_info.set_version(GetVersion());
+ }
+ LOG(ERROR) << " server:" << config_.GetSelfInfo().id() << " sends HB"
+ << " is ready:" << is_ready_
+ << " client size:" << client_replicas.size()
+ << " svr size:" << replicas.size();
+
+ Request request;
+ request.set_type(Request::TYPE_HEART_BEAT);
+ request.mutable_region_info()->set_region_id(
+ config_.GetConfigData().self_region_id());
+ hb_info.SerializeToString(request.mutable_data());
+
+ int ret = client->SendHeartBeat(request);
+ if (ret <= 0) {
+ LOG(ERROR) << " server:" << config_.GetSelfInfo().id()
+ << " sends HB fail:" << ret;
+ }
+}
+
// Porcess the packages received from the network.
// context contains the client socket which can be used for sending response to
// the client, the signature for the request will be filled inside the context
@@ -158,27 +170,27 @@
return -1;
}
- // Check if the certificate is valid.
- if (message.has_signature() && verifier_) {
- bool valid = verifier_->VerifyMessage(message.data(), message.signature());
- if (!valid) {
- LOG(ERROR) << "request is not valid:"
- << message.signature().DebugString();
- LOG(ERROR) << " msg:" << message.data().size();
- return -2;
- }
- } else {
- }
-
std::unique_ptr<Request> request = std::make_unique<Request>();
if (!request->ParseFromString(message.data())) {
LOG(ERROR) << "parse data info fail";
return -1;
}
- std::string tmp;
- if (!request->SerializeToString(&tmp)) {
- return -1;
+ if (request->type() == Request::TYPE_HEART_BEAT) {
+ return Dispatch(std::move(context), std::move(request));
+ }
+
+ // Check if the certificate is valid.
+ if (message.has_signature() && verifier_) {
+ bool valid = verifier_->VerifyMessage(message.data(), message.signature());
+ if (!valid) {
+ LOG(ERROR) << "request is not valid:"
+ << message.signature().DebugString();
+ LOG(ERROR) << " msg:" << message.data().size()
+ << " is recovery:" << request->is_recovery();
+ return -2;
+ }
+ } else {
}
// forward the signature to the request so that it can be included in the
@@ -187,6 +199,7 @@
// LOG(ERROR) << "======= server:" << config_.GetSelfInfo().id()
// << " get request type:" << request->type()
// << " from:" << request->sender_id();
+
return Dispatch(std::move(context), std::move(request));
}
@@ -202,6 +215,7 @@
int ConsensusManager::ProcessHeartBeat(std::unique_ptr<Context> context,
std::unique_ptr<Request> request) {
+ std::unique_lock<std::mutex> lk(hb_mutex_);
std::vector<ReplicaInfo> replicas = GetReplicas();
HeartBeatInfo hb_info;
if (!hb_info.ParseFromString(request->data())) {
@@ -212,7 +226,10 @@
LOG(ERROR) << "receive public size:" << hb_info.public_keys().size()
<< " primary:" << hb_info.primary()
<< " version:" << hb_info.version()
- << " from region:" << request->region_info().region_id();
+ << " from region:" << request->region_info().region_id()
+ << " sender:" << hb_info.sender()
+ << " last send:" << hb_info.hb_version()
+ << " current v:" << hb_[hb_info.sender()];
if (request->region_info().region_id() ==
config_.GetConfigData().self_region_id()) {
@@ -261,6 +278,18 @@
}
}
}
+
+ if (!hb_info.ip().empty() && hb_info.hb_version() > 0 &&
+ hb_[hb_info.sender()] != hb_info.hb_version()) {
+ ReplicaInfo info;
+ info.set_ip(hb_info.ip());
+ info.set_port(hb_info.port());
+ info.set_id(hb_info.sender());
+ // bc_client_->Flush(info);
+ hb_[hb_info.sender()] = hb_info.hb_version();
+ SendHeartBeat();
+ }
+
if (!is_ready_ && replica_num >= config_.GetMinDataReceiveNum()) {
LOG(ERROR) << "============ Server " << config_.GetSelfInfo().id()
<< " is ready "
diff --git a/platform/networkstrate/consensus_manager.h b/platform/networkstrate/consensus_manager.h
index fffc9fc..57ecc83 100644
--- a/platform/networkstrate/consensus_manager.h
+++ b/platform/networkstrate/consensus_manager.h
@@ -89,6 +89,7 @@
private:
void HeartBeat();
+ void SendHeartBeat();
void BroadCastThread();
protected:
@@ -105,6 +106,9 @@
std::unique_ptr<ReplicaCommunicator> bc_client_;
std::vector<ReplicaInfo> clients_;
Stats* global_stats_;
+ uint64_t version_;
+ std::map<int, uint64_t> hb_;
+ std::mutex hb_mutex_;
};
} // namespace resdb
diff --git a/platform/networkstrate/replica_communicator.cpp b/platform/networkstrate/replica_communicator.cpp
index 7083fe6..de09612 100644
--- a/platform/networkstrate/replica_communicator.cpp
+++ b/platform/networkstrate/replica_communicator.cpp
@@ -257,7 +257,7 @@
}
if (target_replica.ip().empty()) {
- LOG(ERROR) << "no replica info node:"<<node_id;
+ LOG(ERROR) << "no replica info node:" << node_id;
return;
}
diff --git a/platform/proto/resdb.proto b/platform/proto/resdb.proto
index 71ed459..0efea6b 100644
--- a/platform/proto/resdb.proto
+++ b/platform/proto/resdb.proto
@@ -115,8 +115,13 @@
message HeartBeatInfo{
repeated CertificateKey public_keys = 1;
+ repeated int64 node_version = 8;
uint32 primary = 2;
uint64 version= 3;
+ int32 sender = 4;
+ string ip = 5;
+ int32 port = 6;
+ int64 hb_version = 7;
}
message ClientCertInfo {
diff --git a/platform/statistic/BUILD b/platform/statistic/BUILD
index e20fd91..a0618d3 100644
--- a/platform/statistic/BUILD
+++ b/platform/statistic/BUILD
@@ -8,17 +8,17 @@
srcs = ["stats.cpp"],
hdrs = ["stats.h"],
deps = [
- "//proto/kv:kv_cc_proto",
- "//platform/proto:resdb_cc_proto",
- "//common:json",
":prometheus_handler",
- "//common:comm",
- "//common/utils",
- "//third_party:prometheus",
- "//platform/common/network:tcp_socket",
"//common:asio",
"//common:beast",
+ "//common:comm",
+ "//common:json",
+ "//common/utils",
+ "//platform/common/network:tcp_socket",
+ "//platform/proto:resdb_cc_proto",
+ "//proto/kv:kv_cc_proto",
"//third_party:crow",
+ "//third_party:prometheus",
],
)
diff --git a/platform/statistic/stats.cpp b/platform/statistic/stats.cpp
index d8de42a..200ea52 100644
--- a/platform/statistic/stats.cpp
+++ b/platform/statistic/stats.cpp
@@ -20,7 +20,9 @@
#include "platform/statistic/stats.h"
#include <glog/logging.h>
+
#include <ctime>
+
#include "common/utils/utils.h"
#include "proto/kv/kv.pb.h"
@@ -67,16 +69,19 @@
global_thread_ =
std::thread(&Stats::MonitorGlobal, this); // pass by reference
- transaction_summary_.port=-1;
+ transaction_summary_.port = -1;
- //Setup websocket here
+ // Setup websocket here
make_faulty_.store(false);
- transaction_summary_.request_pre_prepare_state_time=std::chrono::system_clock::time_point::min();
- transaction_summary_.prepare_state_time=std::chrono::system_clock::time_point::min();
- transaction_summary_.commit_state_time=std::chrono::system_clock::time_point::min();
- transaction_summary_.execution_time=std::chrono::system_clock::time_point::min();
- transaction_summary_.txn_number=0;
-
+ transaction_summary_.request_pre_prepare_state_time =
+ std::chrono::system_clock::time_point::min();
+ transaction_summary_.prepare_state_time =
+ std::chrono::system_clock::time_point::min();
+ transaction_summary_.commit_state_time =
+ std::chrono::system_clock::time_point::min();
+ transaction_summary_.execution_time =
+ std::chrono::system_clock::time_point::min();
+ transaction_summary_.txn_number = 0;
}
void Stats::Stop() { stop_ = true; }
@@ -86,107 +91,122 @@
if (global_thread_.joinable()) {
global_thread_.join();
}
- if(enable_resview && crow_thread_.joinable()){
+ if (enable_resview && crow_thread_.joinable()) {
crow_thread_.join();
}
}
-void Stats::CrowRoute(){
+void Stats::CrowRoute() {
crow::SimpleApp app;
- while(!stop_){
- try{
- CROW_ROUTE(app, "/consensus_data").methods("GET"_method)([this](const crow::request& req, crow::response& res){
- LOG(ERROR)<<"API 1";
- res.set_header("Access-Control-Allow-Origin", "*"); // Allow requests from any origin
- res.set_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS"); // Specify allowed methods
- res.set_header("Access-Control-Allow-Headers", "Content-Type, Authorization"); // Specify allowed headers
+ while (!stop_) {
+ try {
+ CROW_ROUTE(app, "/consensus_data")
+ .methods("GET"_method)([this](const crow::request& req,
+ crow::response& res) {
+ LOG(ERROR) << "API 1";
+ res.set_header("Access-Control-Allow-Origin",
+ "*"); // Allow requests from any origin
+ res.set_header("Access-Control-Allow-Methods",
+ "GET, POST, OPTIONS"); // Specify allowed methods
+ res.set_header(
+ "Access-Control-Allow-Headers",
+ "Content-Type, Authorization"); // Specify allowed headers
- // Send your response
- res.body=consensus_history_.dump();
- res.end();
- });
- CROW_ROUTE(app, "/get_status").methods("GET"_method)([this](const crow::request& req, crow::response& res){
- LOG(ERROR)<<"API 2";
- res.set_header("Access-Control-Allow-Origin", "*"); // Allow requests from any origin
- res.set_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS"); // Specify allowed methods
- res.set_header("Access-Control-Allow-Headers", "Content-Type, Authorization"); // Specify allowed headers
+ // Send your response
+ res.body = consensus_history_.dump();
+ res.end();
+ });
+ CROW_ROUTE(app, "/get_status")
+ .methods("GET"_method)([this](const crow::request& req,
+ crow::response& res) {
+ LOG(ERROR) << "API 2";
+ res.set_header("Access-Control-Allow-Origin",
+ "*"); // Allow requests from any origin
+ res.set_header("Access-Control-Allow-Methods",
+ "GET, POST, OPTIONS"); // Specify allowed methods
+ res.set_header(
+ "Access-Control-Allow-Headers",
+ "Content-Type, Authorization"); // Specify allowed headers
- // Send your response
- res.body= IsFaulty() ? "Faulty" : "Not Faulty";
- res.end();
- });
- CROW_ROUTE(app, "/make_faulty").methods("GET"_method)([this](const crow::request& req, crow::response& res){
- LOG(ERROR)<<"API 3";
- res.set_header("Access-Control-Allow-Origin", "*"); // Allow requests from any origin
- res.set_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS"); // Specify allowed methods
- res.set_header("Access-Control-Allow-Headers", "Content-Type, Authorization"); // Specify allowed headers
+ // Send your response
+ res.body = IsFaulty() ? "Faulty" : "Not Faulty";
+ res.end();
+ });
+ CROW_ROUTE(app, "/make_faulty")
+ .methods("GET"_method)([this](const crow::request& req,
+ crow::response& res) {
+ LOG(ERROR) << "API 3";
+ res.set_header("Access-Control-Allow-Origin",
+ "*"); // Allow requests from any origin
+ res.set_header("Access-Control-Allow-Methods",
+ "GET, POST, OPTIONS"); // Specify allowed methods
+ res.set_header(
+ "Access-Control-Allow-Headers",
+ "Content-Type, Authorization"); // Specify allowed headers
- // Send your response
- if(enable_faulty_switch_){
- make_faulty_.store(!make_faulty_.load());
- }
- res.body= "Success";
- res.end();
- });
- app.port(8500+transaction_summary_.port).multithreaded().run();
+ // Send your response
+ if (enable_faulty_switch_) {
+ make_faulty_.store(!make_faulty_.load());
+ }
+ res.body = "Success";
+ res.end();
+ });
+ app.port(8500 + transaction_summary_.port).multithreaded().run();
sleep(1);
- }
- catch( const std::exception& e){
+ } catch (const std::exception& e) {
}
}
app.stop();
}
-bool Stats::IsFaulty(){
- return make_faulty_.load();
-}
+bool Stats::IsFaulty() { return make_faulty_.load(); }
-void Stats::ChangePrimary(int primary_id){
- transaction_summary_.primary_id=primary_id;
+void Stats::ChangePrimary(int primary_id) {
+ transaction_summary_.primary_id = primary_id;
make_faulty_.store(false);
}
-void Stats::SetProps(int replica_id, std::string ip, int port, bool resview_flag, bool faulty_flag){
- transaction_summary_.replica_id=replica_id;
- transaction_summary_.ip=ip;
- transaction_summary_.port=port;
- enable_resview=resview_flag;
- enable_faulty_switch_=faulty_flag;
- if(resview_flag){
+void Stats::SetProps(int replica_id, std::string ip, int port,
+ bool resview_flag, bool faulty_flag) {
+ transaction_summary_.replica_id = replica_id;
+ transaction_summary_.ip = ip;
+ transaction_summary_.port = port;
+ enable_resview = resview_flag;
+ enable_faulty_switch_ = faulty_flag;
+ if (resview_flag) {
crow_thread_ = std::thread(&Stats::CrowRoute, this);
}
}
-void Stats::SetPrimaryId(int primary_id){
- transaction_summary_.primary_id=primary_id;
+void Stats::SetPrimaryId(int primary_id) {
+ transaction_summary_.primary_id = primary_id;
}
-void Stats::RecordStateTime(std::string state){
- if(!enable_resview){
+void Stats::RecordStateTime(std::string state) {
+ if (!enable_resview) {
return;
}
- if(state=="request" || state=="pre-prepare"){
- transaction_summary_.request_pre_prepare_state_time=std::chrono::system_clock::now();
- }
- else if(state=="prepare"){
- transaction_summary_.prepare_state_time=std::chrono::system_clock::now();
- }
- else if(state=="commit"){
- transaction_summary_.commit_state_time=std::chrono::system_clock::now();
+ if (state == "request" || state == "pre-prepare") {
+ transaction_summary_.request_pre_prepare_state_time =
+ std::chrono::system_clock::now();
+ } else if (state == "prepare") {
+ transaction_summary_.prepare_state_time = std::chrono::system_clock::now();
+ } else if (state == "commit") {
+ transaction_summary_.commit_state_time = std::chrono::system_clock::now();
}
}
-void Stats::GetTransactionDetails(BatchUserRequest batch_request){
- if(!enable_resview){
+void Stats::GetTransactionDetails(BatchUserRequest batch_request) {
+ if (!enable_resview) {
return;
}
- transaction_summary_.txn_number=batch_request.seq();
+ transaction_summary_.txn_number = batch_request.seq();
transaction_summary_.txn_command.clear();
transaction_summary_.txn_key.clear();
transaction_summary_.txn_value.clear();
for (auto& sub_request : batch_request.user_requests()) {
KVRequest kv_request;
- if(!kv_request.ParseFromString(sub_request.request().data())){
+ if (!kv_request.ParseFromString(sub_request.request().data())) {
break;
}
if (kv_request.cmd() == KVRequest::SET) {
@@ -209,53 +229,70 @@
}
}
-void Stats::SendSummary(){
- if(!enable_resview){
+void Stats::SendSummary() {
+ if (!enable_resview) {
return;
}
- transaction_summary_.execution_time=std::chrono::system_clock::now();
+ transaction_summary_.execution_time = std::chrono::system_clock::now();
- //Convert Transaction Summary to JSON
- summary_json_["replica_id"]=transaction_summary_.replica_id;
- summary_json_["ip"]=transaction_summary_.ip;
- summary_json_["port"]=transaction_summary_.port;
- summary_json_["primary_id"]=transaction_summary_.primary_id;
- summary_json_["propose_pre_prepare_time"]=transaction_summary_.request_pre_prepare_state_time.time_since_epoch().count();
- summary_json_["prepare_time"]=transaction_summary_.prepare_state_time.time_since_epoch().count();
- summary_json_["commit_time"]=transaction_summary_.commit_state_time.time_since_epoch().count();
- summary_json_["execution_time"]=transaction_summary_.execution_time.time_since_epoch().count();
- for(size_t i=0; i<transaction_summary_.prepare_message_count_times_list.size(); i++){
- summary_json_["prepare_message_timestamps"].push_back(transaction_summary_.prepare_message_count_times_list[i].time_since_epoch().count());
+ // Convert Transaction Summary to JSON
+ summary_json_["replica_id"] = transaction_summary_.replica_id;
+ summary_json_["ip"] = transaction_summary_.ip;
+ summary_json_["port"] = transaction_summary_.port;
+ summary_json_["primary_id"] = transaction_summary_.primary_id;
+ summary_json_["propose_pre_prepare_time"] =
+ transaction_summary_.request_pre_prepare_state_time.time_since_epoch()
+ .count();
+ summary_json_["prepare_time"] =
+ transaction_summary_.prepare_state_time.time_since_epoch().count();
+ summary_json_["commit_time"] =
+ transaction_summary_.commit_state_time.time_since_epoch().count();
+ summary_json_["execution_time"] =
+ transaction_summary_.execution_time.time_since_epoch().count();
+ for (size_t i = 0;
+ i < transaction_summary_.prepare_message_count_times_list.size(); i++) {
+ summary_json_["prepare_message_timestamps"].push_back(
+ transaction_summary_.prepare_message_count_times_list[i]
+ .time_since_epoch()
+ .count());
}
- for(size_t i=0; i<transaction_summary_.commit_message_count_times_list.size(); i++){
- summary_json_["commit_message_timestamps"].push_back(transaction_summary_.commit_message_count_times_list[i].time_since_epoch().count());
+ for (size_t i = 0;
+ i < transaction_summary_.commit_message_count_times_list.size(); i++) {
+ summary_json_["commit_message_timestamps"].push_back(
+ transaction_summary_.commit_message_count_times_list[i]
+ .time_since_epoch()
+ .count());
}
- summary_json_["txn_number"]=transaction_summary_.txn_number;
- for(size_t i=0; i<transaction_summary_.txn_command.size(); i++){
- summary_json_["txn_commands"].push_back(transaction_summary_.txn_command[i]);
+ summary_json_["txn_number"] = transaction_summary_.txn_number;
+ for (size_t i = 0; i < transaction_summary_.txn_command.size(); i++) {
+ summary_json_["txn_commands"].push_back(
+ transaction_summary_.txn_command[i]);
}
- for(size_t i=0; i<transaction_summary_.txn_key.size(); i++){
+ for (size_t i = 0; i < transaction_summary_.txn_key.size(); i++) {
summary_json_["txn_keys"].push_back(transaction_summary_.txn_key[i]);
}
- for(size_t i=0; i<transaction_summary_.txn_value.size(); i++){
+ for (size_t i = 0; i < transaction_summary_.txn_value.size(); i++) {
summary_json_["txn_values"].push_back(transaction_summary_.txn_value[i]);
}
- consensus_history_[std::to_string(transaction_summary_.txn_number)]=summary_json_;
+ consensus_history_[std::to_string(transaction_summary_.txn_number)] =
+ summary_json_;
+ LOG(ERROR) << summary_json_.dump();
- LOG(ERROR)<<summary_json_.dump();
-
- //Reset Transaction Summary Parameters
- transaction_summary_.request_pre_prepare_state_time=std::chrono::system_clock::time_point::min();
- transaction_summary_.prepare_state_time=std::chrono::system_clock::time_point::min();
- transaction_summary_.commit_state_time=std::chrono::system_clock::time_point::min();
- transaction_summary_.execution_time=std::chrono::system_clock::time_point::min();
+ // Reset Transaction Summary Parameters
+ transaction_summary_.request_pre_prepare_state_time =
+ std::chrono::system_clock::time_point::min();
+ transaction_summary_.prepare_state_time =
+ std::chrono::system_clock::time_point::min();
+ transaction_summary_.commit_state_time =
+ std::chrono::system_clock::time_point::min();
+ transaction_summary_.execution_time =
+ std::chrono::system_clock::time_point::min();
transaction_summary_.prepare_message_count_times_list.clear();
transaction_summary_.commit_message_count_times_list.clear();
summary_json_.clear();
-
}
void Stats::MonitorGlobal() {
@@ -423,7 +460,8 @@
prometheus_->Inc(PREPARE, 1);
}
num_prepare_++;
- transaction_summary_.prepare_message_count_times_list.push_back(std::chrono::system_clock::now());
+ transaction_summary_.prepare_message_count_times_list.push_back(
+ std::chrono::system_clock::now());
}
void Stats::IncCommit() {
@@ -431,12 +469,11 @@
prometheus_->Inc(COMMIT, 1);
}
num_commit_++;
- transaction_summary_.commit_message_count_times_list.push_back(std::chrono::system_clock::now());
+ transaction_summary_.commit_message_count_times_list.push_back(
+ std::chrono::system_clock::now());
}
-void Stats::IncPendingExecute() {
- pending_execute_++;
-}
+void Stats::IncPendingExecute() { pending_execute_++; }
void Stats::IncExecute() { execute_++; }
diff --git a/platform/statistic/stats.h b/platform/statistic/stats.h
index 621aaf8..0ca8dd1 100644
--- a/platform/statistic/stats.h
+++ b/platform/statistic/stats.h
@@ -19,17 +19,18 @@
#pragma once
+#include <crow.h>
+
#include <chrono>
#include <future>
-
-#include "platform/statistic/prometheus_handler.h"
-#include "platform/proto/resdb.pb.h"
-#include "proto/kv/kv.pb.h"
-#include "platform/common/network/tcp_socket.h"
#include <nlohmann/json.hpp>
+
#include "boost/asio.hpp"
#include "boost/beast.hpp"
-#include <crow.h>
+#include "platform/common/network/tcp_socket.h"
+#include "platform/proto/resdb.pb.h"
+#include "platform/statistic/prometheus_handler.h"
+#include "proto/kv/kv.pb.h"
namespace asio = boost::asio;
namespace beast = boost::beast;
@@ -37,44 +38,46 @@
namespace resdb {
-struct VisualData{
- //Set when initializing
- int replica_id;
- int primary_id;
- std::string ip;
- int port;
+struct VisualData {
+ // Set when initializing
+ int replica_id;
+ int primary_id;
+ std::string ip;
+ int port;
- //Set when new txn is received
- int txn_number;
- std::vector<std::string> txn_command;
- std::vector<std::string> txn_key;
- std::vector<std::string> txn_value;
+ // Set when new txn is received
+ int txn_number;
+ std::vector<std::string> txn_command;
+ std::vector<std::string> txn_key;
+ std::vector<std::string> txn_value;
- //Request state if primary_id==replica_id, pre_prepare state otherwise
- std::chrono::system_clock::time_point request_pre_prepare_state_time;
- std::chrono::system_clock::time_point prepare_state_time;
- std::vector<std::chrono::system_clock::time_point> prepare_message_count_times_list;
- std::chrono::system_clock::time_point commit_state_time;
- std::vector<std::chrono::system_clock::time_point> commit_message_count_times_list;
- std::chrono::system_clock::time_point execution_time;
+ // Request state if primary_id==replica_id, pre_prepare state otherwise
+ std::chrono::system_clock::time_point request_pre_prepare_state_time;
+ std::chrono::system_clock::time_point prepare_state_time;
+ std::vector<std::chrono::system_clock::time_point>
+ prepare_message_count_times_list;
+ std::chrono::system_clock::time_point commit_state_time;
+ std::vector<std::chrono::system_clock::time_point>
+ commit_message_count_times_list;
+ std::chrono::system_clock::time_point execution_time;
};
-class Stats{
+class Stats {
public:
static Stats* GetGlobalStats(int sleep_seconds = 5);
void Stop();
- void RetrieveProgress();
- void SetProps(int replica_id, std::string ip, int port, bool resview_flag, bool faulty_flag);
- void SetPrimaryId(int primary_id);
- void RecordStateTime(std::string state);
- void GetTransactionDetails(BatchUserRequest batch_request);
- void SendSummary();
- void CrowRoute();
- bool IsFaulty();
- void ChangePrimary(int primary_id);
-
+ void RetrieveProgress();
+ void SetProps(int replica_id, std::string ip, int port, bool resview_flag,
+ bool faulty_flag);
+ void SetPrimaryId(int primary_id);
+ void RecordStateTime(std::string state);
+ void GetTransactionDetails(BatchUserRequest batch_request);
+ void SendSummary();
+ void CrowRoute();
+ bool IsFaulty();
+ void ChangePrimary(int primary_id);
void AddLatency(uint64_t run_time);
diff --git a/scripts/deploy/config/kv_server.conf b/scripts/deploy/config/kv_server.conf
index a811f76..500e34d 100644
--- a/scripts/deploy/config/kv_server.conf
+++ b/scripts/deploy/config/kv_server.conf
@@ -1,8 +1,8 @@
iplist=(
-172.31.52.247
-172.31.54.193
-172.31.55.48
-172.31.53.140
+172.31.57.186
+172.31.57.186
+172.31.57.186
+172.31.57.186
172.31.57.186
)
diff --git a/scripts/deploy/script/deploy.sh b/scripts/deploy/script/deploy.sh
index 87ab7dc..6da8deb 100755
--- a/scripts/deploy/script/deploy.sh
+++ b/scripts/deploy/script/deploy.sh
@@ -14,6 +14,7 @@
fi
# obtain the src path
+main_folder=resilientdb_app
server_path=`echo "$server" | sed 's/:/\//g'`
server_path=${server_path:1}
server_name=`echo "$server" | awk -F':' '{print $NF}'`
@@ -60,10 +61,12 @@
# commands functions
function run_cmd(){
count=1
+ idx=1
for ip in ${deploy_iplist[@]};
do
- ssh -i ${key} -n -o BatchMode=yes -o StrictHostKeyChecking=no ubuntu@${ip} "$1" &
+ ssh -i ${key} -n -o BatchMode=yes -o StrictHostKeyChecking=no ubuntu@${ip} "cd ${main_folder}/$idx; $1" &
((count++))
+ ((idx++))
done
while [ $count -gt 0 ]; do
@@ -76,6 +79,15 @@
ssh -i ${key} -n -o BatchMode=yes -o StrictHostKeyChecking=no ubuntu@${ip} "$1"
}
+idx=1
+for ip in ${deploy_iplist[@]};
+do
+ run_one_cmd "mkdir -p ${main_folder}/$idx" &
+ ((count++))
+ ((idx++))
+done
+
+
run_cmd "killall -9 ${server_bin}"
run_cmd "rm -rf ${server_bin}; rm ${server_bin}*.log; rm -rf server.config; rm -rf cert;"
@@ -83,11 +95,13 @@
# upload config files and binary
echo "upload configs"
+idx=1
count=0
for ip in ${deploy_iplist[@]};
do
- scp -i ${key} -r ${bin_path} ${output_path}/server.config ${output_path}/cert ubuntu@${ip}:/home/ubuntu/ &
+ scp -i ${key} -r ${bin_path} ${output_path}/server.config ${output_path}/cert ubuntu@${ip}:/home/ubuntu/${main_folder}/$idx &
((count++))
+ ((idx++))
done
while [ $count -gt 0 ]; do
@@ -103,9 +117,10 @@
do
private_key="cert/node_"${idx}".key.pri"
cert="cert/cert_"${idx}".cert"
- run_one_cmd "nohup ./${server_bin} server.config ${private_key} ${cert} ${grafna_port} > ${server_bin}.log 2>&1 &" &
+ run_one_cmd "cd ${main_folder}/$idx; nohup ./${server_bin} server.config ${private_key} ${cert} ${grafna_port} > ${server_bin}.log 2>&1 &" &
((count++))
((idx++))
+ ((grafna_port++))
done
while [ $count -gt 0 ]; do
@@ -120,7 +135,7 @@
resp=""
while [ "$resp" = "" ]
do
- resp=`ssh -i ${key} -n -o BatchMode=yes -o StrictHostKeyChecking=no ubuntu@${ip} "grep \"receive public size:${#iplist[@]}\" ${server_bin}.log"`
+ resp=`ssh -i ${key} -n -o BatchMode=yes -o StrictHostKeyChecking=no ubuntu@${ip} "cd ${main_folder}/$idx; grep \"receive public size:${#iplist[@]}\" ${server_bin}.log"`
if [ "$resp" = "" ]; then
sleep 1
fi
diff --git a/service/kv/kv_service.cpp b/service/kv/kv_service.cpp
index 11272aa..6cd20e1 100644
--- a/service/kv/kv_service.cpp
+++ b/service/kv/kv_service.cpp
@@ -60,6 +60,7 @@
exit(0);
}
google::InitGoogleLogging(argv[0]);
+ FLAGS_minloglevel = 1;
char* config_file = argv[1];
char* private_key_file = argv[2];
diff --git a/service/tools/kv/api_tools/kv_client_txn_tools.cpp b/service/tools/kv/api_tools/kv_client_txn_tools.cpp
index 4ebff4d..55f0f6c 100644
--- a/service/tools/kv/api_tools/kv_client_txn_tools.cpp
+++ b/service/tools/kv/api_tools/kv_client_txn_tools.cpp
@@ -49,8 +49,6 @@
ResDBTxnAccessor client(config);
auto resp = client.GetTxn(min_seq, max_seq);
- absl::StatusOr<std::vector<std::pair<uint64_t, std::string>>> GetTxn(
- uint64_t min_seq, uint64_t max_seq);
if (!resp.ok()) {
LOG(ERROR) << "get replica state fail";
exit(1);
diff --git a/third_party/BUILD b/third_party/BUILD
index b43e923..0ec7614 100644
--- a/third_party/BUILD
+++ b/third_party/BUILD
@@ -51,4 +51,4 @@
deps = [
"@com_crowcpp_crow//:crow",
],
-)
\ No newline at end of file
+)