blob: be42224aafac74fd35a60d7325edc0d10fa92263 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "interface/rdbc/transaction_constructor.h"
#include <glog/logging.h>
namespace resdb {
TransactionConstructor::TransactionConstructor(const ResDBConfig& config)
: NetChannel("", 0),
config_(config),
timeout_ms_(
config.GetClientTimeoutMs()) { // default 2s for process timeout
socket_->SetRecvTimeout(timeout_ms_);
}
absl::StatusOr<std::string> TransactionConstructor::GetResponseData(
const Response& response) {
std::string hash_;
std::set<int64_t> hash_counter;
std::string resp_str;
for (const auto& each_resp : response.resp()) {
// Check signature
std::string hash = SignatureVerifier::CalculateHash(each_resp.data());
if (!hash_.empty() && hash != hash_) {
LOG(ERROR) << "hash not the same";
return absl::InvalidArgumentError("hash not match");
}
if (hash_.empty()) {
hash_ = hash;
resp_str = each_resp.data();
}
hash_counter.insert(each_resp.signature().node_id());
}
// LOG(INFO) << "recv hash:" << hash_counter.size()
// << " need:" << config_.GetMinClientReceiveNum()
// << " data len:" << resp_str.size();
if (hash_counter.size() >=
static_cast<size_t>(config_.GetMinClientReceiveNum())) {
return resp_str;
}
return absl::InvalidArgumentError("data not enough");
}
int TransactionConstructor::SendRequest(
const google::protobuf::Message& message, Request::Type type) {
// Use the replica obtained from the server.
NetChannel::SetDestReplicaInfo(config_.GetReplicaInfos()[0]);
return NetChannel::SendRequest(message, type, false);
}
int TransactionConstructor::SendRequest(
const google::protobuf::Message& message,
google::protobuf::Message* response, Request::Type type) {
NetChannel::SetDestReplicaInfo(config_.GetReplicaInfos()[0]);
int ret = NetChannel::SendRequest(message, type, true);
if (ret == 0) {
std::string resp_str;
int ret = NetChannel::RecvRawMessageData(&resp_str);
if (ret >= 0) {
if (!response->ParseFromString(resp_str)) {
LOG(ERROR) << "parse response fail:" << resp_str.size();
return -2;
}
return 0;
}
}
return -1;
}
} // namespace resdb