redirect to primary from proxy
diff --git a/platform/consensus/ordering/pbft/checkpoint_manager.cpp b/platform/consensus/ordering/pbft/checkpoint_manager.cpp
index cc88156..fac4640 100644
--- a/platform/consensus/ordering/pbft/checkpoint_manager.cpp
+++ b/platform/consensus/ordering/pbft/checkpoint_manager.cpp
@@ -310,14 +310,14 @@
}
std::string hash_ = request->hash();
uint64_t current_seq = request->seq();
- if (current_seq != last_seq_ + 1) {
+ if (current_seq < last_seq_ + 1) {
LOG(ERROR) << "seq invalid:" << last_seq_ << " current:" << current_seq;
continue;
}
{
std::lock_guard<std::mutex> lk(lt_mutex_);
last_hash_ = GetHash(last_hash_, request->hash());
- last_seq_++;
+ last_seq_ = current_seq;
}
txn_db_->Put(std::move(request));
diff --git a/platform/consensus/ordering/pbft/message_manager.cpp b/platform/consensus/ordering/pbft/message_manager.cpp
index b7b7bfe..5807520 100644
--- a/platform/consensus/ordering/pbft/message_manager.cpp
+++ b/platform/consensus/ordering/pbft/message_manager.cpp
@@ -45,6 +45,9 @@
[&](std::unique_ptr<Request> request,
std::unique_ptr<BatchUserResponse> resp_msg) {
if (request->is_recovery()) {
+ if (checkpoint_manager_) {
+ checkpoint_manager_->AddCommitData(std::move(request));
+ }
return;
}
resp_msg->set_proxy_id(request->proxy_id());
diff --git a/platform/consensus/ordering/pbft/query.cpp b/platform/consensus/ordering/pbft/query.cpp
index f01cc23..6dacf1d 100644
--- a/platform/consensus/ordering/pbft/query.cpp
+++ b/platform/consensus/ordering/pbft/query.cpp
@@ -56,12 +56,49 @@
int Query::ProcessQuery(std::unique_ptr<Context> context,
std::unique_ptr<Request> request) {
+
+ if (config_.GetPublicKeyCertificateInfo()
+ .public_key()
+ .public_key_info()
+ .type() == CertificateKeyInfo::CLIENT) {
+
+ auto find_primary = [&](){
+ auto config_data = config_.GetConfigData();
+ for (const auto& r : config_data.region()) {
+ for (const auto& replica : r.replica_info()) {
+ if(replica.id() == 1){
+ return replica;
+ }
+ }
+ }
+ };
+ ReplicaInfo primary = find_primary();
+ std::string ip = primary.ip();
+ int port = primary.port();
+
+ LOG(ERROR)<<"redirect to primary:"<<ip<<" port:"<<port;
+ auto client = std::make_unique<NetChannel>(ip, port);
+ if (client->SendRawMessage(*request) == 0) {
+ QueryResponse resp;
+ if (client->RecvRawMessage(&resp) == 0) {
+ if (context != nullptr && context->client != nullptr) {
+ LOG(ERROR) << "send response from primary:"<<resp.transactions_size();
+ int ret = context->client->SendRawMessage(resp);
+ if (ret) {
+ LOG(ERROR) << "send resp fail ret:" << ret;
+ }
+ }
+ }
+ }
+ return 0;
+ }
+
QueryRequest query;
if (!query.ParseFromString(request->data())) {
LOG(ERROR) << "parse data fail";
return -2;
}
- // LOG(ERROR) << "request:" << query.DebugString();
+ //LOG(ERROR) << "request:" << query.DebugString();
QueryResponse response;
for (uint64_t i = query.min_seq(); i <= query.max_seq(); ++i) {
@@ -74,10 +111,11 @@
txn->set_hash(ret_request->hash());
txn->set_seq(ret_request->seq());
txn->set_proxy_id(ret_request->proxy_id());
+ LOG(ERROR)<<"add seq:"<<ret_request->seq();
}
if (context != nullptr && context->client != nullptr) {
- // LOG(ERROR) << "send response:" << response.DebugString();
+ //LOG(ERROR) << "send response:" << response.DebugString();
int ret = context->client->SendRawMessage(response);
if (ret) {
LOG(ERROR) << "send resp fail ret:" << ret;