blob: 0f7e14389057953b8d12dd9c972f67b30e72fa3c [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <atomic>
#include <cstdint>
#include <memory>
#include <mutex>
#include <optional>
#include <unordered_map>
#include <utility>
#include <vector>
#include <glog/logging.h>
#include "kudu/common/timestamp.h"
#include "kudu/consensus/log_anchor_registry.h"
#include "kudu/consensus/opid.pb.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/tablet/lock_manager.h"
#include "kudu/tablet/mvcc.h"
#include "kudu/tablet/tablet_metadata.h"
#include "kudu/tserver/tserver.pb.h"
#include "kudu/util/locks.h"
#include "kudu/util/rw_semaphore.h"
#include "kudu/util/status.h"
namespace kudu {
namespace tablet {
// NOTE: we define the type explicitly so we can forward declare this enum.
enum TxnState : int8_t {
// Not a real state; useful for representing optionality.
// Each transaction starts in this state. While in this state, the
// transaction is not yet ready to be used, e.g. the initial op to begin
// the transaction may not have successfully replicated yet.
// Each transaction is moved into this state once they are ready to begin
// accepting ops.
// A transaction is moved into this state when a client has signified the
// intent to begin committing it. While in this state, the transaction may
// not accept new ops.
// A transaction is moved into this state when it becomes finalized -- all
// participants have acknowledged the intent to commit and have guaranteed
// that all ops in the transaction will succeed. While in this state, the
// transaction may not accept new ops and may not be aborted.
// A transaction is moved into this state when a client has signified
// intent to cancel the transaction. While in this state, the transaction
// may not accept new ops, begin committing, or finalize a commit.
const char* TxnStateToString(TxnState s);
// Tracks the state associated with a transaction.
// This class will primarily be accessed via op drivers. As such, locking
// primitives are exposed publicly, to be called in different stages of
// replication.
class Txn : public RefCountedThreadSafe<Txn> {
// Constructs a transaction instance with the given transaction ID and WAL
// anchor registry.
// The WAL anchor registry is used to ensure that the WAL segment that
// contains the participant op that replays to the transaction's current
// in-memory state is not GCed, allowing us to rebuild this transaction's
// in-memory state upon rebooting a server.
Txn(int64_t txn_id, log::LogAnchorRegistry* log_anchor_registry,
scoped_refptr<TabletMetadata> tablet_metadata, TxnState init_state = kInitializing)
: txn_id_(txn_id),
commit_timestamp_(-1) {}
// Takes the state lock and returns it. As transaction state is meant to be
// driven via an op driver, lock acquisition is expected to be serialized in
// a single thread.
void AcquireWriteLock(std::unique_lock<rw_semaphore>* txn_lock);
void AcquireReadLock(shared_lock<rw_semaphore>* txn_lock);
// Adopts the input partition lock, maintaining it until the transaction is
// complete (aborted or finalized). Rather than maintaining multiple
// ScopedPartitionLocks, this will release any currently-held lock and
// acquire the new one. It is thus expected that repeat callers are taking
// the same lock.
void AdoptPartitionLock(ScopedPartitionLock partition_lock);
// Validates that the transaction is in the appropriate state to perform the
// given operation. Should be called while holding the state lock before
// replicating a participant op.
Status ValidateBeginTransaction(tserver::TabletServerErrorPB::Code* code) const {
if (PREDICT_FALSE(state_ == kOpen)) {
*code = tserver::TabletServerErrorPB::TXN_OP_ALREADY_APPLIED;
return Status::IllegalState(
strings::Substitute("Transaction $0 already open", txn_id_));
if (PREDICT_FALSE(tablet_metadata_->HasTxnMetadata(txn_id_))) {
*code = tserver::TabletServerErrorPB::TXN_ILLEGAL_STATE;
return Status::IllegalState(
strings::Substitute("Transaction metadata for transaction $0 already exists",
if (PREDICT_FALSE(state_ != kInitializing)) {
*code = tserver::TabletServerErrorPB::TXN_ILLEGAL_STATE;
return Status::IllegalState(
strings::Substitute("Cannot begin transaction in state: $0",
return Status::OK();
Status ValidateBeginCommit(tserver::TabletServerErrorPB::Code* code,
Timestamp* begin_commit_ts) const {
std::optional<Timestamp> already_applied_timestamp;
if (PREDICT_FALSE(state_ == kInitializing)) {
Timestamp timestamp;
TxnState meta_state;
if (!tablet_metadata_->HasTxnMetadata(txn_id_, &meta_state, &timestamp)) {
*code = tserver::TabletServerErrorPB::TXN_ILLEGAL_STATE;
return Status::IllegalState("Transaction hasn't been successfully started");
if (PREDICT_FALSE(meta_state != kCommitted && meta_state != kCommitInProgress)) {
*code = tserver::TabletServerErrorPB::TXN_ILLEGAL_STATE;
return Status::IllegalState(
strings::Substitute("Cannot begin committing transaction in state: $0",
// There's no in-flight transaction, but we've already committed the
// transaction and persisted a commit timestamp. Return the commit
// timestamp.
already_applied_timestamp = timestamp;
// If we're in the process of committing, return the commit timestamp we
// have available to us.
if (PREDICT_FALSE(state_ == kCommitInProgress)) {
already_applied_timestamp = DCHECK_NOTNULL(commit_op_)->timestamp();
if (PREDICT_FALSE(state_ == kCommitted)) {
DCHECK_NE(-1, commit_timestamp_);
already_applied_timestamp = Timestamp(commit_timestamp_);
if (already_applied_timestamp) {
DCHECK_NE(Timestamp::kInvalidTimestamp, *already_applied_timestamp);
*begin_commit_ts = *already_applied_timestamp;
*code = tserver::TabletServerErrorPB::TXN_OP_ALREADY_APPLIED;
return Status::IllegalState(
strings::Substitute("Transaction $0 commit already in progress", txn_id_));
// If the transaction is otherwise not open, return an error.
if (PREDICT_FALSE(state_ != kOpen)) {
*code = tserver::TabletServerErrorPB::TXN_ILLEGAL_STATE;
return Status::IllegalState(
strings::Substitute("Cannot begin committing transaction in state: $0",
return Status::OK();
Status ValidateFinalize(tserver::TabletServerErrorPB::Code* code) const {
RETURN_NOT_OK(CheckPersistedMetadataState(code, kCommitted));
if (PREDICT_FALSE(state_ == kCommitted)) {
*code = tserver::TabletServerErrorPB::TXN_OP_ALREADY_APPLIED;
return Status::IllegalState(
strings::Substitute("Transaction $0 has already been committed", txn_id_));
if (PREDICT_FALSE(state_ != kInitializing && state_ != kCommitInProgress)) {
*code = tserver::TabletServerErrorPB::TXN_ILLEGAL_STATE;
return Status::IllegalState(
strings::Substitute("Cannot finalize transaction in state: $0",
return Status::OK();
Status ValidateAbort(tserver::TabletServerErrorPB::Code* code) const {
// NOTE: it's allowed that we replicate an ABORT_TXN op even if the
// transaction doesn't exist. This allows us to ensure we remove any
// pending TxnOpDispatchers.
RETURN_NOT_OK(CheckPersistedMetadataState(code, kAborted));
if (PREDICT_FALSE(state_ == kAborted)) {
*code = tserver::TabletServerErrorPB::TXN_OP_ALREADY_APPLIED;
return Status::IllegalState(
strings::Substitute("Transaction $0 has already been aborted", txn_id_));
if (PREDICT_FALSE(state_ == kCommitted)) {
*code = tserver::TabletServerErrorPB::TXN_ILLEGAL_STATE;
return Status::IllegalState(
strings::Substitute("Cannot abort transaction in state: $0",
return Status::OK();
// Applies the given state transitions. Should be called while holding the
// state lock in write mode after successfully replicating a participant op.
void BeginTransaction() {
void BeginCommit(const consensus::OpId& op_id) {
op_id.index(), strings::Substitute("BEGIN_COMMIT-$0-$1", txn_id_, this),
void FinalizeCommit(int64_t finalized_commit_timestamp) {
commit_timestamp_ = finalized_commit_timestamp;
void AbortTransaction() {
// Simple accessors for state. No locks are required to call these.
TxnState state() const {
return state_;
int64_t commit_timestamp() const {
return commit_timestamp_;
int64_t txn_id() const {
return txn_id_;
void SetCommitOp(std::unique_ptr<ScopedOp> commit_op) {
DCHECK(nullptr == commit_op_.get());
commit_op_ = std::move(commit_op);
ScopedOp* commit_op() {
return commit_op_.get();
void ReleasePartitionLock() {
std::lock_guard<simple_spinlock> l(lock_);
friend class RefCountedThreadSafe<Txn>;
// Sets the transaction state.
void SetState(TxnState s) {
state_ = s;
// If we're initializing a new transaction, checks for any persisted
// transaction metadata, ensuring it's compatible with our expected state,
// and returning an error otherwise.
// NOTE: we only need to check the expected metadata state when we're
// aborting or finalizing a commit, since these end the in-flight
// transaction. In other cases, we should be able to check the state of the
// in-flight transaction.
Status CheckPersistedMetadataState(
tserver::TabletServerErrorPB::Code* code,
TxnState expected_metadata_state = kNone) const {
DCHECK(expected_metadata_state == kAborted || expected_metadata_state == kCommitted);
// If we're not initializing a new transaction, there's an already
// in-flight, non-persisted transaction, and we'll check its state
// elsewhere.
if (PREDICT_TRUE(state_ != kInitializing)) {
return Status::OK();
TxnState meta_state;
if (tablet_metadata_->HasTxnMetadata(txn_id_, &meta_state)) {
if (expected_metadata_state == meta_state) {
*code = tserver::TabletServerErrorPB::TXN_OP_ALREADY_APPLIED;
return Status::IllegalState(
strings::Substitute("Transaction metadata for transaction $0 already set",
// We created this transaction as a part of this op (i.e. it was not
// already in flight), and there is existing metadata for it that doesn't
// match the expected state.
*code = tserver::TabletServerErrorPB::TXN_ILLEGAL_STATE;
return Status::IllegalState(
strings::Substitute("Transaction metadata for transaction $0 already exists",
// If we don't have metadata for the transaction, it depends on what the
// expected state is. E.g. ABORT_TXN ops can and should be replicated even
// if we haven't successfully initialized the transaction. This is possible
// if we've successfully registered the participant, but haven't yet
// replicated the BEGIN_TXN op. The same can't be said for FINALIZE_COMMIT
// ops, so return an error here in that case.
if (expected_metadata_state == kCommitted) {
*code = tserver::TabletServerErrorPB::TXN_ILLEGAL_STATE;
return Status::IllegalState(
strings::Substitute("Transaction metadata for transaction $0 does not exist",
return Status::OK();
// Transaction ID for this transaction.
const int64_t txn_id_;
// Log anchor registry with which to anchor WAL segments, and an anchor to
// update upon applying a state change.
log::LogAnchorRegistry* log_anchor_registry_;
// Anchor used to prevent GC of a BEGIN_COMMIT op.
log::LogAnchor begin_commit_anchor_;
// Tablet metadata used to persist this transaction's metadata.
scoped_refptr<TabletMetadata> tablet_metadata_;
// Lock protecting access to 'state_' and 'commit_timestamp'. Ops that intend
// on mutating 'state_' must take this lock in write mode. Ops that intend on
// reading 'state_' and relying on it remaining constant must take this lock
// in read mode.
mutable rw_semaphore state_lock_;
std::atomic<TxnState> state_;
// If this transaction was successfully committed, the timestamp at which the
// transaction should be applied, and -1 otherwise.
std::atomic<int64_t> commit_timestamp_;
// Scoped op with a lifecycle that spans between the BEGIN_COMMIT op and
// corresponding FINALIZE_COMMIT or ABORT_TXN op, used to ensure that
// scanners wait until we finish the transaction if we've begun committing,
// before proceeding with a scan. This ensures scans on this participant are
// repeatable.
std::unique_ptr<ScopedOp> commit_op_;
// Holds the partition lock acquired for this transaction.
simple_spinlock lock_;
ScopedPartitionLock partition_lock_;
// Tracks the on-going transactions in which a given tablet is participating.
class TxnParticipant {
explicit TxnParticipant(scoped_refptr<TabletMetadata> tmeta)
: tablet_metadata_(std::move(tmeta)) {}
// Convenience struct representing a Txn of this participant. This is useful
// for testing, as it easy to construct.
struct TxnEntry {
int64_t txn_id;
TxnState state;
int64_t commit_timestamp;
// Creates a transaction in kOpen.
void CreateOpenTransaction(int64_t txn_id,
log::LogAnchorRegistry* log_anchor_registry);
// Gets the transaction for the given transaction ID, creating it in
// the kInitializing state if one doesn't already exist.
scoped_refptr<Txn> GetOrCreateTransaction(int64_t txn_id,
log::LogAnchorRegistry* log_anchor_registry);
// Gets the transaction for the given transaction ID, or returns null if it
// does not exist.
scoped_refptr<Txn> GetTransaction(int64_t txn_id);
// Removes the given transaction if it failed to initialize, e.g. the op that
// created it failed to replicate, leaving it in the kInitializing state but
// with no op actively mutating it.
// It is expected that the caller, e.g. a ParticipantOp, has released any Txn
// references before calling this, ensuring that when we check the state of
// the Txn, we can thread-safely determine whether it has been abandoned.
void ClearIfInitFailed(int64_t txn_id);
// Removes the given transaction if it is in a terminal state, i.e. it is
// either kAborted or kCommitted, freeing any WAL anchors it may have held.
// Assumes there are no active op drivers updating state (i.e. that the
// transaction reference in our map is the only one).
// Returns whether or not this call actually cleared the transaction (i.e.
// returns 'false' if the transaction was not found)..
bool ClearIfComplete(int64_t txn_id);
// Returns the transactions, sorted by transaction ID. This returns both
// in-flight transactions tracked by 'txns_' as well as transactions that
// have terminated and persisted metadata via abort or commit.
std::vector<TxnEntry> GetTxnsForTests() const;
// Protects insertions and removals from 'txns_'.
mutable simple_spinlock lock_;
// Maps from transaction ID to the corresponding transaction state.
std::unordered_map<int64_t, scoped_refptr<Txn>> txns_;
// Tablet metadata used to persist this transaction's metadata.
scoped_refptr<TabletMetadata> tablet_metadata_;
inline bool operator==(const TxnParticipant::TxnEntry& lhs, const TxnParticipant::TxnEntry& rhs) {
return lhs.txn_id == rhs.txn_id &&
lhs.state == rhs.state &&
lhs.commit_timestamp == rhs.commit_timestamp;
} // namespace tablet
} // namespace kudu