blob: ee5998ceb9ea91273ba49b91b2b24cc237be5b44 [file] [log] [blame]
/*
* Copyright (c) 2019-2022 ExpoLab, UC Davis
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or
* sell copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
* DEALINGS IN THE SOFTWARE.
*
*/
#include "execution/transaction_executor.h"
#include <glog/logging.h>
namespace resdb {
TransactionExecutor::TransactionExecutor(
const ResDBConfig& config, PostExecuteFunc post_exec_func,
SystemInfo* system_info,
std::unique_ptr<TransactionExecutorImpl> executor_impl)
: config_(config),
post_exec_func_(post_exec_func),
system_info_(system_info),
executor_impl_(std::move(executor_impl)),
commit_queue_("order"),
execute_queue_("execute"),
stop_(false) {
global_stats_ = Stats::GetGlobalStats();
ordering_thread_ = std::thread(&TransactionExecutor::OrderMessage, this);
execute_thread_ = std::thread(&TransactionExecutor::ExecuteMessage, this);
if (executor_impl_ && executor_impl_->IsOutOfOrder()) {
execute_OOO_thread_ =
std::thread(&TransactionExecutor::ExecuteMessageOutOfOrder, this);
}
}
TransactionExecutor::~TransactionExecutor() { Stop(); }
void TransactionExecutor::Stop() {
stop_ = true;
if (ordering_thread_.joinable()) {
ordering_thread_.join();
}
if (execute_thread_.joinable()) {
execute_thread_.join();
}
if (execute_OOO_thread_.joinable()) {
execute_OOO_thread_.join();
}
}
void TransactionExecutor::SetPreExecuteFunc(PreExecuteFunc pre_exec_func) {
pre_exec_func_ = pre_exec_func;
}
void TransactionExecutor::SetSeqUpdateNotifyFunc(SeqUpdateNotifyFunc func) {
seq_update_notify_func_ = func;
}
bool TransactionExecutor::IsStop() { return stop_; }
uint64_t TransactionExecutor::GetMaxPendingExecutedSeq() {
return next_execute_seq_ - 1;
}
int TransactionExecutor::Commit(std::unique_ptr<Request> message) {
global_stats_->IncPendingExecute();
if (executor_impl_ && executor_impl_->IsOutOfOrder()) {
std::unique_ptr<Request> msg = std::make_unique<Request>(*message);
execute_OOO_queue_.Push(std::move(message));
commit_queue_.Push(std::move(msg));
} else {
commit_queue_.Push(std::move(message));
}
return 0;
}
void TransactionExecutor::AddNewData(std::unique_ptr<Request> message) {
candidates_.insert(std::make_pair(message->seq(), std::move(message)));
}
std::unique_ptr<Request> TransactionExecutor::GetNextData() {
if (candidates_.empty() || candidates_.begin()->first != next_execute_seq_) {
return nullptr;
}
auto res = std::move(candidates_.begin()->second);
if (pre_exec_func_) {
pre_exec_func_(res.get());
}
candidates_.erase(candidates_.begin());
return res;
}
void TransactionExecutor::OrderMessage() {
while (!IsStop()) {
auto message = commit_queue_.Pop();
if (message != nullptr) {
global_stats_->IncExecute();
uint64_t seq = message->seq();
if (next_execute_seq_ > seq) {
LOG(INFO) << "request seq:" << seq << " has been executed"
<< " next seq:" << next_execute_seq_;
continue;
}
AddNewData(std::move(message));
}
while (!IsStop()) {
std::unique_ptr<Request> message = GetNextData();
if (message == nullptr) {
break;
}
execute_queue_.Push(std::move(message));
next_execute_seq_++;
if (seq_update_notify_func_) {
seq_update_notify_func_(next_execute_seq_);
}
}
}
return;
}
void TransactionExecutor::ExecuteMessage() {
while (!IsStop()) {
auto message = execute_queue_.Pop();
if (message == nullptr) {
continue;
}
bool need_execute = true;
if (executor_impl_ && executor_impl_->IsOutOfOrder()) {
need_execute = false;
}
Execute(std::move(message), need_execute);
}
}
void TransactionExecutor::ExecuteMessageOutOfOrder() {
while (!IsStop()) {
auto message = execute_OOO_queue_.Pop();
if (message == nullptr) {
continue;
}
OnlyExecute(std::move(message));
}
}
void TransactionExecutor::OnlyExecute(std::unique_ptr<Request> request) {
// Execute the request on user size, then send the response back to the
// client.
BatchClientRequest batch_request;
if (!batch_request.ParseFromString(request->data())) {
LOG(ERROR) << "parse data fail";
}
batch_request.set_seq(request->seq());
batch_request.set_hash(request->hash());
batch_request.set_proxy_id(request->proxy_id());
if (request->has_committed_certs()) {
*batch_request.mutable_committed_certs() = request->committed_certs();
}
// LOG(INFO) << " get request batch size:"
// << batch_request.client_requests_size()<<" proxy
// id:"<<request->proxy_id();
std::unique_ptr<BatchClientResponse> batch_response =
std::make_unique<BatchClientResponse>();
std::unique_ptr<BatchClientResponse> response;
if (executor_impl_) {
response = executor_impl_->ExecuteBatch(batch_request);
}
}
void TransactionExecutor::Execute(std::unique_ptr<Request> request,
bool need_execute) {
// Execute the request on user size, then send the response back to the
// client.
BatchClientRequest batch_request;
if (!batch_request.ParseFromString(request->data())) {
LOG(ERROR) << "parse data fail";
}
batch_request.set_seq(request->seq());
batch_request.set_hash(request->hash());
batch_request.set_proxy_id(request->proxy_id());
if (request->has_committed_certs()) {
*batch_request.mutable_committed_certs() = request->committed_certs();
}
std::unique_ptr<BatchClientResponse> batch_response =
std::make_unique<BatchClientResponse>();
std::unique_ptr<BatchClientResponse> response;
if (executor_impl_ && need_execute) {
response = executor_impl_->ExecuteBatch(batch_request);
}
global_stats_->IncTotalRequest(batch_request.client_requests_size());
if (executor_impl_ == nullptr || executor_impl_->NeedResponse()) {
if (response == nullptr) {
response = std::make_unique<BatchClientResponse>();
}
response->set_createtime(batch_request.createtime());
response->set_local_id(batch_request.local_id());
post_exec_func_(std::move(request), std::move(response));
}
global_stats_->IncExecuteDone();
}
} // namespace resdb