blob: 7d8873488216de85645c9208f79e1a4941305c5f [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/client/transaction-internal.h"
#include <algorithm>
#include <functional>
#include <memory>
#include <ostream>
#include <string>
#include <glog/logging.h>
#include "kudu/client/client-internal.h"
#include "kudu/client/client.h"
#include "kudu/client/session-internal.h"
#include "kudu/client/shared_ptr.h" // IWYU pragma: keep
#include "kudu/client/txn_manager_proxy_rpc.h"
#include "kudu/common/txn_id.h"
#include "kudu/common/wire_protocol.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/master/txn_manager.pb.h"
#include "kudu/master/txn_manager.proxy.h"
#include "kudu/rpc/messenger.h"
#include "kudu/rpc/response_callback.h"
#include "kudu/rpc/rpc.h"
#include "kudu/transactions/transactions.pb.h"
#include "kudu/util/async_util.h"
#include "kudu/util/monotime.h"
#include "kudu/util/status.h"
#include "kudu/util/status_callback.h"
using kudu::client::internal::AsyncRandomTxnManagerRpc;
using kudu::rpc::BackoffType;
using kudu::transactions::AbortTransactionRequestPB;
using kudu::transactions::AbortTransactionResponsePB;
using kudu::transactions::BeginTransactionRequestPB;
using kudu::transactions::BeginTransactionResponsePB;
using kudu::transactions::CommitTransactionRequestPB;
using kudu::transactions::CommitTransactionResponsePB;
using kudu::transactions::GetTransactionStateRequestPB;
using kudu::transactions::GetTransactionStateResponsePB;
using kudu::transactions::KeepTransactionAliveRequestPB;
using kudu::transactions::KeepTransactionAliveResponsePB;
using kudu::transactions::TxnManagerServiceProxy;
using kudu::transactions::TxnStatePB;
using kudu::transactions::TxnTokenPB;
using std::string;
using std::unique_ptr;
using strings::Substitute;
namespace kudu {
namespace client {
namespace {
MonoTime GetRpcDeadline(const KuduClient* c) {
return MonoTime::Now() + c->default_admin_operation_timeout();
}
} // anonymous namespace
KuduTransaction::Data::Data(const sp::shared_ptr<KuduClient>& client)
: weak_client_(client),
txn_keep_alive_ms_(0) {
CHECK(client);
}
Status KuduTransaction::Data::CreateSession(sp::shared_ptr<KuduSession>* session) {
auto c = weak_client_.lock();
if (!c) {
return Status::IllegalState("associated KuduClient is gone");
}
// We could check for the transaction status here before trying to return
// a session for a transaction that has been committed or abored already.
// However, it would mean to incur an extra RPC to TxnManager which isn't
// a good idea if thinking about this at scale. So, since tablet servers
// should perform the same kind of verification while processing write
// operations issued from the context of this session anyways,
// there isn't much sense duplicating that at the client side.
sp::shared_ptr<KuduSession> ret(new KuduSession(c, txn_id_));
ret->data_->Init(ret);
*session = std::move(ret);
return Status::OK();
}
Status KuduTransaction::Data::Begin(const sp::shared_ptr<KuduTransaction>& txn) {
auto c = weak_client_.lock();
if (!c) {
return Status::IllegalState("associated KuduClient is gone");
}
BeginTransactionResponsePB resp;
{
Synchronizer sync;
BeginTransactionRequestPB req;
AsyncRandomTxnManagerRpc<BeginTransactionRequestPB,
BeginTransactionResponsePB> rpc(
GetRpcDeadline(c.get()), c.get(), BackoffType::EXPONENTIAL,
std::move(req), &resp, &TxnManagerServiceProxy::BeginTransactionAsync,
"BeginTransaction", sync.AsStatusCallback());
rpc.SendRpc();
RETURN_NOT_OK(sync.Wait());
}
if (resp.has_error()) {
return StatusFromPB(resp.error().status());
}
DCHECK(resp.has_txn_id());
txn_id_ = resp.txn_id();
DCHECK(txn_id_.IsValid());
DCHECK(resp.has_keepalive_millis());
txn_keep_alive_ms_ = resp.keepalive_millis();
DCHECK_GT(txn_keep_alive_ms_, 0);
// Start sending regular heartbeats for the new transaction.
auto next_run_after = MonoDelta::FromMilliseconds(
std::max<uint32_t>(1, txn_keep_alive_ms_ / 2));
auto m = c->data_->messenger_;
if (PREDICT_FALSE(!m)) {
return Status::IllegalState("null messenger in Kudu client");
}
sp::weak_ptr<KuduTransaction> weak_txn(txn);
m->ScheduleOnReactor(
[weak_txn](const Status& s) {
SendTxnKeepAliveTask(s, weak_txn);
},
next_run_after);
return Status::OK();
}
Status KuduTransaction::Data::Commit(bool wait) {
DCHECK(txn_id_.IsValid());
auto c = weak_client_.lock();
if (!c) {
return Status::IllegalState("associated KuduClient is gone");
}
const auto deadline = GetRpcDeadline(c.get());
CommitTransactionResponsePB resp;
{
Synchronizer sync;
CommitTransactionRequestPB req;
req.set_txn_id(txn_id_);
AsyncRandomTxnManagerRpc<CommitTransactionRequestPB,
CommitTransactionResponsePB> rpc(
deadline, c.get(), BackoffType::EXPONENTIAL,
std::move(req), &resp, &TxnManagerServiceProxy::CommitTransactionAsync,
"CommitTransaction", sync.AsStatusCallback());
rpc.SendRpc();
RETURN_NOT_OK(sync.Wait());
}
if (resp.has_error()) {
return StatusFromPB(resp.error().status());
}
if (wait) {
RETURN_NOT_OK(WaitForTxnCommitToFinalize(c.get(), deadline, txn_id_));
}
return Status::OK();
}
Status KuduTransaction::Data::IsCommitComplete(
bool* is_complete, Status* completion_status) {
DCHECK(is_complete);
DCHECK(completion_status);
DCHECK(txn_id_.IsValid());
auto c = weak_client_.lock();
if (!c) {
return Status::IllegalState("associated KuduClient is gone");
}
const auto deadline = GetRpcDeadline(c.get());
return IsCommitCompleteImpl(
c.get(), deadline, txn_id_, is_complete, completion_status);
}
Status KuduTransaction::Data::Rollback() {
DCHECK(txn_id_.IsValid());
auto c = weak_client_.lock();
if (!c) {
return Status::IllegalState("associated KuduClient is gone");
}
AbortTransactionResponsePB resp;
{
Synchronizer sync;
AbortTransactionRequestPB req;
req.set_txn_id(txn_id_);
AsyncRandomTxnManagerRpc<AbortTransactionRequestPB,
AbortTransactionResponsePB> rpc(
GetRpcDeadline(c.get()), c.get(), BackoffType::EXPONENTIAL,
std::move(req), &resp, &TxnManagerServiceProxy::AbortTransactionAsync,
"AbortTransaction", sync.AsStatusCallback());
rpc.SendRpc();
RETURN_NOT_OK(sync.Wait());
}
if (resp.has_error()) {
return StatusFromPB(resp.error().status());
}
return Status::OK();
}
Status KuduTransaction::Data::Serialize(
string* serialized_txn,
const SerializationOptions& options) const {
DCHECK(serialized_txn);
DCHECK(txn_id_.IsValid());
TxnTokenPB token;
token.set_txn_id(txn_id_);
token.set_keepalive_millis(txn_keep_alive_ms_);
token.set_enable_keepalive(options.keepalive());
if (!token.SerializeToString(serialized_txn)) {
return Status::Corruption("unable to serialize transaction information");
}
return Status::OK();
}
Status KuduTransaction::Data::Deserialize(
const sp::shared_ptr<KuduClient>& client,
const string& serialized_txn,
sp::shared_ptr<KuduTransaction>* txn) {
DCHECK(client);
// TODO(aserbin): should the owner of the transaction be taken into account
// as well, i.e. not allow other than the user that created
// the transaction to deserialize its transaction token?
TxnTokenPB token;
if (!token.ParseFromString(serialized_txn)) {
return Status::Corruption("unable to deserialize transaction information");
}
if (!token.has_txn_id()) {
return Status::Corruption("transaction identifier is missing");
}
if (!token.has_keepalive_millis()) {
return Status::Corruption("keepalive information is missing");
}
sp::shared_ptr<KuduTransaction> ret(new KuduTransaction(client));
ret->data_->txn_keep_alive_ms_ = token.keepalive_millis();
DCHECK_GT(ret->data_->txn_keep_alive_ms_, 0);
ret->data_->txn_id_ = token.txn_id();
DCHECK(ret->data_->txn_id_.IsValid());
// Start sending periodic txn keepalive requests for the deserialized
// transaction, as specified in the source txn token.
if (token.has_enable_keepalive() && token.enable_keepalive()) {
auto m = client->data_->messenger_;
if (PREDICT_TRUE(m)) {
sp::weak_ptr<KuduTransaction> weak_txn(ret);
auto next_run_after = MonoDelta::FromMilliseconds(
std::max<uint32_t>(1, ret->data_->txn_keep_alive_ms_ / 2));
m->ScheduleOnReactor(
[weak_txn](const Status& s) {
SendTxnKeepAliveTask(s, weak_txn);
},
next_run_after);
}
}
*txn = std::move(ret);
return Status::OK();
}
Status KuduTransaction::Data::IsCommitCompleteImpl(
KuduClient* client,
const MonoTime& deadline,
const TxnId& txn_id,
bool* is_complete,
Status* completion_status) {
DCHECK(client);
GetTransactionStateResponsePB resp;
{
Synchronizer sync;
GetTransactionStateRequestPB req;
req.set_txn_id(txn_id);
AsyncRandomTxnManagerRpc<GetTransactionStateRequestPB,
GetTransactionStateResponsePB> rpc(
deadline, client, BackoffType::EXPONENTIAL, std::move(req), &resp,
&TxnManagerServiceProxy::GetTransactionStateAsync, "GetTransactionState",
sync.AsStatusCallback());
rpc.SendRpc();
RETURN_NOT_OK(sync.Wait());
}
if (resp.has_error()) {
return StatusFromPB(resp.error().status());
}
DCHECK(resp.has_state());
const auto state = resp.state();
switch (state) {
case TxnStatePB::OPEN:
*is_complete = false;
*completion_status = Status::IllegalState("transaction is still open");
break;
case TxnStatePB::ABORTED:
*is_complete = true;
*completion_status = Status::Aborted("transaction has been aborted");
break;
case TxnStatePB::COMMIT_IN_PROGRESS:
*is_complete = false;
*completion_status = Status::Incomplete("commit is still in progress");
break;
case TxnStatePB::COMMITTED:
*is_complete = true;
*completion_status = Status::OK();
break;
default: {
auto errmsg = Substitute("$0: unknown transaction state", state);
LOG(DFATAL) << errmsg;
return Status::IllegalState(errmsg);
}
}
return Status::OK();
}
Status KuduTransaction::Data::WaitForTxnCommitToFinalize(
KuduClient* client, const MonoTime& deadline, const TxnId& txn_id) {
return RetryFunc(
deadline,
"waiting for transaction commit to be completed",
"timed out waiting for transaction commit to finalize",
[&](const MonoTime& deadline, bool* retry) {
bool is_complete = false;
Status status;
const auto s = KuduTransaction::Data::IsCommitCompleteImpl(
client, deadline, txn_id, &is_complete, &status);
if (!s.ok()) {
*retry = false;
return s;
}
*retry = !is_complete;
return status;
});
}
// A structure to pass around metadata on KeepTransactionAlive() when invoking
// the RPC asynchronously.
struct KeepaliveRpcCtx {
KeepTransactionAliveResponsePB resp;
unique_ptr<AsyncRandomTxnManagerRpc<KeepTransactionAliveRequestPB,
KeepTransactionAliveResponsePB>> rpc;
sp::weak_ptr<KuduTransaction> weak_txn;
};
void KuduTransaction::Data::SendTxnKeepAliveTask(
const Status& status,
sp::weak_ptr<KuduTransaction> weak_txn) {
VLOG(2) << Substitute("SendTxnKeepAliveTask() is run");
if (PREDICT_FALSE(!status.ok())) {
// This means there was an error executing the task on reactor. As of now,
// this can only happen if the reactor is being shutdown and the task is
// de-scheduled from the queue, so the only possible error status here is
// Status::Aborted().
VLOG(1) << Substitute("SendTxnKeepAliveTask did not run: $0",
status.ToString());
DCHECK(status.IsAborted());
return;
}
// Check if the transaction object is still around.
sp::shared_ptr<KuduTransaction> txn(weak_txn.lock());
if (PREDICT_FALSE(!txn)) {
return;
}
auto c = txn->data_->weak_client_.lock();
if (PREDICT_FALSE(!c)) {
return;
}
const auto& txn_id = txn->data_->txn_id_;
const auto next_run_after = MonoDelta::FromMilliseconds(
std::max<uint32_t>(1, txn->data_->txn_keep_alive_ms_ / 2));
auto deadline = MonoTime::Now() + next_run_after;
sp::shared_ptr<KeepaliveRpcCtx> ctx(new KeepaliveRpcCtx);
ctx->weak_txn = weak_txn;
{
KeepTransactionAliveRequestPB req;
req.set_txn_id(txn_id);
unique_ptr<AsyncRandomTxnManagerRpc<KeepTransactionAliveRequestPB,
KeepTransactionAliveResponsePB>> rpc(
new AsyncRandomTxnManagerRpc<KeepTransactionAliveRequestPB,
KeepTransactionAliveResponsePB>(
deadline, c.get(), BackoffType::LINEAR, std::move(req), &ctx->resp,
&TxnManagerServiceProxy::KeepTransactionAliveAsync,
"KeepTransactionAlive",
[ctx](const Status& s) {
TxnKeepAliveCb(s, std::move(ctx));
}));
ctx->rpc = std::move(rpc);
}
// Send the RPC and handle the response asynchronously.
ctx->rpc->SendRpc();
}
void KuduTransaction::Data::TxnKeepAliveCb(
const Status& s, sp::shared_ptr<KeepaliveRpcCtx> ctx) {
// Break the circular reference to 'ctx'. The circular reference is there
// because KeepaliveRpcCtx::rpc captures the 'ctx' for ResponseCallback.
ctx->rpc.reset();
// Check if the transaction object is still around.
sp::shared_ptr<KuduTransaction> txn(ctx->weak_txn.lock());
if (PREDICT_FALSE(!txn)) {
return;
}
const auto& resp = ctx->resp;
if (s.ok() && resp.has_error()) {
auto s = StatusFromPB(resp.error().status());
}
const auto& txn_id = txn->data_->txn_id_;
if (s.IsIllegalState() || s.IsAborted()) {
// Transaction's state changed a to terminal one, no need to send
// keepalive requests anymore.
VLOG(1) << Substitute("KeepTransactionAlive() returned $0: "
"stopping keepalive requests for transaction ID $1",
s.ToString(), txn_id.value());
return;
}
// Re-schedule the task, so it will send another keepalive heartbeat as
// necessary.
sp::shared_ptr<KuduClient> c(txn->data_->weak_client_.lock());
if (PREDICT_FALSE(!c)) {
return;
}
auto m = c->data_->messenger_;
if (PREDICT_TRUE(m)) {
auto weak_txn = ctx->weak_txn;
const auto next_run_after = MonoDelta::FromMilliseconds(
std::max<uint32_t>(1, txn->data_->txn_keep_alive_ms_ / 2));
m->ScheduleOnReactor(
[weak_txn](const Status& s) {
SendTxnKeepAliveTask(s, weak_txn);
},
next_run_after);
}
}
} // namespace client
} // namespace kudu