blob: de260658f2fdab6fb620299759fd075bca85f32d [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/master/txn_manager_service.h"
#include <cstdint>
#include <string>
#include <glog/logging.h>
#include "kudu/common/wire_protocol.h"
#include "kudu/gutil/port.h"
#include "kudu/master/master.h"
#include "kudu/master/txn_manager.h"
#include "kudu/master/txn_manager.pb.h"
#include "kudu/rpc/remote_user.h"
#include "kudu/rpc/rpc_context.h"
#include "kudu/server/server_base.h"
#include "kudu/transactions/transactions.pb.h"
#include "kudu/util/status.h"
using kudu::master::Master;
using kudu::rpc::RpcContext;
using kudu::server::ServerBase;
using kudu::transactions::TxnStatusEntryPB;
using std::string;
namespace kudu {
namespace transactions {
namespace {
// If 's' is not OK and 'resp' has no application specific error set,
// set the error field of 'resp' to match 's' and set the code to
// UNKNOWN_ERROR.
template<class RespClass>
void CheckRespErrorOrSetUnknown(const Status& s, RespClass* resp) {
if (!s.ok() && !resp->has_error()) {
StatusToPB(s, resp->mutable_error()->mutable_status());
resp->mutable_error()->set_code(TxnManagerErrorPB::UNKNOWN_ERROR);
}
}
const auto kMissingTxnId =
Status::InvalidArgument("missing transaction identifier");
} // anonymous namespace
TxnManagerServiceImpl::TxnManagerServiceImpl(Master* server)
: TxnManagerServiceIf(server->metric_entity(), server->result_tracker()),
server_(server) {
}
void TxnManagerServiceImpl::BeginTransaction(
const BeginTransactionRequestPB* /* req */,
BeginTransactionResponsePB* resp,
RpcContext* ctx) {
int64_t txn_id;
uint32_t txn_keepalive_ms;
const auto s = server_->txn_manager()->BeginTransaction(
ctx->remote_user().username(),
ctx->GetClientDeadline(),
&txn_id,
&txn_keepalive_ms);
if (PREDICT_TRUE(s.ok())) {
resp->set_txn_id(txn_id);
resp->set_keepalive_millis(txn_keepalive_ms);
}
CheckRespErrorOrSetUnknown(s, resp);
return ctx->RespondSuccess();
}
void TxnManagerServiceImpl::CommitTransaction(
const CommitTransactionRequestPB* req,
CommitTransactionResponsePB* resp,
RpcContext* ctx) {
if (!req->has_txn_id()) {
CheckRespErrorOrSetUnknown(kMissingTxnId, resp);
return ctx->RespondSuccess();
}
// Initiate the commit phase for the transaction. The caller can check for the
// completion of the commit phase using the GetTransactionState() RPC.
const auto s = server_->txn_manager()->CommitTransaction(
req->txn_id(),
ctx->remote_user().username(),
ctx->GetClientDeadline());
CheckRespErrorOrSetUnknown(s, resp);
return ctx->RespondSuccess();
}
void TxnManagerServiceImpl::GetTransactionState(
const GetTransactionStateRequestPB* req,
GetTransactionStateResponsePB* resp,
rpc::RpcContext* ctx) {
if (!req->has_txn_id()) {
CheckRespErrorOrSetUnknown(kMissingTxnId, resp);
return ctx->RespondSuccess();
}
TxnStatusEntryPB txn_status;
const auto s = server_->txn_manager()->GetTransactionState(
req->txn_id(),
ctx->remote_user().username(),
ctx->GetClientDeadline(),
&txn_status);
if (PREDICT_TRUE(s.ok())) {
DCHECK(txn_status.has_state());
resp->set_state(txn_status.state());
}
CheckRespErrorOrSetUnknown(s, resp);
return ctx->RespondSuccess();
}
void TxnManagerServiceImpl::AbortTransaction(
const AbortTransactionRequestPB* req,
AbortTransactionResponsePB* resp,
RpcContext* ctx) {
if (!req->has_txn_id()) {
CheckRespErrorOrSetUnknown(kMissingTxnId, resp);
return ctx->RespondSuccess();
}
const auto s = server_->txn_manager()->AbortTransaction(
req->txn_id(),
ctx->remote_user().username(),
ctx->GetClientDeadline());
CheckRespErrorOrSetUnknown(s, resp);
return ctx->RespondSuccess();
}
void TxnManagerServiceImpl::KeepTransactionAlive(
const KeepTransactionAliveRequestPB* req,
KeepTransactionAliveResponsePB* resp,
rpc::RpcContext* ctx) {
if (!req->has_txn_id()) {
CheckRespErrorOrSetUnknown(kMissingTxnId, resp);
return ctx->RespondSuccess();
}
const auto s = server_->txn_manager()->KeepTransactionAlive(
req->txn_id(),
ctx->remote_user().username(),
ctx->GetClientDeadline());
CheckRespErrorOrSetUnknown(s, resp);
return ctx->RespondSuccess();
}
bool TxnManagerServiceImpl::AuthorizeClient(
const google::protobuf::Message* /* req */,
google::protobuf::Message* /* resp */,
RpcContext* ctx) {
return server_->Authorize(ctx, ServerBase::USER);
}
} // namespace transactions
} // namespace kudu