blob: af0fddc8a00c67b5a8a13048deec2cb80c5cfb10 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/client/transaction-internal.h"
#include <functional>
#include <memory>
#include <ostream>
#include <string>
#include <utility>
#include <glog/logging.h>
#include "kudu/client/client-internal.h"
#include "kudu/client/client.h"
#include "kudu/client/session-internal.h"
#include "kudu/client/shared_ptr.h" // IWYU pragma: keep
#include "kudu/client/txn_manager_proxy_rpc.h"
#include "kudu/common/txn_id.h"
#include "kudu/common/wire_protocol.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/master/txn_manager.pb.h"
#include "kudu/master/txn_manager.proxy.h"
#include "kudu/rpc/response_callback.h"
#include "kudu/rpc/rpc.h"
#include "kudu/transactions/transactions.pb.h"
#include "kudu/util/async_util.h"
#include "kudu/util/monotime.h"
#include "kudu/util/status.h"
using kudu::client::internal::AsyncRandomTxnManagerRpc;
using kudu::rpc::BackoffType;
using kudu::transactions::AbortTransactionRequestPB;
using kudu::transactions::AbortTransactionResponsePB;
using kudu::transactions::BeginTransactionRequestPB;
using kudu::transactions::BeginTransactionResponsePB;
using kudu::transactions::CommitTransactionRequestPB;
using kudu::transactions::CommitTransactionResponsePB;
using kudu::transactions::GetTransactionStateRequestPB;
using kudu::transactions::GetTransactionStateResponsePB;
using kudu::transactions::TxnManagerServiceProxy;
using kudu::transactions::TxnStatePB;
using kudu::transactions::TxnTokenPB;
using std::string;
using strings::Substitute;
namespace kudu {
namespace client {
namespace {
MonoTime GetRpcDeadline(const KuduClient* c) {
return MonoTime::Now() + c->default_admin_operation_timeout();
}
} // anonymous namespace
KuduTransaction::Data::Data(const sp::shared_ptr<KuduClient>& client)
: weak_client_(client),
txn_keep_alive_ms_(0) {
CHECK(client);
}
Status KuduTransaction::Data::CreateSession(sp::shared_ptr<KuduSession>* session) {
auto c = weak_client_.lock();
if (!c) {
return Status::IllegalState("associated KuduClient is gone");
}
// We could check for the transaction status here before trying to return
// a session for a transaction that has been committed or abored already.
// However, it would mean to incur an extra RPC to TxnManager which isn't
// a good idea if thinking about this at scale. So, since tablet servers
// should perform the same kind of verification while processing write
// operations issued from the context of this session anyways,
// there isn't much sense duplicating that at the client side.
sp::shared_ptr<KuduSession> ret(new KuduSession(c, txn_id_));
ret->data_->Init(ret);
*session = std::move(ret);
return Status::OK();
}
Status KuduTransaction::Data::Begin() {
auto c = weak_client_.lock();
if (!c) {
return Status::IllegalState("associated KuduClient is gone");
}
const auto deadline = GetRpcDeadline(c.get());
Synchronizer sync;
BeginTransactionRequestPB req;
BeginTransactionResponsePB resp;
AsyncRandomTxnManagerRpc<
BeginTransactionRequestPB, BeginTransactionResponsePB> rpc(
deadline, c.get(), BackoffType::EXPONENTIAL, req, &resp,
&TxnManagerServiceProxy::BeginTransactionAsync, "BeginTransaction",
sync.AsStatusCallback());
rpc.SendRpc();
RETURN_NOT_OK(sync.Wait());
if (resp.has_error()) {
return StatusFromPB(resp.error().status());
}
// TODO(aserbin): start sending regular hearbeats for the started transaction
CHECK(resp.has_txn_id());
txn_id_ = resp.txn_id();
CHECK(resp.has_keepalive_millis());
txn_keep_alive_ms_ = resp.keepalive_millis();
CHECK(txn_id_.IsValid());
return Status::OK();
}
Status KuduTransaction::Data::Commit(bool wait) {
CHECK(txn_id_.IsValid());
auto c = weak_client_.lock();
if (!c) {
return Status::IllegalState("associated KuduClient is gone");
}
const auto deadline = GetRpcDeadline(c.get());
Synchronizer sync;
CommitTransactionRequestPB req;
req.set_txn_id(txn_id_);
CommitTransactionResponsePB resp;
AsyncRandomTxnManagerRpc<CommitTransactionRequestPB,
CommitTransactionResponsePB> rpc(
deadline, c.get(), BackoffType::EXPONENTIAL, req, &resp,
&TxnManagerServiceProxy::CommitTransactionAsync, "CommitTransaction",
sync.AsStatusCallback());
rpc.SendRpc();
RETURN_NOT_OK(sync.Wait());
if (resp.has_error()) {
return StatusFromPB(resp.error().status());
}
if (wait) {
RETURN_NOT_OK(WaitForTxnCommitToFinalize(c.get(), deadline, txn_id_));
}
return Status::OK();
}
Status KuduTransaction::Data::IsCommitComplete(
bool* is_complete, Status* completion_status) {
DCHECK(is_complete);
DCHECK(completion_status);
CHECK(txn_id_.IsValid());
auto c = weak_client_.lock();
if (!c) {
return Status::IllegalState("associated KuduClient is gone");
}
const auto deadline = GetRpcDeadline(c.get());
return IsCommitCompleteImpl(
c.get(), deadline, txn_id_, is_complete, completion_status);
}
Status KuduTransaction::Data::Rollback() {
DCHECK(txn_id_.IsValid());
auto c = weak_client_.lock();
if (!c) {
return Status::IllegalState("associated KuduClient is gone");
}
const auto deadline = GetRpcDeadline(c.get());
Synchronizer sync;
AbortTransactionRequestPB req;
req.set_txn_id(txn_id_);
AbortTransactionResponsePB resp;
AsyncRandomTxnManagerRpc<AbortTransactionRequestPB,
AbortTransactionResponsePB> rpc(
deadline, c.get(), BackoffType::EXPONENTIAL, req, &resp,
&TxnManagerServiceProxy::AbortTransactionAsync, "AbortTransaction",
sync.AsStatusCallback());
rpc.SendRpc();
RETURN_NOT_OK(sync.Wait());
if (resp.has_error()) {
return StatusFromPB(resp.error().status());
}
return Status::OK();
}
Status KuduTransaction::Data::Serialize(string* serialized_txn) const {
CHECK(txn_id_.IsValid());
DCHECK(serialized_txn);
TxnTokenPB token;
token.set_txn_id(txn_id_);
if (txn_keep_alive_ms_ > 0) {
token.set_keepalive_millis(txn_keep_alive_ms_);
}
if (!token.SerializeToString(serialized_txn)) {
return Status::Corruption("unable to serialize transaction information");
}
return Status::OK();
}
Status KuduTransaction::Data::Deserialize(
const sp::shared_ptr<KuduClient>& client,
const string& serialized_txn,
sp::shared_ptr<KuduTransaction>* txn) {
// TODO(aserbin): should the owner of the transaction be taken into account
// as well, i.e. not allow other than the user that created
// the transaction to deserialize its transaction token?
TxnTokenPB token;
if (!token.ParseFromString(serialized_txn)) {
return Status::Corruption("unable to deserialize transaction information");
}
if (!token.has_txn_id()) {
return Status::Corruption("transaction identifier is missing");
}
if (!token.has_keepalive_millis()) {
return Status::Corruption("keepalive information is missing");
}
sp::shared_ptr<KuduTransaction> ret(new KuduTransaction(client));
ret->data_->txn_id_ = token.txn_id();
ret->data_->txn_keep_alive_ms_ = token.keepalive_millis();
*txn = std::move(ret);
return Status::OK();
}
Status KuduTransaction::Data::IsCommitCompleteImpl(
KuduClient* client,
const MonoTime& deadline,
const TxnId& txn_id,
bool* is_complete,
Status* completion_status) {
DCHECK(client);
Synchronizer sync;
GetTransactionStateRequestPB req;
req.set_txn_id(txn_id);
GetTransactionStateResponsePB resp;
AsyncRandomTxnManagerRpc<GetTransactionStateRequestPB,
GetTransactionStateResponsePB> rpc(
deadline, client, BackoffType::EXPONENTIAL, req, &resp,
&TxnManagerServiceProxy::GetTransactionStateAsync, "GetTransactionState",
sync.AsStatusCallback());
rpc.SendRpc();
RETURN_NOT_OK(sync.Wait());
if (resp.has_error()) {
return StatusFromPB(resp.error().status());
}
DCHECK(resp.has_state());
const auto state = resp.state();
switch (state) {
case TxnStatePB::OPEN:
*is_complete = false;
*completion_status = Status::IllegalState("transaction is still open");
break;
case TxnStatePB::ABORTED:
*is_complete = true;
*completion_status = Status::Aborted("transaction has been aborted");
break;
case TxnStatePB::COMMIT_IN_PROGRESS:
*is_complete = false;
*completion_status = Status::Incomplete("commit is still in progress");
break;
case TxnStatePB::COMMITTED:
*is_complete = true;
*completion_status = Status::OK();
break;
default: {
auto errmsg = Substitute("$0: unknown transaction state", state);
LOG(DFATAL) << errmsg;
return Status::IllegalState(errmsg);
}
}
return Status::OK();
}
Status KuduTransaction::Data::WaitForTxnCommitToFinalize(
KuduClient* client, const MonoTime& deadline, const TxnId& txn_id) {
return RetryFunc(
deadline,
"waiting for transaction commit to be completed",
"timed out waiting for transaction commit to finalize",
[&](const MonoTime& deadline, bool* retry) {
bool is_complete = false;
Status status;
const auto s = KuduTransaction::Data::IsCommitCompleteImpl(
client, deadline, txn_id, &is_complete, &status);
if (!s.ok()) {
*retry = false;
return s;
}
*retry = !is_complete;
return status;
});
}
} // namespace client
} // namespace kudu