blob: cc4f27e3bd87059602bad18953bda3dcb81ccef6 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <cstdint>
#include <memory>
#include <mutex>
#include <string>
#include <unordered_map>
#include <vector>
#include <boost/optional/optional.hpp>
#include "kudu/gutil/ref_counted.h"
#include "kudu/tablet/txn_coordinator.h"
#include "kudu/transactions/txn_status_entry.h"
#include "kudu/transactions/txn_status_tablet.h"
#include "kudu/util/locks.h"
#include "kudu/util/status.h"
namespace kudu {
namespace tablet {
class TabletReplica;
} // namespace tablet
namespace tserver {
class TabletServerErrorPB;
} // namespace tserver
namespace transactions {
class TxnStatusEntryPB;
// Maps the transaction ID to the corresponding TransactionEntry.
typedef std::unordered_map<int64_t, scoped_refptr<TransactionEntry>> TransactionsMap;
// Visitor used to iterate over and load into memory the existing state from a
// status tablet.
class TxnStatusManagerBuildingVisitor : public TransactionsVisitor {
public:
TxnStatusManagerBuildingVisitor();
~TxnStatusManagerBuildingVisitor() = default;
// Builds a TransactionEntry for the given metadata and keeps track of it in
// txns_by_id_. This is not thread-safe -- callers should ensure only a
// single thread calls it at once.
void VisitTransactionEntries(int64_t txn_id, TxnStatusEntryPB status_entry_pb,
std::vector<ParticipantIdAndPB> participants) override;
// Releases the transactions map to the caller. Should only be called once
// per call to VisitTransactionEntries().
void Release(int64_t* highest_txn_id, TransactionsMap* txns_by_id);
private:
int64_t highest_txn_id_;
TransactionsMap txns_by_id_;
};
// Manages ongoing transactions and participants therein, backed by an
// underlying tablet.
class TxnStatusManager final : public tablet::TxnCoordinator {
public:
explicit TxnStatusManager(tablet::TabletReplica* tablet_replica);
~TxnStatusManager() = default;
// Loads the contents of the status tablet into memory.
Status LoadFromTablet() override;
// Writes an entry to the status tablet and creates a transaction in memory.
// Returns an error if a higher transaction ID has already been attempted
// (even if that attempt failed), which helps ensure that at most one call to
// this method will succeed for a given transaction ID. The
// 'highest_seen_txn_id' output parameter, if not null, is populated in both
// success and failure cases, except for the case when returning
// Status::ServiceUnavailable() due to not-yet-loaded data from the
// backing transaction status tablet.
//
// TODO(awong): consider computing the next available transaction ID in this
// partition and using it in case this transaction is already used, or having
// callers forward a request for the next-highest transaction ID.
Status BeginTransaction(int64_t txn_id,
const std::string& user,
int64_t* highest_seen_txn_id,
tserver::TabletServerErrorPB* ts_error) override;
// Begins committing the given transaction, returning an error if the
// transaction doesn't exist, isn't open, or isn't owned by the given user.
Status BeginCommitTransaction(int64_t txn_id, const std::string& user,
tserver::TabletServerErrorPB* ts_error) override;
// Finalizes the commit of the transaction, returning an error if the
// transaction isn't in an appropraite state.
//
// Unlike the other transaction life-cycle calls, this isn't user-initiated,
// so it doesn't take a user.
//
// TODO(awong): add a commit timestamp.
Status FinalizeCommitTransaction(int64_t txn_id,
tserver::TabletServerErrorPB* ts_error) override;
// Aborts the given transaction, returning an error if the transaction
// doesn't exist, is committed or not yet opened, or isn't owned by the given
// user.
Status AbortTransaction(int64_t txn_id, const std::string& user,
tserver::TabletServerErrorPB* ts_error) override;
// Retrieves the status of the specified transaction, returning an error if
// the transaction doesn't exist or isn't owned by the specified user.
Status GetTransactionStatus(int64_t txn_id,
const std::string& user,
transactions::TxnStatusEntryPB* txn_status,
tserver::TabletServerErrorPB* ts_error) override;
// Processes keep-alive heartbeat for the specified transaction.
Status KeepTransactionAlive(int64_t txn_id,
const std::string& user,
tserver::TabletServerErrorPB* ts_error) override;
// Creates an in-memory participant, writes an entry to the status table, and
// attaches the in-memory participant to the transaction.
//
// If the transaction is open, it is ensured to be active for the duration of
// this call. Returns an error if the given transaction isn't open.
Status RegisterParticipant(int64_t txn_id, const std::string& tablet_id,
const std::string& user,
tserver::TabletServerErrorPB* ts_error) override;
// Abort transactions which are still in non-terminal state but haven't
// received keep-alive updates (see KeepTransactionAlive()) for a long time.
void AbortStaleTransactions() override;
int64_t highest_txn_id() const override {
std::lock_guard<simple_spinlock> l(lock_);
return highest_txn_id_;
}
// Populates a map from transaction ID to the sorted list of participants
// associated with that transaction ID.
tablet::ParticipantIdsByTxnId GetParticipantsByTxnIdForTests() const override;
private:
// Verifies that the transaction status data has already been loaded from the
// underlying tablet and the replica is a leader. Returns Status::OK() if the
// data is loaded and the replica is a leader. Otherwise, if the data hasn't
// been loaded yet, return Status::ServiceUnavailable(). If the data has
// been loaded, but the replica isn't a leader, returns
// Status::ServiceUnavailable() and sets the code in 'ts_error'
// to TabletServerErrorPB::NOT_THE_LEADER.
Status CheckTxnStatusDataLoadedUnlocked(
tserver::TabletServerErrorPB* ts_error) const;
// Returns the transaction entry, returning an error if the transaction ID
// doesn't exist or if 'user' is specified but isn't the owner of the
// transaction. In addition, if the underlying replica isn't a leader,
// sets the code in 'ts_error' to TabletServerErrorPB::NOT_THE_LEADER
// correspondingly.
Status GetTransaction(int64_t txn_id, const boost::optional<std::string>& user,
scoped_refptr<TransactionEntry>* txn,
tserver::TabletServerErrorPB* ts_error) const;
// Protects 'highest_txn_id_' and 'txns_by_id_'.
mutable simple_spinlock lock_;
// The highest transaction ID seen by this status manager so far. Requests to
// create a new transaction must provide an ID higher than this ID.
int64_t highest_txn_id_;
// Tracks the currently on-going transactions.
TransactionsMap txns_by_id_;
// The access to underlying storage.
TxnStatusTablet status_tablet_;
};
class TxnStatusManagerFactory : public tablet::TxnCoordinatorFactory {
public:
TxnStatusManagerFactory() {}
std::unique_ptr<tablet::TxnCoordinator> Create(tablet::TabletReplica* replica) override {
return std::unique_ptr<tablet::TxnCoordinator>(new TxnStatusManager(replica));
}
};
} // namespace transactions
} // namespace kudu